You can get the internal RDD using: schemaRDD.queryExecution.toRDD.  This
is used internally and does not copy.  This is an unstable developer API.

On Thu, Jan 15, 2015 at 11:26 PM, Nathan McCarthy <
nathan.mccar...@quantium.com.au> wrote:

>  Thanks Cheng!
>
>  Is there any API I can get access too (e.g. ParquetTableScan) which
> would allow me to load up the low level/baseRDD of just RDD[Row] so I could
> avoid the defensive copy (maybe lose our on columnar storage etc.).
>
>  We have parts of our pipeline using SparkSQL/SchemaRDDs and others using
> the core RDD api (mapPartitions etc.). Any tips?
>
>  Out of curiosity, a lot of SparkSQL functions seem to run in a
> mapPartiton (e.g. Distinct). Does a defensive copy happen there too?
>
>  Keen to get the best performance and the best blend of SparkSQL and
> functional Spark.
>
>  Cheers,
> Nathan
>
>   From: Cheng Lian <lian.cs....@gmail.com>
> Date: Monday, 12 January 2015 1:21 am
> To: Nathan <nathan.mccar...@quantium.com.au>, Michael Armbrust <
> mich...@databricks.com>
> Cc: "user@spark.apache.org" <user@spark.apache.org>
>
> Subject: Re: SparkSQL schemaRDD & MapPartitions calls - performance
> issues - columnar formats?
>
>
> On 1/11/15 1:40 PM, Nathan McCarthy wrote:
>
> Thanks Cheng & Michael! Makes sense. Appreciate the tips!
>
>  Idiomatic scala isn't performant. I’ll definitely start using while
> loops or tail recursive methods. I have noticed this in the spark code
> base.
>
>  I might try turning off columnar compression (via 
> *spark.sql.inMemoryColumnarStorage.compressed=false
> *correct?) and see how performance compares to the primitive objects.
> Would you expect to see similar runtimes vs the primitive objects? We do
> have the luxury of lots of memory at the moment so this might give us an
> additional performance boost.
>
> Turning off compression should be faster, but still slower than directly
> using primitive objects. Because Spark SQL also serializes all objects
> within a column into byte buffers in a compact format. However, this
> radically reduces number of Java objects in the heap and is more GC
> friendly. When running large queries, cost introduced by GC can be
> significant.
>
>
>  Regarding the defensive copying of row objects. Can we switch this off
> and just be aware of the risks? Is MapPartitions on SchemaRDDs and
> operating on the Row object the most performant way to be flipping between
> SQL & Scala user code? Is there anything else I could be doing?
>
> This can be very dangerous and error prone. Whenever an operator tries to
> cache row objects, turning off defensive copying can introduce wrong query
> result. For example, sort-based shuffle caches rows to do sorting. In some
> cases, sample operator may also cache row objects. This is very
> implementation specific and may change between versions.
>
>
>  Cheers,
> ~N
>
>   From: Michael Armbrust <mich...@databricks.com>
> Date: Saturday, 10 January 2015 3:41 am
> To: Cheng Lian <lian.cs....@gmail.com>
> Cc: Nathan <nathan.mccar...@quantium.com.au>, "user@spark.apache.org" <
> user@spark.apache.org>
> Subject: Re: SparkSQL schemaRDD & MapPartitions calls - performance
> issues - columnar formats?
>
>   The other thing to note here is that Spark SQL defensively copies rows
> when we switch into user code.  This probably explains the difference
> between 1 & 2.
>
>  The difference between 1 & 3 is likely the cost of decompressing the
> column buffers vs. accessing a bunch of uncompressed primitive objects.
>
> On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian <lian.cs....@gmail.com> wrote:
>
>> Hey Nathan,
>>
>> Thanks for sharing, this is a very interesting post :) My comments are
>> inlined below.
>>
>> Cheng
>>
>> On 1/7/15 11:53 AM, Nathan McCarthy wrote:
>>
>> Hi,
>>
>>  I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala
>> via rdd.mapPartitions(…). Using the latest release 1.2.0.
>>
>>  Simple example; load up some sample data from parquet on HDFS (about
>> 380m rows, 10 columns) on a 7 node cluster.
>>
>>    val t = sqlC.parquetFile("/user/n/sales-tran12m.parquet”)
>>    t.registerTempTable("test1”)
>>    sqlC.cacheTable("test1”)
>>
>>  Now lets do some operations on it; I want the total sales & quantities
>> sold for each hour in the day so I choose 3 out of the 10 possible
>> columns...
>>
>>    sqlC.sql("select Hour, sum(ItemQty), sum(Sales) from test1 group by
>> Hour").collect().foreach(println)
>>
>>  After the table has been 100% cached in memory, this takes around 11
>> seconds.
>>
>>  Lets do the same thing but via a MapPartitions call (this isn’t
>> production ready code but gets the job done).
>>
>>    val try2 = sqlC.sql("select Hour, ItemQty, Sales from test1”)
>>   rddPC.mapPartitions { case hrs =>
>>     val qtySum = new Array[Double](24)
>>     val salesSum = new Array[Double](24)
>>
>>      for(r <- hrs) {
>>       val hr = r.getInt(0)
>>       qtySum(hr) += r.getDouble(1)
>>       salesSum(hr) += r.getDouble(2)
>>     }
>>     (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
>>   }.reduceByKey((a,b) => (a._1 + b._1, a._2 +
>> b._2)).collect().foreach(println)
>>
>> I believe the evil thing that makes this snippet much slower is the
>> for-loop. According to my early benchmark done with Scala 2.9, for-loop can
>> be orders of magnitude slower than a simple while-loop, especially when the
>> body of the loop only does something as trivial as this case. The reason is
>> that Scala for-loop is translated into corresponding
>> foreach/map/flatMap/withFilter function calls. And that's exactly why Spark
>> SQL tries to avoid for-loop or any other functional style code in critical
>> paths (where every row is touched), we also uses reusable mutable row
>> objects instead of the immutable version to improve performance. You may
>> check HiveTableScan, ParquetTableScan, InMemoryColumnarTableScan etc. for
>> reference. Also, the `sum` function calls in your SQL code are translated
>> into `o.a.s.s.execution.Aggregate` operators, which also use imperative
>> while-loop and reusable mutable rows.
>>
>> Another thing to notice is that the `hrs` iterator physically points to
>> underlying in-memory columnar byte buffers, and the `for (r <- hrs) { ...
>> }` loop actually decompresses and extracts values from required byte
>> buffers (this is the "unwrapping" processes you mentioned below).
>>
>>
>>  Now this takes around ~49 seconds… Even though test1 table is 100%
>> cached. The number of partitions remains the same…
>>
>>  Now if I create a simple RDD of a case class HourSum(hour: Int, qty:
>> Double, sales: Double)
>>
>>  Convert the SchemaRDD;
>>  val rdd = sqlC.sql("select * from test1").map{ r =>
>> HourSum(r.getInt(1), r.getDouble(7), r.getDouble(8)) }.cache()
>>  //cache all the data
>> rdd.count()
>>
>>  Then run basically the same MapPartitions query;
>>
>>  rdd.mapPartitions { case hrs =>
>>   val qtySum = new Array[Double](24)
>>   val salesSum = new Array[Double](24)
>>
>>    for(r <- hrs) {
>>     val hr = r.hour
>>     qtySum(hr) += r.qty
>>     salesSum(hr) += r.sales
>>   }
>>   (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
>> }.reduceByKey((a,b) => (a._1 + b._1, a._2 +
>> b._2)).collect().foreach(println)
>>
>>  This takes around 1.5 seconds! Albeit the memory footprint is much
>> larger.
>>
>> I guess this 1.5 seconds doesn't include the time spent on caching the
>> simple RDD? As I've explained above, in the first `mapPartitions` style
>> snippet, columnar byte buffer unwrapping happens within the `mapPartitions`
>> call. However, in this version, the unwrapping process happens when the
>> `rdd.count()` action is performed. At that point, all values of all columns
>> are extracted from underlying byte buffers, and the portion of data you
>> need are then manually selected and transformed into the simple case class
>> RDD via the `map` call.
>>
>> If you include time spent on caching the simple case class RDD, it should
>> be even slower than the first `mapPartitions` version.
>>
>>
>>  My thinking is that because SparkSQL does store things in a columnar
>> format, there is some unwrapping to be done out of the column array buffers
>> which takes time and for some reason this just takes longer when I switch
>> out to map partitions (maybe its unwrapping the entire row, even though I’m
>> using just a subset of columns, or maybe there is some object
>> creation/autoboxing going on when calling getInt or getDouble)…
>>
>>  I’ve tried simpler cases too, like just summing sales. Running sum via
>> SQL is fast (4.7 seconds), running a mapPartition sum on a double RDD is
>> even faster (2.6 seconds). But MapPartitions on the SchemaRDD;
>>
>>  *sqlC.sql("select SalesInclGST from test1").mapPartitions(iter =>
>> Iterator(iter.foldLeft(0.0)((t,r) => t+r.getDouble(0)))).sum*
>>
>>   takes a long time (33 seconds). In all these examples everything is
>> fully cached in memory. And yes for these kinds of operations I can use
>> SQL, but for more complex queries I’d much rather be using a combo of
>> SparkSQL to select the data (so I get nice things like Parquet pushdowns
>> etc.) & functional Scala!
>>
>> Again, unfortunately, functional style code like `Iterator.sum` and
>> `Iterator.foldLeft` can be really slow on critical paths.
>>
>>
>>  I think I’m doing something dumb… Is there something I should be doing
>> to get faster performance on MapPartitions on SchemaRDDs? Is there some
>> unwrapping going on in the background that catalyst does in a smart way
>> that I’m missing?
>>
>> It makes sense that people use both Spark SQL and Spark core, especially
>> when Spark SQL lacks features users need (like window function, for now).
>> The suggestion here is, if you really care about performance (more than
>> code readability and maintenance cost), then avoid immutable, functional
>> code whenever possible on any critical paths...
>>
>>
>>  Cheers,
>> ~N
>>
>>  Nathan McCarthy
>> QUANTIUM
>> Level 25, 8 Chifley, 8-12 Chifley Square
>> Sydney NSW 2000
>>
>> T: +61 2 8224 8922
>> F: +61 2 9292 6444
>>
>> W: quantium.com.au <http://www.quantium.com.au>
>>
>> ------------------------------
>>
>> linkedin.com/company/quantium <http://www.linkedin.com/company/quantium>
>>
>> facebook.com/QuantiumAustralia
>> <http://www.facebook.com/QuantiumAustralia>
>>
>> twitter.com/QuantiumAU <http://www.twitter.com/QuantiumAU>
>>
>>  The contents of this email, including attachments, may be confidential
>> information. If you are not the intended recipient, any use, disclosure or
>> copying of the information is unauthorised. If you have
>> received this email in error, we would be grateful if you would notify us
>> immediately by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 9292
>> 6444) and delete the message from your system.
>>
>>
>>
>   Nathan McCarthy
> QUANTIUM
> Level 25, 8 Chifley, 8-12 Chifley Square
> Sydney NSW 2000
>
> T: +61 2 8224 8922
> F: +61 2 9292 6444
>
> W: quantium.com.au <http://www.quantium.com.au>
>
> ------------------------------
>
> linkedin.com/company/quantium <http://www.linkedin.com/company/quantium>
>
> facebook.com/QuantiumAustralia <http://www.facebook.com/QuantiumAustralia>
>
> twitter.com/QuantiumAU <http://www.twitter.com/QuantiumAU>
>
>  The contents of this email, including attachments, may be confidential
> information. If you are not the intended recipient, any use, disclosure or
> copying of the information is unauthorised. If you have
> received this email in error, we would be grateful if you would notify us
> immediately by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 9292
> 6444) and delete the message from your system.
>
>
>   Nathan McCarthy
>  QUANTIUM
> Level 25, 8 Chifley, 8-12 Chifley Square
> Sydney NSW 2000
>
> T: +61 2 8224 8922
> F: +61 2 9292 6444
>
> W: quantium.com.au <http://www.quantium.com.au>
>
> ------------------------------
>
> linkedin.com/company/quantium <http://www.linkedin.com/company/quantium>
>
> facebook.com/QuantiumAustralia <http://www.facebook.com/QuantiumAustralia>
>
> twitter.com/QuantiumAU <http://www.twitter.com/QuantiumAU>
>
>  The contents of this email, including attachments, may be confidential
> information. If you are not the intended recipient, any use, disclosure or
> copying of the information is unauthorised. If you have
> received this email in error, we would be grateful if you would notify us
> immediately by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 9292
> 6444) and delete the message from your system.
>

Reply via email to