Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats?

2015-01-20 Thread Cheng Lian

On 1/15/15 11:26 PM, Nathan McCarthy wrote:


Thanks Cheng!

Is there any API I can get access too (e.g. ParquetTableScan) which 
would allow me to load up the low level/baseRDD of just RDD[Row] so I 
could avoid the defensive copy (maybe lose our on columnar storage etc.).


We have parts of our pipeline using SparkSQL/SchemaRDDs and others 
using the core RDD api (mapPartitions etc.). Any tips?


(Michael has already answered this)



Out of curiosity, a lot of SparkSQL functions seem to run in a 
mapPartiton (e.g. Distinct). Does a defensive copy happen there too?


Only if necessary. For example, |Sort| does defensive copy as it needs 
to cache rows for sorting.




Keen to get the best performance and the best blend of SparkSQL and 
functional Spark.


Cheers,
Nathan

From: Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com
Date: Monday, 12 January 2015 1:21 am
To: Nathan nathan.mccar...@quantium.com.au 
mailto:nathan.mccar...@quantium.com.au, Michael Armbrust 
mich...@databricks.com mailto:mich...@databricks.com
Cc: user@spark.apache.org mailto:user@spark.apache.org 
user@spark.apache.org mailto:user@spark.apache.org
Subject: Re: SparkSQL schemaRDD  MapPartitions calls - performance 
issues - columnar formats?



On 1/11/15 1:40 PM, Nathan McCarthy wrote:

Thanks Cheng  Michael! Makes sense. Appreciate the tips!

Idiomatic scala isn't performant. I’ll definitely start using while 
loops or tail recursive methods. I have noticed this in the spark 
code base.


I might try turning off columnar compression (via 
/spark.sql.inMemoryColumnarStorage.compressed=false /correct?) and 
see how performance compares to the primitive objects. Would you 
expect to see similar runtimes vs the primitive objects? We do have 
the luxury of lots of memory at the moment so this might give us an 
additional performance boost.
Turning off compression should be faster, but still slower than 
directly using primitive objects. Because Spark SQL also serializes 
all objects within a column into byte buffers in a compact format. 
However, this radically reduces number of Java objects in the heap and 
is more GC friendly. When running large queries, cost introduced by GC 
can be significant.


Regarding the defensive copying of row objects. Can we switch this 
off and just be aware of the risks? Is MapPartitions on SchemaRDDs 
and operating on the Row object the most performant way to be 
flipping between SQL  Scala user code? Is there anything else I 
could be doing?
This can be very dangerous and error prone. Whenever an operator tries 
to cache row objects, turning off defensive copying can introduce 
wrong query result. For example, sort-based shuffle caches rows to do 
sorting. In some cases, sample operator may also cache row objects. 
This is very implementation specific and may change between versions.


Cheers,
~N

From: Michael Armbrust mich...@databricks.com 
mailto:mich...@databricks.com

Date: Saturday, 10 January 2015 3:41 am
To: Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com
Cc: Nathan nathan.mccar...@quantium.com.au 
mailto:nathan.mccar...@quantium.com.au, user@spark.apache.org 
mailto:user@spark.apache.org user@spark.apache.org 
mailto:user@spark.apache.org
Subject: Re: SparkSQL schemaRDD  MapPartitions calls - performance 
issues - columnar formats?


The other thing to note here is that Spark SQL defensively copies 
rows when we switch into user code.  This probably explains the 
difference between 1  2.


The difference between 1  3 is likely the cost of decompressing the 
column buffers vs. accessing a bunch of uncompressed primitive objects.


On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian lian.cs@gmail.com 
mailto:lian.cs@gmail.com wrote:


Hey Nathan,

Thanks for sharing, this is a very interesting post :) My
comments are inlined below.

Cheng

On 1/7/15 11:53 AM, Nathan McCarthy wrote:

Hi,

I’m trying to use a combination of SparkSQL and ‘normal'
Spark/Scala via rdd.mapPartitions(…). Using the latest release
1.2.0.

Simple example; load up some sample data from parquet on HDFS
(about 380m rows, 10 columns) on a 7 node cluster.

  val t = sqlC.parquetFile(/user/n/sales-tran12m.parquet”)
t.registerTempTable(test1”)
sqlC.cacheTable(test1”)

Now lets do some operations on it; I want the total sales 
quantities sold for each hour in the day so I choose 3 out of
the 10 possible columns...

sqlC.sql(select Hour, sum(ItemQty), sum(Sales) from test1 group
by Hour).collect().foreach(println)

After the table has been 100% cached in memory, this takes
around 11 seconds.

Lets do the same thing but via a MapPartitions call (this isn’t
production ready code but gets the job done).

  val try2 = sqlC.sql(select Hour, ItemQty, Sales from test1”)
