[ https://issues.apache.org/jira/browse/SPARK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15129534#comment-15129534 ]
Wenchen Fan commented on SPARK-11838: ------------------------------------- Can the new `StreamFrame` satisfy this requirement(avoid re-computing for slowly changing tables)? cc [~zsxwing] > Spark SQL query fragment RDD reuse > ---------------------------------- > > Key: SPARK-11838 > URL: https://issues.apache.org/jira/browse/SPARK-11838 > Project: Spark > Issue Type: Improvement > Components: SQL > Reporter: Mikhail Bautin > > With many analytical Spark SQL workloads against slowly changing tables, > successive queries frequently share fragments that produce the same result. > Instead of re-computing those fragments for every query, it makes sense to > detect similar fragments and substitute RDDs previously created for matching > SparkPlan fragments into every new SparkPlan at the execution time whenever > possible. Even if no RDDs are persist()-ed to memory/disk/off-heap memory, > many stages can still be skipped due to map output files being present on > executor nodes. > The implementation involves the following steps: > (1) Logical plan "canonicalization". > Logical plans mapping to the same "canonical" logical plan should always > produce the same results (except for possible output column reordering), > although the inverse statement won't always be true. > - Re-mapping expression ids to "canonical expression ids" (successively > increasing numbers always starting with 1). > - Eliminating alias names that are unimportant after analysis completion. > Only the names that are necessary to determine the Hive table columns to be > scanned are retained. > - Reordering columns in projections, grouping/aggregation expressions, etc. > This can be done e.g. by using the string representation as a sort key. Union > inputs always have to be reordered the same way. > - Tree traversal has to happen starting from leaves and progressing towards > the root, because we need to already have identified canonical expression ids > for children of a node before we can come up with sort keys that would allow > to reorder expressions in a node deterministically. This is a bit more > complicated for Union nodes. > - Special handling for MetastoreRelations. We replace MetastoreRelation > with a special class CanonicalMetastoreRelation that uses attributes and > partitionKeys as part of its equals() and hashCode() implementation, but the > visible attributes and aprtitionKeys are restricted to expression ids that > the rest of the query actually needs from that MetastoreRelation. > An example of logical plans and corresponding canonical logical plans: > https://gist.githubusercontent.com/mbautin/ef1317b341211d9606cf/raw > (2) Tracking LogicalPlan fragments corresponding to SparkPlan fragments. When > generating a SparkPlan, we keep an optional reference to a LogicalPlan > instance in every node. This allows us to populate the cache with mappings > from canonical logical plans of query fragments to the corresponding RDDs > generated as part of query execution. Note that there is no new work > necessary to generate the RDDs, we are merely utilizing the RDDs that would > have been produced as part of SparkPlan execution anyway. > (3) SparkPlan fragment substitution. After generating a SparkPlan and before > calling prepare() or execute() on it, we check if any of its nodes have an > associated LogicalPlan that maps to a canonical logical plan matching a cache > entry. If so, we substitute a PhysicalRDD (or a new class UnsafePhysicalRDD > wrapping an RDD of UnsafeRow) scanning the previously created RDD instead of > the current query fragment. If the expected column order differs from what > the current SparkPlan fragment produces, we add a projection to reorder the > columns. We also add safe/unsafe row conversions as necessary to match the > row type that is expected by the parent of the current SparkPlan fragment. > (4) The execute() method of SparkPlan also needs to perform the cache lookup > and substitution described above before producing a new RDD for the current > SparkPlan node. The "loading cache" pattern (e.g. as implemented in Guava) > allows to reuse query fragments between simultaneously submitted queries: > whichever query runs execute() for a particular fragment's canonical logical > plan starts producing an RDD first, and if another query has a fragment with > the same canonical logical plan, it waits for the RDD to be produced by the > first query and inserts it in its SparkPlan instead. > This kind of query fragment caching will mostly be useful for slowly-changing > or static tables. Even with slowly-changing tables, the cache needs to be > invalidated when those data set changes take place. One of the following > approaches could be used: > - Application logic could explicitly invalidate the cache when it detects a > change > - We could add a key that encodes the set of files in HDFS present at the > moment of LogicalPlan creation to CanonicalMetastoreRelation > - We could append version numbers to table names that are increased whenever > a table is updated. This version number stays in the LogicalPlan but gets > removed before doing a Hive table lookup. It could also be used to filter the > set of files to scan from the Hive table. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org