Re: SQL group by on Parquet table slower when table cached
Hi Michael, The storage tab shows the RDD resides fully in memory (10 partitions) with zero disk usage. Tasks for subsequent select on this table in cache shows minimal overheads (GC, queueing, shuffle write etc. etc.), so overhead is not issue. However, it is still twice as slow as reading uncached table. I have spark.rdd.compress = true, spark.sql.inMemoryColumnarStorage.compressed = true, spark.serializer = org.apache.spark.serializer.KryoSerializer Something that may be of relevance ... The underlying table is Parquet, 10 partitions totaling ~350 MB. For mapPartition phase of query on uncached table shows input size of 351 MB. However, after the table is cached, the storage shows the cache size as 12GB. So the in-memory representation seems much bigger than on-disk, even with the compression options turned on. Any thoughts on this ? mapPartition phase same query for cache table shows input size of 12GB (full size of cache table) and takes twice the time as mapPartition for uncached query. Thanks, On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com wrote: Check the storage tab. Does the table actually fit in memory? Otherwise you are rebuilding column buffers in addition to reading the data off of the disk. On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
Re: SQL group by on Parquet table slower when table cached
You'll probably only get good compression for strings when dictionary encoding works. We don't optimize decimals in the in-memory columnar storage, so you are paying expensive serialization there likely. On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com wrote: Flat data of types String, Int and couple of decimal(14,4) On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust mich...@databricks.com wrote: Is this nested data or flat data? On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, The storage tab shows the RDD resides fully in memory (10 partitions) with zero disk usage. Tasks for subsequent select on this table in cache shows minimal overheads (GC, queueing, shuffle write etc. etc.), so overhead is not issue. However, it is still twice as slow as reading uncached table. I have spark.rdd.compress = true, spark.sql.inMemoryColumnarStorage.compressed = true, spark.serializer = org.apache.spark.serializer.KryoSerializer Something that may be of relevance ... The underlying table is Parquet, 10 partitions totaling ~350 MB. For mapPartition phase of query on uncached table shows input size of 351 MB. However, after the table is cached, the storage shows the cache size as 12GB. So the in-memory representation seems much bigger than on-disk, even with the compression options turned on. Any thoughts on this ? mapPartition phase same query for cache table shows input size of 12GB (full size of cache table) and takes twice the time as mapPartition for uncached query. Thanks, On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com wrote: Check the storage tab. Does the table actually fit in memory? Otherwise you are rebuilding column buffers in addition to reading the data off of the disk. On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
Re: SQL group by on Parquet table slower when table cached
Could you share which data types are optimized in the in-memory storage and how are they optimized ? On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust mich...@databricks.com wrote: You'll probably only get good compression for strings when dictionary encoding works. We don't optimize decimals in the in-memory columnar storage, so you are paying expensive serialization there likely. On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com wrote: Flat data of types String, Int and couple of decimal(14,4) On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust mich...@databricks.com wrote: Is this nested data or flat data? On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, The storage tab shows the RDD resides fully in memory (10 partitions) with zero disk usage. Tasks for subsequent select on this table in cache shows minimal overheads (GC, queueing, shuffle write etc. etc.), so overhead is not issue. However, it is still twice as slow as reading uncached table. I have spark.rdd.compress = true, spark.sql.inMemoryColumnarStorage.compressed = true, spark.serializer = org.apache.spark.serializer.KryoSerializer Something that may be of relevance ... The underlying table is Parquet, 10 partitions totaling ~350 MB. For mapPartition phase of query on uncached table shows input size of 351 MB. However, after the table is cached, the storage shows the cache size as 12GB. So the in-memory representation seems much bigger than on-disk, even with the compression options turned on. Any thoughts on this ? mapPartition phase same query for cache table shows input size of 12GB (full size of cache table) and takes twice the time as mapPartition for uncached query. Thanks, On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com wrote: Check the storage tab. Does the table actually fit in memory? Otherwise you are rebuilding column buffers in addition to reading the data off of the disk. On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
Re: SQL group by on Parquet table slower when table cached
You could add a new ColumnType https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala . PRs welcome :) On Mon, Feb 9, 2015 at 3:01 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, As a test, I have same data loaded as another parquet - except with the 2 decimal(14,4) replaced by double. With this, the on disk size is ~345MB, the in-memory size is 2GB (v.s. 12 GB) and the cached query runs in 1/2 the time of uncached query. Would it be possible for Spark to store in-memory decimal in some form of long with decoration ? For the immediate future, is there any hook that we can use to provide custom caching / processing for the decimal type in RDD so other semantic does not changes ? Thanks, On Mon, Feb 9, 2015 at 2:41 PM, Manoj Samel manojsamelt...@gmail.com wrote: Could you share which data types are optimized in the in-memory storage and how are they optimized ? On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust mich...@databricks.com wrote: You'll probably only get good compression for strings when dictionary encoding works. We don't optimize decimals in the in-memory columnar storage, so you are paying expensive serialization there likely. On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com wrote: Flat data of types String, Int and couple of decimal(14,4) On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust mich...@databricks.com wrote: Is this nested data or flat data? On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, The storage tab shows the RDD resides fully in memory (10 partitions) with zero disk usage. Tasks for subsequent select on this table in cache shows minimal overheads (GC, queueing, shuffle write etc. etc.), so overhead is not issue. However, it is still twice as slow as reading uncached table. I have spark.rdd.compress = true, spark.sql.inMemoryColumnarStorage.compressed = true, spark.serializer = org.apache.spark.serializer.KryoSerializer Something that may be of relevance ... The underlying table is Parquet, 10 partitions totaling ~350 MB. For mapPartition phase of query on uncached table shows input size of 351 MB. However, after the table is cached, the storage shows the cache size as 12GB. So the in-memory representation seems much bigger than on-disk, even with the compression options turned on. Any thoughts on this ? mapPartition phase same query for cache table shows input size of 12GB (full size of cache table) and takes twice the time as mapPartition for uncached query. Thanks, On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com wrote: Check the storage tab. Does the table actually fit in memory? Otherwise you are rebuilding column buffers in addition to reading the data off of the disk. On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
Re: SQL group by on Parquet table slower when table cached
Hi Michael, As a test, I have same data loaded as another parquet - except with the 2 decimal(14,4) replaced by double. With this, the on disk size is ~345MB, the in-memory size is 2GB (v.s. 12 GB) and the cached query runs in 1/2 the time of uncached query. Would it be possible for Spark to store in-memory decimal in some form of long with decoration ? For the immediate future, is there any hook that we can use to provide custom caching / processing for the decimal type in RDD so other semantic does not changes ? Thanks, On Mon, Feb 9, 2015 at 2:41 PM, Manoj Samel manojsamelt...@gmail.com wrote: Could you share which data types are optimized in the in-memory storage and how are they optimized ? On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust mich...@databricks.com wrote: You'll probably only get good compression for strings when dictionary encoding works. We don't optimize decimals in the in-memory columnar storage, so you are paying expensive serialization there likely. On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel manojsamelt...@gmail.com wrote: Flat data of types String, Int and couple of decimal(14,4) On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust mich...@databricks.com wrote: Is this nested data or flat data? On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi Michael, The storage tab shows the RDD resides fully in memory (10 partitions) with zero disk usage. Tasks for subsequent select on this table in cache shows minimal overheads (GC, queueing, shuffle write etc. etc.), so overhead is not issue. However, it is still twice as slow as reading uncached table. I have spark.rdd.compress = true, spark.sql.inMemoryColumnarStorage.compressed = true, spark.serializer = org.apache.spark.serializer.KryoSerializer Something that may be of relevance ... The underlying table is Parquet, 10 partitions totaling ~350 MB. For mapPartition phase of query on uncached table shows input size of 351 MB. However, after the table is cached, the storage shows the cache size as 12GB. So the in-memory representation seems much bigger than on-disk, even with the compression options turned on. Any thoughts on this ? mapPartition phase same query for cache table shows input size of 12GB (full size of cache table) and takes twice the time as mapPartition for uncached query. Thanks, On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust mich...@databricks.com wrote: Check the storage tab. Does the table actually fit in memory? Otherwise you are rebuilding column buffers in addition to reading the data off of the disk. On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
SQL group by on Parquet table slower when table cached
Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,
Re: SQL group by on Parquet table slower when table cached
Check the storage tab. Does the table actually fit in memory? Otherwise you are rebuilding column buffers in addition to reading the data off of the disk. On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 Data stored in parquet table (large number of rows) Test 1 select a, sum(b), sum(c) from table Test sqlContext.cacheTable() select a, sum(b), sum(c) from table - seed cache First time slow since loading cache ? select a, sum(b), sum(c) from table - Second time it should be faster as it should be reading from cache, not HDFS. But it is slower than test1 Any thoughts? Should a different query be used to seed cache ? Thanks,