[
https://issues.apache.org/jira/browse/SPARK-22599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon resolved SPARK-22599.
----------------------------------
Resolution: Incomplete
> Avoid extra reading for cached table
> ------------------------------------
>
> Key: SPARK-22599
> URL: https://issues.apache.org/jira/browse/SPARK-22599
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.2.0
> Reporter: Nan Zhu
> Priority: Major
> Labels: bulk-closed
>
> In the current implementation of Spark, InMemoryTableExec read all data in a
> cached table, filter CachedBatch according to stats and pass data to the
> downstream operators. This implementation makes it inefficient to reside the
> whole table in memory to serve various queries against different partitions
> of the table, which occupies a certain portion of our users' scenarios.
> The following is an example of such a use case:
> store_sales is a 1TB-sized table in cloud storage, which is partitioned by
> 'location'. The first query, Q1, wants to output several metrics A, B, C for
> all stores in all locations. After that, a small team of 3 data scientists
> wants to do some causal analysis for the sales in different locations. To
> avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache
> the whole table in memory in Q1.
> With the current implementation, even any one of the data scientists is only
> interested in one out of three locations, the queries they submit to Spark
> cluster is still reading 1TB data completely.
> The reason behind the extra reading operation is that we implement
> CachedBatch as
> {code}
> case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats:
> InternalRow)
> {code}
> where "stats" is a part of every CachedBatch, so we can only filter batches
> for output of InMemoryTableExec operator by reading all data in in-memory
> table as input. The extra reading would be even more unacceptable when some
> of the table's data is evicted to disks.
> We propose to introduce a new type of block, metadata block, for the
> partitions of RDD representing data in the cached table. Every metadata block
> contains stats info for all columns in a partition and is saved to
> BlockManager when executing compute() method for the partition. To minimize
> the number of bytes to read,
> More details can be found in design
> doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing
> performance test results:
> Environment: 6 Executors, each of which has 16 cores 90G memory
> dataset: 1T TPCDS data
> queries: tested 4 queries (Q19, Q46, Q34, Q27) in
> https://github.com/databricks/spark-sql-perf/blob/c2224f37e50628c5c8691be69414ec7f5a3d919a/src/main/scala/com/databricks/spark/sql/perf/tpcds/ImpalaKitQueries.scala
> results:
> https://docs.google.com/spreadsheets/d/1A20LxqZzAxMjW7ptAJZF4hMBaHxKGk3TBEQoAJXfzCI/edit?usp=sharing
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]