rddPC.mapPartitions { case hrs =
val qtySum = new Array[Double](24)
val salesSum = new Array[Double](24)

for(r - hrs

Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats?

2015-01-16 Thread Michael Armbrust
You can get the internal RDD using: schemaRDD.queryExecution.toRDD.  This
is used internally and does not copy.  This is an unstable developer API.

On Thu, Jan 15, 2015 at 11:26 PM, Nathan McCarthy 
nathan.mccar...@quantium.com.au wrote:

  Thanks Cheng!

  Is there any API I can get access too (e.g. ParquetTableScan) which
 would allow me to load up the low level/baseRDD of just RDD[Row] so I could
 avoid the defensive copy (maybe lose our on columnar storage etc.).

  We have parts of our pipeline using SparkSQL/SchemaRDDs and others using
 the core RDD api (mapPartitions etc.). Any tips?

  Out of curiosity, a lot of SparkSQL functions seem to run in a
 mapPartiton (e.g. Distinct). Does a defensive copy happen there too?

  Keen to get the best performance and the best blend of SparkSQL and
 functional Spark.

  Cheers,
 Nathan

   From: Cheng Lian lian.cs@gmail.com
 Date: Monday, 12 January 2015 1:21 am
 To: Nathan nathan.mccar...@quantium.com.au, Michael Armbrust 
 mich...@databricks.com
 Cc: user@spark.apache.org user@spark.apache.org

 Subject: Re: SparkSQL schemaRDD  MapPartitions calls - performance
 issues - columnar formats?


 On 1/11/15 1:40 PM, Nathan McCarthy wrote:

 Thanks Cheng  Michael! Makes sense. Appreciate the tips!

  Idiomatic scala isn't performant. I’ll definitely start using while
 loops or tail recursive methods. I have noticed this in the spark code
 base.

  I might try turning off columnar compression (via 
 *spark.sql.inMemoryColumnarStorage.compressed=false
 *correct?) and see how performance compares to the primitive objects.
 Would you expect to see similar runtimes vs the primitive objects? We do
 have the luxury of lots of memory at the moment so this might give us an
 additional performance boost.

 Turning off compression should be faster, but still slower than directly
 using primitive objects. Because Spark SQL also serializes all objects
 within a column into byte buffers in a compact format. However, this
 radically reduces number of Java objects in the heap and is more GC
 friendly. When running large queries, cost introduced by GC can be
 significant.


  Regarding the defensive copying of row objects. Can we switch this off
 and just be aware of the risks? Is MapPartitions on SchemaRDDs and
 operating on the Row object the most performant way to be flipping between
 SQL  Scala user code? Is there anything else I could be doing?

 This can be very dangerous and error prone. Whenever an operator tries to
 cache row objects, turning off defensive copying can introduce wrong query
 result. For example, sort-based shuffle caches rows to do sorting. In some
 cases, sample operator may also cache row objects. This is very
 implementation specific and may change between versions.


  Cheers,
 ~N

   From: Michael Armbrust mich...@databricks.com
 Date: Saturday, 10 January 2015 3:41 am
 To: Cheng Lian lian.cs@gmail.com
 Cc: Nathan nathan.mccar...@quantium.com.au, user@spark.apache.org 
 user@spark.apache.org
 Subject: Re: SparkSQL schemaRDD  MapPartitions calls - performance
 issues - columnar formats?

   The other thing to note here is that Spark SQL defensively copies rows
 when we switch into user code.  This probably explains the difference
 between 1  2.

  The difference between 1  3 is likely the cost of decompressing the
 column buffers vs. accessing a bunch of uncompressed primitive objects.

 On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian lian.cs@gmail.com wrote:

 Hey Nathan,

 Thanks for sharing, this is a very interesting post :) My comments are
 inlined below.

 Cheng

 On 1/7/15 11:53 AM, Nathan McCarthy wrote:

 Hi,

  I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala
 via rdd.mapPartitions(…). Using the latest release 1.2.0.

  Simple example; load up some sample data from parquet on HDFS (about
 380m rows, 10 columns) on a 7 node cluster.

val t = sqlC.parquetFile(/user/n/sales-tran12m.parquet”)
t.registerTempTable(test1”)
sqlC.cacheTable(test1”)

  Now lets do some operations on it; I want the total sales  quantities
 sold for each hour in the day so I choose 3 out of the 10 possible
 columns...

sqlC.sql(select Hour, sum(ItemQty), sum(Sales) from test1 group by
 Hour).collect().foreach(println)

  After the table has been 100% cached in memory, this takes around 11
 seconds.

  Lets do the same thing but via a MapPartitions call (this isn’t
 production ready code but gets the job done).

val try2 = sqlC.sql(select Hour, ItemQty, Sales from test1”)
   rddPC.mapPartitions { case hrs =
 val qtySum = new Array[Double](24)
 val salesSum = new Array[Double](24)

  for(r - hrs) {
   val hr = r.getInt(0)
   qtySum(hr) += r.getDouble(1)
   salesSum(hr) += r.getDouble(2)
 }
 (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
   }.reduceByKey((a,b) = (a._1 + b._1, a._2 +
 b._2)).collect().foreach(println)

 I believe the evil thing that makes this snippet much slower

Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats?

2015-01-15 Thread Nathan McCarthy
Thanks Cheng!

Is there any API I can get access too (e.g. ParquetTableScan) which would allow 
me to load up the low level/baseRDD of just RDD[Row] so I could avoid the 
defensive copy (maybe lose our on columnar storage etc.).

We have parts of our pipeline using SparkSQL/SchemaRDDs and others using the 
core RDD api (mapPartitions etc.). Any tips?

Out of curiosity, a lot of SparkSQL functions seem to run in a mapPartiton 
(e.g. Distinct). Does a defensive copy happen there too?

Keen to get the best performance and the best blend of SparkSQL and functional 
Spark.

Cheers,
Nathan

From: Cheng Lian lian.cs@gmail.commailto:lian.cs@gmail.com
Date: Monday, 12 January 2015 1:21 am
To: Nathan 
nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au, 
Michael Armbrust mich...@databricks.commailto:mich...@databricks.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: SparkSQL schemaRDD  MapPartitions calls - performance issues - 
columnar formats?


On 1/11/15 1:40 PM, Nathan McCarthy wrote:
Thanks Cheng  Michael! Makes sense. Appreciate the tips!

Idiomatic scala isn't performant. I’ll definitely start using while loops or 
tail recursive methods. I have noticed this in the spark code base.

I might try turning off columnar compression (via 
spark.sql.inMemoryColumnarStorage.compressed=false correct?) and see how 
performance compares to the primitive objects. Would you expect to see similar 
runtimes vs the primitive objects? We do have the luxury of lots of memory at 
the moment so this might give us an additional performance boost.
Turning off compression should be faster, but still slower than directly using 
primitive objects. Because Spark SQL also serializes all objects within a 
column into byte buffers in a compact format. However, this radically reduces 
number of Java objects in the heap and is more GC friendly. When running large 
queries, cost introduced by GC can be significant.

Regarding the defensive copying of row objects. Can we switch this off and just 
be aware of the risks? Is MapPartitions on SchemaRDDs and operating on the Row 
object the most performant way to be flipping between SQL  Scala user code? Is 
there anything else I could be doing?
This can be very dangerous and error prone. Whenever an operator tries to cache 
row objects, turning off defensive copying can introduce wrong query result. 
For example, sort-based shuffle caches rows to do sorting. In some cases, 
sample operator may also cache row objects. This is very implementation 
specific and may change between versions.

Cheers,
~N

From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com
Date: Saturday, 10 January 2015 3:41 am
To: Cheng Lian lian.cs@gmail.commailto:lian.cs@gmail.com
Cc: Nathan 
nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au, 
user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: SparkSQL schemaRDD  MapPartitions calls - performance issues - 
columnar formats?

The other thing to note here is that Spark SQL defensively copies rows when we 
switch into user code.  This probably explains the difference between 1  2.

The difference between 1  3 is likely the cost of decompressing the column 
buffers vs. accessing a bunch of uncompressed primitive objects.

On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian 
lian.cs@gmail.commailto:lian.cs@gmail.com wrote:
Hey Nathan,

Thanks for sharing, this is a very interesting post :) My comments are inlined 
below.

Cheng

On 1/7/15 11:53 AM, Nathan McCarthy wrote:
Hi,

I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via 
rdd.mapPartitions(…). Using the latest release 1.2.0.

Simple example; load up some sample data from parquet on HDFS (about 380m rows, 
10 columns) on a 7 node cluster.

  val t = sqlC.parquetFile(/user/n/sales-tran12m.parquet”)
  t.registerTempTable(test1”)
  sqlC.cacheTable(test1”)

Now lets do some operations on it; I want the total sales  quantities sold for 
each hour in the day so I choose 3 out of the 10 possible columns...

  sqlC.sql(select Hour, sum(ItemQty), sum(Sales) from test1 group by 
Hour).collect().foreach(println)

After the table has been 100% cached in memory, this takes around 11 seconds.

Lets do the same thing but via a MapPartitions call (this isn’t production 
ready code but gets the job done).

  val try2 = sqlC.sql(select Hour, ItemQty, Sales from test1”)
  rddPC.mapPartitions { case hrs =
val qtySum = new Array[Double](24)
val salesSum = new Array[Double](24)

for(r - hrs) {
  val hr = r.getInt(0)
  qtySum(hr) += r.getDouble(1)
  salesSum(hr) += r.getDouble(2)
}
(salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
  }.reduceByKey((a,b) = (a._1 + b._1, a._2 + b._2)).collect().foreach(println)
I believe the evil thing that makes this snippet much slower is the for-loop

Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats?

2015-01-11 Thread Cheng Lian


On 1/11/15 1:40 PM, Nathan McCarthy wrote:

Thanks Cheng  Michael! Makes sense. Appreciate the tips!

Idiomatic scala isn't performant. I’ll definitely start using while 
loops or tail recursive methods. I have noticed this in the spark code 
base.


I might try turning off columnar compression (via 
/spark.sql.inMemoryColumnarStorage.compressed=false /correct?) and see 
how performance compares to the primitive objects. Would you expect to 
see similar runtimes vs the primitive objects? We do have the luxury 
of lots of memory at the moment so this might give us an additional 
performance boost.
Turning off compression should be faster, but still slower than directly 
using primitive objects. Because Spark SQL also serializes all objects 
within a column into byte buffers in a compact format. However, this 
radically reduces number of Java objects in the heap and is more GC 
friendly. When running large queries, cost introduced by GC can be 
significant.


Regarding the defensive copying of row objects. Can we switch this off 
and just be aware of the risks? Is MapPartitions on SchemaRDDs and 
operating on the Row object the most performant way to be flipping 
between SQL  Scala user code? Is there anything else I could be doing?
This can be very dangerous and error prone. Whenever an operator tries 
to cache row objects, turning off defensive copying can introduce wrong 
query result. For example, sort-based shuffle caches rows to do sorting. 
In some cases, sample operator may also cache row objects. This is very 
implementation specific and may change between versions.


Cheers,
~N

From: Michael Armbrust mich...@databricks.com 
mailto:mich...@databricks.com

Date: Saturday, 10 January 2015 3:41 am
To: Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com
Cc: Nathan nathan.mccar...@quantium.com.au 
mailto:nathan.mccar...@quantium.com.au, user@spark.apache.org 
mailto:user@spark.apache.org user@spark.apache.org 
mailto:user@spark.apache.org
Subject: Re: SparkSQL schemaRDD  MapPartitions calls - performance 
issues - columnar formats?


The other thing to note here is that Spark SQL defensively copies rows 
when we switch into user code.  This probably explains the difference 
between 1  2.


The difference between 1  3 is likely the cost of decompressing the 
column buffers vs. accessing a bunch of uncompressed primitive objects.


On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian lian.cs@gmail.com 
mailto:lian.cs@gmail.com wrote:


Hey Nathan,

Thanks for sharing, this is a very interesting post :) My comments
are inlined below.

Cheng

On 1/7/15 11:53 AM, Nathan McCarthy wrote:

Hi,

I’m trying to use a combination of SparkSQL and ‘normal'
Spark/Scala via rdd.mapPartitions(…). Using the latest release
1.2.0.

Simple example; load up some sample data from parquet on HDFS
(about 380m rows, 10 columns) on a 7 node cluster.

  val t = sqlC.parquetFile(/user/n/sales-tran12m.parquet”)
t.registerTempTable(test1”)
  sqlC.cacheTable(test1”)

Now lets do some operations on it; I want the total sales 
quantities sold for each hour in the day so I choose 3 out of the
10 possible columns...

  sqlC.sql(select Hour, sum(ItemQty), sum(Sales) from test1
group by Hour).collect().foreach(println)

After the table has been 100% cached in memory, this takes around
11 seconds.

Lets do the same thing but via a MapPartitions call (this isn’t
production ready code but gets the job done).

val try2 = sqlC.sql(select Hour, ItemQty, Sales from test1”)
rddPC.mapPartitions { case hrs =
  val qtySum = new Array[Double](24)
  val salesSum = new Array[Double](24)

  for(r - hrs) {
val hr = r.getInt(0)
qtySum(hr) += r.getDouble(1)
salesSum(hr) += r.getDouble(2)
  }
  (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
}.reduceByKey((a,b) = (a._1 + b._1, a._2 +
b._2)).collect().foreach(println)

I believe the evil thing that makes this snippet much slower is
the for-loop. According to my early benchmark done with Scala 2.9,
for-loop can be orders of magnitude slower than a simple
while-loop, especially when the body of the loop only does
something as trivial as this case. The reason is that Scala
for-loop is translated into corresponding
foreach/map/flatMap/withFilter function calls. And that's exactly
why Spark SQL tries to avoid for-loop or any other functional
style code in critical paths (where every row is touched), we also
uses reusable mutable row objects instead of the immutable version
to improve performance. You may check HiveTableScan,
ParquetTableScan, InMemoryColumnarTableScan etc. for reference.
Also, the `sum` function calls in your SQL code are translated
into `o.a.s.s.execution.Aggregate` operators, which also use
imperative while-loop and reusable mutable rows.

Another thing to notice

Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats?

2015-01-10 Thread Nathan McCarthy
Thanks Cheng  Michael! Makes sense. Appreciate the tips!

Idiomatic scala isn't performant. I’ll definitely start using while loops or 
tail recursive methods. I have noticed this in the spark code base.

I might try turning off columnar compression (via 
spark.sql.inMemoryColumnarStorage.compressed=false correct?) and see how 
performance compares to the primitive objects. Would you expect to see similar 
runtimes vs the primitive objects? We do have the luxury of lots of memory at 
the moment so this might give us an additional performance boost.

Regarding the defensive copying of row objects. Can we switch this off and just 
be aware of the risks? Is MapPartitions on SchemaRDDs and operating on the Row 
object the most performant way to be flipping between SQL  Scala user code? Is 
there anything else I could be doing?

Cheers,
~N

From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com
Date: Saturday, 10 January 2015 3:41 am
To: Cheng Lian lian.cs@gmail.commailto:lian.cs@gmail.com
Cc: Nathan 
nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au, 
user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: SparkSQL schemaRDD  MapPartitions calls - performance issues - 
columnar formats?

The other thing to note here is that Spark SQL defensively copies rows when we 
switch into user code.  This probably explains the difference between 1  2.

The difference between 1  3 is likely the cost of decompressing the column 
buffers vs. accessing a bunch of uncompressed primitive objects.

On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian 
lian.cs@gmail.commailto:lian.cs@gmail.com wrote:
Hey Nathan,

Thanks for sharing, this is a very interesting post :) My comments are inlined 
below.

Cheng

On 1/7/15 11:53 AM, Nathan McCarthy wrote:
Hi,

I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via 
rdd.mapPartitions(…). Using the latest release 1.2.0.

Simple example; load up some sample data from parquet on HDFS (about 380m rows, 
10 columns) on a 7 node cluster.

  val t = sqlC.parquetFile(/user/n/sales-tran12m.parquet”)
  t.registerTempTable(test1”)
  sqlC.cacheTable(test1”)

Now lets do some operations on it; I want the total sales  quantities sold for 
each hour in the day so I choose 3 out of the 10 possible columns...

  sqlC.sql(select Hour, sum(ItemQty), sum(Sales) from test1 group by 
Hour).collect().foreach(println)

After the table has been 100% cached in memory, this takes around 11 seconds.

Lets do the same thing but via a MapPartitions call (this isn’t production 
ready code but gets the job done).

  val try2 = sqlC.sql(select Hour, ItemQty, Sales from test1”)
  rddPC.mapPartitions { case hrs =
val qtySum = new Array[Double](24)
val salesSum = new Array[Double](24)

for(r - hrs) {
  val hr = r.getInt(0)
  qtySum(hr) += r.getDouble(1)
  salesSum(hr) += r.getDouble(2)
}
(salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
  }.reduceByKey((a,b) = (a._1 + b._1, a._2 + b._2)).collect().foreach(println)
I believe the evil thing that makes this snippet much slower is the for-loop. 
According to my early benchmark done with Scala 2.9, for-loop can be orders of 
magnitude slower than a simple while-loop, especially when the body of the loop 
only does something as trivial as this case. The reason is that Scala for-loop 
is translated into corresponding foreach/map/flatMap/withFilter function calls. 
And that's exactly why Spark SQL tries to avoid for-loop or any other 
functional style code in critical paths (where every row is touched), we also 
uses reusable mutable row objects instead of the immutable version to improve 
performance. You may check HiveTableScan, ParquetTableScan, 
InMemoryColumnarTableScan etc. for reference. Also, the `sum` function calls in 
your SQL code are translated into `o.a.s.s.execution.Aggregate` operators, 
which also use imperative while-loop and reusable mutable rows.

Another thing to notice is that the `hrs` iterator physically points to 
underlying in-memory columnar byte buffers, and the `for (r - hrs) { ... }` 
loop actually decompresses and extracts values from required byte buffers (this 
is the unwrapping processes you mentioned below).

Now this takes around ~49 seconds… Even though test1 table is 100% cached. The 
number of partitions remains the same…

Now if I create a simple RDD of a case class HourSum(hour: Int, qty: Double, 
sales: Double)

Convert the SchemaRDD;
val rdd = sqlC.sql(select * from test1).map{ r = HourSum(r.getInt(1), 
r.getDouble(7), r.getDouble(8)) }.cache()
//cache all the data
rdd.count()

Then run basically the same MapPartitions query;

rdd.mapPartitions { case hrs =
  val qtySum = new Array[Double](24)
  val salesSum = new Array[Double](24)

  for(r - hrs) {
val hr = r.hour
qtySum(hr) += r.qty
salesSum(hr) += r.sales
  }
  (salesSum zip qtySum).zipWithIndex.map(_.swap

Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats?

2015-01-09 Thread Michael Armbrust
The other thing to note here is that Spark SQL defensively copies rows when
we switch into user code.  This probably explains the difference between 1
 2.

The difference between 1  3 is likely the cost of decompressing the column
buffers vs. accessing a bunch of uncompressed primitive objects.

On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian lian.cs@gmail.com wrote:

  Hey Nathan,

 Thanks for sharing, this is a very interesting post :) My comments are
 inlined below.

 Cheng

 On 1/7/15 11:53 AM, Nathan McCarthy wrote:

 Hi,

  I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via
 rdd.mapPartitions(…). Using the latest release 1.2.0.

  Simple example; load up some sample data from parquet on HDFS (about
 380m rows, 10 columns) on a 7 node cluster.

val t = sqlC.parquetFile(/user/n/sales-tran12m.parquet”)
t.registerTempTable(test1”)
sqlC.cacheTable(test1”)

  Now lets do some operations on it; I want the total sales  quantities
 sold for each hour in the day so I choose 3 out of the 10 possible
 columns...

sqlC.sql(select Hour, sum(ItemQty), sum(Sales) from test1 group by
 Hour).collect().foreach(println)

  After the table has been 100% cached in memory, this takes around 11
 seconds.

  Lets do the same thing but via a MapPartitions call (this isn’t
 production ready code but gets the job done).

val try2 = sqlC.sql(select Hour, ItemQty, Sales from test1”)
   rddPC.mapPartitions { case hrs =
 val qtySum = new Array[Double](24)
 val salesSum = new Array[Double](24)

  for(r - hrs) {
   val hr = r.getInt(0)
   qtySum(hr) += r.getDouble(1)
   salesSum(hr) += r.getDouble(2)
 }
 (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
   }.reduceByKey((a,b) = (a._1 + b._1, a._2 +
 b._2)).collect().foreach(println)

 I believe the evil thing that makes this snippet much slower is the
 for-loop. According to my early benchmark done with Scala 2.9, for-loop can
 be orders of magnitude slower than a simple while-loop, especially when the
 body of the loop only does something as trivial as this case. The reason is
 that Scala for-loop is translated into corresponding
 foreach/map/flatMap/withFilter function calls. And that's exactly why Spark
 SQL tries to avoid for-loop or any other functional style code in critical
 paths (where every row is touched), we also uses reusable mutable row
 objects instead of the immutable version to improve performance. You may
 check HiveTableScan, ParquetTableScan, InMemoryColumnarTableScan etc. for
 reference. Also, the `sum` function calls in your SQL code are translated
 into `o.a.s.s.execution.Aggregate` operators, which also use imperative
 while-loop and reusable mutable rows.

 Another thing to notice is that the `hrs` iterator physically points to
 underlying in-memory columnar byte buffers, and the `for (r - hrs) { ...
 }` loop actually decompresses and extracts values from required byte
 buffers (this is the unwrapping processes you mentioned below).


  Now this takes around ~49 seconds… Even though test1 table is 100%
 cached. The number of partitions remains the same…

  Now if I create a simple RDD of a case class HourSum(hour: Int, qty:
 Double, sales: Double)

  Convert the SchemaRDD;
  val rdd = sqlC.sql(select * from test1).map{ r = HourSum(r.getInt(1),
 r.getDouble(7), r.getDouble(8)) }.cache()
  //cache all the data
 rdd.count()

  Then run basically the same MapPartitions query;

  rdd.mapPartitions { case hrs =
   val qtySum = new Array[Double](24)
   val salesSum = new Array[Double](24)

for(r - hrs) {
 val hr = r.hour
 qtySum(hr) += r.qty
 salesSum(hr) += r.sales
   }
   (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
 }.reduceByKey((a,b) = (a._1 + b._1, a._2 +
 b._2)).collect().foreach(println)

  This takes around 1.5 seconds! Albeit the memory footprint is much
 larger.

 I guess this 1.5 seconds doesn't include the time spent on caching the
 simple RDD? As I've explained above, in the first `mapPartitions` style
 snippet, columnar byte buffer unwrapping happens within the `mapPartitions`
 call. However, in this version, the unwrapping process happens when the
 `rdd.count()` action is performed. At that point, all values of all columns
 are extracted from underlying byte buffers, and the portion of data you
 need are then manually selected and transformed into the simple case class
 RDD via the `map` call.

 If you include time spent on caching the simple case class RDD, it should
 be even slower than the first `mapPartitions` version.


  My thinking is that because SparkSQL does store things in a columnar
 format, there is some unwrapping to be done out of the column array buffers
 which takes time and for some reason this just takes longer when I switch
 out to map partitions (maybe its unwrapping the entire row, even though I’m
 using just a subset of columns, or maybe there is some object
 creation/autoboxing going on when calling getInt or getDouble)…


Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats?

2015-01-09 Thread Cheng Lian

Hey Nathan,

Thanks for sharing, this is a very interesting post :) My comments are 
inlined below.


Cheng

On 1/7/15 11:53 AM, Nathan McCarthy wrote:

Hi,

I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala 
via rdd.mapPartitions(…). Using the latest release 1.2.0.


Simple example; load up some sample data from parquet on HDFS (about 
380m rows, 10 columns) on a 7 node cluster.


  val t = sqlC.parquetFile(/user/n/sales-tran12m.parquet”)
t.registerTempTable(test1”)
sqlC.cacheTable(test1”)

Now lets do some operations on it; I want the total sales  quantities 
sold for each hour in the day so I choose 3 out of the 10 possible 
columns...


sqlC.sql(select Hour, sum(ItemQty), sum(Sales) from test1 group by 
Hour).collect().foreach(println)


After the table has been 100% cached in memory, this takes around 11 
seconds.


Lets do the same thing but via a MapPartitions call (this isn’t 
production ready code but gets the job done).


  val try2 = sqlC.sql(select Hour, ItemQty, Sales from test1”)
rddPC.mapPartitions { case hrs =
val qtySum = new Array[Double](24)
val salesSum = new Array[Double](24)

for(r - hrs) {
  val hr = r.getInt(0)
qtySum(hr) += r.getDouble(1)
salesSum(hr) += r.getDouble(2)
}
(salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
}.reduceByKey((a,b) = (a._1 + b._1, a._2 + 
b._2)).collect().foreach(println)
I believe the evil thing that makes this snippet much slower is the 
for-loop. According to my early benchmark done with Scala 2.9, for-loop 
can be orders of magnitude slower than a simple while-loop, especially 
when the body of the loop only does something as trivial as this case. 
The reason is that Scala for-loop is translated into corresponding 
foreach/map/flatMap/withFilter function calls. And that's exactly why 
Spark SQL tries to avoid for-loop or any other functional style code in 
critical paths (where every row is touched), we also uses reusable 
mutable row objects instead of the immutable version to improve 
performance. You may check HiveTableScan, ParquetTableScan, 
InMemoryColumnarTableScan etc. for reference. Also, the `sum` function 
calls in your SQL code are translated into `o.a.s.s.execution.Aggregate` 
operators, which also use imperative while-loop and reusable mutable rows.


Another thing to notice is that the `hrs` iterator physically points to 
underlying in-memory columnar byte buffers, and the `for (r - hrs) { 
... }` loop actually decompresses and extracts values from required byte 
buffers (this is the unwrapping processes you mentioned below).


Now this takes around ~49 seconds… Even though test1 table is 100% 
cached. The number of partitions remains the same…


Now if I create a simple RDD of a case class HourSum(hour: Int, qty: 
Double, sales: Double)


Convert the SchemaRDD;
val rdd = sqlC.sql(select * from test1).map{ r = 
HourSum(r.getInt(1), r.getDouble(7), r.getDouble(8)) }.cache()

//cache all the data
rdd.count()

Then run basically the same MapPartitions query;

rdd.mapPartitions { case hrs =
  val qtySum = new Array[Double](24)
  val salesSum = new Array[Double](24)

  for(r - hrs) {
val hr = r.hour
qtySum(hr) += r.qty
salesSum(hr) += r.sales
  }
  (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
}.reduceByKey((a,b) = (a._1 + b._1, a._2 + 
b._2)).collect().foreach(println)


This takes around 1.5 seconds! Albeit the memory footprint is much larger.
I guess this 1.5 seconds doesn't include the time spent on caching the 
simple RDD? As I've explained above, in the first `mapPartitions` style 
snippet, columnar byte buffer unwrapping happens within the 
`mapPartitions` call. However, in this version, the unwrapping process 
happens when the `rdd.count()` action is performed. At that point, all 
values of all columns are extracted from underlying byte buffers, and 
the portion of data you need are then manually selected and transformed 
into the simple case class RDD via the `map` call.


If you include time spent on caching the simple case class RDD, it 
should be even slower than the first `mapPartitions` version.


My thinking is that because SparkSQL does store things in a columnar 
format, there is some unwrapping to be done out of the column array 
buffers which takes time and for some reason this just takes longer 
when I switch out to map partitions (maybe its unwrapping the entire 
row, even though I’m using just a subset of columns, or maybe there is 
some object creation/autoboxing going on when calling getInt or 
getDouble)…


I’ve tried simpler cases too, like just summing sales. Running sum via 
SQL is fast (4.7 seconds), running a mapPartition sum on a double RDD 
is even faster (2.6 seconds). But MapPartitions on the SchemaRDD;


/sqlC.sql(select SalesInclGST from test1).mapPartitions(iter = 
Iterator(iter.foldLeft(0.0)((t,r) = t+r.getDouble(0.sum/


 takes a long time (33 seconds). In all these examples everything is 
fully cached in memory. And yes for these kinds 

Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats?

2015-01-08 Thread Nathan McCarthy
Any ideas? :)

From: Nathan 
nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au
Date: Wednesday, 7 January 2015 2:53 pm
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: SparkSQL schemaRDD  MapPartitions calls - performance issues - 
columnar formats?

Hi,

I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via 
rdd.mapPartitions(…). Using the latest release 1.2.0.

Simple example; load up some sample data from parquet on HDFS (about 380m rows, 
10 columns) on a 7 node cluster.

  val t = sqlC.parquetFile(/user/n/sales-tran12m.parquet”)
  t.registerTempTable(test1”)
  sqlC.cacheTable(test1”)

Now lets do some operations on it; I want the total sales  quantities sold for 
each hour in the day so I choose 3 out of the 10 possible columns...

  sqlC.sql(select Hour, sum(ItemQty), sum(Sales) from test1 group by 
Hour).collect().foreach(println)

After the table has been 100% cached in memory, this takes around 11 seconds.

Lets do the same thing but via a MapPartitions call (this isn’t production 
ready code but gets the job done).

  val try2 = sqlC.sql(select Hour, ItemQty, Sales from test1”)
  rddPC.mapPartitions { case hrs =
val qtySum = new Array[Double](24)
val salesSum = new Array[Double](24)

for(r - hrs) {
  val hr = r.getInt(0)
  qtySum(hr) += r.getDouble(1)
  salesSum(hr) += r.getDouble(2)
}
(salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
  }.reduceByKey((a,b) = (a._1 + b._1, a._2 + b._2)).collect().foreach(println)

Now this takes around ~49 seconds… Even though test1 table is 100% cached. The 
number of partitions remains the same…

Now if I create a simple RDD of a case class HourSum(hour: Int, qty: Double, 
sales: Double)

Convert the SchemaRDD;
val rdd = sqlC.sql(select * from test1).map{ r = HourSum(r.getInt(1), 
r.getDouble(7), r.getDouble(8)) }.cache()
//cache all the data
rdd.count()

Then run basically the same MapPartitions query;

rdd.mapPartitions { case hrs =
  val qtySum = new Array[Double](24)
  val salesSum = new Array[Double](24)

  for(r - hrs) {
val hr = r.hour
qtySum(hr) += r.qty
salesSum(hr) += r.sales
  }
  (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
}.reduceByKey((a,b) = (a._1 + b._1, a._2 + b._2)).collect().foreach(println)

This takes around 1.5 seconds! Albeit the memory footprint is much larger.

My thinking is that because SparkSQL does store things in a columnar format, 
there is some unwrapping to be done out of the column array buffers which takes 
time and for some reason this just takes longer when I switch out to map 
partitions (maybe its unwrapping the entire row, even though I’m using just a 
subset of columns, or maybe there is some object creation/autoboxing going on 
when calling getInt or getDouble)…

I’ve tried simpler cases too, like just summing sales. Running sum via SQL is 
fast (4.7 seconds), running a mapPartition sum on a double RDD is even faster 
(2.6 seconds). But MapPartitions on the SchemaRDD;

sqlC.sql(select SalesInclGST from test1).mapPartitions(iter = 
Iterator(iter.foldLeft(0.0)((t,r) = t+r.getDouble(0.sum

 takes a long time (33 seconds). In all these examples everything is fully 
cached in memory. And yes for these kinds of operations I can use SQL, but for 
more complex queries I’d much rather be using a combo of SparkSQL to select the 
data (so I get nice things like Parquet pushdowns etc.)  functional Scala!

I think I’m doing something dumb… Is there something I should be doing to get 
faster performance on MapPartitions on SchemaRDDs? Is there some unwrapping 
going on in the background that catalyst does in a smart way that I’m missing?

Cheers,
~N

Nathan McCarthy
QUANTIUM
Level 25, 8 Chifley, 8-12 Chifley Square
Sydney NSW 2000

T: +61 2 8224 8922
F: +61 2 9292 6444

W: quantium.com.auwww.quantium.com.au



linkedin.com/company/quantiumwww.linkedin.com/company/quantium

facebook.com/QuantiumAustraliawww.facebook.com/QuantiumAustralia

twitter.com/QuantiumAUwww.twitter.com/QuantiumAU


The contents of this email, including attachments, may be confidential 
information. If you are not the intended recipient, any use, disclosure or 
copying of the information is unauthorised. If you have received this email in 
error, we would be grateful if you would notify us immediately by email reply, 
phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 6444) and delete the message from 
your system.


SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats?

2015-01-06 Thread Nathan McCarthy
Hi,

I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via 
rdd.mapPartitions(…). Using the latest release 1.2.0.

Simple example; load up some sample data from parquet on HDFS (about 380m rows, 
10 columns) on a 7 node cluster.

  val t = sqlC.parquetFile(/user/n/sales-tran12m.parquet”)
  t.registerTempTable(test1”)
  sqlC.cacheTable(test1”)

Now lets do some operations on it; I want the total sales  quantities sold for 
each hour in the day so I choose 3 out of the 10 possible columns...

  sqlC.sql(select Hour, sum(ItemQty), sum(Sales) from test1 group by 
Hour).collect().foreach(println)

After the table has been 100% cached in memory, this takes around 11 seconds.

Lets do the same thing but via a MapPartitions call (this isn’t production 
ready code but gets the job done).

  val try2 = sqlC.sql(select Hour, ItemQty, Sales from test1”)
  rddPC.mapPartitions { case hrs =
val qtySum = new Array[Double](24)
val salesSum = new Array[Double](24)

for(r - hrs) {
  val hr = r.getInt(0)
  qtySum(hr) += r.getDouble(1)
  salesSum(hr) += r.getDouble(2)
}
(salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
  }.reduceByKey((a,b) = (a._1 + b._1, a._2 + b._2)).collect().foreach(println)

Now this takes around ~49 seconds… Even though test1 table is 100% cached. The 
number of partitions remains the same…

Now if I create a simple RDD of a case class HourSum(hour: Int, qty: Double, 
sales: Double)

Convert the SchemaRDD;
val rdd = sqlC.sql(select * from test1).map{ r = HourSum(r.getInt(1), 
r.getDouble(7), r.getDouble(8)) }.cache()
//cache all the data
rdd.count()

Then run basically the same MapPartitions query;

rdd.mapPartitions { case hrs =
  val qtySum = new Array[Double](24)
  val salesSum = new Array[Double](24)

  for(r - hrs) {
val hr = r.hour
qtySum(hr) += r.qty
salesSum(hr) += r.sales
  }
  (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
}.reduceByKey((a,b) = (a._1 + b._1, a._2 + b._2)).collect().foreach(println)

This takes around 1.5 seconds! Albeit the memory footprint is much larger.

My thinking is that because SparkSQL does store things in a columnar format, 
there is some unwrapping to be done out of the column array buffers which takes 
time and for some reason this just takes longer when I switch out to map 
partitions (maybe its unwrapping the entire row, even though I’m using just a 
subset of columns, or maybe there is some object creation/autoboxing going on 
when calling getInt or getDouble)…

I’ve tried simpler cases too, like just summing sales. Running sum via SQL is 
fast (4.7 seconds), running a mapPartition sum on a double RDD is even faster 
(2.6 seconds). But MapPartitions on the SchemaRDD;

sqlC.sql(select SalesInclGST from test1).mapPartitions(iter = 
Iterator(iter.foldLeft(0.0)((t,r) = t+r.getDouble(0.sum

 takes a long time (33 seconds). In all these examples everything is fully 
cached in memory. And yes for these kinds of operations I can use SQL, but for 
more complex queries I’d much rather be using a combo of SparkSQL to select the 
data (so I get nice things like Parquet pushdowns etc.)  functional Scala!

I think I’m doing something dumb… Is there something I should be doing to get 
faster performance on MapPartitions on SchemaRDDs? Is there some unwrapping 
going on in the background that catalyst does in a smart way that I’m missing?

Cheers,
~N

Nathan McCarthy
QUANTIUM
Level 25, 8 Chifley, 8-12 Chifley Square
Sydney NSW 2000

T: +61 2 8224 8922
F: +61 2 9292 6444

W: quantium.com.auwww.quantium.com.au



linkedin.com/company/quantiumwww.linkedin.com/company/quantium

facebook.com/QuantiumAustraliawww.facebook.com/QuantiumAustralia

twitter.com/QuantiumAUwww.twitter.com/QuantiumAU


The contents of this email, including attachments, may be confidential 
information. If you are not the intended recipient, any use, disclosure or 
copying of the information is unauthorised. If you have received this email in 
error, we would be grateful if you would notify us immediately by email reply, 
phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 6444) and delete the message from 
your system.