Re: SQL group by on Parquet table slower when table cached

2015-02-09 Thread Manoj Samel
Hi Michael,

The storage tab shows the RDD resides fully in memory (10 partitions) with
zero disk usage. Tasks for subsequent select on this table in cache shows
minimal overheads (GC, queueing, shuffle write etc. etc.), so overhead is
not issue. However, it is still twice as slow as reading uncached table.

I have spark.rdd.compress = true, spark.sql.inMemoryColumnarStorage.compressed
= true, spark.serializer = org.apache.spark.serializer.KryoSerializer

Something that may be of relevance ...

The underlying table is Parquet, 10 partitions totaling ~350 MB. For
mapPartition phase of query on uncached table shows input size of 351 MB.
However, after the table is cached, the storage shows the cache size as
12GB. So the in-memory representation seems much bigger than on-disk, even
with the compression options turned on. Any thoughts on this ?

mapPartition phase same query for cache table shows input size of 12GB
(full size of cache table) and takes twice the time as mapPartition for
uncached query.

Thanks,






On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com
wrote:

 Check the storage tab.  Does the table actually fit in memory? Otherwise
 you are rebuilding column buffers in addition to reading the data off of
 the disk.

 On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Spark 1.2

 Data stored in parquet table (large number of rows)

 Test 1

 select a, sum(b), sum(c) from table

 Test

 sqlContext.cacheTable()
 select a, sum(b), sum(c) from table  - seed cache First time slow since
 loading cache ?
 select a, sum(b), sum(c) from table  - Second time it should be faster as
 it should be reading from cache, not HDFS. But it is slower than test1

 Any thoughts? Should a different query be used to seed cache ?

 Thanks,





Re: SQL group by on Parquet table slower when table cached

2015-02-09 Thread Michael Armbrust
You'll probably only get good compression for strings when dictionary
encoding works.  We don't optimize decimals in the in-memory columnar
storage, so you are paying expensive serialization there likely.

On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com
wrote:

 Flat data of types String, Int and couple of decimal(14,4)

 On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust mich...@databricks.com
 wrote:

 Is this nested data or flat data?

 On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Hi Michael,

 The storage tab shows the RDD resides fully in memory (10 partitions)
 with zero disk usage. Tasks for subsequent select on this table in cache
 shows minimal overheads (GC, queueing, shuffle write etc. etc.), so
 overhead is not issue. However, it is still twice as slow as reading
 uncached table.

 I have spark.rdd.compress = true, 
 spark.sql.inMemoryColumnarStorage.compressed
 = true, spark.serializer = org.apache.spark.serializer.KryoSerializer

 Something that may be of relevance ...

 The underlying table is Parquet, 10 partitions totaling ~350 MB. For
 mapPartition phase of query on uncached table shows input size of 351 MB.
 However, after the table is cached, the storage shows the cache size as
 12GB. So the in-memory representation seems much bigger than on-disk, even
 with the compression options turned on. Any thoughts on this ?

 mapPartition phase same query for cache table shows input size of 12GB
 (full size of cache table) and takes twice the time as mapPartition for
 uncached query.

 Thanks,






 On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com
  wrote:

 Check the storage tab.  Does the table actually fit in memory?
 Otherwise you are rebuilding column buffers in addition to reading the data
 off of the disk.

 On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Spark 1.2

 Data stored in parquet table (large number of rows)

 Test 1

 select a, sum(b), sum(c) from table

 Test

 sqlContext.cacheTable()
 select a, sum(b), sum(c) from table  - seed cache First time slow
 since loading cache ?
 select a, sum(b), sum(c) from table  - Second time it should be faster
 as it should be reading from cache, not HDFS. But it is slower than test1

 Any thoughts? Should a different query be used to seed cache ?

 Thanks,








Re: SQL group by on Parquet table slower when table cached

2015-02-09 Thread Manoj Samel
Could you share which data types are optimized in the in-memory storage and
how are they optimized ?

