This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3e30a98 [SPARK-27674][SQL] the hint should not be dropped after cache lookup 3e30a98 is described below commit 3e30a988102e162f2702ae223312763a0bdc15eb Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Wed May 15 15:47:52 2019 -0700 [SPARK-27674][SQL] the hint should not be dropped after cache lookup ## What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/20365 . #20365 fixed this problem when the hint node is a root node. This PR fixes this problem for all the cases. ## How was this patch tested? a new test Closes #24580 from cloud-fan/bug. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: gatorsmile <gatorsm...@gmail.com> --- .../catalyst/optimizer/EliminateResolvedHint.scala | 2 +- .../apache/spark/sql/execution/CacheManager.scala | 22 +++++---- .../org/apache/spark/sql/CachedTableSuite.scala | 56 ++++++++++++++++------ 3 files changed, 54 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala index 5586690..aebd660 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala @@ -56,7 +56,7 @@ object EliminateResolvedHint extends Rule[LogicalPlan] { * in this method will be cleaned up later by this rule, and may emit warnings depending on the * configurations. */ - private def extractHintsFromPlan(plan: LogicalPlan): (LogicalPlan, Seq[HintInfo]) = { + private[sql] def extractHintsFromPlan(plan: LogicalPlan): (LogicalPlan, Seq[HintInfo]) = { plan match { case h: ResolvedHint => val (plan, hints) = extractHintsFromPlan(h.child) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index b3c253b..a13e6ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -23,7 +23,8 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.Logging import org.apache.spark.sql.{Dataset, SparkSession} -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression} +import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint} import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.command.CommandUtils @@ -212,17 +213,18 @@ class CacheManager extends Logging { def useCachedData(plan: LogicalPlan): LogicalPlan = { val newPlan = plan transformDown { case command: IgnoreCachedData => command - // Do not lookup the cache by hint node. Hint node is special, we should ignore it when - // canonicalizing plans, so that plans which are same except hint can hit the same cache. - // However, we also want to keep the hint info after cache lookup. Here we skip the hint - // node, so that the returned caching plan won't replace the hint node and drop the hint info - // from the original plan. - case hint: ResolvedHint => hint case currentFragment => - lookupCachedData(currentFragment) - .map(_.cachedRepresentation.withOutput(currentFragment.output)) - .getOrElse(currentFragment) + lookupCachedData(currentFragment).map { cached => + // After cache lookup, we should still keep the hints from the input plan. + val hints = EliminateResolvedHint.extractHintsFromPlan(currentFragment)._2 + val cachedPlan = cached.cachedRepresentation.withOutput(currentFragment.output) + // The returned hint list is in top-down order, we should create the hint nodes from + // right to left. + hints.foldRight[LogicalPlan](cachedPlan) { case (hint, p) => + ResolvedHint(p, hint) + } + }.getOrElse(currentFragment) } newPlan transformAllExpressions { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 76350ad..62e77bf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.executor.DataReadMethod.DataReadMethod import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.SubqueryExpression -import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join} +import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, JoinStrategyHint, SHUFFLE_HASH} import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan} import org.apache.spark.sql.execution.columnar._ import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec @@ -938,23 +938,49 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } } - test("Cache should respect the broadcast hint") { - val df = broadcast(spark.range(1000)).cache() - val df2 = spark.range(1000).cache() - df.count() - df2.count() + test("Cache should respect the hint") { + def testHint(df: Dataset[_], expectedHint: JoinStrategyHint): Unit = { + val df2 = spark.range(2000).cache() + df2.count() - // Test the broadcast hint. - val joinPlan = df.join(df2, "id").queryExecution.optimizedPlan - val hint = joinPlan.collect { - case Join(_, _, _, _, hint) => hint + def checkHintExists(): Unit = { + // Test the broadcast hint. + val joinPlan = df.join(df2, "id").queryExecution.optimizedPlan + val joinHints = joinPlan.collect { + case Join(_, _, _, _, hint) => hint + } + assert(joinHints.size == 1) + assert(joinHints(0).leftHint.get.strategy.contains(expectedHint)) + assert(joinHints(0).rightHint.isEmpty) + } + + // Make sure the hint does exist when `df` is not cached. + checkHintExists() + + df.cache() + try { + df.count() + // Make sure the hint still exists when `df` is cached. + checkHintExists() + } finally { + // Clean-up + df.unpersist() + } } - assert(hint.size == 1) - assert(hint(0).leftHint.get.strategy.contains(BROADCAST)) - assert(hint(0).rightHint.isEmpty) - // Clean-up - df.unpersist() + // The hint is the root node + testHint(broadcast(spark.range(1000)), BROADCAST) + // The hint is under subquery alias + testHint(broadcast(spark.range(1000)).as("df"), BROADCAST) + // The hint is under filter + testHint(broadcast(spark.range(1000)).filter($"id" > 100), BROADCAST) + // If there are 2 adjacent hints, the top one takes effect. + testHint( + spark.range(1000) + .hint("SHUFFLE_MERGE") + .hint("SHUFFLE_HASH") + .as("df"), + SHUFFLE_HASH) } test("analyzes column statistics in cached query") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org