Spark

BOOKS https://www.amazon.com/Spark-Action-Covers-Apache-Examples-ebook/dp/B09781T1XK https://www.amazon.com/Learning-Spark-Lightning-Fast-Data-Analytics-ebook/dp/B08F9WVFCT https://www.amazon.com/Spark-Definitive-Guide-Processing-Simple-ebook/dp/B079P71JHY https://medium.datadriveninvestor.com/introduction-to-spark-with-python-spark-architecture-and-components-explained-in-detail-54e2ba09d6fe Accessing Hive metastore from Spark: https://www.youtube.com/watch?v=6-LtMmNAvIE Reading HDFS file names https://www.reddit.com/r/apachespark/comments/mcbq3c/reading_just_filenames_from_hdfs/ Code: https://runawayhorse001.github.io/LearningApacheSpark/ JSON in Spark: https://www.reddit.com/r/apachespark/comments/naqeeo/why_isnt_get_json_object_function_consistently/ https://www.reddit.com/r/apachespark/comments/nxxtkc/parsing_nested_json_arrays_in_sql_analytics/ https://habr.com/ru/company/leroy_merlin/blog/563066/ https://analyticshut.com/reading-json-data-in-spark/ https://stackoverflow.com/questions/34069282/how-to-query-json-data-column-using-spark-dataframes https://subscription.packtpub.com/book/data/9781785880070/4/ch04lvl1sec29/loading-json-into-spark https://habr.com/ru/company/otus/blog/557812/. Spark streaming https://habr.com/ru/company/otus/blog/556734/ https://mungingdata.com/apache-spark/python-pyspark-scala-which-better/ https://habr.com/ru/company/newprolab/blog/530568/ Spark 3.0 https://community.cloud.databricks.com/login.html. - playground https://docs.databricks.com/data/databricks-datasets.html https://leanpub.com/beautiful-spark/ Book https://habr.com/ru/company/otus/blog/529684/ . fast spark https://habr.com/ru/company/sberbank/blog/496310/ https://mungingdata.com/apache-spark/using-the-console/ https://towardsdatascience.com/write-clean-and-solid-scala-spark-jobs-28ac4395424a https://mungingdata.com/category/apache-spark/ https://luminousmen.com/post/spark-tips-dont-collect-data-on-driver https://medium.com/@itzikjan/spark-join-optimization-on-skew-data-using-bin-packing-afae73f68662 https://habr.com/ru/company/alfastrah/blog/481924/ https://github.com/ericxiao251/spark-syntax https://www.reddit.com/r/dataengineering/comments/dixkr8/free_sandbox_for_playing_with_spark/ https://sparkbyexamples.com/ https://dzone.com/articles/pyspark-join-explained-with-examples https://sujithjay.com/spark/shuffle-hash-sort-merge-joins https://sujithjay.com/spark/or-within-joins https://medium.com/@F4Qtech/udfs-vs-map-vs-custom-spark-native-functions-91ab2c154b44 https://towardsdatascience.com/my-10-recommendations-after-getting-the-databricks-certification-for-apache-spark-53cd3690073 https://medium.com/towards-artificial-intelligence/4-tips-to-write-scalable-apache-spark-code-1c736e4d698e https://www.waitingforcode.com/apache-spark/memory-apache-spark-classes/read https://www.qubole.com/blog/enhance-spark-performance-with-dynamic-filtering/ https://dzone.com/search?page=1 https://youtu.be/daXEp4HmS-E https://www.youtube.com/watch?v=CF5Ewk0GxiQ https://www.youtube.com/watch?v=5wn5QRGnBh4 . SQOOP, SPARK SQL, MYSQL - EXAMPLE 2 https://www.youtube.com/watch?v=0Xg9wsOCz74 . Spark streaming https://databricks.com/sparkaisummit/europe/spark-summit-2019-keynotes https://medium.com/thron-tech/optimising-spark-rdd-pipelines-679b41362a8a https://open.compscicenter.ru/archive/apache-spark/ https://legacy.gitbook.com/@jaceklaskowski https://jaceklaskowski.gitbooks.io/mastering-spark-sql https://blog.usejournal.com/spark-study-notes-core-concepts-visualized-5256c44e4090 https://www.enigma.com/blog/things-i-wish-id-known-about-spark https://habr.com/post/329838/ https://www.youtube.com/watch?v=4Lngj22dFiA . DataFrames https://medium.com/@achilleus/a-practical-introduction-to-sparks-column-part-2-1e52f1d29eb1 https://www.mungingdata.com/apache-spark/aggregations . Aggregation Feature Engineering for Time Series Analysis in Spark: https://www.youtube.com/watch?v=JWjRB8b6rc0 https://github.com/P7h/docker-spark https://hub.docker.com/ https://habr.com/company/lanit/blog/413137/ https://towardsdatascience.com/sql-at-scale-with-apache-spark-sql-and-dataframes-concepts-architecture-and-examples-c567853a702f https://www.mungingdata.com/ https://www.mungingdata.com/aws/athena-spark-best-friends From csv to parquet: val df = spark.read.csv("/mnt/my-bucket/csv-lake/") spark.write.parquet("/mnt/my-bucket/parquet-lake/")

Links