On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust mich...@databricks.com
wrote:

 You'll probably only get good compression for strings when dictionary
 encoding works.  We don't optimize decimals in the in-memory columnar
 storage, so you are paying expensive serialization there likely.

 On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Flat data of types String, Int and couple of decimal(14,4)

 On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust mich...@databricks.com
 wrote:

 Is this nested data or flat data?

 On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Hi Michael,

 The storage tab shows the RDD resides fully in memory (10 partitions)
 with zero disk usage. Tasks for subsequent select on this table in cache
 shows minimal overheads (GC, queueing, shuffle write etc. etc.), so
 overhead is not issue. However, it is still twice as slow as reading
 uncached table.

 I have spark.rdd.compress = true, 
 spark.sql.inMemoryColumnarStorage.compressed
 = true, spark.serializer = org.apache.spark.serializer.KryoSerializer

 Something that may be of relevance ...

 The underlying table is Parquet, 10 partitions totaling ~350 MB. For
 mapPartition phase of query on uncached table shows input size of 351 MB.
 However, after the table is cached, the storage shows the cache size as
 12GB. So the in-memory representation seems much bigger than on-disk, even
 with the compression options turned on. Any thoughts on this ?

 mapPartition phase same query for cache table shows input size of 12GB
 (full size of cache table) and takes twice the time as mapPartition for
 uncached query.

 Thanks,






 On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 Check the storage tab.  Does the table actually fit in memory?
 Otherwise you are rebuilding column buffers in addition to reading the 
 data
 off of the disk.

 On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Spark 1.2

 Data stored in parquet table (large number of rows)

 Test 1

 select a, sum(b), sum(c) from table

 Test

 sqlContext.cacheTable()
 select a, sum(b), sum(c) from table  - seed cache First time slow
 since loading cache ?
 select a, sum(b), sum(c) from table  - Second time it should be
 faster as it should be reading from cache, not HDFS. But it is slower 
 than
 test1

 Any thoughts? Should a different query be used to seed cache ?

 Thanks,









Re: SQL group by on Parquet table slower when table cached

2015-02-09 Thread Michael Armbrust
You could add a new ColumnType
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
.

PRs welcome :)

On Mon, Feb 9, 2015 at 3:01 PM, Manoj Samel manojsamelt...@gmail.com
wrote:

 Hi Michael,

 As a test, I have same data loaded as another parquet - except with the 2
 decimal(14,4) replaced by double. With this, the  on disk size is ~345MB,
 the in-memory size is 2GB (v.s. 12 GB) and the cached query runs in 1/2 the
 time of uncached query.

 Would it be possible for Spark to store in-memory decimal in some form of
 long with decoration ?

 For the immediate future, is there any hook that we can use to provide
 custom caching / processing for the decimal type in RDD so other semantic
 does not changes ?

 Thanks,




 On Mon, Feb 9, 2015 at 2:41 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Could you share which data types are optimized in the in-memory storage
 and how are they optimized ?

 On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust mich...@databricks.com
 wrote:

 You'll probably only get good compression for strings when dictionary
 encoding works.  We don't optimize decimals in the in-memory columnar
 storage, so you are paying expensive serialization there likely.

 On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Flat data of types String, Int and couple of decimal(14,4)

 On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 Is this nested data or flat data?

 On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Hi Michael,

 The storage tab shows the RDD resides fully in memory (10 partitions)
 with zero disk usage. Tasks for subsequent select on this table in cache
 shows minimal overheads (GC, queueing, shuffle write etc. etc.), so
 overhead is not issue. However, it is still twice as slow as reading
 uncached table.

 I have spark.rdd.compress = true, 
 spark.sql.inMemoryColumnarStorage.compressed
 = true, spark.serializer = org.apache.spark.serializer.KryoSerializer

 Something that may be of relevance ...

 The underlying table is Parquet, 10 partitions totaling ~350 MB. For
 mapPartition phase of query on uncached table shows input size of 351 MB.
 However, after the table is cached, the storage shows the cache size as
 12GB. So the in-memory representation seems much bigger than on-disk, 
 even
 with the compression options turned on. Any thoughts on this ?

 mapPartition phase same query for cache table shows input size of
 12GB (full size of cache table) and takes twice the time as mapPartition
 for uncached query.

 Thanks,






 On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 Check the storage tab.  Does the table actually fit in memory?
 Otherwise you are rebuilding column buffers in addition to reading the 
 data
 off of the disk.

 On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel 
 manojsamelt...@gmail.com wrote:

 Spark 1.2

 Data stored in parquet table (large number of rows)

 Test 1

 select a, sum(b), sum(c) from table

 Test

 sqlContext.cacheTable()
 select a, sum(b), sum(c) from table  - seed cache First time slow
 since loading cache ?
 select a, sum(b), sum(c) from table  - Second time it should be
 faster as it should be reading from cache, not HDFS. But it is slower 
 than
 test1

 Any thoughts? Should a different query be used to seed cache ?

 Thanks,











