[ 
https://issues.apache.org/jira/browse/SPARK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15129534#comment-15129534
 ] 

Wenchen Fan commented on SPARK-11838:
-------------------------------------

Can the new `StreamFrame` satisfy this requirement(avoid re-computing for 
slowly changing tables)? cc [~zsxwing]

> Spark SQL query fragment RDD reuse
> ----------------------------------
>
>                 Key: SPARK-11838
>                 URL: https://issues.apache.org/jira/browse/SPARK-11838
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>            Reporter: Mikhail Bautin
>
> With many analytical Spark SQL workloads against slowly changing tables, 
> successive queries frequently share fragments that produce the same result. 
> Instead of re-computing those fragments for every query, it makes sense to 
> detect similar fragments and substitute RDDs previously created for matching 
> SparkPlan fragments into every new SparkPlan at the execution time whenever 
> possible. Even if no RDDs are persist()-ed to memory/disk/off-heap memory, 
> many stages can still be skipped due to map output files being present on 
> executor nodes.
> The implementation involves the following steps:
> (1) Logical plan "canonicalization". 
> Logical plans mapping to the same "canonical" logical plan should always 
> produce the same results (except for possible output column reordering), 
> although the inverse statement won't always be true. 
>   - Re-mapping expression ids to "canonical expression ids" (successively 
> increasing numbers always starting with 1).
>   - Eliminating alias names that are unimportant after analysis completion. 
> Only the names that are necessary to determine the Hive table columns to be 
> scanned are retained.
>   - Reordering columns in projections, grouping/aggregation expressions, etc. 
> This can be done e.g. by using the string representation as a sort key. Union 
> inputs always have to be reordered the same way.
>   - Tree traversal has to happen starting from leaves and progressing towards 
> the root, because we need to already have identified canonical expression ids 
> for children of a node before we can come up with sort keys that would allow 
> to reorder expressions in a node deterministically. This is a bit more 
> complicated for Union nodes.
>   - Special handling for MetastoreRelations. We replace MetastoreRelation 
> with a special class CanonicalMetastoreRelation that uses attributes and 
> partitionKeys as part of its equals() and hashCode() implementation, but the 
> visible attributes and aprtitionKeys are restricted to expression ids that 
> the rest of the query actually needs from that MetastoreRelation.
> An example of logical plans and corresponding canonical logical plans: 
> https://gist.githubusercontent.com/mbautin/ef1317b341211d9606cf/raw
> (2) Tracking LogicalPlan fragments corresponding to SparkPlan fragments. When 
> generating a SparkPlan, we keep an optional reference to a LogicalPlan 
> instance in every node. This allows us to populate the cache with mappings 
> from canonical logical plans of query fragments to the corresponding RDDs 
> generated as part of query execution. Note that there is no new work 
> necessary to generate the RDDs, we are merely utilizing the RDDs that would 
> have been produced as part of SparkPlan execution anyway.
> (3) SparkPlan fragment substitution. After generating a SparkPlan and before 
> calling prepare() or execute() on it, we check if any of its nodes have an 
> associated LogicalPlan that maps to a canonical logical plan matching a cache 
> entry. If so, we substitute a PhysicalRDD (or a new class UnsafePhysicalRDD 
> wrapping an RDD of UnsafeRow) scanning the previously created RDD instead of 
> the current query fragment. If the expected column order differs from what 
> the current SparkPlan fragment produces, we add a projection to reorder the 
> columns. We also add safe/unsafe row conversions as necessary to match the 
> row type that is expected by the parent of the current SparkPlan fragment.
> (4) The execute() method of SparkPlan also needs to perform the cache lookup 
> and substitution described above before producing a new RDD for the current 
> SparkPlan node. The "loading cache" pattern (e.g. as implemented in Guava) 
> allows to reuse query fragments between simultaneously submitted queries: 
> whichever query runs execute() for a particular fragment's canonical logical 
> plan starts producing an RDD first, and if another query has a fragment with 
> the same canonical logical plan, it waits for the RDD to be produced by the 
> first query and inserts it in its SparkPlan instead.
> This kind of query fragment caching will mostly be useful for slowly-changing 
> or static tables. Even with slowly-changing tables, the cache needs to be 
> invalidated when those data set changes take place. One of the following 
> approaches could be used:
> - Application logic could explicitly invalidate the cache when it detects a 
> change
> - We could add a key that encodes the set of files in HDFS present at the 
> moment of LogicalPlan creation to CanonicalMetastoreRelation
> - We could append version numbers to table names that are increased whenever 
> a table is updated. This version number stays in the LogicalPlan but gets 
> removed before doing a Hive table lookup. It could also be used to filter the 
> set of files to scan from the Hive table.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to