SparkSQL Notebook

This is another notebook, this time about SparkSQL. It is again not intended for public use, it’s only my notebook. Period.

Spark abstraction of Structured Data is called a DataFrame. Think of it like a distributed SQL table. Like tables, DataFrames have a schema which is really important to allow Spark to perform aggressive optimizations. DataFrames are also untyped so the Scala compiler doesn’t statically type check at compile time. Consequently, DataFrame transformations are untyped transformations.

To use SparkSQL, everything starts with the SparkSession object (this example configures Spark to run in local mode):

import org.apache.spark.sql.SparkSession
     
val spark = SparkSession
     .builder()
     .appName("your App name here!")
     .config("spark.master", "local[*]")
     .getOrCreate()

import spark.implicits._

SparkSession is also supposed to replace SparkContext in the future, so keep an eye on it.

DataFrame use usual fundamental data types, like:

  • Byte
  • Short
  • Int (IntegerType)
  • Long
  • java.lang.BigDecimal (DecimalType)
  • Float
  • Double
  • Boolean
  • java.sql.Timestamp
  • java.sql.Date
  • String

But DataFrames can use more structured data types like:

  • Array[T] -> ArrayType(elementType, containsNull)
  • Map[K, V] -> MapType(keyType, valueType, valueContainsNull)
  • case class -> StructType(List[StructFields])

To make data types available, import them:

import org.apache.spark.sql.types._

DataFrames can be created:

  • from an existing RDD, either with schema inference or by providing one
  • reading in a specific data source from file (csv, JSON, …)

Case 1 with inferred schema:

val tupleRDD: RDD[(Int, String)] = ...
val tupleDF = tupleRDD.toDF("id", "name")

Calling .toDF() without arguments assign progressive numeric names to columns. In this case: _1, _2, _3, _4…

scala> val rdd = sc.parallelize(1->"Andrea", 2->"Elisa", 3->"Spino")

scala> rdd.toDF
res1: org.apache.spark.sql.DataFrame = [_1: int, _2: string]

scala> rdd.toDF("Id", "name")
res2: org.apache.spark.sql.DataFrame = [Id: int, name: string]

If you already have an RDD containing some class, the attributes are inferred from class fields:

case class Person(id: Int, name: String, city: String)
val peopleRDD: RDD[Person] = ...
val poepleDF = peopleRDD.toDF

Case 1 with provided schema:

case class Person(name: String, age: Int)
val peopleRdd: RDD[Person] = sc.textFile(...).map(personParseFunction)

/*
 * this is the encoding of the schema as a String
 */
val schemaString = "name age"
 
/*
 * generate the schema based on the string of schema, by calling
 * StructField on each field encoded in the string and applying
 * the StringType type to it
 */
val fields = schemaString.split(" ")
    .map(field => StructField(field, StringType, nullable = true))

/*
 * then build the whole schema
 */
val schema = StructType(fields)
 
/*
 * convert records to rows
 */
val rowRDD = peopleRDD.map(p => Row(p.name, p.age))
   
/*
 * apply the schema
 */
val peopleDF = spark.createDataFrame(rowRDD, schema)

Case 2:

/*
 * using the read method of the SparkSession object we can directly
 * ingest data
 */
val df = spark.read.json("my_data.json")   

Case 2 can be applied to more data format. The most common are of course JSON, CSV and Parquet. JDBC is also an option, by providing a query.

To make a DataFrame available in SQL literals, you have to register it:

/*
* how to register a DataFrame for future referring into SQL literals,
* like in a "FROM <data_frame_name>" fragment
*/
peopleDF.createOrReplaceTempView("people")

/*
* this is how you do a literal query on it (note the "FROM people"
* which refers to the previously registere DataFrame)
*/
val adultsDF = spark.sql("SELECT * FROM people WHERE age > 17")

To inspect DataFrame data, use the .show() method which pretty prints the first N records. The .printSchema() prints the schema in a tree format:

// root
//  |-- id: integer (nullable = true)
//  |-- fname: string (nullable = true)
//  |-- lname: string (nullable = true)
//  |-- age: integer (nullable = true)
//  |-- city: string (nullable = true)

Some of the most common transformations include:

def select(col: String, cols: String*): DataFrame
def agg(expr: Column, expls: Columns*): DataFrame
def groupBy(col: Sring, cols: String*): DataFrame // or something
def join(right: DataFrame): DataFrame

