Repository: spark
Updated Branches:
  refs/heads/branch-2.2 b1a732fea -> f0e80aa2d


[SPARK-20576][SQL] Support generic hint function in Dataset/DataFrame

## What changes were proposed in this pull request?
We allow users to specify hints (currently only "broadcast" is supported) in 
SQL and DataFrame. However, while SQL has a standard hint format (/*+ ... */), 
DataFrame doesn't have one and sometimes users are confused that they can't 
find how to apply a broadcast hint. This ticket adds a generic hint function on 
DataFrame that allows using the same hint on DataFrames as well as SQL.

As an example, after this patch, the following will apply a broadcast hint on a 
DataFrame using the new hint function:

```
df1.join(df2.hint("broadcast"))
```

## How was this patch tested?
Added a test case in DataFrameJoinSuite.

Author: Reynold Xin <r...@databricks.com>

Closes #17839 from rxin/SPARK-20576.

(cherry picked from commit 527fc5d0c990daaacad4740f62cfe6736609b77b)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0e80aa2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0e80aa2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0e80aa2

Branch: refs/heads/branch-2.2
Commit: f0e80aa2ddee80819ef33ee24eb6a15a73bc02d5
Parents: b1a732f
Author: Reynold Xin <r...@databricks.com>
Authored: Wed May 3 09:22:25 2017 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed May 3 09:22:41 2017 -0700

----------------------------------------------------------------------
 .../sql/catalyst/analysis/ResolveHints.scala      |  8 +++++++-
 .../main/scala/org/apache/spark/sql/Dataset.scala | 16 ++++++++++++++++
 .../org/apache/spark/sql/DataFrameJoinSuite.scala | 18 +++++++++++++++++-
 3 files changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f0e80aa2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
index c4827b8..df688fa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
@@ -86,7 +86,13 @@ object ResolveHints {
 
     def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
       case h: Hint if 
BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
-        applyBroadcastHint(h.child, h.parameters.toSet)
+        if (h.parameters.isEmpty) {
+          // If there is no table alias specified, turn the entire subtree 
into a BroadcastHint.
+          BroadcastHint(h.child)
+        } else {
+          // Otherwise, find within the subtree query plans that should be 
broadcasted.
+          applyBroadcastHint(h.child, h.parameters.toSet)
+        }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f0e80aa2/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 06dd550..5f602dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1074,6 +1074,22 @@ class Dataset[T] private[sql](
   def apply(colName: String): Column = col(colName)
 
   /**
+   * Specifies some hint on the current Dataset. As an example, the following 
code specifies
+   * that one of the plan can be broadcasted:
+   *
+   * {{{
+   *   df1.join(df2.hint("broadcast"))
+   * }}}
+   *
+   * @group basic
+   * @since 2.2.0
+   */
+  @scala.annotation.varargs
+  def hint(name: String, parameters: String*): Dataset[T] = withTypedPlan {
+    Hint(name, parameters, logicalPlan)
+  }
+
+  /**
    * Selects column based on the column name and return it as a [[Column]].
    *
    * @note The column name can also reference to a nested column like `a.b`.

http://git-wip-us.apache.org/repos/asf/spark/blob/f0e80aa2/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 541ffb5..4a52af6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -151,7 +151,7 @@ class DataFrameJoinSuite extends QueryTest with 
SharedSQLContext {
       Row(1, 1, 1, 1) :: Row(2, 1, 2, 2) :: Nil)
   }
 
-  test("broadcast join hint") {
+  test("broadcast join hint using broadcast function") {
     val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
     val df2 = Seq((1, "1"), (2, "2")).toDF("key", "value")
 
@@ -174,6 +174,22 @@ class DataFrameJoinSuite extends QueryTest with 
SharedSQLContext {
     }
   }
 
+  test("broadcast join hint using Dataset.hint") {
+    // make sure a giant join is not broadcastable
+    val plan1 =
+      spark.range(10e10.toLong)
+        .join(spark.range(10e10.toLong), "id")
+        .queryExecution.executedPlan
+    assert(plan1.collect { case p: BroadcastHashJoinExec => p }.size == 0)
+
+    // now with a hint it should be broadcasted
+    val plan2 =
+      spark.range(10e10.toLong)
+        .join(spark.range(10e10.toLong).hint("broadcast"), "id")
+        .queryExecution.executedPlan
+    assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1)
+  }
+
   test("join - outer join conversion") {
     val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
     val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to