https://www.mungingdata.com/episodes/4-building-spark-jar-files-with-sbt https://github.com/databricks/Spark-The-Definitive-Guide https://databricks.com/try-databricks https://spark.apache.org/docs/latest/tuning.html https://legacy.gitbook.com/book/umbertogriffo/apache-spark-best-practices-and-tuning/details https://legacy.gitbook.com/book/databricks/databricks-spark-knowledge-base/details https://spoddutur.github.io/spark-notes https://github.com/jaceklaskowski https://legacy.gitbook.com/@jaceklaskowski https://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications http://c2fo.io/c2fo/spark/aws/emr/2016/09/01/apache-spark-config-cheatsheet-part2/ http://site.clairvoyantsoft.com/understanding-resource-allocation-configurations-spark-application/

Join

https://medium.com/@akhilanand.bv/https-medium-com-joins-in-apache-spark-part-2-5b038bc7455b . https://medium.com/@achilleus/https-medium-com-joins-in-apache-spark-part-3-1d40c1e51e1c http://kirillpavlov.com/blog/2016/04/23/beyond-traditional-join-with-apache-spark/ https://dzone.com/articles/pyspark-join-explained-with-examples https://sujithjay.com/spark/shuffle-hash-sort-merge-joins https://sujithjay.com/spark/or-within-joins

Broadcast joins

https://www.mungingdata.com/apache-spark/broadcast-joins https://henning.kropponline.de/2016/12/11/broadcast-join-with-spark/ https://sujithjay.com/spark-sql/2018/02/17/Broadcast-Hash-Joins-in-Apache-Spark/ https://mungingdata.com/apache-spark/broadcasting-maps-in-spark/ Use Avro for everything that makes sense to be stored in a row-major manner and Parquet for everything that needs to be stored in a column-major manner. Parquet stores data in binary files with column-oriented storage, and also tracks some statistics about each file that make it possible to quickly skip data not needed for a query. https://mungingdata.com/apache-spark/partitionby/ https://mungingdata.com/apache-spark/partition-filters-pushed-filters/

Stage Job Task

https://stackoverflow.com/questions/37528047/how-are-stages-split-into-tasks-in-spark https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-DAGScheduler-Stage.html DAGScheduler splits up a job into a collection of stages. Each stage contains a sequence of narrow transformations that can be completed without shuffling the entire data set, Executor JVM runs several tasks (1 task per RDD partition) The number of concurrent task-slots is numExecutors * ExecutorCores The number of tasks is determined by the number of partitions. Every RDD has a defined number of partitions. For a source RDD that is read from HDFS ( using sc.textFile( ... ) for example ) the number of partitions is the number of splits generated by the input format. Some operations on RDD(s) can result in an RDD with a different number of partitions Stage: is a collection of tasks. Same process running against different subsets of data (partitions). Task: represents a unit of work on a partition of a distributed dataset. So in each stage, number-of-tasks = number-of-partitions, or as you said "one task per stage per partition”. Each executer runs on one yarn container, and each container resides on one node. Each stage utilizes multiple executers, each executer is allocated multiple vcores. Each vcore can execute exactly one task at a time So at any stage, multiple tasks could be executed in parallel. number-of-tasks running = number-of-vcores being used. HDFS achieves full write throughput with ~5 tasks per executor . So it’s good to keep the number of cores per executor below that number. ---- spark.cores.max spark.executor.memory -------- https://stackoverflow.com/questions/37528047/how-are-stages-split-into-tasks-in-spark

Shell

./sbin/start-master.sh .bin/pyspark .bin/spark-sql .bin/sparkR .bin/spark-shell - scala console :help :type spark :type sc spark.version :imports .bin/spark-submit - run precompiled application spark - variable which keeps SparkSession https://mungingdata.com/apache-spark/sparksession/

Integration with Hive

hive> set hive.ececution.engine=spark

DataFrames, DataSets, RDD

