Spark Notebook

Here are some notes I’ve taken on Spark. Not intended to be organized material for other, just my personal notes. You’ve been warned!

In spark an operation must be associative to provide reliable deterministic results! Because of the non-shared multi-node paradigm Spark is built on.

Spark is based on an ubiquitous abstraction called Resilient Data Set (RDD for short). Using a distributed approach introduces new concerns:

  • Partial failure: some of the nodes could fail, something that in the shared memory scenario we didn’t even care about. Spark do a pretty good job in masking partial failures!
  • Latency: data are spread on several nodes communicating through network and networks have latency. Some operations show a higher latency than others, something we must carefully consider when profiling a Spark job. That’s because latency can’t be masked by Spark

There is quite symmetry between RDDs and typical Scala structures:

abstract class RDD[T] {
  def map[U](f: T => U): RDD[U] = ...
  def flatMap[U](f: T => TraversableOnce[U]): RDD[U] = ...
  def filter(f: T => Boolean): RDD[T] = ...
  def reduce(f: (T, T) => T): T = ...
}

To create and RDD you can transform previous RDDs or you can use the SparkContext (or SparkSession) to create one. Transforming RDD is like calling a high-order function with a map in Scala. SparkContext and SparkSession are the handle to the Spark Cluster.

  • SparkContext.parallelize(s: Seq) converts a Scala collection into an RDD.
  • SparkContext.textFile(s: String) converts the contents of a file into an RDD. The file can be read from local disks, HDFS, Amazon S3 and so on.

RDD provide two kind of operations: Transformations and Actions (or accessors). Transformers are ops tha return a new collection after an input one. Think to map(). Accessors return single values as results, not collections. Think to reduce(). Since Sparks managed collections are RDDs, Spark Transformers take RDDs and return RDDs and Spark Accessors coherently accept RDDs and return values.

The main difference between Transformations and Actions is that Transformations are lazy while Actions are eager. Transformations do not process a single bit unless explicitly requested to do this. That’s why declaring a new RDD happens instantly. On the other hand, Actions must return single values out of the Spark cluster back to the driver program, so they are forced to start computation. The whole pipeline that created the RDD the Action received as input is evaluated. If you count the lines inside a file like this:

val lines = sparkContext.fromFile("...")     // computation does not happen here...
val rowCount = lines.count()                 // but here!

the source file is not read when the lines RDD is declared, but when the rowCount value is evaluated. The same applies for the word count example below:

val lines = sparkContext.fromFile("...")     // nothing happens here...
val words = lines.flatMap(l => l.split(" ")) // nor here...
val wordCount = words.count()                // but here computation fires up

Lazyness allows Spark to push transformation down to the very nodes that hold data, providing data computation locality.

Once again, to tell if an operation is a Transformation (lazy) or an Action (eager), just look at the result type. RDDs indicate a Transformation, while non-RDD types indicate Actions. All the following return an RDD: map, flatMap, distinct, filter. All the following do not return an RDD and are Actions: collect:Array[T], count: Long, take: Array[T], reduce: A, foreach: Unit.

One more important aspect of this type of operation can be highlighted by this example. Let’s suppose that we are looking for the first 10 records in a terabyte log file that match a criteria:

val lastYearLogs: RDD[String] = ...
val firstLogsWithErrors = lastYearLogs.filter(_.contains("ERROR")).take(10)

In a Hadoop scenario, the whole file could be parsed and filtered to select ALL the records featuring the “ERROR” string, just to later take the first 10 and discard the others. But Sparks acts better, because the filter Transformation is not executed until the take() Action is called. The transformation is pushed inside the action and is performed as long as 10 records are collected by take(). No filtering happens on the other billions of records at all!

This is possible because Spark analyzes and optimizes the pipeline before executing it.

There are special type of Transformations that operate on two sets, like:

union(other: RDD[T]): RDD[T]
intersection(other: RDD[T]): RDD[T]
subtract(other: RDD[T]): RDD[T]
cartesian(other: RDD[T]): RDD[T]

Each of the previous combines an existing RDD[T] with another RDD[T].

Other Actions have no symmetry in Scala because are specifical to the scenario Spark was designed to:

takeSample(withRepl: Boolean, num: Int): Array[T]
takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
saveAsTextFile(path: String): Unit
saveAsSequenceFile(path:String): Unit

All the previous are Actions and act eagerly!

