
Understanding Catalyst transformations
Conceptually, the Catalyst optimizer executes two types of transformations. The first one converts an input tree type to the same tree type (that is, without changing the tree type). This type of transformation includes converting one expression to another expression, one Logical Plan to another Logical Plan, and one Physical Plan to another Physical Plan. The second type of transformation converts one tree type to another type, for example, from a Logical Plan to a Physical Plan. A Logical Plan is converted to a Physical Plan by applying a set of strategies. These strategies use pattern matching to convert a tree to the other type. For example, we have specific patterns for matching logical project and filter operators to physical project and filter operators, respectively.
A set of rules can also be combined into a single rule to accomplish a specific transformation. For example, depending on your query, predicates such as filter can be pushed down to reduce the overall number of rows before executing a join operation. In addition, if your query has an expression with constants in your query, then constant folding optimization computes the expression once at the time of compilation instead of repeating it for every row during runtime. Furthermore, if your query requires a subset of columns, then column pruning can help reduce the columns to the essential ones. All these rules can be combined into a single rule to achieve all three transformations.
In the following example, we measure the difference in execution times on Spark 1.6 and Spark 2.2. We use the iPinYou Real-Time Bidding Dataset for Computational Advertising Research in our next example. This Dataset contains the data from three seasons of the iPinYou global RTB bidding algorithm competition. You can download this Dataset from the data server at University College London at http://data.computational-advertising.org/.
First, we define the case classes for the records in the bid transactions and the region files:
scala> case class PinTrans(bidid: String, timestamp: String, ipinyouid: String, useragent: String, IP: String, region: String, city: String, adexchange: String, domain: String, url:String, urlid: String, slotid: String, slotwidth: String, slotheight: String, slotvisibility: String, slotformat: String, slotprice: String, creative: String, bidprice: String)
scala> case class PinRegion(region: String, regionName: String)
Next, we create the DataFrames from one of the bids files and the region file:
scala> val pintransDF = spark.sparkContext.textFile("file:///Users/aurobindosarkar/Downloads/make-ipinyou-data-master/original-data/ipinyou.contest.dataset/training1st/bid.20130314.txt").map(_.split("\t")).map(attributes => PinTrans(attributes(0).trim, attributes(1).trim, attributes(2).trim, attributes(3).trim, attributes(4).trim, attributes(5).trim, attributes(6).trim, attributes(7).trim, attributes(8).trim, attributes(9).trim, attributes(10).trim, attributes(11).trim, attributes(12).trim, attributes(13).trim, attributes(14).trim, attributes(15).trim, attributes(16).trim, attributes(17).trim, attributes(18).trim)).toDF()
scala> val pinregionDF = spark.sparkContext.textFile("file:///Users/aurobindosarkar/Downloads/make-ipinyou-data-master/original-data/ipinyou.contest.dataset/region.en.txt").map(_.split("\t")).map(attributes => PinRegion(attributes(0).trim, attributes(1).trim)).toDF()
Next, we borrow a simple benchmark function (available in several Databricks sample notebooks) to measure the execution time:
scala> def benchmark(name: String)(f: => Unit) {
val startTime = System.nanoTime
f
val endTime = System.nanoTime
println(s"Time taken in $name: " + (endTime - startTime).toDouble / 1000000000 + " seconds")
}
We use the SparkSession object to set the whole-stage code generation parameter off (this roughly translates to the Spark 1.6 environment). We also measure the execution time for a join operation between the two DataFrames:
scala> spark.conf.set("spark.sql.codegen.wholeStage", false)
scala> benchmark("Spark 1.6") {
| pintransDF.join(pinregionDF, "region").count()
| }
Time taken in Spark 1.6: 3.742190552 seconds
Next, we set the whole-stage code generation parameter to true and measure the execution time. We note that the execution time is much lower for the same code in Spark 2.2:
scala> spark.conf.set("spark.sql.codegen.wholeStage", true)
scala> benchmark("Spark 2.2") {
| pintransDF.join(pinregionDF, "region").count()
| }
Time taken in Spark 2.2: 1.881881579 seconds
We use the explain() function to print out the various stages in the Catalyst transformations pipeline. We will explain the following output in more detail in Chapter 11, Tuning Spark SQL Components for Performance:
scala> pintransDF.join(pinregionDF, "region").selectExpr("count(*)").explain(true)




In the next section, we present developer-relevant details of Project Tungsten.