DataFrames have types, but Spark maintains them completely and only checks whether those types line up to those specified in the schema at runtime. Datasets, on the other hand, check whether types conform to the specification at compile time. Datasets are only available to Java Virtual Machine (JVM)– based languages (Scala and Java) and we specify types with case classes or Java beans. DataFrames are simply Datasets of Type Row. df.printSchema() df.col("somename") df.first() df.filter("somename < 2") .where(col("anothername) =!= "Russia") .show(2) auctionDF .select("auctionid") .distinct .count auctionDF .groupBy("item", "auctionid") .count .agg(min("count"), avg("count"),max("count")) .show Show various implementations for the following query in Spark? Table name is employee and dataframe name is dF Query: select name, AVG(salary) from employee where country = “USA” group by name, salary; Method 1: Using scala code in Spark: dF.filter(“country = ‘USA’”)// Filter them .groupBy(“name”, “salary”)// group them .agg(dF(“name”), avg(“salary”)// Aggregate .show()// display first 20 lines from the result Method 2: Using sqlContext – DataFrame/DataSet implementation dF.createOrReplaceTempView("employee")// Register the table val empQuery = “select name, AVG(salary) from employee where country = “USA” groupby name, salary;” sqlContext.sql(empQuery)// execute the query using sqlContext .show()// display first 20 lines from the result Method 3: Using hiveContext – if table exists in hive dF.createOrReplaceTempView("employee")// Register the table val empQuery = “select name, AVG(salary) from employee where country = “USA” groupby name, salary;” hiveContext.sql(empQuery)// execute the query using hiveContext create DataFrame: val myRange = spark.range( 1000). toDF(" number") with dataframe you do not manipulate partitions manually (but with RDD you do ; RDD is low-level API) driver : executes main(), accepts users params andscheduling work across executors A dataframe in Spark is similar to a SQL table, an R dataframe, or a pandas dataframe. In Spark, dataframe is actually a wrapper around RDDs, the basic data structure in Spark. DataSet: static typing -> compile-time error detection DataFrames are untyped: scala compiler does not check types in its schema DataFrames=DataSet[Rows] DataFrames : Unlike RDDs they are stored in a column based fashion in memory which allows for various optimizations (vectorization, columnar compression, off-heap storage, etc.). Their schema is fairly robust allowing for arbitrary nested data structures (ie: a column can be a list of structs which are composed of other structs). The fact that Spark is aware of the structure of the underlying data allows for non-JVM languages (Python, R) to leverage functions written in the faster native JVM implementation. One of the issues with DataFrames however is that they are only runtime and not compile time type safe which for a language like Scala introduces a severe drawback. Join DF val dfA : DataFrame val dfB : DataFrame dfA.join(dfB, dfB("date") === dfA("date") )

DataSets

https://blog.codecentric.de/en/2016/07/spark-2-0-datasets-case-classes/ https://www.balabit.com/blog/spark-scala-dataset-tutorial Since Spark as a compiler understands your Dataset type JVM object, it maps your type-specific JVM object to Tungsten’s internal memory representation using Encoders. As a result, Tungsten Encoders can efficiently serialize/deserialize JVM objects as well as generate compact bytecode that can execute at superior speeds. val lines = sqlContext.read.text("/wikipedia").as[String] val words = lines .flatMap(_.split(" ")) .filter(_ != "") case class University(name: String, numStudents: Long, yearFounded: Long) val schools = sqlContext.read.json("/schools.json").as[University] schools.map(s => s"${s.name} is ${2015 – s.yearFounded} years old") val dsAvgTmp = ds .filter(d => {d.temp > 25}) .map(d => (d.temp, d.humidity, d.cca3)) .groupBy($"_3") .avg() ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner") To use joinWith you first have to create a DataSet, and most likely two of them. To create a DataSet, you need to create a case class that matches your schema and call DataFrame.as[T] where T is your case class. So: case class KeyValue(key: Int, value: String) val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value") val ds = df.as[KeyValue] // org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string] You could also skip the case class and use a tuple: val tupDs = df.as[(Int,String)] // org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string] Then if you had another case class / DF, like this say: case class Nums(key: Int, num1: Double, num2: Long) val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2") val ds2 = df2.as[Nums] // org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint] Then, while the syntax of join and joinWith are similar, the results are different: df.join(df2, df.col("key") === df2.col("key")).show // +---+-----+---+----+----+ // |key|value|key|num1|num2| // +---+-----+---+----+----+ // | 1| asdf| 1| 7.7| 101| // | 2|34234| 2| 1.2| 10| // +---+-----+---+----+----+ ds.joinWith(ds2, df.col("key") === df2.col("key")).show // +---------+-----------+ // | _1| _2| // +---------+-----------+ // | [1,asdf]|[1,7.7,101]| // |[2,34234]| [2,1.2,10]| // +---------+-----------+ As you can see, joinWith leaves the objects intact as parts of a tuple, while join flattens out the columns into a single namespace. (Which will cause problems in the above case because the column name "key" is repeated.) Curiously enough, I have to use df.col("key") and df2.col("key") to create the conditions for joining ds and ds2 -- if you use just col("key") on either side it does not work, and ds.col(...) doesn't exist. Using the original df.col("key") does the trick, however.

Accumulators and broadcast variables (a-la distributed Cache)

Spark supports two types of shared variables: 1) broadcast variables - can be used to cache a value in memory on all nodes, 2) accumulators - are only “added” to, such as counters and sums. are variables that are only “added” to through an associative operation and can therefore, be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types val accum = sc.accumulator(0) sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) accum.value

Read

val dataLakeDF = spark.read.parquet("s3a://some-bucket/foo") val extractDF = dataLakeDF .where(col("mood") === "happy") .repartition(10000) spark.read.format(" csv") .option(" mode", "FAILFAST") //dropMalformed | permissive .option(" inferSchema", "true") .option(" path", "path/ to/ file( s)") .schema( someSchema) <-- to strictly specify schema .load() spark.read.format(" parquet") .load("/ data/ flight-data/ parquet/ 2010-summary.parquet") .show( 5)

Write

DataFrameWriter .format(...) .option(...) .partitionBy(...) .bucketBy(...) .sortBy( ...) .save() dataframe.write.format(" csv") .option(" mode", "OVERWRITE") .option(" dateFormat", "yyyy-MM-dd") .option(" path", "path/ to/ file( s)") .save()

Query

