The other thing to note here is that Spark SQL defensively copies rows when
we switch into user code.  This probably explains the difference between 1
& 2.

The difference between 1 & 3 is likely the cost of decompressing the column
buffers vs. accessing a bunch of uncompressed primitive objects.

On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian <lian.cs....@gmail.com> wrote:

>  Hey Nathan,
>
> Thanks for sharing, this is a very interesting post :) My comments are
> inlined below.
>
> Cheng
>
> On 1/7/15 11:53 AM, Nathan McCarthy wrote:
>
> Hi,
>
>  I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via
> rdd.mapPartitions(…). Using the latest release 1.2.0.
>
>  Simple example; load up some sample data from parquet on HDFS (about
> 380m rows, 10 columns) on a 7 node cluster.
>
>    val t = sqlC.parquetFile("/user/n/sales-tran12m.parquet”)
>    t.registerTempTable("test1”)
>    sqlC.cacheTable("test1”)
>
>  Now lets do some operations on it; I want the total sales & quantities
> sold for each hour in the day so I choose 3 out of the 10 possible
> columns...
>
>    sqlC.sql("select Hour, sum(ItemQty), sum(Sales) from test1 group by
> Hour").collect().foreach(println)
>
>  After the table has been 100% cached in memory, this takes around 11
> seconds.
>
>  Lets do the same thing but via a MapPartitions call (this isn’t
> production ready code but gets the job done).
>
>    val try2 = sqlC.sql("select Hour, ItemQty, Sales from test1”)
>   rddPC.mapPartitions { case hrs =>
>     val qtySum = new Array[Double](24)
>     val salesSum = new Array[Double](24)
>
>      for(r <- hrs) {
>       val hr = r.getInt(0)
>       qtySum(hr) += r.getDouble(1)
>       salesSum(hr) += r.getDouble(2)
>     }
>     (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
>   }.reduceByKey((a,b) => (a._1 + b._1, a._2 +
> b._2)).collect().foreach(println)
>
> I believe the evil thing that makes this snippet much slower is the
> for-loop. According to my early benchmark done with Scala 2.9, for-loop can
> be orders of magnitude slower than a simple while-loop, especially when the
> body of the loop only does something as trivial as this case. The reason is
> that Scala for-loop is translated into corresponding
> foreach/map/flatMap/withFilter function calls. And that's exactly why Spark
> SQL tries to avoid for-loop or any other functional style code in critical
> paths (where every row is touched), we also uses reusable mutable row
> objects instead of the immutable version to improve performance. You may
> check HiveTableScan, ParquetTableScan, InMemoryColumnarTableScan etc. for
> reference. Also, the `sum` function calls in your SQL code are translated
> into `o.a.s.s.execution.Aggregate` operators, which also use imperative
> while-loop and reusable mutable rows.
>
> Another thing to notice is that the `hrs` iterator physically points to
> underlying in-memory columnar byte buffers, and the `for (r <- hrs) { ...
> }` loop actually decompresses and extracts values from required byte
> buffers (this is the "unwrapping" processes you mentioned below).
>
>
>  Now this takes around ~49 seconds… Even though test1 table is 100%
> cached. The number of partitions remains the same…
>
>  Now if I create a simple RDD of a case class HourSum(hour: Int, qty:
> Double, sales: Double)
>
>  Convert the SchemaRDD;
>  val rdd = sqlC.sql("select * from test1").map{ r => HourSum(r.getInt(1),
> r.getDouble(7), r.getDouble(8)) }.cache()
>  //cache all the data
> rdd.count()
>
>  Then run basically the same MapPartitions query;
>
>  rdd.mapPartitions { case hrs =>
>   val qtySum = new Array[Double](24)
>   val salesSum = new Array[Double](24)
>
>    for(r <- hrs) {
>     val hr = r.hour
>     qtySum(hr) += r.qty
>     salesSum(hr) += r.sales
>   }
>   (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
> }.reduceByKey((a,b) => (a._1 + b._1, a._2 +
> b._2)).collect().foreach(println)
>
>  This takes around 1.5 seconds! Albeit the memory footprint is much
> larger.
>
> I guess this 1.5 seconds doesn't include the time spent on caching the
> simple RDD? As I've explained above, in the first `mapPartitions` style
> snippet, columnar byte buffer unwrapping happens within the `mapPartitions`
> call. However, in this version, the unwrapping process happens when the
> `rdd.count()` action is performed. At that point, all values of all columns
> are extracted from underlying byte buffers, and the portion of data you
> need are then manually selected and transformed into the simple case class
> RDD via the `map` call.
>
> If you include time spent on caching the simple case class RDD, it should
> be even slower than the first `mapPartitions` version.
>
>
>  My thinking is that because SparkSQL does store things in a columnar
> format, there is some unwrapping to be done out of the column array buffers
> which takes time and for some reason this just takes longer when I switch
> out to map partitions (maybe its unwrapping the entire row, even though I’m
> using just a subset of columns, or maybe there is some object
> creation/autoboxing going on when calling getInt or getDouble)…
>
>  I’ve tried simpler cases too, like just summing sales. Running sum via
> SQL is fast (4.7 seconds), running a mapPartition sum on a double RDD is
> even faster (2.6 seconds). But MapPartitions on the SchemaRDD;
>
>  *sqlC.sql("select SalesInclGST from test1").mapPartitions(iter =>
> Iterator(iter.foldLeft(0.0)((t,r) => t+r.getDouble(0)))).sum*
>
>   takes a long time (33 seconds). In all these examples everything is
> fully cached in memory. And yes for these kinds of operations I can use
> SQL, but for more complex queries I’d much rather be using a combo of
> SparkSQL to select the data (so I get nice things like Parquet pushdowns
> etc.) & functional Scala!
>
> Again, unfortunately, functional style code like `Iterator.sum` and
> `Iterator.foldLeft` can be really slow on critical paths.
>
>
>  I think I’m doing something dumb… Is there something I should be doing
> to get faster performance on MapPartitions on SchemaRDDs? Is there some
> unwrapping going on in the background that catalyst does in a smart way
> that I’m missing?
>
> It makes sense that people use both Spark SQL and Spark core, especially
> when Spark SQL lacks features users need (like window function, for now).
> The suggestion here is, if you really care about performance (more than
> code readability and maintenance cost), then avoid immutable, functional
> code whenever possible on any critical paths...
>
>
>  Cheers,
> ~N
>
>  Nathan McCarthy
> QUANTIUM
> Level 25, 8 Chifley, 8-12 Chifley Square
> Sydney NSW 2000
>
> T: +61 2 8224 8922
> F: +61 2 9292 6444
>
> W: quantium.com.au <http://www.quantium.com.au>
>
> ------------------------------
>
> linkedin.com/company/quantium <http://www.linkedin.com/company/quantium>
>
> facebook.com/QuantiumAustralia <http://www.facebook.com/QuantiumAustralia>
>
> twitter.com/QuantiumAU <http://www.twitter.com/QuantiumAU>
>
>  The contents of this email, including attachments, may be confidential
> information. If you are not the intended recipient, any use, disclosure or
> copying of the information is unauthorised. If you have
> received this email in error, we would be grateful if you would notify us
> immediately by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 9292
> 6444) and delete the message from your system.
>
>
>

Reply via email to