Logistic regression is an algorithm to separate two classes of inputs. Think to a cloud of points, half blue and half red. A line could separate them exactly, but at first it is drown with the wrong inclination. Logistic regression rotates the line until it separates the blue dots from the reds.

w <- w - alpha x sum(i <- 1 to n)(g(w; xi, yi))

This is a possible implementation with Spark:

val points = sparkContext.textFile("...").map(parsePoint)
var w = Vector.zeros(d)
for (i <- 1 to numIterations) {
  val gradient = points.map { p =>
    (1 / (1 + exp(-p.y * w.dot(p.x))) - 1) * p.y * p.y
  }.reduce(_ + _)
  w -= alpha * gradient
}

points is being re-evaluated upon every iteration! This is unnecessary. How could be optimized? Well, Spark gives a simple way to avoid recompute an RDD. Just call on it one method from persist() or cache()! This tells Spark that an RDD is going to be used several times and there’s no need to recompute it every time. If possible, keep its data into memory, that’s going to be faster! Let’s see an example.

val lastYearsLogs: RDD[String] = ...
val logsWithErrors = lastYearsLogs.filter(_.contains("ERROR")).persist()
val numErrors = logsWithErrors.count()

Here calling persist() does not hurts, but does not give any performance boost, because logsWithErrors is then used only once. But what happens if we insert another action on it?

val lastYearsLogs: RDD[String] = ...
val logsWithErrors = lastYearsLogs.filter(_.contains("ERROR")).persist()
val numErrors = logsWithErrors.count()
val firstLogsWithErrors = logsWithErrors.take(10)

Now logsWithErrors is referred twice. Rememberd that Sparks triggers computation only when an action is called. Here we have two distinct piplines:

  1. count the record <–from– records with errors <–from– lines in last years logs
  2. take first 10 records <–from– records with errors <–from– lines in last years logs

When pipeline 1 executes, Sparks keep its intermediate RDD logsWithErrors into cluster RAM. This actually transforms the pipelines like this:

  1. count the record <–from– records with errors <–from– lines in last years logs
  2. take first 10 records <–from– persisted RDD from pipeline 1.

The second action is going to return in a much shorter time than before. So lets optimize the logistic regression example. All we have to do is calling persist() on the points RDD:

val points = sparkContext.textFile("...").map(parsePoint).persist()
var w = Vector.zeros(d)
for (i <- 1 to numIterations) {
  val gradient = points.map { p =>
   (1 / (1 + exp(-p.y * w.dot(p.x))) - 1) * p.y * p.y
   }.reduce(_ + _)
   w -= alpha * gradient
 }

RDD can be persisted:

  • in memory as regular Java objects
  • on disk as regular Java objects
  • in memory as serialized Java objects (more compact)
  • on disk as serialized Java objects (more compact)
  • both in memory and on disk (spill over to disk to avoid re-computation)

cache() uses the first method, period. persist() can be customized with one of the previous profile, using keys MEMORY_ONLY, MEMORY_ONLY_SER, MEMORY_AND_DISK, MEMORY_AND_DISK_SER, DISK_ONLY. Third and fourth spill on disk only if there’s not enough memory.

Takeaway: deferred semantics of Spark RDDs are very unlike Scala Collections, due to:

  1. the lazy semantics of RDD transformations
  2. users’ implicit reflex to assume collections are eagerly evaluated

==> Always double check if your code is re-evaluating twice or more the same RDD! <==

Spark clusters are organized in a Master-Workers topology. The driver program is where the SparkContext lives. The Driver chats with the Cluster manager (YARN, Mesos) to allocate resources. Executors are the programs that actually execute computation on the worker nodes. So, usual steps in a Spark application life cycle are:

  1. the driver program runs the Spark application, which creates the SparkContext
  2. the SparkContext connects to a cluster manager (YARN, Mesos, …) which allocates resources
  3. Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application
  4. Next, driver program sends your application code to the executors (yes, we are sending fragments of code across the cluster to be executed)
  5. Finally, SparkContext sends tasks for the executors to run

Reduce operations are the most difficult (if not impossible) to parallelize. For example, while fold() can be parallelized, foldLeft can’t, because foldLeft mapped function can’t be applied in parallel and then applied again to the results of the results obtained from parallel executions. Let’s first recall foldleft signature:

def foldLeft[A,B](zero: A)(f: (A, B) => A): A

So we define a function named folder which is the f in the signature and try this:

def folder(str: String, i: Int) = str + i
val xs = List(1,2,3,4)
val res = xs.foldLeft("")(folder)

Now suppose that Spark splits xs in two inside an RDD:

On worker1: List(1,2)              On worker2: List(3,4)

Here’s the substitution steps:

folder("", 1) -> "1"               folder("", 3) -> "3"
folder("1", 2) -> "12"             folder("3", 4) -> "34"

Now Spark should ideally shuffle both results (“12” and “34”) on one single node and then apply the folder() function again. But it can’t, because both values are String while folder takes a String and an Int! Type mismatch is happening here!

On the other hand, fold has an important restriction: there’s no B! Both argument in the f() function must have the very same type:

def fold[A](zero: A)(f: (A, A) => A): A

Here we see clearly that we do can parallelize fold like this:

folder(0, 1) -> 1                   folder(0, 3) -> 3
folder(1, 2) -> 3                   folder(3, 4) -> 7

Then Spark can apply f again to both output ang obtain a 10. If we instead want a String concatenation we must first transform each Int to a String:

val xs = sc.parallelize(List(1,2,3,4)).map(_.toString)

and then reduce it with fold:

val res = xs.fold("")((a, b) => a + b) // or shortly xs.fold("")(_+_)

Then let’s consider aggregate as it’s declared in Scala:

def aggregate[B](z: => B)(seqop: (B, A) => B, combop: (B, B) => B): B

aggregate() take the best of both worlds because is parallelizable and makes possible to change the return type! In fact the parallelizable code in seqop() possibly changes the type to be finally combined from [A] to [B] and the combinator takes that type [B]. Takeaway: foldLeft and foldRight are not defined on RDD, while other combinators (fold, reduce, aggregate) are all available.

Pair RDD are treated as a special case in Spark. Their type is RDD[(K,V)]. This kind of RDD have special operations on keys, like reduceByKey(), groupByKey() and join():

def groupByKey(): RDD[(K, Iterable[V])]
def reduceByKey(f: (V, V) => V): RDD[(K, V)]
def join[W](other: RDD[K, W]): RDD[(K, (V, W))]

How to create a Pair RDD?

case class WikipediaPage(
    title: String,
    redirectTitle: String,
    timestamp: String,
    lastContributorUsername: String,
    text: String
)

val rdd: RDD[WikipediaPage] = ...
val pairRdd: RDD[(String, String)] = rdd.map(page => (page.title, page.text)) 

Most common transformations on Pair RDD are:

  • groupByKey
  • reduceByKey
  • mapValues
  • keys
  • join
  • leftOuterJoin/rightOuterJoin

While a tipical Action is:

  • countByKey

groupByKey produces a new Pair RDD that has the same key type but has an Interable[V] as values. All the values in the original RDD with a common key are gathered in that Iterable.

reduceByKey goes a step further. After doing the same operation groupByKey does, it applies a function f() to the Iterable containing the list of values and reducing them:

def reduceByKey(f: (V, V) => V): RDD[(K, V)]

An example:

case class Event(organizer: String, name: String, budget: Int)
    
val events = sc.parallelize(...).map(event => (event.organizer, event.budget))
val budgetPerOrganizer = events.reduceByKey(_+_)

By contrast mapValues[U](f: V => U): RDD[(K, U)] applies function f() to the values only. This is a shortcut for:

rdd.map { case (k, v): (k, func(v)) }

Another popular action if countByKey:

def countByKey(): Map[K, Long]

The return type is not an RDD and this complies with the rule: non-RDD -> is an Action.

As an example, we’ll compute the average budget per organizer.

case class Event(organizer: String, name: String, budget: Int)
    
val events = sc.parallelize(...).map(event => (event.organizer, (event.budget, 1)))
val intermediate = events.reduceByKey((a,b) => (a._1 + b._1, a._2 + b._2))
val averageBudgets = 
    intermediate.mapValues(case (budget,count) => budget/count).collect()

Here is an alternative:

case class Event(organizer: String, name: String, budget: Int)
    
val events = sc.parallelize(...).map(evt => (evt.organizer, evt.budget)).persist()
val budgetPerOrganizer = events.reduceByKey(_+_)
val eventsPerOrganizer = events.countByKey()
val intermediate = budgetPerOrganizer.join(eventsPerOrganizer)    
val averageBudgets = intermediate.mapValues((budget,count) => budget/count).collect()

Another frequently used transformation is:

def keys: RDD[K]

It returns a new RDD (lazy transformation) with the keys of the input RDD.

