spark part-II
spark core works on rdds (spark 1 style) but we have high level constructs to query/process data easily, its dataframe/datasets
dataframe is distributed collection of data organized into named columns. It was available in spark 1 also but in spark 2 and onwards we have better support for dataframe/datasets and both are merged into single api (datset api)
dataframe code will be converted to low level rdd code which is done by driver. low level/rdd code is directly sent to executors dataframe/dataset code optimized and converted to low level before sending to executor, using catalyst optimizer
spark session
earlier we use to do spark conext and if hive required then hive context will be required, etc. spark session is unified and have all the contexts, kind of unified entry point for spark application. It is singleton object as we either get or create it.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("app1").master("local[2]").getOrCreate()
//do processing
spark.stop()
//another way to set config
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
val sparkConf = new SparkConf()
sparkConf.set("spark.app.name", "app1")
sparkConf.set("spark.master", "local[2]")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
//do processing
spark.stop()
basics dataframe commands
import org.apache.log4j.Level
import org.apache.log4j.Logger
//to get only ERROR and above
Logger.getLogger("org").setLevel(level.ERROR)
//we should not use inferSchema in production as it infers data type based on few initial rows
//in spark it will create 3 jobs: 1st read, 2nd inferSchema as some data read, 3rd for show
val df = spark.read.option("header", true).option("inferSchema", true).csv("/path")
df.show()
df.printSchema()
//this will create 3 stages: 1st default, 2nd repartition, 3rd groupBy
df.repartition(4).where("col1>10").select("col2,col3").count().show()
Note: whenever shuffling is done then stage writes to Exchange (buffer) and next stage reads from Exchange. So we see in spark ui some data (compressed) written out and read in in next stage.
rdd/dataframe/dataset
rdd
- example map, filter, reduceByKey, etc.
- low level, not developer friendly
- lacks basic optimizations
dataframe
- spark 1.3 and onwards
- high level construct, developer friendly
challenges with dataframe:
- not strongly typed, errors at runtime
- less flexibility
dataframes can be converted to rdd to get flexibility and type safety, but the conversion has some cost involved and also rdd don’t go through catalyst optimizer, so major optimizations will be skipped if we work with rdd.
dataset
- spark 1.6 and onwards
- compile time safety
- more flexibility to write low level code like anonymous/lambda functions
- conversion from dataframe to dataset is seemless
Note: before spark 2, dataframe and dataset had different api. In spark 2, both are merged into single unified structured api
So, dataframe is dataset of row type (dataset[row]), row is generic type which bounds at runtime. But in dataset type (dataset[Order]) is bound at compile time.
convert dataframe to dataset
case class Abc(id: Int, col2: String)
val df = spark.read.option("header", true).option("inferSchema", true).csv("/path")
//this is required for conversion from dataframe to dataset or vice versa
//it has to be imported after spark session only
import spark.implicits._
val ds = df.as[Abc]
//this will give error at complie time as col3 does not exist
ds.filter(x => x.col3 > 3)
//we could use other way to filter but that will not give error at compile time
ds.filter("col3 > 3")
Note: dataframes are preferred, as there is overhead in dataset for casting to particular type. with dataframe serializaton is managed by tungsten binary format, with dataset serialization is managed by java serialization which is slow
read modes
PERMISSIVE: sets NULL if encounters malformed record, this is default mode, _corrupt_record as new column will appear and it will hold the malformed record
DROPMALFORMED: will ignore malformed record
FAILFAST: exception raised when malformed record found
spark.read.format("csv").option("header", true).option("path", "/path").option("mode", "FAILFAST").load()
schema types
INFER: as we do for csv infer schema
IMPLICIT: example lile orc/parquet which comes with schema
EXPLICIT: manually defining schema like using case class
explicit schema approaches
programatic approach:
//in this approach we give spark datatypes
val dfSchema = StructType(List(StructField("col1", IntegerType),StructField("col2", TimestampType),StructField("col3", IntegerType)))
val df = spark.read.format("csv").schema(dfSchema).option("path", "/path").load()
df.show()
df.printSchema()
ddl string:
//in this approach we give scala datatypes
val dfSchema = "col1 Int, col2 Timestamp, col3 Int"
val df = spark.read.format("csv").schema(dfSchema).option("path", "/path").load()
df.show()
df.printSchema()
save modes
append: add file in existing folder
overwrite: overwrites the data in folder
errorIfExists: error if folder exist
ignore: ignore if folder exist
df.write.mode(SaveMode.Overwrite).option("path", "/path").save()
spark file layout
ways to control number of files generated when saving data, by default it will be equal to number of partitions in dataframe
repartiton: it does full shuffle, not preferred
partitioning/bucketing: partitoning will create folder and bucket is number of files
df.write.partitionBy("col1").mode(SaveMode.Overwrite).option("path", "/path").save()
max records: if this option is set than each file will have that many records only
df.write.option("maxRecordsPerFile", 1000).mode(SaveMode.Overwrite).option("path", "/path").save()
sparksql
to work with sql we create view and work on it. example:
//lets say we have dataframe already
df.createOrReplaceTempView("dfView")
val output = spak.sql("select col1, max(col2) from dfView group by col1 order by col1")
output.show()
Note: as per performance sparksql/dataframe are similar as both uses catalyst optimizer
save as table
when we use save
then it saves directly as file in some folder. But sometimes we want to store as table (data + metadata)
data: stored in path used in spark.sql.warehouse.dir
metadata: stored in memory by default, hive metstore can be used for permanent storage
//by default data will be stored in default location & metadata in memory
df.write.format("csv").mode(SaveMode.Overwrite).saveAsTable("table1")
Note:
- we can enable hive support in config by
enableHiveStore()
to have permanent metadata store. So, we can create database and save tables in that database. - save in table benefits if some query needs to be done for example by reporting tools etc.
- bucketing works on table, so here we can do like
bucketBy(2, "col1")
, this will create 2 files/buckets, it uses hash function so same data will end up in same bucket, it is widely used withsortBy
for performance
transformations
low level: mostly used with rdds, some of it also works with dataframes/datasets
- map
- filter
- groupByKey
high level: used with dataframes/datasets
- select
- where
- groupBy
Note: although we can do work with dataframe/dataset but sometimes we need rdds for example if we have unstructured file with mutiple delimiters 1:data1,data2
. In this case we can read data in rdd and then process it using regex to get data and convert in dataset
column read
column string: df.select("col1","col2").show
column object: df.select(column("col1"),col("col2"),$"col3", 'col4).show
$"col3", 'col4
is supported only in scala, others are supported in both pyspark & scala
Note: both column string & object can not be used in same statement
column expressions
df.select(col("col1"), expr("concat(col2, '_suffix')")).show()
df.selectExpr("col1", "concat(col2, '_suffix')").show()
Note: columns strings/object can not be used together with column expression
attach column names to dataframe
//lets say we have only data in the path without column names
val df = spark.read.format("csv").option("path", "/path").load()
//here column name are decided by spark but we can give name as below
val df1 = df.toDF("col1", "col2", "col3")
df1.show()
udf
column object expression udf
//step1: define function
def func(i:Int):String = {
if (i>90) "Y" else "N"
}
//step2: register function
val parseFunc = udf(func(_:Int):String)
//read data
val df = spark.read.format("csv").option("path", "/path").load()
//step3: call function, driver will serialize function and send to all executors
val df1= df.withColumn("col4", parseFunc(col("col1")))
df1.show()
column string/sql udf
//step1: define function
def func(i:Int):String = {
if (i>90) "Y" else "N"
}
//step2: register function, this will register in catalog and normal spark sql can also be done on it
spark.udf.register("parseFunc",func(_:Int):String)
//read data
val df = spark.read.format("csv").option("path", "/path").load()
//step3: call function, driver will serialize function and send to all executors
val df1= df.withColumn("col4", expr("parseFunc(col1)"))
df1.show()
example with test data in code
//test data in list
val l1 = List(
(1,"2001-01-15",98,"PASS"),
(2,"2001-01-15",9,"FAIL"),
(3,"2001-01-15",60,"PASS")
)
//one way to get data in rdd than convert to df
import spark.implicits._
val rdd = spark.sparkContext.parallelize(l1)
val df = rdd.toDF()
//other way to directly get in df
val df1 = spark.createDataFrame(l1).toDF("col1","col2","col3","col4")
//convert date column col2 (string) to epoch time
val df2 = df1.withColumn("col2", unix_timestamp(col("col2").cast(DateType)))
//add new column with unique data
val df3 = df2.withColumn("new", monotonically_increasing_id)
//drop duplicates based on col1 and col2
val df4 = df3.dropDuplicates("col1","col2")
//drop col3
val df5 = df4.drop("col3")
aggregartions
simple
- output is single row
- example sum/max/min of all data
val df = spark.read.format("csv").option("path", "/path").load()
//using column object expression
df.select(count("*").as("rows"), sum("col3").as("total"), avg("col3").as("avg"), countDistnct("col4").as("distinct")).show()
//using string expression
df.selectExpr("count(*) as rows", "sum(col3) as total", "avg(col3) as avg", "countDistnct(col4) as distinct").show()
//using spark sql
df.createOrReplaceTempView("dfView")
spark.sql("select count(*) as rows, sum(col3) as total, avg(col3) as avg, countDistnct(col4) as distinct from dfView").show()
Note: when count is done on particular column than only non null will be counted
grouping
- output is more than one record
- groupBy is used
val df = spark.read.format("csv").option("path", "/path").load()
df.groupBy("col1","col2").agg(sum("col3").as("sum"),sum(expr("col3*col3").as("square"))).show()
window
- some fixed size window is used
- example past 7 days sale
val df = spark.read.format("csv").option("path", "/path").load()
//window need partition, order, window size
val window = Window.partitionBy("col1").orderBy("col2").rowsBetween(Window.unboundedPreceding, Windows.currentRow)
df.withColumn("runningSum", sum("col3").over(window)).show()
join
simple
these are the ones where shuffle is involved, also called as shuffle sort merge join
val df1 = spark.read.format("csv").option("path", "/path1").load
val df2 = spark.read.format("csv").option("path", "/path2").load
val joinCondition = df1.col("col1") === df2.col("col1")
df1.join(df2, joinCondition, "inner").show()
type of joins
- inner: matching records
- left: matching + non matching from left
- right: matching + non matching from right
- outer: matching + non matching from left + non matching from right
internals of join
- for join to happen keys needs to be on same executor (reducer)
- write output to exchange, which is buffer in the executor (mapper)
- from exchange spark framework will read and do shuffle so as same keys goes to same executor
Note:
- in join, if both dataframes have same column name then selecting that particular column will be ambiguous. either need to rename column in one of the dataframe
withColumnRenamed
before join or drop the column after join. - in case of null, use
coalesce
to get some default value or something
broadcast
- small tables is broadcasted to all the executors, no shuffle
- one table should be small enough so as all executors can hold data in memory
- by default its enabled to broadcast small files
- can be disabled by
spark.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
- can also be used as
df1.join(broadcast(df2), joinCondition, "inner")
val rdd1 = sc.textFile("/path/to/big_file/")
val rdd2 = rdd1.map(x => (x.split(":")(0),x.split(":")(1))
val a = Array(("key1",0),("key2",1))
//partitions will be based on default parallelism
val rdd3 = sc.parallelize(a)
//shuffle needs to be done, another stage will be created
val rdd4 = rdd2.join(rdd3)
rdd4.saveAsTextFile("result1")
//here we can use broadcast for small table to avoid shuffling and save time
val toBroadcast = a.toMap
val dataBroadcast = sc.broadcast(toBroadcast)
val rdd3 = rdd2.map(x => (x._1,x._2,dataBroadcast.value(x._1)))
rdd3.saveAsTextFile("result2")
pivot
//lets say temp view dfView exist
//here pivot on col2 have to query to find distinct
val df = spark.sql("select col1, col2 from dfView").groupBy("col1").pivot("col2").count().show()
//we can fix the data in pivot so as it does not have to do distinct query
val columns = List("Yes", "No")
val df = spark.sql("select col1, col2 from dfView").groupBy("col1").pivot("col2", columns).count().show()
Comments