Repository: spark
Updated Branches:
  refs/heads/branch-2.3 f8f522c01 -> 851c30386


[SPARK-23192][SQL] Keep the Hint after Using Cached Data

## What changes were proposed in this pull request?

The hint of the plan segment is lost, if the plan segment is replaced by the 
cached data.

```Scala
      val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", 
"value")
      val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", 
"value")
      df2.cache()
      val df3 = df1.join(broadcast(df2), Seq("key"), "inner")
```

This PR is to fix it.

## How was this patch tested?
Added a test

Author: gatorsmile <gatorsm...@gmail.com>

Closes #20365 from gatorsmile/fixBroadcastHintloss.

(cherry picked from commit 613c290336e3826111164c24319f66774b1f65a3)
Signed-off-by: gatorsmile <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/851c3038
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/851c3038
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/851c3038

Branch: refs/heads/branch-2.3
Commit: 851c303867eb54405f6508919619debe84708933
Parents: f8f522c
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Tue Jan 23 14:56:28 2018 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Tue Jan 23 14:56:37 2018 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/CacheManager.scala  | 12 ++++++++----
 .../spark/sql/execution/joins/BroadcastJoinSuite.scala | 13 +++++++++++++
 2 files changed, 21 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/851c3038/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index b05fe49..432eb59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ResolvedHint}
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.storage.StorageLevel
@@ -170,9 +170,13 @@ class CacheManager extends Logging {
   def useCachedData(plan: LogicalPlan): LogicalPlan = {
     val newPlan = plan transformDown {
       case currentFragment =>
-        lookupCachedData(currentFragment)
-          .map(_.cachedRepresentation.withOutput(currentFragment.output))
-          .getOrElse(currentFragment)
+        lookupCachedData(currentFragment).map { cached =>
+          val cachedPlan = 
cached.cachedRepresentation.withOutput(currentFragment.output)
+          currentFragment match {
+            case hint: ResolvedHint => ResolvedHint(cachedPlan, hint.hints)
+            case _ => cachedPlan
+          }
+        }.getOrElse(currentFragment)
     }
 
     newPlan transformAllExpressions {

http://git-wip-us.apache.org/repos/asf/spark/blob/851c3038/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 0bcd54e..1704bc8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -109,6 +109,19 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
     }
   }
 
+  test("broadcast hint is retained after using the cached data") {
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+      val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", 
"value")
+      val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", 
"value")
+      df2.cache()
+      val df3 = df1.join(broadcast(df2), Seq("key"), "inner")
+      val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect {
+        case b: BroadcastHashJoinExec => b
+      }.size
+      assert(numBroadCastHashJoin === 1)
+    }
+  }
+
   test("broadcast hint isn't propagated after a join") {
     withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
       val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", 
"value")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to