Colums can be specified using three syntaxes:

  • Using $-notation (requires spark.implicits._):
df.filter($"age" > 18)
  • Referring to the Dataframe
df.filter(df("age") > 18)
  • Using SQL query string
df.filter("age > 18")

While the third is the most comfortable, it requires some parsing from Spark, so the second is preferable, but being impractical to rewrite the name of the Dataframe for each column, the $-notation is the most practical.

This is a filter example:

case class Person(id: Int, fname: String, lname: String, age: Int, wage: Double)
val personDF = sc.parallelize(...).toDF
	
val olderThan10 = personDF.select("lname", "fname")
                      	.where("age > 10")
                      	.orderBy("lname")

.filter() and .where() are aliases. Complex expressions are allowed:

personDF.where(($"age" > 10) && ($"lname" != "Smith")).show()

Spark provides several aggregation functions like count, sum, max, min, avg.

Aggregations in Spark are supposed to happen after a groupBy which in fact do return a RelationalGroupedDataset, not a plain Dataset, which is intended to support aggregations.

df.groupBy($"attr1").agg(sum($"attr2"))
df.groupBy($"attr1").count($"attr2")

/*
 * find the wealthiest persons by age
 */   
personDF.groupBy($"age").max($"wage")

val rankedDF = postsDF.groupBy($"authorID", $"subforum")
                      .agg(count($"authorID"))
                      .orderBy($"subforum", $"count(authorID)".desc)

To drop record with null values we can use the .drop() method:

  • drop(): drops rows that contain at least one null or NaN
  • drop(“all”): drops rows that contain null or NaN only (whole record is null)
  • drop(Array(“frname”, “lname”)): drops record with null in the specified columns

The fill(x) method replaces all occurrences of null values with value x. Called with a Map, replaces occurrences of null only in specified columns with specified values:

fill(Map("age" -> 0, "wage" -> 1000))

Called with an Array of columns and a Map replaces the keys of the Map with paired values only in the columns included in the Array:

replace(Array("age"), Map(9->10,8->10))

DataFrames have actions like RDD:

  • collect(): Array[Row]
  • count(): Long
  • first(): Row
  • head(): Row
  • take(n: Int): Array[Row]
  • show(): Unit

The show() actions displays the first 20 rows of a DataFrame.

Joining an DataFrame is possible in the usual way:

df1.join(df2, $"df1.id" === $"df2.id", "join_type")

where join_type can be:

  • inner (default)
  • outer
  • left_outer
  • right_outer
  • leftsemi

As already seen with RDDs, if we have to both join and filter, then filtering before joining is more efficient than the other way out, because filtering reduces the number of fields join has to later work on. However, SparkSQL has more opportunities to optimize the job because of the strongest structure of data. But also thanks to Catalyst, a query optimizer, and Tungsten, a heap optimizer. Catalyst is able to:

  • reorder operations
  • reduce the amount of data read
  • prune unneeded partitions

Tungsten on the other hand can optimize the way data is encoded in memory, providing highly optimized encoders.

DataFrame limitations!

DataFrames are untyped! This implies that Scala compiler can’t type-check nor schema-check any statement. This brings back run time errors, something Scala made us forget of.

Limited data types! If your data cna’t be expressed as case classes, Tungsten is unable to do its magic.

Require semi-structured or structured data! Unstructured data can be processed only by RDDs.

DataFrames also return Rows, not regular Scala type. Here is an example:

case class Person(id: Int, fname: String, lname: String, age: Int, wage: Double)
val personDF = sc.parallelize(...).toDF
    
val olderThan10 = personDF.select("fname", "lname")
                          .where("age > 10")
                          .orderBy("lname")
                          .collect()

The collect() action does not return an Array[String] but a org.apache.spark.sql.Row, which is the untyped interface DataFrames use to return data. A Row is schemaless, so the programmer has to remember the schema and to cast each field to proper type:

olderThan10.map { row => (
    row(0).asInstanceOf[String], 
    row(1).asInstanceOf[String]
)}

which is ugly, error prone and tedious. When doing interactive exploration, we can rely on the .printTreeString() method to gain some clue on a Row schema. But this can’t eliminate the risk of a miscast that throws a java.lang.ClassCastException! This is why another interface was born:

DataSets!

Actually DataFrames are Datasets:

type DataFrame = Dataset[Row]