dbDataFrame.select(" DEST_COUNTRY_NAME"). distinct(). show(5) dbDataFrame.select(" DEST_COUNTRY_NAME"). distinct(). explain Read the plan from bottom to top; bottom is the source of data = = Physical Plan = = *HashAggregate( keys =[ DEST_COUNTRY_NAME# 8108], functions =[]) +- Exchange hashpartitioning( DEST_COUNTRY_NAME# 8108, 200) +- *HashAggregate( keys =[ DEST_COUNTRY_NAME# 8108], functions =[]) +- *Scan JDBCRelation( flight_info) [numPartitions = 1] ... Spark can actually do better than this on certain queries. For example, if we specify a filter on our DataFrame, Spark will push that filter down into the database. We can see this in the explain plan under PushedFilters. dbDataFrame.filter(" DEST_COUNTRY_NAME in (' Anguilla', 'Sweden')"). explain = = Physical Plan = = *Scan JDBCRel... PushedFilters: [* In( DEST_COUNTRY_NAME, [Anguilla, Sweden])], val pushdownQuery = """( SELECT DISTINCT( DEST_COUNTRY_NAME) FROM flight_info) AS flight_info""" val dbDataFrame = spark.read.format(" jdbc") .option(" url", url) .option(" dbtable", pushdownQuery) .option(" driver", driver) .load() val joinExpr = person.col(" graduate_program") = = = graduateProgram.col(" id") person .join( graduateProgram, joinExpr) . explain() = = Physical Plan = = *BroadcastHashJoin [graduate_program# 40], [id# 5.... :- LocalTableScan [id# 38, name# 39, graduate_progr... +- BroadcastExchange HashedRelationBroadcastMode(.... +- LocalTableScan [id# 56, degree# 57, departmen.... With the DataFrame API, we can also explicitly give the optimizer a hint that we would like to use a broadcast join by using the correct function around the small DataFrame in question. In this example, these result in the same plan we just saw; however, this is not always the case: import org.apache.spark.sql.functions.broadcast val joinExpr = person.col(" graduate_program") = = = graduateProgram.col(" id") person.join( broadcast( graduateProgram), joinExpr). explain()

Spark SQL

With Spark SQL, you can register any DataFrame as a table or view (a temporary table) and query it using pure SQL. There is no performance difference between writing SQL queries or writing DataFrame code, they both “compile” to the same underlying plan that we specify in DataFrame code. Complex Spark SQL Data Types Scala -> SQL Array[T] -> ArrayType(elementType, containsNull) Map[K,V] -> MapType(keyType, valueType, valueContainsNull) case class -> StructType(List[StructFields]) You can make any DataFrame into a table or view with one simple method call flightData2015. createOrReplaceTempView(" flight_data_2015") Now we can query our data in SQL. To do so, we’ll use the spark.sql function (remember, spark is our SparkSession variable) that conveniently returns a new DataFrame. Although this might seem a bit circular in logic — that a SQL query against a DataFrame returns another DataFrame — it’s actually quite powerful. This makes it possible for you to specify transformations in the manner most convenient to you at any given point in time and not sacrifice any efficiency to do so! To understand that this is happening, let’s take a look at two explain plans: val sqlWay = spark.sql(""" SELECT DEST_COUNTRY_NAME, count(1) FROM flight_data_2015 GROUP BY DEST_COUNTRY_NAME """) val dataFrameWay = flightData2015 .groupBy(' DEST_COUNTRY_NAME) .count() sqlWay.explain dataFrameWay.explain The SQL interface also includes the ability to provide hints to perform joins. These are not enforced, however, so the optimizer might choose to ignore them. You can set one of these hints by using a special comment syntax. MAPJOIN, BROADCAST, and BROADCASTJOIN all do the same thing and are all supported: -- in SQL SELECT /* + MAPJOIN( graduateProgram) */ * FROM person JOIN graduateProgram ON person.graduate_program = graduateProgram.id This doesn’t come for free either: if you try to broadcast something too large, you can crash your driver node (because that collect is expensive).

Partitions

http://www.bigsynapse.com/spark-input-output http://www.bigsynapse.com/controlling-the-number-of-partitions-in-spark https://acadgild.com/blog/partitioning-in-spark/ https://medium.com/parrot-prediction/partitioning-in-apache-spark-8134ad840b0 Spark Partition Types ------------------------- Hash Partitioning partition = key.hashCode () % numPartitions. Range Partitioning - In some RDDs have keys that follow a particular ordering. Range partitioning is an efficient partitioning technique, for such RDDs. Custom partitioner https://medium.com/parrot-prediction/partitioning-in-apache-spark-8134ad840b0 spark.default.parallelism - sets up the number of partitions to use for HashPartitioner (can be overridden when creating SparkContext object), spark.sql.shuffle.partitions - controls the number of partitions for operations on DataFrames (default is 200) in HDFS one partition is created for each block of the file of size 64M Too few partitions - You will not utilize all of the cores available in the cluster. Too many partitions - There will be excessive overhead in managing many small tasks. Best way to decide a number of spark partitions in an RDD is to make the number of partitions equal to the number of cores over the cluster. This results in all the partitions will process in parallel.

Performance and Monitoring

http://spark.apache.org/docs/latest/tuning.html Try the G1GC garbage collector with -XX:+UseG1GC. It can improve performance in some situations where garbage collection is a bottleneck. Note that with large executor heap sizes, it may be important to increase the G1 region size with -XX:G1HeapRegionSize Cluster Config:** 10 Nodes 16 cores per Node 64GB RAM per Node Full memory requested to yarn per executor = spark-executor-memory + spark.yarn.executor.memoryOverhead. spark.yarn.executor.memoryOverhead = Max(384MB, 7% of spark.executor-memory) Let’s assign 5 core per executors => --executor-cores = 5 (for good HDFS throughput) Leave 1 core per node for Hadoop/Yarn daemons => Num cores available per node = 16-1 = 15 So, Total available of cores in cluster = 15 x 10 = 150 Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30 Leaving 1 executor for ApplicationManager => --num-executors = 29 Number of executors per node = 30/10 = 3 Memory per executor = 64GB/3 = 21GB Counting off heap overhead = 7% of 21GB = 3GB. So, actual --executor-memory = 21 - 3 = 18GB So, if we request 20GB per executor, AM will actually get 20GB + memoryOverhead = 20 + 7% of 20GB = ~23GB memory for us. To change Spark’s log level, simply run the following command: spark.sparkContext.setLogLevel(" INFO") https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html Avoid GroupByKey Let's look at two different ways to compute word counts, one using reduceByKey and the other using groupByKey: val words = Array("one", "two", "two", "three", "three", "three") val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) val wordCountsWithReduce = wordPairsRDD .reduceByKey(_ + _) .collect() val wordCountsWithGroup = wordPairsRDD .groupByKey() .map(t => (t._1, t._2.sum)) .collect() While both of these functions will produce the correct answer, the reduceByKey example works much better on a large dataset. That's because Spark knows it can combine output with a common key on each partition before shuffling the data. Look at the diagram below to understand what happens with reduceByKey. Notice how pairs on the same machine with the same key are combined (by using the lamdba function passed into reduceByKey) before the data is shuffled. Then the lamdba function is called again to reduce all the values from each partition to produce one final result. With a broadcast join one side of the join equation is being materialized and send to all mappers. It is therefore considered as a map-side join which can bring significant performance improvement by omitting the required sort-and-shuffle phase during a reduce step. As the data set is getting materialized and send over the network it does only bring significant performance improvement, if it considerable small. Another constraint is that it also needs to fit completely into memory of each executor. Not to forget it also needs to fit into the memory of the Driver! DataFrame API --------------------------- val employeesDF = employeesRDD.toDF va departmentsDF = departmentsRDD.toDF // materializing the department data val tmpDepartments = broadcast(departmentsDF.as("departments")) employeesDF.join(broadcast(tmpDepartments), $"depId" === $"id", // join by employees.depID == departments.id "inner").show() Spark includes a cost-based query optimizer that plans queries based on the properties of the input data when using the structured APIs. To allow the cost-based optimizer to make these sorts of decisions, you need to collect (and maintain) statistics about your tables that it can use. There are two kinds of statistics: table-level and column-level statistics. Statistics collection is available only on named tables, not on arbitrary DataFrames or RDDs. The first step in garbage collection tuning is to gather statistics on how frequently garbage collection occurs and the amount of time it takes. You can do this by adding -verbose:gc -XX: + PrintGCDetails -XX: + PrintGCTimeStamps to Spark’s JVM options using the spark.executor.extraJavaOptions configuration parameter. The next time you run your Spark job, you will see messages printed in the worker’s logs each time a garbage collection occurs. These logs will be on your cluster’s worker nodes (in the stdout files in their work directories), not in the

Links

https://medium.com/@mrpowers/how-to-write-spark-etl-processes-df01b0c1bec9 https://hackernoon.com/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4 https://medium.com/@mrpowers/chaining-custom-dataframe-transformations-in-spark-a39e315f903c http://spark.apache.org/examples.html https://mindfulmachines.io/blog/2018/4/3/spark-rdds-and-dataframes https://github.com/mahmoudparsian/pyspark-tutorial https://medium.com/@schaun.wheeler/pyspark-udfs-and-star-expansion-b50f501dcb7b http://www.janvsmachine.net/2017/09/sessionization-with-spark.html https://spark.apache.org/docs/2.2.0/ https://spark.apache.org/docs/0.7.2/api/pyspark/pyspark.rdd.RDD-class.html https://blog.knoldus.com/2017/04/16/the-dominant-apis-of-spark-datasets-dataframes-and-rdds/ http://parrotprediction.com/partitioning-in-apache-spark/ https://blog.deepsense.ai/optimize-spark-with-distribute-by-and-cluster-by/ https://opencredo.com/spark-testing/ https://www.youtube.com/watch?v=ISO8V1cYmfY https://www.slideshare.net/hadooparchbook/top-5-mistakes-when-writing-spark-applications- https://blog.deepsense.ai/optimize-spark-with-distribute-by-and-cluster-by/ https://blog.deepsense.ai/improve-aggregate-performance-with-batching/66374492 http://technology.finra.org/code/using-spark-transformations-for-mpreduce-jobs.html https://blog.deepsense.ai/fast-and-accurate-categorical-distribution-without-reshuffling-in-apache-spark/

RDD activities

In Spark all work is expressed as either - creating new RDDs - transforming existing RDDs - calling operations on RDDs to compute a result.

Transformations

There are two types of transformations: - those that specify narrow dependencies input (partition: output partition ) 1:1 - those that specify wide dependencies (partition: output partition ) 1:M Transformations consisting of narrow dependencies are those where each input partition will contribute to only one output partition. Example: where clause specifies a narrow dependency, where only one partition contributes to at most one output partition. A wide dependency style transformation will have input partitions contributing to many output partitions. We call this a shuffle where Spark will exchange partitions across the cluster. Spark will automatically perform an operation called pipelining on narrow dependencies, this means that if we specify multiple filters on DataFrames they’ll all be performed in memory. The same cannot be said for shuffles. When we perform a shuffle, Spark will write the results to disk.

Actions

There are three kinds of actions: • actions to view data in the console; • actions to collect data to native objects in the respective language; • and actions to write to output data sources.

Structured Streaming

Structured Streaming became Production Ready in Spark 2.2. https://jaceklaskowski.gitbooks.io/spark-structured-streaming/ https://databricks.com/blog/2017/01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1.html Structured streaming allows you to take the same operations that you perform in batch mode and perform them in a streaming fashion. This can reduce latency and allow for incremental processing. The best thing about Structured Streaming is that it allows you to rapidly and quickly get value out of streaming systems with simple switches, it also makes it easy to reason about because you can write your batch job as a way to prototype it and then you can convert it to streaming job. The way all of this works is by incrementally processing that data.

Spark Session

val spark=SparkSession .builder() .appName("My App") .getOrCreate() val conf = new SparkConf().setAppName("SparkJoins").setMaster("local") val sc = new SparkContext(conf) Broadcast variables allow the programmer to keep a read-only variable or lookup table cached on each machine SparkContext.broadcast(v) val broadcastVar = sc.broadcast(Array(1, 2, 3)) def processData (t: RDD[(Int, Int)], u: RDD[(Int, String)]) : Map[Int,Long] = { var jn = t.leftOuterJoin(u).values.distinct return jn.countByKey } StatusCounter scounter = myRDD.status(); scounter.count() // Mean() Sum Max Min Variance StdDev

RDD - distributed and read-only

partitions - atomic pieces of data set dependencies - relation between RDD and how it was derived from function for computing datset based on parent RDD metadata you will likely only be creating two types of RDDs: the “generic” RDD type or a key-value RDD that provides additional functions, such as aggregating by key. Both just represent a collection of objects, but key-value RDDs have special operations as well as a concept of custom partitioning by key. Let’s formally define RDDs. Internally, each RDD is characterized by five main properties: 1. A list of partitions 2. A function for computing each split 3. A list of dependencies on other RDDs 4. Optionally, a Partitioner for key-value RDDs (e.g., to say that the RDD is hash-partitioned) 5. Optionally, a list of preferred locations on which to compute each split (e.g., block locations for a Hadoop Distributed File System [HDFS] file) The Partitioner is probably one of the core reasons why you might want to use RDDs in your code. Specifying your own custom Partitioner can give you significant performance and stability improvements if you use it correctly. You manipulate RDDs in much the same way that you manipulate DataFrames. the core difference being that you manipulate raw Java or Scala objects instead of Spark types. Narrow depencencies: each partition of parent RDD is used by at most one partition of child RDD fast, no shuffle, optimizations like pipelining possible Parent partition(1) -> Child partition(<=1) Examples: map, filter, flatMap, mapPartitions, mapValues, mapPartitionsWithIndex, union, join with co-partitioned inputs Wide depencencies: slow... shuffle over network Parent partition(1) -> Child partition(many) Examples: (group/reduce/combine)ByKey, join, cogroup groupWith, join (depends on partitioning scheme), (left/right)OuterJoin distinct, intersection, repartition, coalese dependencies method on RDDs returns the sequence of Dependencies objects The sorts of dependencies objects: Narrow dependencies objects: OneToOneDependency PruneDependency, RangeDependency Wide dependency objects: ShuffleDependency val wordsRdd = sc.parallelize(largeList) val pairs = wordRdd.map(c=>(c,1)) .groupByKey() .dependencies

SparkWordCount

import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark._ object SparkWordCount { def main(args: Array[String]) { val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) /* local = master URL; Word Count = application name; */ /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ /* Map = variables to work nodes */ /*creating an inputRDD to read text file (in.txt) through Spark context*/ val input = sc.textFile("in.txt") /* Transform the inputRDD into countRDD */ val count = input.flatMap(line ⇒ line.split(" ")) .map(word ⇒ (word, 1)) .reduceByKey(_ + _) /* saveAsTextFile method is an action that effects on the RDD */ count.saveAsTextFile("outfile") System.out.println("OK"); } } scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPWordCount.scala jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar spark-submit --class SparkWordCount --master local wordcount.jar

Choosing best excecution plan

case class Demographic ( id: Int codingBootcamp: Boolean, country: String gender: String isMinority: Boolean, servedInMilitary: Boolean ) //Pair RDD (id,demographic) val demographic = sc.textfile(..) case class Finances (id: Int hasDept: Boolean, hasDependents: Boolean, hasLoans: Boolean, income: Int ) //Pair RDD (id,finances) val finances = sc.textfile(..) plan #1 Following join is pairRDD (id:Int, (Demographic, Finances)) demographic.join(finances) .filter { p=> p._2._1.country == "Swiss" && p._2._2..hasDependens && p._2._2.hasDept }.count steps for join above are: 1) inner join 2) filter to select Swiss people 3) filter to select dept and dependence plan #2 : fast filter before join val filtered_finance=finances.filter(p=>p._2.hasDependends && p._2.hasDept) demographic.filter(p=>p._2.country == "Swiss") .join(filtered_finance) count steps for join above are: 1) filter to select dept and dependence 2) filter to select Swiss people 3) join plan #3 slowest val cartesian=demographic.catresian(finances) cartesian.filter{ case (p1,p2) => p1._1 == p2._1 } .filter{ case (p1,p2) => (p1._2.country == "Swiss") && (p2._2.hasDependence) && p2._2.hasDept }.count

Data Frames

DataFrames are untyped: scala compiler doesn not check types in its schema RDD[T] but DataFrame has no type, it con Catalyst - query optimizer Tungsten - off-heap serializer avoiding garbage collector - contains rows which can contain any schema DataFrame conseptually are RDD full of records with known schema (like in SQL RDBMS table) Transformations on DataFrames are known as untyped transformations DataFrame can be created from existing RDD val tupleRDD = ..//Assume RDD[(Int String, String, String)] val tupleDF = tupleRDD.toDF("id","name","city","country") if you have RDD containing case class instances then spark can infere attributes from case class fields case class Person(id: Int, name: String, city: String) val peopleRDD = .. //Assume RDD[Person] reflection will be used to automatically generate names of fields How to create dataframe 1) Create RDD of Rows from original RDD 2) Create schema represented by StructuredType 3) Apply schema to RDD of Rows via createDataFrime method or by reading datasource from json/csv/Parquet/ JDBC file: val df=sparkread.json(my.json) peopleOF.createOrReplaceTempView("people") val adultsDF = spark.sql("SELECT * FROM people WHERE age>17") DataFrame show() show 20 first elements printSchema() df.filter($"age" >18) df.filter(df("age") >18) df.filter("age >18") val res=empDF.select("id","lname") .where("city=='Sidney'") .orderBy("id") myDF.groupBy($authorId, $subForum) .agg(c, count($authorId)) .orderBy($subForum, count($authorId)) .desc

Pair RDD

def groupByKey(): RDD[(K,Iterable[V])] def reduceByKey(func: (V,V) => V): RDD[(K,V)] def join[W](other: RDD[(K,W)]): RDD[(K,(V,W))] var pairRDD = rdd.map(page => (title, page.text)) case class Event(organizer: String, name: String, budget: Int) //key:organizer var eventsRDD = sc.parallelize(...) .map(event => (event.orgazier, event.budget)) def groupByKey(): RDD[(K, Iterable[V])] var groupedRdd= eventsRDD.groupByKey() groupedRdd.collect().foreach(println) groupByKey collects all of the values associated with the given key and stores them in that single collection. That means that data has moved around the network and doing this, moving the data on the network is called shuffling. Can we somehow do it in a more efficient way? Perhaps we can reduce before doing the shuffle. This could potentially reduce the amount of data that we actually have to send over the network. To do that, we can use the reduceByKey operator, you can think of it as a combination of first doing groupByKey and then reducing overall the values that were grouped by that key in those collections. reduceByKey: conceptionally it is combination of groupByKey and reducing all values per key def reduceByKey (func: (V,V) => V): RDD[(K,V)] val budgetsRDD=eventsRDD.reduceByKey(_+_) budgetsRdd.collect().foreach(println) def mapValues[U] (f: V=>U): RDD[(K,U)] rdd.map { case(x,y): (x,func(y))} //applies func to only values in PairRDD def countByKey(): Map[K,Long]) // counts # of elements per key keys (def keys: RDD{K}) Return RDD with keys on eack tuple Example: number of unique visitors case class Visitor(ip: String, timestamp: String, duration: String) val visits: RDD[Visitor]= sc.textFile(..).map(v=>(v.ip, v.duration)) val numUniqueVisits=visits.keys.distinct().count() In regular Scala collections there is groupBy: def groupBy[K](f: A=>K): Map[K, Traversable[A]] Usage example: val ages=List(34,12,14,78) val grouped=ages.groupBy( age => if (age >17) "adult" else if (age <50) "adult" else "senior" )

Joins on pair RDDs

Inner/leftOuterJoin/rightOuterJoin

How to configure RDD persistance

- in memory as Java objects - on disk as Java objects - in memory as serialized Java objects - on disk as serialized Java objects - both in memory an on disk cache() persist MEMORY_ONLY MEMORY_ONLY_SER MEMORY_AND_DISK MEMORY_AND_DISK_SER DISK_ONLY

Partitioning

https://www.edureka.co/blog/demystifying-partitioning-in-spark partition# = key.hashcode() % number_of_partitions can lead to skewed distribution between nodes Spark support hash partitioning and range partitioning all data in same partition are garanteed to be in the same node Partitions are configurable provided the RDD is key-value based. Properties of Partition Tuples in the same partition are guaranteed to be in the same machine. Each node in a cluster can contain more than one partition. The total number of partitions are configurable, by default it is set to the total number of cores on all the executor nodes. Types of Partitioning in Spark Spark supports two types of partitioning, - Hash Partitioning: Uses Java’s Object.hashCodemethod to determine the partition as partition = key.hashCode() % numPartitions. - Range Partitioning: Uses a range to distribute to the respective partitions the keys that fall within a range. This method is suitable where there’s a natural ordering in the keys and the keys are non negative. Pair RDD may contain keys that have order defined (e.g.: Int or String) For such RDD the range partitioning is more effective range partitioner : provide # of patitions provide a pair RDD with ordered keys this RDD is sampled to create a suitable set of sorted ranges val pairs = purchasedRdd.map(p=> (p.customerId, p.price)) val tunedPartitioner = new RangePartitioner(8, pairs) // spark will use sampling here against pairs to find best ranges val partitioned = pair.partitionBy(tunePartitioner).persist() // persist to eliminate shuffling val purchasesPerCustomer=partitioned.map(p=> (p._1), (1.p._2))) val purchasesPerMonth = purchasesPerCustomer.reduceByKey((v1,v2)=> (v1._1+v2._1,v1._2+v2._2) Operations on Pair RDD that hold and propagate a partitioner: cogroup groupWith join leftOterJoin rightOuterJoin groupByKey - uses hash partitioner with default parametes reduceByKey foldByKey combineByKey partitionBy sort mapValues (if parent has a partitioner) flatMapValues (as above) filter (as above) But following transformation will loose partitioner because it is possible for map to change the key map rdd.map((k: String, v:Int) => ("doh",v)) flatMap userData (UserId, UserInfo) //big table events(UserId,LinkInfo) val sc=new SparkContent() val userData=sc.sequenceFile[UserId,UserInfo]("hdfs://..").persist() def processingNewLogs(logfilename: String){ val events = sc.sequenceFile[UserId,LinkInfo](logfilename) val joined = userData.join(events) //RDD (UserId, (UserInfo,LinkInfo)) val offTopicVisits = join.filter { case (userId, (userInfo, linkInfo)) => //expand tuple ! userInfo.topics.contains(linkInfo.topic) }.count() println("Number of visits to not-subscribed topics: "+offTopicVisits) } The code above is slow because of shuffling; fix is - partition val userData=sc.sequenceFile[UserId,UserInfo]("hdfs://..") .partitionBy(new HashPartitioner(100)) .persist() Then spark will shuffle only events RDD (which is smaller than userData RDD) Following requires all key-value pairs with the same key on the same machine Grouping is done using hash partitioner with default params val purchasesPerCustomer= purchasesRdd.map( p => (p.customerId), p.price)) // Pair RDD .groupByKey() How to know when shuffle may happened? using function toDebugString to see the excecution plan partitioned.reducedByKey((v1,v2) => (v1._1 + v2._1, v1._2 + v2._2)) .toDebugString Methods which might the cause the shuffle cogroup groupWith join leftOterJoin rightOuterJoin groupByKey - uses hash partitioner with default parametes reduceByKey combineByKey distinct intersection repartition coalesce But if we use pre-partitioning then running reduceByKey on pre-partitioned RDD will do reducing work locally; only final result will be send from worker to driver Join called on 2 RDDs that a pre-partitioned with same partitioner and cached on same machine will work locally as well; nio shuffling myRDD.toDebugString repartition, join, cogroup and any of *By* or *ByKey transformaions can result in shuffle if you declare numPartitions parameter it will probably shuffle combineByKey groupByKey calls a shuffle s Skewed data solutions: - Salting ( modify Key=Foo to Foo+rendom.nextInt(sultNumber) - Isolated Salting - Isolated Map Joins RDD is lazy, just DAG is bulding until the action is asked, like collect() .cache() .coalesce() - remove empty RDD ReduceByKey vs GroupByKey TreeDeduce over Reduce Use complex/nested types Off-heap memory

RDD types

hadoopRDD : 1 partition per HDFS block, dependencies->none, compute(partition)=read corresponding block converted into MappedRDD filteredRDD: partitions: same as parent RDD; depndencies: one-to-one on parent; compute(partition)= compute parent and filter it JoinedRDD: 1 partition per redue task; dependencies: shuffle on each parent; compute(partition) read and join shuffled data; partitioner=HashPartitioner(numTasks) mappedRDD PairRDD ShuffleRDD UnionRDD PyrhonRDD DoubleRDD JdbcRDD JsonRDD SchemaRDD VertedRDD EdgeRDD -------- CassandraRDD: va cassandraRDD=sc.cassandraTable("mykeyspace","columnA") - predicate pushdown GeoRDD ExSpark -------- RSS.cache==RDD.persist(MEMORY_ONLY) it is the same RDD.persist(MEMORY_AND_DISK) RDD.persist(DISK_ONLY) RDD.persist(MEMORY_AND_DISK_SER) Cryo serialiazation spark.storage,memoryFraction: shuffle memory, cached RDDs user programs Hadoop means 3 Apache projects: HDFS, Map-Reduce, Yarn Job Traker, Named Node Spark is contender of Map-Reduce conf.registrKryoClass() instead Java Serialization to speed High churn/Lost churn array of ints instead of linkedlist CuseConstMArkSweep

How to run Spark in Standalone client mode

spark-submit \ -class org.apache.spark.examples.SparkPi \ -deploy-mode client \ -master spark//$SPARK_MASTER_IP:$SPARK_MASTER_PORT \ $SPARK_HOME/examples/lib/spark-examples_version.jar 10

How to run Spark in Standalone cluster mode

spark-submit \ -class org.apache.spark.examples.SparkPi \ -deploy-mode cluster \ -master spark//$SPARK_MASTER_IP:$SPARK_MASTER_PORT \ $SPARK_HOME/examples/lib/spark-examples_version.jar 10

How to run Spark in YARN client mode

Yet Another Resoure Manager - YARN Resource Manager manages many NodeManagers Resource Manager has Scheduler spark-submit \ -class org.apache.spark.examples.SparkPi \ -deploy-mode client \ -master yarn \ $SPARK_HOME/examples/lib/spark-examples_version.jar 10

How to run Spark in YARN cluster mode

spark-submit \ -class org.apache.spark.examples.SparkPi \ -deploy-mode cluster \ -master yarn \ $SPARK_HOME/examples/lib/spark-examples_version.jar 10

Spark + Oozie

https://habrahabr.ru/post/338952/