case class Visitor(ip: String, timestamp: String, duration: String)
val visitors: RDD[Visitor] = sc.textfile(...).map(v => (v.ip, v.duration))
val numUniqueVisits = visits.keys.distinct().count()

Partitions

Dataset are divided into partitions. By default Spark creates a number of partition that equals the number of cores available in the whole cluster. Each node is guaranteed to have at least one partition on it. A partition NEVER spans more than one machine, so data belonging to that partition are guaranteed to be close on a shared memory system.

Spark provides two kinds of partitions:

  • Hash partitions
  • Range partitions

Partition customization is only possible on PairRDDs! Hash partitioning attempts to send data on the same host basing on data keys.

val purchasesPerCust = 
    puchasesRdd.map(p => (p.customerId, p.price)).groupByKey()
    
//
// here .groupByKey() first computes the partition of each record (k, v) as:
//
p = k.hashCode() % numberOfPartitions

Range partitioning is important when data is ordered (by int, lexicographically or something) and we thing that this order is important.

As an example, lets suppose that an RDD has the following keys: [8, 96, 240, 400, 401, 800]

Applying hash partitioning, supposing for simplicity that the hashCode() method just returns the key itself and setting the number of partitions to 4, result in the following partitioning:

P0: 9, 96, 240, 400, 800
P1: 401
P2: -empty-
P3: -empty-

A very unbalanced result! Lets see what happens using range partitioning. Using this set of ranges: (1,200), (201, 400), (401, 600), (601, 800), the resulting partitioning is:

P0: 8, 96
P1: 240, 400
P2: 401
P3: 800

A more balanced partitioning! How do we manage partitioning?

  1. By calling partitionBy on an RDD, providing an explicit Partitioner
  2. Using transformations that return RDDs with specific partitioners

Let’s see an example of the first method.

val pairs = purchasesRdd.map(p => (p.customerId, p.price))
    
/*
 * we pass the pairs RDD to the RangePartitioner constructor
 * because it will sample it to guess the best ranges
 */
val tunedPartitioner = new RangePartitioner(8, pairs)
val partitioned = pairs.partitionBy(tunedPartitioner).persist()

Calling .persist() after invoking a partitioner is REALLY important to prevent further, useless shuffling of data across the network, in case Spark needs to re-evaluate the already partitioned RDD.

.sortByKey() by default applies a RANGE partitioner, while .groupByKey() sets a HASH partitioner.

As a general rule, remember that transformations preserve the RDD partitioner as long as possible. This implies a real important exception on .map() and .flatMap(). The reason is that both can alter the PairRDD keys and since partitioning is done on keys, any previous partitioning could be invalidated.

Operations that might cause a shuffle are:

  • cogroup
  • groupWith
  • join
  • leftOuterJoin
  • rightOuterJoin
  • groupByKey
  • reduceByKey
  • combineByKey
  • distinct
  • intersection
  • repartition
  • coalesce

A way to avoid a shuffle (or reduce it to its unavoidable amount) is by spreading computation on the nodes, making it happen locally. For example:

  1. by using reduceByKey() on a pre-partitioned RDD will cause the values to be computed locally, requiring only the final reduced value to be sent from the workers to the driver
  2. join() called on two RDDs pre-partitioned with the same partitioner and cached on the same machine will cause the join to be computed locally, with no shuffling across the network

Transformations with narrow dependencies:

  • map
  • mapValues
  • flatMap
  • filter
  • mapPartitions
  • mapPartitionsWithIndex

Transformations with wide dependencies:

  • cogroup
  • groupWith
  • JOIN family: join, leftOuterJoin, rightOuterJoin,
  • ByKEY family: groupByKey, reduceByKey, combineByKey
  • distinct
  • intersection
  • repartition
  • coalesce

Spark provides the .dependencies() method on RDD to inspect RDD dependency types. And those can be:

  • Narrow dependencies:

    • OneToOneDependency
    • PruneDependency
    • RangeDependency
  • Wide dependencies:

    • ShuffleDependency

Calling the .toDebugString on an RDD returns a description of the DAG.

val lines = sc.textFile("src/main/resources/stackoverflow/stackoverflow.csv")
println(lines.toDebugString)
17/03/19 12:51:06 INFO FileInputFormat: Total input paths to process : 1
(6) src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at <console>:19 []
 |  src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at <console>:19 []
    
val javaLines = lines.filter(_.contains("Java"))
println(javaLines.toDebugString)
(6) MapPartitionsRDD[2] at filter at <console>:20 []
 |  src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at <console>:19 []
 |  src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at <console>:19 []