Repository: spark
Updated Branches:
  refs/heads/branch-2.0 343c28504 -> b3c491217


[SPARK-15109][SQL] Accept Dataset[_] in joins

## What changes were proposed in this pull request?
This patch changes the join API in Dataset so they can accept any Dataset, 
rather than just DataFrames.

## How was this patch tested?
N/A.

Author: Reynold Xin <r...@databricks.com>

Closes #12886 from rxin/SPARK-15109.

(cherry picked from commit d864c55cf8c92466336e796d0c98d83230e330af)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3c49121
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3c49121
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3c49121

Branch: refs/heads/branch-2.0
Commit: b3c4912173a5d03f104c748b2bb7ea6b148b43c9
Parents: 343c285
Author: Reynold Xin <r...@databricks.com>
Authored: Wed May 4 10:38:27 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed May 4 10:38:35 2016 -0700

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/sql/Dataset.scala   | 12 ++++++------
 .../src/main/scala/org/apache/spark/sql/functions.scala |  4 ++--
 2 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b3c49121/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 31dd64e..c77b138 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -564,7 +564,7 @@ class Dataset[T] private[sql](
    * @group untypedrel
    * @since 2.0.0
    */
-  def join(right: DataFrame): DataFrame = withPlan {
+  def join(right: Dataset[_]): DataFrame = withPlan {
     Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
   }
 
@@ -589,7 +589,7 @@ class Dataset[T] private[sql](
    * @group untypedrel
    * @since 2.0.0
    */
-  def join(right: DataFrame, usingColumn: String): DataFrame = {
+  def join(right: Dataset[_], usingColumn: String): DataFrame = {
     join(right, Seq(usingColumn))
   }
 
@@ -614,7 +614,7 @@ class Dataset[T] private[sql](
    * @group untypedrel
    * @since 2.0.0
    */
-  def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = {
+  def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame = {
     join(right, usingColumns, "inner")
   }
 
@@ -635,7 +635,7 @@ class Dataset[T] private[sql](
    * @group untypedrel
    * @since 2.0.0
    */
-  def join(right: DataFrame, usingColumns: Seq[String], joinType: String): 
DataFrame = {
+  def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): 
DataFrame = {
     // Analyze the self join. The assumption is that the analyzer will 
disambiguate left vs right
     // by creating a new instance for one of the branch.
     val joined = sparkSession.executePlan(
@@ -663,7 +663,7 @@ class Dataset[T] private[sql](
    * @group untypedrel
    * @since 2.0.0
    */
-  def join(right: DataFrame, joinExprs: Column): DataFrame = join(right, 
joinExprs, "inner")
+  def join(right: Dataset[_], joinExprs: Column): DataFrame = join(right, 
joinExprs, "inner")
 
   /**
    * Join with another [[DataFrame]], using the given join expression. The 
following performs
@@ -686,7 +686,7 @@ class Dataset[T] private[sql](
    * @group untypedrel
    * @since 2.0.0
    */
-  def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame = 
{
+  def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame 
= {
     // Note that in this function, we introduce a hack in the case of 
self-join to automatically
     // resolve ambiguous join conditions into ones that might make sense 
[SPARK-6231].
     // Consider this case: df.join(df, df("key") === df("key"))

http://git-wip-us.apache.org/repos/asf/spark/blob/b3c49121/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index fe63c80..3e295c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -931,8 +931,8 @@ object functions {
    * @group normal_funcs
    * @since 1.5.0
    */
-  def broadcast(df: DataFrame): DataFrame = {
-    Dataset.ofRows(df.sparkSession, BroadcastHint(df.logicalPlan))
+  def broadcast[T](df: Dataset[T]): Dataset[T] = {
+    Dataset[T](df.sparkSession, 
BroadcastHint(df.logicalPlan))(df.unresolvedTEncoder)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to