spark part-I

11 minute read

hadoop offers:

  • hdfs: for storage
  • mapreduce: for computation
  • yarn: for resource management

spark is general purpose in memory compute engine, so its an alternative to mapreduce in hadoop, but would require storage (local/cloud) and resource manager(yarn/mesos/k8s).

spark can run on top of hadoop by replacing mapreduce with spark

why spark

problem with mapreduce, it has to read and write from storage every time for each mapreduce job. And spark does all the processing in memory, so ideally just initial one read and final write to disk would be required. it is approximately 10-100% faster than hadoop.

what do we mean by spark is general purpose

in hadoop we use:

  • sqoop: for import
  • pig: for cleaning
  • hive: for querying
  • mahout: for ML
  • storm: for streaming

whereas in spark we can do all like cleaning, querying, ML, ingestion, streaming.

storage unit in spark (rdd)

basic unit which holds data in spark is rdd (resilient distributed dataset). It is in memory distributed collection

//transfornations (lazy), nothing is executed only dag is being formed
rdd1 = load file1 from hdfs
rdd2 = rdd1.map

//action, it will be executed along with required transormations
rdd2.collect

lets say we have 500MB file in hdfs, then default we have 4 blocks data, based on 128MB block size. So, these 4 blocks will be loaded in memory as rdd in 4 partitions.

resilient: can recover from failure, if we loose rdd2 then through lineage graph its known how the rdd was created from its parent rdd. So, rdd provide fault tolerance through lineage graph. whereas in haddop resiliency is maintened by replication in hdfs.

lineage graph: keeps track of transormations to be executed after action has been called.

immutable: after rdd load data cannot be changed

why immutable

because if we keep only one rdd and modify the same again and again, then in case of rdd failure at some step then we wont have parent rdd to regenrate then would require to do all the transformation from start.

why lazy transformations

so as optimized plan can be made.

example: in below case we have to load all data then show one line only, it is better to optimize the plan knowing what all transormations are needed.

rdd1 = load data
rdd1 take 1 record to print

frequency of word in a file

assume file is already present in some hdfs location /path/to/file/file1

running code interactively:

  • scala: spark-shell
  • python: pyspark

spark context: entrypoint to spark cluster, when we run interactively we can do spark-shellthen it will return sc

//in scala
val rdd1 = sc.textFile("/path/to/file/file1")
val rdd2 = rdd1.flatMap(x => x.split(" ")) 
//flatMap takes each line as input 
//and will split based on " " in Array
//finally it will return only one Array with all the words
val rdd3 = rdd2.map(x => (x,1)) //each word will be key and 1 as value
val rdd4 = rdd3.reduceByKey((x,y) = x+y) //this is also transformation
//reduceByKey will have all the same keys together and takes 2 rows at a time 
//and operation we do is add so it will add values of the two rows
rdd4.collect()
#in pyspark
sc.setLogLevel("ERROR") #default log is WARN
rdd1 = sc.textFile("/path/to/file/file1")
rdd2 = rdd1.flatMap(lambda x : x.split(" ")) 
rdd3 = rdd2.map(lambda x : (x,1))
rdd4 = rdd3.reduceByKey(lambda x,y : x+y)
rdd4.collect()
rdd4.saveAsTextFile("/path/to/file/output1")

Note

  • at any point we can use rdd.collect() to see output of rdd
  • in scala if there are no input parametrs to function then we can call without parentheses
  • anonymous functions are called lambda in python
  • to access local file we can use file:// before the file path. like sc.textFile("file:///path/to/file/file1")
  • for doing lowercase we can use toLowerCase()
  • for sorting we can use sortby(x => x._2) to sort on value in key value input
  • if we need to just count number of times key is repeating then can use countByValue but its an action not transformation
  • dag in pyspark is different than of scala because pyspark uses api library, scala dag matches to the code but not pyspark code
  • at raw rdd level code pyspark is not that optimized compared to scala
  • dag is created while transformations are being called and submitted when action is called, it shows jobs, stages, tasks
  • lineage keeps track of rdds and how they are connected so as in case of failure, recovery can be done using parent rdd or if base rdd then getting data from specefic partition in disk, lineage is part of dag
  • we can check lineage on any rdd with function toDebugString like rdd1.toDebugString

