Nan Zhu created SPARK-22599:
-------------------------------

             Summary: Avoid extra reading for cached table
                 Key: SPARK-22599
                 URL: https://issues.apache.org/jira/browse/SPARK-22599
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 2.2.0
            Reporter: Nan Zhu


In the current implementation of Spark, InMemoryTableExec read all data in a 
cached table, filter CachedBatch according to stats and pass data to the 
downstream operators. This implementation makes it inefficient to reside the 
whole table in memory to serve various queries against different partitions of 
the table, which occupies a certain portion of our users' scenarios.

The following is an example of such a use case:

store_sales is a 1TB-sized table in cloud storage, which is partitioned by 
'location'. The first query, Q1, wants to output several metrics A, B, C for 
all stores in all locations. After that, a small team of 3 data scientists 
wants to do some causal analysis for the sales in different locations. To avoid 
unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole 
table in memory in Q1.

With the current implementation, even any one of the data scientists is only 
interested in one out of three locations, the queries they submit to Spark 
cluster is still reading 1TB data completely.

The reason behind the extra reading operation is that we implement CachedBatch 
as

{code:scala}
case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
{code}

where "stats" is a part of every CachedBatch, so we can only filter batches for 
output of InMemoryTableExec operator by reading all data in in-memory table as 
input. The extra reading would be even more unacceptable when some of the 
table's data is evicted to disks.

We propose to introduce a new type of block, metadata block, for the partitions 
of RDD representing data in the cached table. Every metadata block contains 
stats info for all columns in a partition and is saved to BlockManager when 
executing compute() method for the partition. To minimize the number of bytes 
to read,

More details can be found in design 
doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to