Repository: spark Updated Branches: refs/heads/branch-2.2 b1a732fea -> f0e80aa2d
[SPARK-20576][SQL] Support generic hint function in Dataset/DataFrame ## What changes were proposed in this pull request? We allow users to specify hints (currently only "broadcast" is supported) in SQL and DataFrame. However, while SQL has a standard hint format (/*+ ... */), DataFrame doesn't have one and sometimes users are confused that they can't find how to apply a broadcast hint. This ticket adds a generic hint function on DataFrame that allows using the same hint on DataFrames as well as SQL. As an example, after this patch, the following will apply a broadcast hint on a DataFrame using the new hint function: ``` df1.join(df2.hint("broadcast")) ``` ## How was this patch tested? Added a test case in DataFrameJoinSuite. Author: Reynold Xin <r...@databricks.com> Closes #17839 from rxin/SPARK-20576. (cherry picked from commit 527fc5d0c990daaacad4740f62cfe6736609b77b) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0e80aa2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0e80aa2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0e80aa2 Branch: refs/heads/branch-2.2 Commit: f0e80aa2ddee80819ef33ee24eb6a15a73bc02d5 Parents: b1a732f Author: Reynold Xin <r...@databricks.com> Authored: Wed May 3 09:22:25 2017 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Wed May 3 09:22:41 2017 -0700 ---------------------------------------------------------------------- .../sql/catalyst/analysis/ResolveHints.scala | 8 +++++++- .../main/scala/org/apache/spark/sql/Dataset.scala | 16 ++++++++++++++++ .../org/apache/spark/sql/DataFrameJoinSuite.scala | 18 +++++++++++++++++- 3 files changed, 40 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f0e80aa2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index c4827b8..df688fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -86,7 +86,13 @@ object ResolveHints { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => - applyBroadcastHint(h.child, h.parameters.toSet) + if (h.parameters.isEmpty) { + // If there is no table alias specified, turn the entire subtree into a BroadcastHint. + BroadcastHint(h.child) + } else { + // Otherwise, find within the subtree query plans that should be broadcasted. + applyBroadcastHint(h.child, h.parameters.toSet) + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/f0e80aa2/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 06dd550..5f602dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1074,6 +1074,22 @@ class Dataset[T] private[sql]( def apply(colName: String): Column = col(colName) /** + * Specifies some hint on the current Dataset. As an example, the following code specifies + * that one of the plan can be broadcasted: + * + * {{{ + * df1.join(df2.hint("broadcast")) + * }}} + * + * @group basic + * @since 2.2.0 + */ + @scala.annotation.varargs + def hint(name: String, parameters: String*): Dataset[T] = withTypedPlan { + Hint(name, parameters, logicalPlan) + } + + /** * Selects column based on the column name and return it as a [[Column]]. * * @note The column name can also reference to a nested column like `a.b`. http://git-wip-us.apache.org/repos/asf/spark/blob/f0e80aa2/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 541ffb5..4a52af6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -151,7 +151,7 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { Row(1, 1, 1, 1) :: Row(2, 1, 2, 2) :: Nil) } - test("broadcast join hint") { + test("broadcast join hint using broadcast function") { val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") val df2 = Seq((1, "1"), (2, "2")).toDF("key", "value") @@ -174,6 +174,22 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { } } + test("broadcast join hint using Dataset.hint") { + // make sure a giant join is not broadcastable + val plan1 = + spark.range(10e10.toLong) + .join(spark.range(10e10.toLong), "id") + .queryExecution.executedPlan + assert(plan1.collect { case p: BroadcastHashJoinExec => p }.size == 0) + + // now with a hint it should be broadcasted + val plan2 = + spark.range(10e10.toLong) + .join(spark.range(10e10.toLong).hint("broadcast"), "id") + .queryExecution.executedPlan + assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1) + } + test("join - outer join conversion") { val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a") val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org