Personal Page
https://spark.apache.org/docs/latest/submitting-applications.html
./bin/spark-submit \
--class com.example.MainClass \
--master spark://cluster-url:7077 \
--executor-memory 2g \
--total-executor-cores 4 \
/path/to/your/app.jar \
arg1 arg2
Get column description:
df_data.describe().show()
df_data.columns
Handling missing values
from pyspark.sql.functions import col,isnan,when,count
df2 = df_data.select([
count( \
when( \
col(c).contains('None') | \
col(c).contains('NULL') | \
(col(c) == '' ) | \
col(c).isNull() | \
isnan(c), c \
) \
).alias(c) \
for c in df_data.columns])
df2.show()
df_data.filter(df_data.column_name.isNull()).show()
df_data.filter(df_data.column_name1.isNull() & df_data.column_name2.isNull()).show()
The drop function has three parameters namely how, thres, subset.
how parameter accepts two values namely -i) all: to drop a record when all the values are null ; ii) any: to drop a record if any one of it’s value is null
thres parameter is used to specify the minimum number of null values required to drop a record.
subset parameter is used to specify the dataset subset which needs to be checked.
df_data = df_data.na.drop(how=’all’)
DROP the entire record if it contains a null value:
df_data = df_data.na.drop(how=’any’)
The fill function has two parameters namely value and subset.
df_data = df_data.na.fill('missing')
The above code will replace the null values with the keyword ‘missing’
To use the imputer function we first need to import it
from pyspark.ml.feature import Imputer
imputer parameters inputsCols is used to specify a list of columns and inputCol is used to specify a single column that needs to be considered while checking for null values.
Similarly, outputCol and outputCols are used to specify the single or multiple output columns depending on the inputs.
strategy parameter defines the technique that needs to be applied on the target column to impute it’s null values. Acceptible values are ‘mean’, ‘median’, ‘mode’
from pyspark.ml.feature import Imputer
inputCols = ['cname1', 'cname2', 'cname3']
imputer = Imputer(
inputCols = inputCols,
outputCols = ["{}_imputed".format(c) for c in inputCols]).setStrategy("mean")
imputer.fit(df_data).transform(df_data)
We can use the filter function to extract the required data from a dataframe query 1 and 2 below give the same data that is the top 5 rows where year is greater than 2010
df_data.filter(“year > 2010”).show(5)
df_data.filter(df_data['year'] > 2010).show(5)
We can use the ~ symbol to get the inverse of a filter
df_data.filter(~(df_data[‘year’] > 2010)).show(5)
The above code gives the top 5 rows where year is less than 2010
To filter the dataframes based on more than one condition we can use the & | operators
df_data.filter((df_data[‘selling_price’] > 20 ) & ( df_data[‘selling_price’] < 40 )).show(5)
The above code filters dataframe where selling price is between 2lakh and 4lakh
df_data.filter((df_data[‘transmission’]==’Automatic’) | (df_data[‘transmission’]==’Manual’)).show(5)
The above code filters dataframe where the transmission is either manual or automatic.
df_data.select(‘column_name’).distinct().collect()
df_data.groupBy(‘column_name’).count().show()
df_data.groupBy(‘column_name’).mean().show()
df_data.groupBy(‘column_name’).sum().show()
val df = Seq((1, "A", Seq(1,2,3)), (2, "B", Seq(3,5))).toDF("col1", "col2", "col3")
df.show()
+----+----+---------+
|col1|col2| col3|
+----+----+---------+
| 1| A|[1, 2, 3]|
| 2| B| [3, 5]|
+----+----+---------+
val df2 = df.withColumn("col3", explode($"col3"))
df2.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| A| 1|
| 1| A| 2|
| 1| A| 3|
| 2| B| 3|
| 2| B| 5|
+----+----+----+
val df3 = df.withColumn("new_col4", explode($"col3"))
df3.show()
+----+----+---------+--------+
|col1|col2| col3|new_col4|
+----+----+---------+--------+
| 1| A|[1, 2, 3]| 1|
| 1| A|[1, 2, 3]| 2|
| 1| A|[1, 2, 3]| 3|
| 2| B| [3, 5]| 3|
| 2| B| [3, 5]| 5|
+----+----+---------+--------+
val df5 = df.withColumn("new_col4", explode($"col3")).select("col1","col2", "new_col4")
df5.show()
+----+----+--------+
|col1|col2|new_col4|
+----+----+--------+
| 1| A| 1|
| 1| A| 2|
| 1| A| 3|
| 2| B| 3|
| 2| B| 5|
+----+----+--------+
val df7=df.select(col("col1"),col("col2"), explode(col("col3")))
df7.show()
+----+----+---+
|col1|col2|col|
+----+----+---+
| 1| A| 1|
| 1| A| 2|
| 1| A| 3|
| 2| B| 3|
| 2| B| 5|
+----+----+---+
creating alias for exploded column:
-------------------------------------
val df8=df.select(col("col1"),col("col2"), explode(col("col3")).alias("my_alias"))
scala> df8.show()
+----+----+--------+
|col1|col2|my_alias|
+----+----+--------+
| 1| A| 1|
| 1| A| 2|
| 1| A| 3|
| 2| B| 3|
| 2| B| 5|
+----+----+--------+
Filter example
------------------
val df = Seq(
("thor", "new york"),
("aquaman", "atlantis"),
("wolverine", "new york")
).toDF("superhero", "city")
df.show()
+---------+--------+
|superhero| city|
+---------+--------+
| thor|new york|
| aquaman|atlantis|
|wolverine|new york|
+---------+--------+
df.printSchema()
// creating new column:
df.withColumn("city_starts_with_new", $"city".startsWith("new")).show()
+---------+--------+--------------------+
|superhero| city|city_starts_with_new|
+---------+--------+--------------------+
| thor|new york| true|
| aquaman|atlantis| false|
|wolverine|new york| true|
+---------+--------+--------------------+
println(df.schema.fieldNames.contains("city"))
println(df.schema.contains(StructField("city",StringType,true)))
https://stackoverflow.com/questions/47657072/importing-schema-from-json-with-optional-value
import org.apache.spark.sql.expressions.scalalang._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
val schema = StructType(Seq(
StructField("k1", StringType, false),
StructField("optK", StructType(Seq(StructField("nestedK", StringType, false))), false)
))
val df = spark.read.option("allowUnquotedFieldNames",true).schema(schema).json("s3 location of data.json")
df: org.apache.spark.sql.DataFrame = [k1: string, optK: struct<nestedK: string>]
scala> df.show
+--------------+------+
| k1| optK|
+--------------+------+
| someValue|[optV]|
|someOtherValue| null|
+--------------+------+
A Column object corresponding with the city column can be created using the following three syntaxes:
$"city"
df("city")
col("city") (must run import org.apache.spark.sql.functions.col first)
This is tha same:
val s1 = df.select("city")
val s2 = df.select($"city")
https://habr.com/ru/company/otus/blog/653033/
df.show(n=2, truncate=False, vertical=True)
df.display()
// read csv
val raw = spark
.read
.option("header", "true")
.option("inferSchema", "true")
.csv(s"$basePath/data/BankChurners.csv")
val columns: Array[String] = raw.columns
val columnsLen: Int = columns.length
// Переменная colsToDrop – это массив имён колонок, которые надо исключить из загруженного набора данных.
val colsToDrop: Array[String] = columns.slice(columnsLen - 2, columnsLen) :+ columns.head
//Для удаления колонок из DataFrame используется метод drop, аргументами которого является одно или несколько названий колонок – аргументы переменной длины. Чтобы преобразовать массив в аргументы метода в Scala применяется конструкция array: _*
val df = raw.drop(colsToDrop: _*)
df.show(5, truncate = false)
df.printSchema
// Выведем в удобном виде названия колонок и их тип:
df.dtypes.foreach { dt => println(f"${dt._1}%25s\t${dt._2}") }
// посмотрим сколько колонок каждого типа:
df.dtypes.groupBy(_._2).mapValues(_.length).foreach(println)
// Выделим числовые колонки и применим к ним метод summary. Этот метод вычисляет такие статистики как:
// count mean stddev min max
// arbitrary approximate percentiles specified as a percentage (e.g. 75%)
val numericColumns: Array[String] = df.dtypes.filter(!_._2.equals("StringType")).map(_._1)
df.select(numericColumns.map(col): _*).summary().show
//посмотрим на значения колонки Customer_Age
df.groupBy($"Customer_Age").count().show(100)
//введём новую колонку target, которая будет равна 0, когда значение Attrition_Flag равно “Existing Customer”, и 1 в остальных случаях.
val dft = df.withColumn("target", when($"Attrition_Flag" === "Existing Customer", 0).otherwise(1))
dft.groupBy("target").count.show
Выделим в отдельные переменные данные разных классов и сохраним количество записей в каждом классе.
val df1 = dft.filter($"target" === 1)
val df0 = dft.filter($"target" === 0)
val df1count = df1.count
val df0count = df0.count
Нужно увеличить количество записей в наборе df1 в df0count / df1count раз:
val df1Over = df1
.withColumn("dummy", explode(lit((1 to (df0count / df1count).toInt).toArray)))
.drop("dummy")
Давайте рассмотрим это подробнее.
Конструкция (1 to (df0count / df1count).toInt).toArray создаёт массив со значениями от 1 до (df0count / df1count)
(1 to (df0count / df1count).toInt).toArray
res77: Array[Int] = Array(1, 2, 3, 4, 5)
Функция lit создаёт колонки с определённым значением. Мы добавляем колонку с именем dummy, значением которой является массив:
df1
.withColumn("dummy", lit((1 to (df0count / df1count).toInt).toArray))
.select("Attrition_Flag", "Customer_Age", "dummy")
.show(10)
Функция explode создаёт новую строку для каждого элемента массива:
df1
.withColumn("dummy", explode(lit((1 to (df0count / df1count).toInt).toArray)))
.select("Attrition_Flag", "Customer_Age", "dummy")
.show(10)
Итак, df1Over – это набор, содержащий записи класса target = 1, увеличенный в df0count / df1count раз.
Объединим этот новый набор с набором записей второго класса и проверим сбалансированность исходного набора:
val data = df0.unionAll(df1Over)
data.groupBy("target").count.show
https://habr.com/ru/company/alfastrah/blog/481924/
https://spark.apache.org/docs/latest/sql-programming-guide.html
select A.people, B.state, count(*) from A join B on A.state_id=B.state_id group by B.state
Since there are only 50 states we cannot achieve better parallelism by adding > 50 cores also since California is the biggest state the data is skewed - use broadcast join
https://habr.com/ru/company/vk/blog/442688/
val train = sqlContext.read.parquet("/events/hackatons/SNAHackathon/2019/collabTrain")
z.show(train.groupBy($"date").agg(
functions.count($"instanceId_userId").as("count"),
functions.countDistinct($"instanceId_userId").as("users"),
functions.countDistinct($"instanceId_objectId").as("objects"),
functions.countDistinct($"metadata_ownerId").as("owners"))
.orderBy("date"))
or like this:
val train = sqlContext.read.parquet("/events/hackatons/SNAHackathon/2019/collabTrain")
z.show(
train groupBy $"date" agg(
count($"instanceId_userId") as "count",
countDistinct($"instanceId_userId") as "users",
countDistinct($"instanceId_objectId") as "objects",
countDistinct($"metadata_ownerId") as "owners")
orderBy "date"
)
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-hint-framework.html
COALESCE and REPARTITION Hints
Spark SQL 2.4 added support for COALESCE and REPARTITION hints (using SQL comments):
SELECT /*+ COALESCE(5) */
SELECT /*+ REPARTITION(3) */
Broadcast Hints
Spark SQL 2.2 supports BROADCAST hints using broadcast standard function or SQL comments:
SELECT /*+ MAPJOIN(b) */
SELECT /*+ BROADCASTJOIN(b) */
SELECT /*+ BROADCAST(b) */
val transactions = Seq((1, 2), (1, 4), (2, 3)).toDF("user_id", "category_id")
val transactions_with_counts = transactions
.groupBy($"user_id", $"category_id")
.count
There are a few ways to access Row values and keep expected types:
a)
transactions_with_counts.map(
r => Rating(r.getInt(0), r.getInt(1), r.getLong(2))
)
b)
transactions_with_counts.map(r => Rating(
r.getAs[Int]("user_id"), r.getAs[Int]("category_id"), r.getAs[Long](2)
))
c)
import org.apache.spark.sql.Row
transactions_with_counts.map{
case Row(user_id: Int, category_id: Int, rating: Long) =>
Rating(user_id, category_id, rating)
}
d) Converting to statically typed Dataset (Spark 1.6+ / 2.0+):
transactions_with_counts.as[(Int, Int, Long)]
e)
case class Rating(user_id: Int, category_id:Int, count:Long)
val transactions_with_counts = transactions.groupBy($"user_id", $"category_id").count
val rating = transactions_with_counts.as[Rating]
This way you will not run into run-time errors in Spark because your Rating class column name is identical to the ‘count’ column name generated by Spark on run-time.