Re: SQL group by on Parquet table slower when table cached

2015-02-09 Thread Manoj Samel
Hi Michael,

As a test, I have same data loaded as another parquet - except with the 2
decimal(14,4) replaced by double. With this, the  on disk size is ~345MB,
the in-memory size is 2GB (v.s. 12 GB) and the cached query runs in 1/2 the
time of uncached query.

Would it be possible for Spark to store in-memory decimal in some form of
long with decoration ?

For the immediate future, is there any hook that we can use to provide
custom caching / processing for the decimal type in RDD so other semantic
does not changes ?

Thanks,




On Mon, Feb 9, 2015 at 2:41 PM, Manoj Samel manojsamelt...@gmail.com
wrote:

 Could you share which data types are optimized in the in-memory storage
 and how are they optimized ?

 On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust mich...@databricks.com
 wrote:

 You'll probably only get good compression for strings when dictionary
 encoding works.  We don't optimize decimals in the in-memory columnar
 storage, so you are paying expensive serialization there likely.

 On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Flat data of types String, Int and couple of decimal(14,4)

 On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust mich...@databricks.com
  wrote:

 Is this nested data or flat data?

 On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:

 Hi Michael,

 The storage tab shows the RDD resides fully in memory (10 partitions)
 with zero disk usage. Tasks for subsequent select on this table in cache
 shows minimal overheads (GC, queueing, shuffle write etc. etc.), so
 overhead is not issue. However, it is still twice as slow as reading
 uncached table.

 I have spark.rdd.compress = true, 
 spark.sql.inMemoryColumnarStorage.compressed
 = true, spark.serializer = org.apache.spark.serializer.KryoSerializer

 Something that may be of relevance ...

 The underlying table is Parquet, 10 partitions totaling ~350 MB. For
 mapPartition phase of query on uncached table shows input size of 351 MB.
 However, after the table is cached, the storage shows the cache size as
 12GB. So the in-memory representation seems much bigger than on-disk, even
 with the compression options turned on. Any thoughts on this ?

 mapPartition phase same query for cache table shows input size of 12GB
 (full size of cache table) and takes twice the time as mapPartition for
 uncached query.

 Thanks,






 On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 Check the storage tab.  Does the table actually fit in memory?
 Otherwise you are rebuilding column buffers in addition to reading the 
 data
 off of the disk.

 On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com
  wrote:

 Spark 1.2

 Data stored in parquet table (large number of rows)

 Test 1

 select a, sum(b), sum(c) from table

 Test

 sqlContext.cacheTable()
 select a, sum(b), sum(c) from table  - seed cache First time slow
 since loading cache ?
 select a, sum(b), sum(c) from table  - Second time it should be
 faster as it should be reading from cache, not HDFS. But it is slower 
 than
 test1

 Any thoughts? Should a different query be used to seed cache ?

 Thanks,










SQL group by on Parquet table slower when table cached

2015-02-06 Thread Manoj Samel
Spark 1.2

Data stored in parquet table (large number of rows)

Test 1

select a, sum(b), sum(c) from table

Test

sqlContext.cacheTable()
select a, sum(b), sum(c) from table  - seed cache First time slow since
loading cache ?
select a, sum(b), sum(c) from table  - Second time it should be faster as
it should be reading from cache, not HDFS. But it is slower than test1

Any thoughts? Should a different query be used to seed cache ?

Thanks,


Re: SQL group by on Parquet table slower when table cached

2015-02-06 Thread Michael Armbrust
Check the storage tab.  Does the table actually fit in memory? Otherwise
you are rebuilding column buffers in addition to reading the data off of
the disk.

On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com
wrote:

 Spark 1.2

 Data stored in parquet table (large number of rows)

 Test 1

 select a, sum(b), sum(c) from table

 Test

 sqlContext.cacheTable()
 select a, sum(b), sum(c) from table  - seed cache First time slow since
 loading cache ?
 select a, sum(b), sum(c) from table  - Second time it should be faster as
 it should be reading from cache, not HDFS. But it is slower than test1

 Any thoughts? Should a different query be used to seed cache ?

 Thanks,