You can get the internal RDD using: schemaRDD.queryExecution.toRDD. This is used internally and does not copy. This is an unstable developer API.
On Thu, Jan 15, 2015 at 11:26 PM, Nathan McCarthy < nathan.mccar...@quantium.com.au> wrote: > Thanks Cheng! > > Is there any API I can get access too (e.g. ParquetTableScan) which > would allow me to load up the low level/baseRDD of just RDD[Row] so I could > avoid the defensive copy (maybe lose our on columnar storage etc.). > > We have parts of our pipeline using SparkSQL/SchemaRDDs and others using > the core RDD api (mapPartitions etc.). Any tips? > > Out of curiosity, a lot of SparkSQL functions seem to run in a > mapPartiton (e.g. Distinct). Does a defensive copy happen there too? > > Keen to get the best performance and the best blend of SparkSQL and > functional Spark. > > Cheers, > Nathan > > From: Cheng Lian <lian.cs....@gmail.com> > Date: Monday, 12 January 2015 1:21 am > To: Nathan <nathan.mccar...@quantium.com.au>, Michael Armbrust < > mich...@databricks.com> > Cc: "user@spark.apache.org" <user@spark.apache.org> > > Subject: Re: SparkSQL schemaRDD & MapPartitions calls - performance > issues - columnar formats? > > > On 1/11/15 1:40 PM, Nathan McCarthy wrote: > > Thanks Cheng & Michael! Makes sense. Appreciate the tips! > > Idiomatic scala isn't performant. I’ll definitely start using while > loops or tail recursive methods. I have noticed this in the spark code > base. > > I might try turning off columnar compression (via > *spark.sql.inMemoryColumnarStorage.compressed=false > *correct?) and see how performance compares to the primitive objects. > Would you expect to see similar runtimes vs the primitive objects? We do > have the luxury of lots of memory at the moment so this might give us an > additional performance boost. > > Turning off compression should be faster, but still slower than directly > using primitive objects. Because Spark SQL also serializes all objects > within a column into byte buffers in a compact format. However, this > radically reduces number of Java objects in the heap and is more GC > friendly. When running large queries, cost introduced by GC can be > significant. > > > Regarding the defensive copying of row objects. Can we switch this off > and just be aware of the risks? Is MapPartitions on SchemaRDDs and > operating on the Row object the most performant way to be flipping between > SQL & Scala user code? Is there anything else I could be doing? > > This can be very dangerous and error prone. Whenever an operator tries to > cache row objects, turning off defensive copying can introduce wrong query > result. For example, sort-based shuffle caches rows to do sorting. In some > cases, sample operator may also cache row objects. This is very > implementation specific and may change between versions. > > > Cheers, > ~N > > From: Michael Armbrust <mich...@databricks.com> > Date: Saturday, 10 January 2015 3:41 am > To: Cheng Lian <lian.cs....@gmail.com> > Cc: Nathan <nathan.mccar...@quantium.com.au>, "user@spark.apache.org" < > user@spark.apache.org> > Subject: Re: SparkSQL schemaRDD & MapPartitions calls - performance > issues - columnar formats? > > The other thing to note here is that Spark SQL defensively copies rows > when we switch into user code. This probably explains the difference > between 1 & 2. > > The difference between 1 & 3 is likely the cost of decompressing the > column buffers vs. accessing a bunch of uncompressed primitive objects. > > On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian <lian.cs....@gmail.com> wrote: > >> Hey Nathan, >> >> Thanks for sharing, this is a very interesting post :) My comments are >> inlined below. >> >> Cheng >> >> On 1/7/15 11:53 AM, Nathan McCarthy wrote: >> >> Hi, >> >> I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala >> via rdd.mapPartitions(…). Using the latest release 1.2.0. >> >> Simple example; load up some sample data from parquet on HDFS (about >> 380m rows, 10 columns) on a 7 node cluster. >> >> val t = sqlC.parquetFile("/user/n/sales-tran12m.parquet”) >> t.registerTempTable("test1”) >> sqlC.cacheTable("test1”) >> >> Now lets do some operations on it; I want the total sales & quantities >> sold for each hour in the day so I choose 3 out of the 10 possible >> columns... >> >> sqlC.sql("select Hour, sum(ItemQty), sum(Sales) from test1 group by >> Hour").collect().foreach(println) >> >> After the table has been 100% cached in memory, this takes around 11 >> seconds. >> >> Lets do the same thing but via a MapPartitions call (this isn’t >> production ready code but gets the job done). >> >> val try2 = sqlC.sql("select Hour, ItemQty, Sales from test1”) >> rddPC.mapPartitions { case hrs => >> val qtySum = new Array[Double](24) >> val salesSum = new Array[Double](24) >> >> for(r <- hrs) { >> val hr = r.getInt(0) >> qtySum(hr) += r.getDouble(1) >> salesSum(hr) += r.getDouble(2) >> } >> (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator >> }.reduceByKey((a,b) => (a._1 + b._1, a._2 + >> b._2)).collect().foreach(println) >> >> I believe the evil thing that makes this snippet much slower is the >> for-loop. According to my early benchmark done with Scala 2.9, for-loop can >> be orders of magnitude slower than a simple while-loop, especially when the >> body of the loop only does something as trivial as this case. The reason is >> that Scala for-loop is translated into corresponding >> foreach/map/flatMap/withFilter function calls. And that's exactly why Spark >> SQL tries to avoid for-loop or any other functional style code in critical >> paths (where every row is touched), we also uses reusable mutable row >> objects instead of the immutable version to improve performance. You may >> check HiveTableScan, ParquetTableScan, InMemoryColumnarTableScan etc. for >> reference. Also, the `sum` function calls in your SQL code are translated >> into `o.a.s.s.execution.Aggregate` operators, which also use imperative >> while-loop and reusable mutable rows. >> >> Another thing to notice is that the `hrs` iterator physically points to >> underlying in-memory columnar byte buffers, and the `for (r <- hrs) { ... >> }` loop actually decompresses and extracts values from required byte >> buffers (this is the "unwrapping" processes you mentioned below). >> >> >> Now this takes around ~49 seconds… Even though test1 table is 100% >> cached. The number of partitions remains the same… >> >> Now if I create a simple RDD of a case class HourSum(hour: Int, qty: >> Double, sales: Double) >> >> Convert the SchemaRDD; >> val rdd = sqlC.sql("select * from test1").map{ r => >> HourSum(r.getInt(1), r.getDouble(7), r.getDouble(8)) }.cache() >> //cache all the data >> rdd.count() >> >> Then run basically the same MapPartitions query; >> >> rdd.mapPartitions { case hrs => >> val qtySum = new Array[Double](24) >> val salesSum = new Array[Double](24) >> >> for(r <- hrs) { >> val hr = r.hour >> qtySum(hr) += r.qty >> salesSum(hr) += r.sales >> } >> (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator >> }.reduceByKey((a,b) => (a._1 + b._1, a._2 + >> b._2)).collect().foreach(println) >> >> This takes around 1.5 seconds! Albeit the memory footprint is much >> larger. >> >> I guess this 1.5 seconds doesn't include the time spent on caching the >> simple RDD? As I've explained above, in the first `mapPartitions` style >> snippet, columnar byte buffer unwrapping happens within the `mapPartitions` >> call. However, in this version, the unwrapping process happens when the >> `rdd.count()` action is performed. At that point, all values of all columns >> are extracted from underlying byte buffers, and the portion of data you >> need are then manually selected and transformed into the simple case class >> RDD via the `map` call. >> >> If you include time spent on caching the simple case class RDD, it should >> be even slower than the first `mapPartitions` version. >> >> >> My thinking is that because SparkSQL does store things in a columnar >> format, there is some unwrapping to be done out of the column array buffers >> which takes time and for some reason this just takes longer when I switch >> out to map partitions (maybe its unwrapping the entire row, even though I’m >> using just a subset of columns, or maybe there is some object >> creation/autoboxing going on when calling getInt or getDouble)… >> >> I’ve tried simpler cases too, like just summing sales. Running sum via >> SQL is fast (4.7 seconds), running a mapPartition sum on a double RDD is >> even faster (2.6 seconds). But MapPartitions on the SchemaRDD; >> >> *sqlC.sql("select SalesInclGST from test1").mapPartitions(iter => >> Iterator(iter.foldLeft(0.0)((t,r) => t+r.getDouble(0)))).sum* >> >> takes a long time (33 seconds). In all these examples everything is >> fully cached in memory. And yes for these kinds of operations I can use >> SQL, but for more complex queries I’d much rather be using a combo of >> SparkSQL to select the data (so I get nice things like Parquet pushdowns >> etc.) & functional Scala! >> >> Again, unfortunately, functional style code like `Iterator.sum` and >> `Iterator.foldLeft` can be really slow on critical paths. >> >> >> I think I’m doing something dumb… Is there something I should be doing >> to get faster performance on MapPartitions on SchemaRDDs? Is there some >> unwrapping going on in the background that catalyst does in a smart way >> that I’m missing? >> >> It makes sense that people use both Spark SQL and Spark core, especially >> when Spark SQL lacks features users need (like window function, for now). >> The suggestion here is, if you really care about performance (more than >> code readability and maintenance cost), then avoid immutable, functional >> code whenever possible on any critical paths... >> >> >> Cheers, >> ~N >> >> Nathan McCarthy >> QUANTIUM >> Level 25, 8 Chifley, 8-12 Chifley Square >> Sydney NSW 2000 >> >> T: +61 2 8224 8922 >> F: +61 2 9292 6444 >> >> W: quantium.com.au <http://www.quantium.com.au> >> >> ------------------------------ >> >> linkedin.com/company/quantium <http://www.linkedin.com/company/quantium> >> >> facebook.com/QuantiumAustralia >> <http://www.facebook.com/QuantiumAustralia> >> >> twitter.com/QuantiumAU <http://www.twitter.com/QuantiumAU> >> >> The contents of this email, including attachments, may be confidential >> information. If you are not the intended recipient, any use, disclosure or >> copying of the information is unauthorised. If you have >> received this email in error, we would be grateful if you would notify us >> immediately by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 >> 6444) and delete the message from your system. >> >> >> > Nathan McCarthy > QUANTIUM > Level 25, 8 Chifley, 8-12 Chifley Square > Sydney NSW 2000 > > T: +61 2 8224 8922 > F: +61 2 9292 6444 > > W: quantium.com.au <http://www.quantium.com.au> > > ------------------------------ > > linkedin.com/company/quantium <http://www.linkedin.com/company/quantium> > > facebook.com/QuantiumAustralia <http://www.facebook.com/QuantiumAustralia> > > twitter.com/QuantiumAU <http://www.twitter.com/QuantiumAU> > > The contents of this email, including attachments, may be confidential > information. If you are not the intended recipient, any use, disclosure or > copying of the information is unauthorised. If you have > received this email in error, we would be grateful if you would notify us > immediately by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 > 6444) and delete the message from your system. > > > Nathan McCarthy > QUANTIUM > Level 25, 8 Chifley, 8-12 Chifley Square > Sydney NSW 2000 > > T: +61 2 8224 8922 > F: +61 2 9292 6444 > > W: quantium.com.au <http://www.quantium.com.au> > > ------------------------------ > > linkedin.com/company/quantium <http://www.linkedin.com/company/quantium> > > facebook.com/QuantiumAustralia <http://www.facebook.com/QuantiumAustralia> > > twitter.com/QuantiumAU <http://www.twitter.com/QuantiumAU> > > The contents of this email, including attachments, may be confidential > information. If you are not the intended recipient, any use, disclosure or > copying of the information is unauthorised. If you have > received this email in error, we would be grateful if you would notify us > immediately by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 > 6444) and delete the message from your system. >