Datasets are 1. typed 2. distributed 3. collections of data. Datasets require structured or semi-structured data, but can mix with RDD. This is a short example of Dataset API:

exampleDS.groupByKey(l => l.key_column).agg(avg($"price").as[Double])

The .as[T] metod do the cast required to create a TypedColumn which is an evolution over DataFramse’s Columns.

To create a Dataset from a DataFrame or from an RDD you can call the .toDS method:

val dataset1 = dataframe.toDS
val dataset2 = rdd.toDS

The other way is to read data from a file:

val dataset3 = spark.read.json("people.json").as[Person]

Datasets have usual transformations:

map[U](f: T => U): Dataset[U]
flatMap[U](f: T => TraversableOnce[U]): Dataset[U]
filter(pred: T => Boolean): Dataset[T]
distinct(): Dataset[T]
coalesce(numPartitions: Int): Dataset[T]
repartition(numPartitions: Int): Dataset[T]
groupByKey[K](f: T => K): KeyValueGroupedDataset[K, T]

The return type of groupByKey() is a special Dataset which provides a lot of aggregation functions that actually return a Dataset.

reduceGroups(f: (V, V) => V): Dataset[(K, V)]
agg[U](col: TypedColumn[V, U]): Dataset[(K, V)]

The last is used like this:

someDS.agg(sum($"field").as[Double])

Is important to call .as[T] to cast the column to a TypedColum.

mapGroups[U](f: (K, Iterator[V]) => U): Dataset[U]
flatMapGroups[U](f: (K, Iterator[V]) => TraversableOnce[U]): Dataset[U]

The previous methods map a function on each group of data. Together with groupByKey(), these calls can emulate a reduceByKey() call:

keyValuesDS.groupByKey(p => p._1)
           .mapGroups((k, vs) => (k, vs.foldLeft("")((acc, p) => acc + p._2)))

However the documentation states that mapGroups and flatMapGroup force a shuffle which involves high latency, so never use it unless there’s no other choice. A better approach is to use reduceGroups():

keyValuesDS.groupByKey(p => p._1)                   // 1. group by key
           .mapValues(p => p._2)                    // 2. extract the value only
           .reduceGroups((acc, str) => acc + str)   // 3. reduce it

An Aggregator is a class that generically aggregates data.

class Aggregator[-IN, BUF, OUT]

where IN is the input type of the aggregator, otherwise said the (key, value) pair produced by the groupByKey call. BUF is the type of intermediate transformations and OUT is the output of the aggregation. This is the Aggregator interface:

val myAgg = new Aggregator[IN, BUF, OUT] {
    def zero: BUF = ...
    def reduce(b: BUF, a: IN): BUF = ...
    def merge(b1: BUF, b2: BUG): BUF = ...
    def finish(b: BUF): OUT = ...
}.toColumn // this converts the class to a TypeColumn

Here is an example implementation of an Aggregator which concatenates strings:

val keyValues = List(
    039 -> "Milano", 039 -> "Roma", 
    033 -> "Paris", 033 -> Nice,
    049 -> "Berlin")

val ds = keyValues.toDS

import spark.implicits._ // imports the Encoders too

/*
 * this is the Aggregator implementation
 */
val strConcat = new Aggregator[(Int, String), String, String] {
    def zero = ""
    def reduce(b: String, a: (Int, String)) = b + ", " + a._2
    def merge(b1: String, b2: String) = b1 + ", " + b2
    def finish(r: String): String = r
    override def bufferEncoder: Encoder[String] = Encoders.STRING
    override def outputEncoder: Encoder[String] = Encoders.STRING
}.toColumn

/*
 * this is how it's used by .agg()
 */
ds.groupByKey(pair => pair._1).agg(strConcat.as[String])

Now something on Actions:

collect(): Array[T]
count(): Long
first(): T
head(): T
foreach(f: T => Unit): Unit
reduce(f: (T, T) => T): T
show(): Unit
take(n: Int): Array[T]

Actions are much more similar to RDD actions and return familiar Scala types. There’s not much more to add.

Takeaways about Spark possible APIs:

Use Datasets when you…

  • have (semi)structured data
  • want typesafety
  • need to work with functional API
  • need good performance, even if not optimal

Use DataFrames when you…

  • have (semi)tructured data
  • want the best possible performance (Tungsten, Catalyst)

Use RDDs when you…

  • have unstructured data
  • need to do some low level computations
  • have complex data types that cannot be serialized with Encoders