Spark
BOOKS
https://www.amazon.com/Spark-Action-Covers-Apache-Examples-ebook/dp/B09781T1XK
https://www.amazon.com/Learning-Spark-Lightning-Fast-Data-Analytics-ebook/dp/B08F9WVFCT
https://www.amazon.com/Spark-Definitive-Guide-Processing-Simple-ebook/dp/B079P71JHY
https://medium.datadriveninvestor.com/introduction-to-spark-with-python-spark-architecture-and-components-explained-in-detail-54e2ba09d6fe
Accessing Hive metastore from Spark:
https://www.youtube.com/watch?v=6-LtMmNAvIE
Reading HDFS file names
https://www.reddit.com/r/apachespark/comments/mcbq3c/reading_just_filenames_from_hdfs/
Code:
https://runawayhorse001.github.io/LearningApacheSpark/
JSON in Spark:
https://www.reddit.com/r/apachespark/comments/naqeeo/why_isnt_get_json_object_function_consistently/
https://www.reddit.com/r/apachespark/comments/nxxtkc/parsing_nested_json_arrays_in_sql_analytics/
https://habr.com/ru/company/leroy_merlin/blog/563066/
https://analyticshut.com/reading-json-data-in-spark/
https://stackoverflow.com/questions/34069282/how-to-query-json-data-column-using-spark-dataframes
https://subscription.packtpub.com/book/data/9781785880070/4/ch04lvl1sec29/loading-json-into-spark
https://habr.com/ru/company/otus/blog/557812/. Spark streaming
https://habr.com/ru/company/otus/blog/556734/
https://mungingdata.com/apache-spark/python-pyspark-scala-which-better/
https://habr.com/ru/company/newprolab/blog/530568/ Spark 3.0
https://community.cloud.databricks.com/login.html. - playground
https://docs.databricks.com/data/databricks-datasets.html
https://leanpub.com/beautiful-spark/ Book
https://habr.com/ru/company/otus/blog/529684/ . fast spark
https://habr.com/ru/company/sberbank/blog/496310/
https://mungingdata.com/apache-spark/using-the-console/
https://towardsdatascience.com/write-clean-and-solid-scala-spark-jobs-28ac4395424a
https://mungingdata.com/category/apache-spark/
https://luminousmen.com/post/spark-tips-dont-collect-data-on-driver
https://medium.com/@itzikjan/spark-join-optimization-on-skew-data-using-bin-packing-afae73f68662
https://habr.com/ru/company/alfastrah/blog/481924/
https://github.com/ericxiao251/spark-syntax
https://www.reddit.com/r/dataengineering/comments/dixkr8/free_sandbox_for_playing_with_spark/
https://sparkbyexamples.com/
https://dzone.com/articles/pyspark-join-explained-with-examples
https://sujithjay.com/spark/shuffle-hash-sort-merge-joins
https://sujithjay.com/spark/or-within-joins
https://medium.com/@F4Qtech/udfs-vs-map-vs-custom-spark-native-functions-91ab2c154b44
https://towardsdatascience.com/my-10-recommendations-after-getting-the-databricks-certification-for-apache-spark-53cd3690073
https://medium.com/towards-artificial-intelligence/4-tips-to-write-scalable-apache-spark-code-1c736e4d698e
https://www.waitingforcode.com/apache-spark/memory-apache-spark-classes/read
https://www.qubole.com/blog/enhance-spark-performance-with-dynamic-filtering/
https://dzone.com/search?page=1
https://youtu.be/daXEp4HmS-E
https://www.youtube.com/watch?v=CF5Ewk0GxiQ
https://www.youtube.com/watch?v=5wn5QRGnBh4 . SQOOP, SPARK SQL, MYSQL - EXAMPLE 2
https://www.youtube.com/watch?v=0Xg9wsOCz74 . Spark streaming
https://databricks.com/sparkaisummit/europe/spark-summit-2019-keynotes
https://medium.com/thron-tech/optimising-spark-rdd-pipelines-679b41362a8a
https://open.compscicenter.ru/archive/apache-spark/
https://legacy.gitbook.com/@jaceklaskowski
https://jaceklaskowski.gitbooks.io/mastering-spark-sql
https://blog.usejournal.com/spark-study-notes-core-concepts-visualized-5256c44e4090
https://www.enigma.com/blog/things-i-wish-id-known-about-spark
https://habr.com/post/329838/
https://www.youtube.com/watch?v=4Lngj22dFiA . DataFrames
https://medium.com/@achilleus/a-practical-introduction-to-sparks-column-part-2-1e52f1d29eb1
https://www.mungingdata.com/apache-spark/aggregations . Aggregation
Feature Engineering for Time Series Analysis in Spark: https://www.youtube.com/watch?v=JWjRB8b6rc0
https://github.com/P7h/docker-spark
https://hub.docker.com/
https://habr.com/company/lanit/blog/413137/
https://towardsdatascience.com/sql-at-scale-with-apache-spark-sql-and-dataframes-concepts-architecture-and-examples-c567853a702f
https://www.mungingdata.com/
https://www.mungingdata.com/aws/athena-spark-best-friends
From csv to parquet:
val df = spark.read.csv("/mnt/my-bucket/csv-lake/")
spark.write.parquet("/mnt/my-bucket/parquet-lake/")
Links
https://www.mungingdata.com/episodes/4-building-spark-jar-files-with-sbt
https://github.com/databricks/Spark-The-Definitive-Guide
https://databricks.com/try-databricks
https://spark.apache.org/docs/latest/tuning.html
https://legacy.gitbook.com/book/umbertogriffo/apache-spark-best-practices-and-tuning/details
https://legacy.gitbook.com/book/databricks/databricks-spark-knowledge-base/details
https://spoddutur.github.io/spark-notes
https://github.com/jaceklaskowski https://legacy.gitbook.com/@jaceklaskowski
https://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications
http://c2fo.io/c2fo/spark/aws/emr/2016/09/01/apache-spark-config-cheatsheet-part2/
http://site.clairvoyantsoft.com/understanding-resource-allocation-configurations-spark-application/
Join
https://medium.com/@akhilanand.bv/https-medium-com-joins-in-apache-spark-part-2-5b038bc7455b .
https://medium.com/@achilleus/https-medium-com-joins-in-apache-spark-part-3-1d40c1e51e1c
http://kirillpavlov.com/blog/2016/04/23/beyond-traditional-join-with-apache-spark/
https://dzone.com/articles/pyspark-join-explained-with-examples
https://sujithjay.com/spark/shuffle-hash-sort-merge-joins
https://sujithjay.com/spark/or-within-joins
Broadcast joins
https://www.mungingdata.com/apache-spark/broadcast-joins
https://henning.kropponline.de/2016/12/11/broadcast-join-with-spark/
https://sujithjay.com/spark-sql/2018/02/17/Broadcast-Hash-Joins-in-Apache-Spark/
https://mungingdata.com/apache-spark/broadcasting-maps-in-spark/
Use Avro for everything that makes sense to be stored in a row-major manner
and Parquet for everything that needs to be stored in a column-major manner.
Parquet stores data in binary files with column-oriented storage,
and also tracks some statistics about each file that make it possible to quickly skip data not needed for a query.
https://mungingdata.com/apache-spark/partitionby/
https://mungingdata.com/apache-spark/partition-filters-pushed-filters/
Stage Job Task
https://stackoverflow.com/questions/37528047/how-are-stages-split-into-tasks-in-spark
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-DAGScheduler-Stage.html
DAGScheduler splits up a job into a collection of stages. Each stage contains a sequence of narrow transformations that can be completed
without shuffling the entire data set,
Executor JVM runs several tasks (1 task per RDD partition)
The number of concurrent task-slots is numExecutors * ExecutorCores
The number of tasks is determined by the number of partitions. Every RDD has a defined number of partitions.
For a source RDD that is read from HDFS ( using sc.textFile( ... ) for example )
the number of partitions is the number of splits generated by the input format.
Some operations on RDD(s) can result in an RDD with a different number of partitions
Stage: is a collection of tasks. Same process running against different subsets of data (partitions).
Task: represents a unit of work on a partition of a distributed dataset. So in each stage, number-of-tasks = number-of-partitions, or as you said "one task per stage per partition”.
Each executer runs on one yarn container, and each container resides on one node.
Each stage utilizes multiple executers, each executer is allocated multiple vcores.
Each vcore can execute exactly one task at a time
So at any stage, multiple tasks could be executed in parallel. number-of-tasks running = number-of-vcores being used.
HDFS achieves full write throughput with ~5 tasks per executor .
So it’s good to keep the number of cores per executor below that number.
----
spark.cores.max
spark.executor.memory
--------
https://stackoverflow.com/questions/37528047/how-are-stages-split-into-tasks-in-spark
Shell
./sbin/start-master.sh
.bin/pyspark
.bin/spark-sql
.bin/sparkR
.bin/spark-shell - scala console
:help
:type spark
:type sc
spark.version
:imports
.bin/spark-submit - run precompiled application
spark - variable which keeps SparkSession
https://mungingdata.com/apache-spark/sparksession/
Integration with Hive
hive> set hive.ececution.engine=spark
DataFrames, DataSets, RDD
DataFrames have types,
but Spark maintains them completely and only checks whether those types line up to those specified in the schema at runtime.
Datasets, on the other hand, check whether types conform to the specification at compile time.
Datasets are only available to Java Virtual Machine (JVM)– based languages (Scala and Java)
and we specify types with case classes or Java beans.
DataFrames are simply Datasets of Type Row.
df.printSchema()
df.col("somename")
df.first()
df.filter("somename < 2")
.where(col("anothername) =!= "Russia")
.show(2)
auctionDF
.select("auctionid")
.distinct
.count
auctionDF
.groupBy("item", "auctionid")
.count
.agg(min("count"), avg("count"),max("count"))
.show
Show various implementations for the following query in Spark?
Table name is employee and dataframe name is dF Query:
select name, AVG(salary) from employee where country = “USA” group by name, salary;
Method 1: Using scala code in Spark:
dF.filter(“country = ‘USA’”)// Filter them
.groupBy(“name”, “salary”)// group them
.agg(dF(“name”), avg(“salary”)// Aggregate
.show()// display first 20 lines from the result
Method 2: Using sqlContext – DataFrame/DataSet implementation
dF.createOrReplaceTempView("employee")// Register the table
val empQuery = “select name, AVG(salary) from employee where country = “USA” groupby name, salary;”
sqlContext.sql(empQuery)// execute the query using sqlContext
.show()// display first 20 lines from the result
Method 3: Using hiveContext – if table exists in hive
dF.createOrReplaceTempView("employee")// Register the table
val empQuery = “select name, AVG(salary) from employee where country = “USA” groupby name, salary;”
hiveContext.sql(empQuery)// execute the query using hiveContext
create DataFrame: val myRange = spark.range( 1000). toDF(" number")
with dataframe you do not manipulate partitions manually (but with RDD you do ; RDD is low-level API)
driver : executes main(), accepts users params andscheduling work across executors
A dataframe in Spark is similar to a SQL table, an R dataframe, or a pandas dataframe.
In Spark, dataframe is actually a wrapper around RDDs, the basic data structure in Spark.
DataSet: static typing -> compile-time error detection
DataFrames are untyped: scala compiler does not check types in its schema
DataFrames=DataSet[Rows]
DataFrames : Unlike RDDs they are stored in a column based fashion in memory which allows for various optimizations
(vectorization, columnar compression, off-heap storage, etc.).
Their schema is fairly robust allowing for arbitrary nested data structures
(ie: a column can be a list of structs which are composed of other structs).
The fact that Spark is aware of the structure of the underlying data allows for non-JVM languages (Python, R)
to leverage functions written in the faster native JVM implementation. One of the issues with DataFrames
however is that they are only runtime and not compile time type safe which for a language like Scala introduces a severe drawback.
Join DF
val dfA : DataFrame
val dfB : DataFrame
dfA.join(dfB, dfB("date") === dfA("date") )
DataSets
https://blog.codecentric.de/en/2016/07/spark-2-0-datasets-case-classes/
https://www.balabit.com/blog/spark-scala-dataset-tutorial
Since Spark as a compiler understands your Dataset type JVM object,
it maps your type-specific JVM object to Tungsten’s internal memory representation using Encoders.
As a result, Tungsten Encoders can efficiently serialize/deserialize JVM objects
as well as generate compact bytecode that can execute at superior speeds.
val lines = sqlContext.read.text("/wikipedia").as[String]
val words = lines
.flatMap(_.split(" "))
.filter(_ != "")
case class University(name: String, numStudents: Long, yearFounded: Long)
val schools = sqlContext.read.json("/schools.json").as[University]
schools.map(s => s"${s.name} is ${2015 – s.yearFounded} years old")
val dsAvgTmp = ds
.filter(d => {d.temp > 25})
.map(d => (d.temp, d.humidity, d.cca3))
.groupBy($"_3")
.avg()
ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")
To use joinWith you first have to create a DataSet, and most likely two of them.
To create a DataSet, you need to create a case class that matches your schema and call DataFrame.as[T] where T is your case class.
So:
case class KeyValue(key: Int, value: String)
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value")
val ds = df.as[KeyValue]
// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]
You could also skip the case class and use a tuple:
val tupDs = df.as[(Int,String)]
// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]
Then if you had another case class / DF, like this say:
case class Nums(key: Int, num1: Double, num2: Long)
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2")
val ds2 = df2.as[Nums]
// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]
Then, while the syntax of join and joinWith are similar, the results are different:
df.join(df2, df.col("key") === df2.col("key")).show
// +---+-----+---+----+----+
// |key|value|key|num1|num2|
// +---+-----+---+----+----+
// | 1| asdf| 1| 7.7| 101|
// | 2|34234| 2| 1.2| 10|
// +---+-----+---+----+----+
ds.joinWith(ds2, df.col("key") === df2.col("key")).show
// +---------+-----------+
// | _1| _2|
// +---------+-----------+
// | [1,asdf]|[1,7.7,101]|
// |[2,34234]| [2,1.2,10]|
// +---------+-----------+
As you can see, joinWith leaves the objects intact as parts of a tuple, while join flattens out the columns into a single namespace.
(Which will cause problems in the above case because the column name "key" is repeated.)
Curiously enough, I have to use df.col("key") and df2.col("key") to create the conditions for joining ds and ds2 --
if you use just col("key") on either side it does not work,
and ds.col(...) doesn't exist. Using the original df.col("key") does the trick, however.
Accumulators and broadcast variables (a-la distributed Cache)
Spark supports two types of shared variables:
1) broadcast variables - can be used to cache a value in memory on all nodes,
2) accumulators - are only “added” to, such as counters and sums.
are variables that are only “added” to through an associative operation and can therefore,
be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums.
Spark natively supports accumulators of numeric types, and programmers can add support for new types
val accum = sc.accumulator(0)
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
accum.value
Read
val dataLakeDF = spark.read.parquet("s3a://some-bucket/foo")
val extractDF = dataLakeDF
.where(col("mood") === "happy")
.repartition(10000)
spark.read.format(" csv")
.option(" mode", "FAILFAST") //dropMalformed | permissive
.option(" inferSchema", "true")
.option(" path", "path/ to/ file( s)")
.schema( someSchema) <-- to strictly specify schema
.load()
spark.read.format(" parquet")
.load("/ data/ flight-data/ parquet/ 2010-summary.parquet")
.show( 5)
Write
DataFrameWriter
.format(...)
.option(...)
.partitionBy(...)
.bucketBy(...)
.sortBy( ...)
.save()
dataframe.write.format(" csv")
.option(" mode", "OVERWRITE")
.option(" dateFormat", "yyyy-MM-dd")
.option(" path", "path/ to/ file( s)")
.save()
Query
dbDataFrame.select(" DEST_COUNTRY_NAME"). distinct(). show(5)
dbDataFrame.select(" DEST_COUNTRY_NAME"). distinct(). explain
Read the plan from bottom to top; bottom is the source of data
= = Physical Plan = =
*HashAggregate( keys =[ DEST_COUNTRY_NAME# 8108], functions =[])
+- Exchange hashpartitioning( DEST_COUNTRY_NAME# 8108, 200)
+- *HashAggregate( keys =[ DEST_COUNTRY_NAME# 8108], functions =[])
+- *Scan JDBCRelation( flight_info) [numPartitions = 1] ...
Spark can actually do better than this on certain queries.
For example, if we specify a filter on our DataFrame, Spark will push that filter down into the database.
We can see this in the explain plan under PushedFilters.
dbDataFrame.filter(" DEST_COUNTRY_NAME in (' Anguilla', 'Sweden')"). explain
= = Physical Plan = =
*Scan JDBCRel...
PushedFilters: [* In( DEST_COUNTRY_NAME, [Anguilla, Sweden])],
val pushdownQuery = """( SELECT DISTINCT( DEST_COUNTRY_NAME) FROM flight_info) AS flight_info"""
val dbDataFrame = spark.read.format(" jdbc")
.option(" url", url)
.option(" dbtable", pushdownQuery)
.option(" driver", driver)
.load()
val joinExpr = person.col(" graduate_program") = = = graduateProgram.col(" id")
person
.join( graduateProgram, joinExpr)
. explain()
= = Physical Plan = = *BroadcastHashJoin [graduate_program# 40],
[id# 5....
:- LocalTableScan [id# 38, name# 39, graduate_progr...
+- BroadcastExchange HashedRelationBroadcastMode(....
+- LocalTableScan [id# 56, degree# 57, departmen....
With the DataFrame API, we can also explicitly give the optimizer a hint
that we would like to use a broadcast join by using the correct function around the small DataFrame in question.
In this example, these result in the same plan we just saw; however, this is not always the case:
import org.apache.spark.sql.functions.broadcast
val joinExpr = person.col(" graduate_program") = = = graduateProgram.col(" id")
person.join( broadcast( graduateProgram), joinExpr). explain()
Spark SQL
With Spark SQL, you can register any DataFrame as a table or view (a temporary table) and query it using pure SQL.
There is no performance difference between writing SQL queries or writing DataFrame code,
they both “compile” to the same underlying plan that we specify in DataFrame code.
Complex Spark SQL Data Types
Scala -> SQL
Array[T] -> ArrayType(elementType, containsNull)
Map[K,V] -> MapType(keyType, valueType, valueContainsNull)
case class -> StructType(List[StructFields])
You can make any DataFrame into a table or view with one simple method call
flightData2015. createOrReplaceTempView(" flight_data_2015")
Now we can query our data in SQL. To do so, we’ll use the spark.sql function (remember, spark is our SparkSession variable)
that conveniently returns a new DataFrame.
Although this might seem a bit circular in logic — that a SQL query against a DataFrame returns another DataFrame —
it’s actually quite powerful. This makes it possible for you to specify transformations in the manner
most convenient to you at any given point in time and not sacrifice any efficiency to do so!
To understand that this is happening, let’s take a look at two explain plans:
val sqlWay = spark.sql(""" SELECT DEST_COUNTRY_NAME, count(1) FROM flight_data_2015 GROUP BY DEST_COUNTRY_NAME """)
val dataFrameWay = flightData2015 .groupBy(' DEST_COUNTRY_NAME) .count()
sqlWay.explain
dataFrameWay.explain
The SQL interface also includes the ability to provide hints to perform joins.
These are not enforced, however, so the optimizer might choose to ignore them.
You can set one of these hints by using a special comment syntax.
MAPJOIN, BROADCAST, and BROADCASTJOIN all do the same thing and are all supported:
-- in SQL
SELECT /* + MAPJOIN( graduateProgram) */ * FROM person JOIN graduateProgram ON person.graduate_program = graduateProgram.id
This doesn’t come for free either: if you try to broadcast something too large, you can crash your driver node
(because that collect is expensive).
Partitions
http://www.bigsynapse.com/spark-input-output
http://www.bigsynapse.com/controlling-the-number-of-partitions-in-spark
https://acadgild.com/blog/partitioning-in-spark/
https://medium.com/parrot-prediction/partitioning-in-apache-spark-8134ad840b0
Spark Partition Types
-------------------------
Hash Partitioning partition = key.hashCode () % numPartitions.
Range Partitioning - In some RDDs have keys that follow a particular ordering.
Range partitioning is an efficient partitioning technique, for such RDDs.
Custom partitioner
https://medium.com/parrot-prediction/partitioning-in-apache-spark-8134ad840b0
spark.default.parallelism - sets up the number of partitions to use for HashPartitioner (can be overridden when creating SparkContext object),
spark.sql.shuffle.partitions - controls the number of partitions for operations on DataFrames (default is 200)
in HDFS one partition is created for each block of the file of size 64M
Too few partitions - You will not utilize all of the cores available in the cluster.
Too many partitions - There will be excessive overhead in managing many small tasks.
Best way to decide a number of spark partitions in an RDD is to make the number of partitions equal to the number of cores over the cluster.
This results in all the partitions will process in parallel.
Performance and Monitoring
http://spark.apache.org/docs/latest/tuning.html
Try the G1GC garbage collector with -XX:+UseG1GC.
It can improve performance in some situations where garbage collection is a bottleneck.
Note that with large executor heap sizes, it may be important to increase the G1 region size with -XX:G1HeapRegionSize
Cluster Config:**
10 Nodes
16 cores per Node
64GB RAM per Node
Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.
spark.yarn.executor.memoryOverhead =
Max(384MB, 7% of spark.executor-memory)
Let’s assign 5 core per executors => --executor-cores = 5 (for good HDFS throughput)
Leave 1 core per node for Hadoop/Yarn daemons => Num cores available per node = 16-1 = 15
So, Total available of cores in cluster = 15 x 10 = 150
Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30
Leaving 1 executor for ApplicationManager => --num-executors = 29
Number of executors per node = 30/10 = 3
Memory per executor = 64GB/3 = 21GB
Counting off heap overhead = 7% of 21GB = 3GB. So, actual --executor-memory = 21 - 3 = 18GB
So, if we request 20GB per executor, AM will actually get 20GB + memoryOverhead = 20 + 7% of 20GB = ~23GB memory for us.
To change Spark’s log level, simply run the following command:
spark.sparkContext.setLogLevel(" INFO")
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html
Avoid GroupByKey
Let's look at two different ways to compute word counts, one using reduceByKey and the other using groupByKey:
val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
val wordCountsWithReduce = wordPairsRDD
.reduceByKey(_ + _)
.collect()
val wordCountsWithGroup = wordPairsRDD
.groupByKey()
.map(t => (t._1, t._2.sum))
.collect()
While both of these functions will produce the correct answer, the reduceByKey example works much better on a large dataset.
That's because Spark knows it can combine output with a common key on each partition before shuffling the data.
Look at the diagram below to understand what happens with reduceByKey.
Notice how pairs on the same machine with the same key are combined (by using the lamdba function passed into reduceByKey)
before the data is shuffled. Then the lamdba function is called again to reduce all the values from each partition to produce one final result.
With a broadcast join one side of the join equation is being materialized and send to all mappers. It is therefore considered as a map-side join
which can bring significant performance improvement by omitting the required sort-and-shuffle phase during a reduce step.
As the data set is getting materialized and send over the network it does only bring significant performance improvement, if it considerable small. Another constraint is that it also needs to fit completely into memory of each executor.
Not to forget it also needs to fit into the memory of the Driver!
DataFrame API
---------------------------
val employeesDF = employeesRDD.toDF
va departmentsDF = departmentsRDD.toDF
// materializing the department data
val tmpDepartments = broadcast(departmentsDF.as("departments"))
employeesDF.join(broadcast(tmpDepartments),
$"depId" === $"id", // join by employees.depID == departments.id
"inner").show()
Spark includes a cost-based query optimizer that plans queries based on the properties of the input data when using the structured APIs.
To allow the cost-based optimizer to make these sorts of decisions, you need to collect (and maintain) statistics about your tables that it can use.
There are two kinds of statistics: table-level and column-level statistics.
Statistics collection is available only on named tables, not on arbitrary DataFrames or RDDs.
The first step in garbage collection tuning is to gather statistics on how frequently garbage collection occurs
and the amount of time it takes.
You can do this by adding -verbose:gc -XX: + PrintGCDetails -XX: + PrintGCTimeStamps to Spark’s JVM options
using the spark.executor.extraJavaOptions configuration parameter.
The next time you run your Spark job, you will see messages printed in the worker’s logs each time a garbage collection occurs.
These logs will be on your cluster’s worker nodes (in the stdout files in their work directories), not in the
Links
https://medium.com/@mrpowers/how-to-write-spark-etl-processes-df01b0c1bec9
https://hackernoon.com/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4
https://medium.com/@mrpowers/chaining-custom-dataframe-transformations-in-spark-a39e315f903c
http://spark.apache.org/examples.html
https://mindfulmachines.io/blog/2018/4/3/spark-rdds-and-dataframes
https://github.com/mahmoudparsian/pyspark-tutorial
https://medium.com/@schaun.wheeler/pyspark-udfs-and-star-expansion-b50f501dcb7b
http://www.janvsmachine.net/2017/09/sessionization-with-spark.html
https://spark.apache.org/docs/2.2.0/
https://spark.apache.org/docs/0.7.2/api/pyspark/pyspark.rdd.RDD-class.html
https://blog.knoldus.com/2017/04/16/the-dominant-apis-of-spark-datasets-dataframes-and-rdds/
http://parrotprediction.com/partitioning-in-apache-spark/
https://blog.deepsense.ai/optimize-spark-with-distribute-by-and-cluster-by/
https://opencredo.com/spark-testing/
https://www.youtube.com/watch?v=ISO8V1cYmfY
https://www.slideshare.net/hadooparchbook/top-5-mistakes-when-writing-spark-applications-
https://blog.deepsense.ai/optimize-spark-with-distribute-by-and-cluster-by/
https://blog.deepsense.ai/improve-aggregate-performance-with-batching/66374492
http://technology.finra.org/code/using-spark-transformations-for-mpreduce-jobs.html
https://blog.deepsense.ai/fast-and-accurate-categorical-distribution-without-reshuffling-in-apache-spark/
RDD activities
In Spark all work is expressed as either
- creating new RDDs
- transforming existing RDDs
- calling operations on RDDs to compute a result.
Transformations
There are two types of transformations:
- those that specify narrow dependencies input (partition: output partition ) 1:1
- those that specify wide dependencies (partition: output partition ) 1:M
Transformations consisting of narrow dependencies are those where each input partition will contribute to only one output partition.
Example: where clause specifies a
narrow dependency, where only one partition contributes to at most one output partition.
A wide dependency style transformation will have input partitions contributing to many output partitions.
We call this a shuffle where Spark will exchange partitions across the cluster.
Spark will automatically perform an operation called pipelining on narrow dependencies,
this means that if we specify multiple filters on DataFrames they’ll all be performed in memory.
The same cannot be said for shuffles. When we perform a shuffle, Spark will write the results to disk.
Actions
There are three kinds of actions:
• actions to view data in the console;
• actions to collect data to native objects in the respective language;
• and actions to write to output data sources.
Structured Streaming
Structured Streaming became Production Ready in Spark 2.2.
https://jaceklaskowski.gitbooks.io/spark-structured-streaming/
https://databricks.com/blog/2017/01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1.html
Structured streaming allows you to take the same operations that you perform in batch mode and perform them in a streaming fashion.
This can reduce latency and allow for incremental processing.
The best thing about Structured Streaming is that it allows you to rapidly and quickly
get value out of streaming systems with simple switches, it also makes it easy to reason about
because you can write your batch job as a way to prototype it and then you can convert it to streaming job.
The way all of this works is by incrementally processing that data.
Spark Session
val spark=SparkSession
.builder()
.appName("My App")
.getOrCreate()
val conf = new SparkConf().setAppName("SparkJoins").setMaster("local")
val sc = new SparkContext(conf)
Broadcast variables allow the programmer to keep a read-only variable or lookup table cached on each machine
SparkContext.broadcast(v)
val broadcastVar = sc.broadcast(Array(1, 2, 3))
def processData (t: RDD[(Int, Int)], u: RDD[(Int, String)]) : Map[Int,Long] = {
var jn = t.leftOuterJoin(u).values.distinct
return jn.countByKey
}
StatusCounter scounter = myRDD.status();
scounter.count() // Mean() Sum Max Min Variance StdDev
RDD - distributed and read-only
partitions - atomic pieces of data set
dependencies - relation between RDD and how it was derived from
function for computing datset based on parent RDD
metadata
you will likely only be creating two types of RDDs:
the “generic” RDD type
or
a key-value RDD that provides additional functions, such as aggregating by key.
Both just represent a collection of objects,
but key-value RDDs have special operations as well as a concept of custom partitioning by key.
Let’s formally define RDDs.
Internally, each RDD is characterized by five main properties:
1. A list of partitions
2. A function for computing each split
3. A list of dependencies on other RDDs
4. Optionally, a Partitioner for key-value RDDs (e.g., to say that the RDD is hash-partitioned)
5. Optionally, a list of preferred locations on which to compute each split
(e.g., block locations for a Hadoop Distributed File System [HDFS] file)
The Partitioner is probably one of the core reasons why you might want to use RDDs in your code.
Specifying your own custom Partitioner can give you significant performance and stability improvements if you use it correctly.
You manipulate RDDs in much the same way that you manipulate DataFrames.
the core difference being that you manipulate raw Java or Scala objects instead of Spark types.
Narrow depencencies: each partition of parent RDD is used by at most one partition of child RDD
fast, no shuffle, optimizations like pipelining possible
Parent partition(1) -> Child partition(<=1)
Examples: map, filter, flatMap, mapPartitions, mapValues, mapPartitionsWithIndex, union, join with co-partitioned inputs
Wide depencencies: slow... shuffle over network
Parent partition(1) -> Child partition(many)
Examples:
(group/reduce/combine)ByKey, join, cogroup groupWith, join (depends on partitioning scheme), (left/right)OuterJoin
distinct, intersection, repartition, coalese
dependencies method on RDDs returns the sequence of Dependencies objects
The sorts of dependencies objects:
Narrow dependencies objects: OneToOneDependency PruneDependency, RangeDependency
Wide dependency objects: ShuffleDependency
val wordsRdd = sc.parallelize(largeList)
val pairs = wordRdd.map(c=>(c,1))
.groupByKey()
.dependencies
SparkWordCount
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")
/* Transform the inputRDD into countRDD */
val count = input.flatMap(line ⇒ line.split(" "))
.map(word ⇒ (word, 1))
.reduceByKey(_ + _)
/* saveAsTextFile method is an action that effects on the RDD */
count.saveAsTextFile("outfile")
System.out.println("OK");
}
}
scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPWordCount.scala
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
spark-submit --class SparkWordCount --master local wordcount.jar
Choosing best excecution plan
case class Demographic (
id: Int
codingBootcamp: Boolean,
country: String
gender: String
isMinority: Boolean,
servedInMilitary: Boolean
)
//Pair RDD (id,demographic)
val demographic = sc.textfile(..)
case class Finances (id: Int
hasDept: Boolean,
hasDependents: Boolean,
hasLoans: Boolean,
income: Int
)
//Pair RDD (id,finances)
val finances = sc.textfile(..)
plan #1
Following join is pairRDD (id:Int, (Demographic, Finances))
demographic.join(finances)
.filter { p=>
p._2._1.country == "Swiss" &&
p._2._2..hasDependens &&
p._2._2.hasDept
}.count
steps for join above are:
1) inner join
2) filter to select Swiss people
3) filter to select dept and dependence
plan #2 : fast filter before join
val filtered_finance=finances.filter(p=>p._2.hasDependends && p._2.hasDept)
demographic.filter(p=>p._2.country == "Swiss")
.join(filtered_finance)
count
steps for join above are:
1) filter to select dept and dependence
2) filter to select Swiss people
3) join
plan #3 slowest
val cartesian=demographic.catresian(finances)
cartesian.filter{
case (p1,p2) => p1._1 == p2._1
}
.filter{
case (p1,p2) => (p1._2.country == "Swiss") &&
(p2._2.hasDependence) &&
p2._2.hasDept
}.count
Data Frames
DataFrames are untyped: scala compiler doesn not check types in its schema
RDD[T] but DataFrame has no type, it con
Catalyst - query optimizer
Tungsten - off-heap serializer avoiding garbage collector - contains rows which can contain any schema
DataFrame conseptually are RDD full of records with known schema (like in SQL RDBMS table)
Transformations on DataFrames are known as untyped transformations
DataFrame can be created from existing RDD
val tupleRDD = ..//Assume RDD[(Int String, String, String)]
val tupleDF = tupleRDD.toDF("id","name","city","country")
if you have RDD containing case class instances then spark can infere attributes from case class fields
case class Person(id: Int, name: String, city: String)
val peopleRDD = .. //Assume RDD[Person]
reflection will be used to automatically generate names of fields
How to create dataframe
1) Create RDD of Rows from original RDD
2) Create schema represented by StructuredType
3) Apply schema to RDD of Rows via createDataFrime method
or by reading datasource from json/csv/Parquet/ JDBC file:
val df=sparkread.json(my.json)
peopleOF.createOrReplaceTempView("people")
val adultsDF = spark.sql("SELECT * FROM people WHERE age>17")
DataFrame show() show 20 first elements
printSchema()
df.filter($"age" >18)
df.filter(df("age") >18)
df.filter("age >18")
val res=empDF.select("id","lname")
.where("city=='Sidney'")
.orderBy("id")
myDF.groupBy($authorId, $subForum)
.agg(c, count($authorId))
.orderBy($subForum, count($authorId))
.desc
Pair RDD
def groupByKey(): RDD[(K,Iterable[V])]
def reduceByKey(func: (V,V) => V): RDD[(K,V)]
def join[W](other: RDD[(K,W)]): RDD[(K,(V,W))]
var pairRDD = rdd.map(page => (title, page.text))
case class Event(organizer: String, name: String, budget: Int) //key:organizer
var eventsRDD = sc.parallelize(...)
.map(event => (event.orgazier, event.budget))
def groupByKey(): RDD[(K, Iterable[V])]
var groupedRdd= eventsRDD.groupByKey()
groupedRdd.collect().foreach(println)
groupByKey collects all of the values associated with the given key and stores them in that single collection.
That means that data has moved around the network and doing this, moving the data on the network is called shuffling.
Can we somehow do it in a more efficient way?
Perhaps we can reduce before doing the shuffle.
This could potentially reduce the amount of data that we actually have to send over the network.
To do that, we can use the reduceByKey operator,
you can think of it as a combination of first doing groupByKey
and then reducing overall the values that were grouped by that key in those collections.
reduceByKey: conceptionally it is combination of groupByKey and reducing all values per key
def reduceByKey (func: (V,V) => V): RDD[(K,V)]
val budgetsRDD=eventsRDD.reduceByKey(_+_)
budgetsRdd.collect().foreach(println)
def mapValues[U] (f: V=>U): RDD[(K,U)]
rdd.map { case(x,y): (x,func(y))} //applies func to only values in PairRDD
def countByKey(): Map[K,Long]) // counts # of elements per key
keys (def keys: RDD{K}) Return RDD with keys on eack tuple
Example: number of unique visitors
case class Visitor(ip: String, timestamp: String, duration: String)
val visits: RDD[Visitor]= sc.textFile(..).map(v=>(v.ip, v.duration))
val numUniqueVisits=visits.keys.distinct().count()
In regular Scala collections there is groupBy:
def groupBy[K](f: A=>K): Map[K, Traversable[A]]
Usage example:
val ages=List(34,12,14,78)
val grouped=ages.groupBy(
age =>
if (age >17) "adult"
else if (age <50) "adult"
else "senior"
)
Joins on pair RDDs
Inner/leftOuterJoin/rightOuterJoin
How to configure RDD persistance
- in memory as Java objects
- on disk as Java objects
- in memory as serialized Java objects
- on disk as serialized Java objects
- both in memory an on disk
cache()
persist
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK
MEMORY_AND_DISK_SER
DISK_ONLY
Partitioning
https://www.edureka.co/blog/demystifying-partitioning-in-spark
partition# = key.hashcode() % number_of_partitions
can lead to skewed distribution between nodes
Spark support hash partitioning and range partitioning
all data in same partition are garanteed to be in the same node
Partitions are configurable provided the RDD is key-value based.
Properties of Partition
Tuples in the same partition are guaranteed to be in the same machine.
Each node in a cluster can contain more than one partition.
The total number of partitions are configurable, by default it is set to the total number of cores on all the executor nodes.
Types of Partitioning in Spark
Spark supports two types of partitioning,
- Hash Partitioning: Uses Java’s Object.hashCodemethod to determine the partition as partition = key.hashCode() % numPartitions.
- Range Partitioning: Uses a range to distribute to the respective partitions the keys that fall within a range.
This method is suitable where there’s a natural ordering in the keys and the keys are non negative.
Pair RDD may contain keys that have order defined (e.g.: Int or String)
For such RDD the range partitioning is more effective
range partitioner : provide # of patitions
provide a pair RDD with ordered keys
this RDD is sampled to create a suitable set of sorted ranges
val pairs = purchasedRdd.map(p=> (p.customerId, p.price))
val tunedPartitioner = new RangePartitioner(8, pairs) // spark will use sampling here against pairs to find best ranges
val partitioned = pair.partitionBy(tunePartitioner).persist() // persist to eliminate shuffling
val purchasesPerCustomer=partitioned.map(p=> (p._1), (1.p._2)))
val purchasesPerMonth = purchasesPerCustomer.reduceByKey((v1,v2)=> (v1._1+v2._1,v1._2+v2._2)
Operations on Pair RDD that hold and propagate a partitioner:
cogroup
groupWith
join
leftOterJoin
rightOuterJoin
groupByKey - uses hash partitioner with default parametes
reduceByKey
foldByKey
combineByKey
partitionBy
sort
mapValues (if parent has a partitioner)
flatMapValues (as above)
filter (as above)
But following transformation will loose partitioner because it is possible for map to change the key
map rdd.map((k: String, v:Int) => ("doh",v))
flatMap
userData (UserId, UserInfo) //big table
events(UserId,LinkInfo)
val sc=new SparkContent()
val userData=sc.sequenceFile[UserId,UserInfo]("hdfs://..").persist()
def processingNewLogs(logfilename: String){
val events = sc.sequenceFile[UserId,LinkInfo](logfilename)
val joined = userData.join(events) //RDD (UserId, (UserInfo,LinkInfo))
val offTopicVisits = join.filter {
case (userId, (userInfo, linkInfo)) => //expand tuple
! userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to not-subscribed topics: "+offTopicVisits)
}
The code above is slow because of shuffling; fix is - partition
val userData=sc.sequenceFile[UserId,UserInfo]("hdfs://..")
.partitionBy(new HashPartitioner(100))
.persist()
Then spark will shuffle only events RDD (which is smaller than userData RDD)
Following requires all key-value pairs with the same key on the same machine
Grouping is done using hash partitioner with default params
val purchasesPerCustomer=
purchasesRdd.map( p => (p.customerId), p.price)) // Pair RDD
.groupByKey()
How to know when shuffle may happened?
using function toDebugString to see the excecution plan
partitioned.reducedByKey((v1,v2) => (v1._1 + v2._1, v1._2 + v2._2))
.toDebugString
Methods which might the cause the shuffle
cogroup
groupWith
join
leftOterJoin
rightOuterJoin
groupByKey - uses hash partitioner with default parametes
reduceByKey
combineByKey
distinct
intersection
repartition
coalesce
But if we use pre-partitioning then running reduceByKey on pre-partitioned RDD will
do reducing work locally; only final result will be send from worker to driver
Join called on 2 RDDs that a pre-partitioned with same partitioner and cached on same machine
will work locally as well; nio shuffling
myRDD.toDebugString
repartition, join, cogroup and any of *By* or *ByKey transformaions can result in shuffle
if you declare numPartitions parameter it will probably shuffle
combineByKey groupByKey calls a shuffle
s
Skewed data solutions:
- Salting ( modify Key=Foo to Foo+rendom.nextInt(sultNumber)
- Isolated Salting
- Isolated Map Joins
RDD is lazy, just DAG is bulding until the action is asked, like collect()
.cache()
.coalesce() - remove empty RDD
ReduceByKey vs GroupByKey
TreeDeduce over Reduce
Use complex/nested types
Off-heap memory
RDD types
hadoopRDD : 1 partition per HDFS block, dependencies->none, compute(partition)=read corresponding block
converted into MappedRDD
filteredRDD: partitions: same as parent RDD; depndencies: one-to-one on parent; compute(partition)= compute parent and filter it
JoinedRDD: 1 partition per redue task; dependencies: shuffle on each parent; compute(partition) read and join shuffled data;
partitioner=HashPartitioner(numTasks)
mappedRDD
PairRDD
ShuffleRDD
UnionRDD
PyrhonRDD
DoubleRDD
JdbcRDD
JsonRDD
SchemaRDD
VertedRDD
EdgeRDD
--------
CassandraRDD:
va cassandraRDD=sc.cassandraTable("mykeyspace","columnA") - predicate pushdown
GeoRDD
ExSpark
--------
RSS.cache==RDD.persist(MEMORY_ONLY) it is the same
RDD.persist(MEMORY_AND_DISK)
RDD.persist(DISK_ONLY)
RDD.persist(MEMORY_AND_DISK_SER)
Cryo serialiazation
spark.storage,memoryFraction: shuffle memory, cached RDDs user programs
Hadoop means 3 Apache projects: HDFS, Map-Reduce, Yarn
Job Traker, Named Node
Spark is contender of Map-Reduce
conf.registrKryoClass() instead Java Serialization to speed
High churn/Lost churn
array of ints instead of linkedlist
CuseConstMArkSweep
How to run Spark in Standalone client mode
spark-submit \
-class org.apache.spark.examples.SparkPi \
-deploy-mode client \
-master spark//$SPARK_MASTER_IP:$SPARK_MASTER_PORT \
$SPARK_HOME/examples/lib/spark-examples_version.jar 10
How to run Spark in Standalone cluster mode
spark-submit \
-class org.apache.spark.examples.SparkPi \
-deploy-mode cluster \
-master spark//$SPARK_MASTER_IP:$SPARK_MASTER_PORT \
$SPARK_HOME/examples/lib/spark-examples_version.jar 10
How to run Spark in YARN client mode
Yet Another Resoure Manager - YARN
Resource Manager manages many NodeManagers
Resource Manager has Scheduler
spark-submit \
-class org.apache.spark.examples.SparkPi \
-deploy-mode client \
-master yarn \
$SPARK_HOME/examples/lib/spark-examples_version.jar 10
How to run Spark in YARN cluster mode
spark-submit \
-class org.apache.spark.examples.SparkPi \
-deploy-mode cluster \
-master yarn \
$SPARK_HOME/examples/lib/spark-examples_version.jar 10
Spark + Oozie
https://habrahabr.ru/post/338952/