On 1/11/15 1:40 PM, Nathan McCarthy wrote:
Thanks Cheng & Michael! Makes sense. Appreciate the tips!

Idiomatic scala isn't performant. I’ll definitely start using while loops or tail recursive methods. I have noticed this in the spark code base.

I might try turning off columnar compression (via /spark.sql.inMemoryColumnarStorage.compressed=false /correct?) and see how performance compares to the primitive objects. Would you expect to see similar runtimes vs the primitive objects? We do have the luxury of lots of memory at the moment so this might give us an additional performance boost.
Turning off compression should be faster, but still slower than directly using primitive objects. Because Spark SQL also serializes all objects within a column into byte buffers in a compact format. However, this radically reduces number of Java objects in the heap and is more GC friendly. When running large queries, cost introduced by GC can be significant.

Regarding the defensive copying of row objects. Can we switch this off and just be aware of the risks? Is MapPartitions on SchemaRDDs and operating on the Row object the most performant way to be flipping between SQL & Scala user code? Is there anything else I could be doing?
This can be very dangerous and error prone. Whenever an operator tries to cache row objects, turning off defensive copying can introduce wrong query result. For example, sort-based shuffle caches rows to do sorting. In some cases, sample operator may also cache row objects. This is very implementation specific and may change between versions.


The other thing to note here is that Spark SQL defensively copies rows when we switch into user code. This probably explains the difference between 1 & 2.

The difference between 1 & 3 is likely the cost of decompressing the column buffers vs. accessing a bunch of uncompressed primitive objects.

On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian

    Hey Nathan,

    Thanks for sharing, this is a very interesting post :) My comments
    are inlined below.


    On 1/7/15 11:53 AM, Nathan McCarthy wrote:

    I’m trying to use a combination of SparkSQL and ‘normal'
    Spark/Scala via rdd.mapPartitions(…). Using the latest release

    Simple example; load up some sample data from parquet on HDFS
    (about 380m rows, 10 columns) on a 7 node cluster.

      val t = sqlC.parquetFile("/user/n/sales-tran12m.parquet”)

    Now lets do some operations on it; I want the total sales &
    quantities sold for each hour in the day so I choose 3 out of the
    10 possible columns...

      sqlC.sql("select Hour, sum(ItemQty), sum(Sales) from test1
    group by Hour").collect().foreach(println)

    After the table has been 100% cached in memory, this takes around
    11 seconds.

    Lets do the same thing but via a MapPartitions call (this isn’t
    production ready code but gets the job done).

    val try2 = sqlC.sql("select Hour, ItemQty, Sales from test1”)
    rddPC.mapPartitions { case hrs =>
      val qtySum = new Array[Double](24)
      val salesSum = new Array[Double](24)

      for(r <- hrs) {
        val hr = r.getInt(0)
        qtySum(hr) += r.getDouble(1)
        salesSum(hr) += r.getDouble(2)
      (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
    }.reduceByKey((a,b) => (a._1 + b._1, a._2 +
    I believe the evil thing that makes this snippet much slower is
    the for-loop. According to my early benchmark done with Scala 2.9,
    for-loop can be orders of magnitude slower than a simple
    while-loop, especially when the body of the loop only does
    something as trivial as this case. The reason is that Scala
    for-loop is translated into corresponding
    foreach/map/flatMap/withFilter function calls. And that's exactly
    why Spark SQL tries to avoid for-loop or any other functional
    style code in critical paths (where every row is touched), we also
    uses reusable mutable row objects instead of the immutable version
    to improve performance. You may check HiveTableScan,
    ParquetTableScan, InMemoryColumnarTableScan etc. for reference.
    Also, the `sum` function calls in your SQL code are translated
    into `o.a.s.s.execution.Aggregate` operators, which also use
    imperative while-loop and reusable mutable rows.

    Another thing to notice is that the `hrs` iterator physically
    points to underlying in-memory columnar byte buffers, and the `for
    (r <- hrs) { ... }` loop actually decompresses and extracts values
    from required byte buffers (this is the "unwrapping" processes you
    mentioned below).

    Now this takes around ~49 seconds… Even though test1 table is
    100% cached. The number of partitions remains the same…

    Now if I create a simple RDD of a case class HourSum(hour: Int,
    qty: Double, sales: Double)

    Convert the SchemaRDD;
    val rdd = sqlC.sql("select * from test1").map{ r =>
    HourSum(r.getInt(1), r.getDouble(7), r.getDouble(8)) }.cache()
    //cache all the data

    Then run basically the same MapPartitions query;

    rdd.mapPartitions { case hrs =>
      val qtySum = new Array[Double](24)
      val salesSum = new Array[Double](24)

      for(r <- hrs) {
        val hr = r.hour
        qtySum(hr) += r.qty
        salesSum(hr) += r.sales
      (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
    }.reduceByKey((a,b) => (a._1 + b._1, a._2 +

    This takes around 1.5 seconds! Albeit the memory footprint is
    much larger.
    I guess this 1.5 seconds doesn't include the time spent on caching
    the simple RDD? As I've explained above, in the first
    `mapPartitions` style snippet, columnar byte buffer unwrapping
    happens within the `mapPartitions` call. However, in this version,
    the unwrapping process happens when the `rdd.count()` action is
    performed. At that point, all values of all columns are extracted
    from underlying byte buffers, and the portion of data you need are
    then manually selected and transformed into the simple case class
    RDD via the `map` call.

    If you include time spent on caching the simple case class RDD, it
    should be even slower than the first `mapPartitions` version.

    My thinking is that because SparkSQL does store things in a
    columnar format, there is some unwrapping to be done out of the
    column array buffers which takes time and for some reason this
    just takes longer when I switch out to map partitions (maybe its
    unwrapping the entire row, even though I’m using just a subset of
    columns, or maybe there is some object creation/autoboxing going
    on when calling getInt or getDouble)…

    I’ve tried simpler cases too, like just summing sales. Running
    sum via SQL is fast (4.7 seconds), running a mapPartition sum on
    a double RDD is even faster (2.6 seconds). But MapPartitions on
    the SchemaRDD;

    /sqlC.sql("select SalesInclGST from test1").mapPartitions(iter =>
    Iterator(iter.foldLeft(0.0)((t,r) => t+r.getDouble(0)))).sum/

     takes a long time (33 seconds). In all these examples everything
    is fully cached in memory. And yes for these kinds of operations
    I can use SQL, but for more complex queries I’d much rather be
    using a combo of SparkSQL to select the data (so I get nice
    things like Parquet pushdowns etc.) & functional Scala!
    Again, unfortunately, functional style code like `Iterator.sum`
    and `Iterator.foldLeft` can be really slow on critical paths.

    I think I’m doing something dumb… Is there something I should be
    doing to get faster performance on MapPartitions on SchemaRDDs?
    Is there some unwrapping going on in the background that catalyst
    does in a smart way that I’m missing?
    It makes sense that people use both Spark SQL and Spark core,
    especially when Spark SQL lacks features users need (like window
    function, for now). The suggestion here is, if you really care
    about performance (more than code readability and maintenance
    cost), then avoid immutable, functional code whenever possible on
    any critical paths...