shared variables

broadcast

broadcast join in spark is similar to map side join in hive. It is aceived by using broadcast variable. Small table can be broadcasted to all nodes.

Example: If we need to filter out some data based on data in other small table then we could broadcast small table to all nodes. Lets say we have only one column in small table we could use set to have distinct values in a variable.

import scala.io.Source

var v:Set[String] = Set()
/*
//data to be broadcasted rows like: 
are
is
am
*/
val lines = Source.fromFile("/path/to/broadcast/data").getLines()
for(line <- lines) {v += line}

var b = sc.broadcast(v)

/*
//some data which has rows like:
hello how are you, 20
am good, 30
*/
val rdd1 = sc.textFile("/path/to/data")
val mappedInput = rdd1.map(x => x.split(",")(1).toFloat,x.split(",")(0))

//it will flat the structure based on values and produce more rows
val words = mappedInput.flatMapValues(x => x.split(" "))

//this will be map side work only to filter out data
val filterData = words.filer(x => !b.value(x._1))

//this will add values for same words
val total = filterData.reduceByKey((x,y) => x+y)

//this will give data in sorted desc
val sorted = total.sortBy(x => x._2,false)

accumlator

similar to counters in mapreduce, if we need to count then we can use accumlator.

  • shared variable on driver machine
  • each executor can only update it but cannot read value
val v = sc.longAccumlator("shown name in UI")
val rdd1 = sc.textFile("/path/to/data")

//it will count the number of lines in file
rdd1.foreach(x => v.add(1))

list to rdd

//lets say we want col1 and count how many times it repeat
val l = List("A: P1", "B: P2", "A: P3")
val rdd1 = sc.parallelize(l)
val pair = rdd1.map(x => {
  val cols = x.split(":")
  (cols(0),1)
})
val result = pair.reduceByKey((x,y) => x+y)
result.collect().foreach(println)

narrow & wide transformation

narrow: ideally where shuffling is not involved, when rdd partition is dependent on at most one partition of parent rdd

example: map, filter, filtermap, etc

wide: where shuffling is involved, when rdd partition is dependent on mutiple partitions of parent rdd

example: reduceByKey, groupByKey, etc

job, stages & tasks

job:

  • every action is a job
  • sortByKey is exception as its transformation but still shows up as job because some part of it is eager and some part lazy
  • when action is called all the transformations in the stage from beginning will be executed. spark optimizes to skip previous stages

stage:

  • whenever shuffle is required new stage is created, so wide transformation creates new stage
  • by default it will be one stage
  • when shuffle happens it uses disk so as next stage can pick data
  • wide transformations should be used later in the app so as it works on less data

example: if 2 wide transormations then total 3 stages in the spark application run

task:

  • will be equal to number of partitions read

reduceByKey vs reduce

reduceByKey

  • is transormation(wide)
  • works on pair rdd (tuple of two: key, value) only
//lets say we want col1 and count how many times it repeat
val l = List("A: P1", "B: P2", "A: P3")
val rdd1 = sc.parallelize(l)
val pair = rdd1.map(x => {
  val cols = x.split(":")
  (cols(0),1)
})

//transformation: here we get only rdd
val result = pair.reduceByKey((x,y) => x+y)

//action: here we get result on local not rdd
result.collect().foreach(println)

reduce:

  • is an action
  • gives single output
val a = 1 to 100
val rdd1 = sc.parallelize(a)

//action: it will return result
rdd1.reduce((x,y) => x+y)

reduceByKey vs groupByKey

both are transformation(wide)

reduceByKey:

  • works on mapper end to do local aggregations
  • similar to combiner in hadoop
  • less shuffling as local aggregations done

groupByKey:

  • all the data is sent to reducer to do aggregations
  • more shuffling
  • should be avoided as no local aggregations
  • resources will be wasted as most of mappers will be ideal in case of reducer work

Note: by default, hdfs blocks size is 128MB and local block size is 32MB

pair rdd vs map datatype in scala

pait rdd holds tuple of two elements(key, value).

map datatype in scala also hold key value pair but cannot have duplicates while pair rdd can have duplicates

save to file

//its an action
rdd.saveAsTextFile("/path/to/file")

parallelism and partitions

default parallism is equal to number of cores and default partitions is equal to number of parallelism

//to check parallelism
sc.defaultParallelism

val l = List("A: P1", "B: P2", "A: P3")
val rdd1 = sc.parallelize(l)

//to check number of partitions
rdd1.getNumPartitions

//if we load from local file in rdd than it will use 32MB block to partition
//if hdfs than 128MB
//but if file is small like 2MB than default min partition is used which is 2

sc.defaultMinPartitions

repartition vs coalesce

repartition:

  • to increase parallelism we may need to repartition to more partitions
  • if we have less data in each partition after a certain point then we may need to consider repartition to use less partitions with good data
  • it is wide transformation
  • does full shuffle of data to get exact equal size
//lets say rdd1 exist with default 2 partitions
rdd1.getNumPartitions

//this will repartition in 10 partitions
val rdd2 = rdd1.repartition(10)

rdd2.getNumPartitions


//decrease number of partitions to 1
val rdd3 = rdd1.repartition(1)

coalesce:

  • same as repartition but can only decrease number of partitions
  • increasing does not give error but also does not change the number of partitions
  • it is transformation
  • minimize shuffling by combining local partitions to avoid full shuffle
//lets say rdd1 exist with default 2 partitions
rdd1.getNumPartitions

//decrease number of partitions to 1
val rdd2 = rdd1.coalesce(1)

rdd2.getNumPartitions

Note

  • to decrease partitions, repartition and coalesce both can be used but coalesce is preferred as it minimizes shuffling by trying to combine partitions within the node
  • when using coalesce, we can get unequal partitions
  • when using repartition, full shuffling will be done but we get equal size partitions

cache & persist

if we have a lot of transormations and we call actions mutiple times then all the transformations will run again or atleast the last stage will be executed in full. If we use cache then transormations (if any) or action after cache will only be executed.

cache and persist, both have same purpose to not do all the transformations again.

cache will always be in memory, persist have option of various storage levels. persist in memory persist() is same as cache.

persist storage levels

MEMORY_ONLY: cached in memory non serialized format

DISK_ONLY: in disk serialized format

MEMORY_AND_DISK: in memory, if not enough memory then evicts blocks and are stored in disk. recommemded to avoid expensive recalculation and memory is limited

OFF_HEAP: blocks cached off heap (outside jvm), in jvm it uses garbage collection to free space, its time taking process. So, we could uses memory outside executor. These are called unsafe operations as it will use raw memory outside jvm. It stores in serialized format

MEMORY_ONLY_SER: same as memory only but serialized

MEMORY_AND_DISK_SER: same as memory and disk but serialized

MEMORY_ONLY_2: same as memory only but with 2 replicas on different worker nodes, t speed up recovery

Note:

  • in memory only if we dont have enough memory then it will not fail rather will skip caching it, so it cache partitions as much as it can until full
  • serialized saves space as it stores in binary but will require more processing
  • in memory it will be memory deserialized
  • can cache in memory & remaining on disk
  • can cache in memory serialized it will take more in processing
  • can unpersist
  • prefer kryo serializer over java serializer
  • whenever the data is stored on disk or has to be transferred over the disk it has to be in serialized form, if we use kryo serializer then size will be much lesser than in the case of java serializer
  • kryo is significantly faster and more compact than java serialization (often as much as 10times

run jar

jar is Java ARchive, a package file format typically used to aggregate many java class files and associated metadata and resources (text, images, etc.) into one file for distribution. jar-wiki

to run from terminal we can submit like below

#step1: prepare jar from preffered IDE (path to files should be in args, spark context should not have local)
#step2: move jar to the edge/gateway machine using scp
spark-submit \
--class ClassName \
--master yarn \
--deploy-mode cluster \
--executor-memory 2G \
--num-executors 2 \
path/to/jar path/to/file

deploy-mode

  • when cluster then driver will be on one of the worker node, logs can be viewed from the worker node where its executing
  • when client then driver will be on the edge/gateway machine where we are running the command, logs will be printed directly on shell

map vs map partition

map is a transformation which works on each row. for example if in rdd there are 4 partitins with 1000 rows in each partition then map will be called 4000 times

map partition works on each partition so in example above it will be run 4 times. This can help if we need to make connection to database

Comments