This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 973283c33ad [SPARK-35739][SQL] Add Java-compatible Dataset.join 
overloads
973283c33ad is described below

commit 973283c33ad908d071550e9be92a4fca76a8a9df
Author: Brandon Dahler <b...@amazon.com>
AuthorDate: Thu Apr 28 18:52:26 2022 -0500

    [SPARK-35739][SQL] Add Java-compatible Dataset.join overloads
    
    ### What changes were proposed in this pull request?
    Adds 3 new syntactic sugar overloads to Dataset's join method as proposed 
in [SPARK-35739](https://issues.apache.org/jira/browse/SPARK-35739).
    
    ### Why are the changes needed?
    Improved development experience for developers using Spark SQL, 
specifically when coding in Java.
    
    Prior to changes the Seq overloads required developers to use less-known 
Java-to-Scala converter methods that made code less readable.  The overloads 
internalize those converter calls for two of the new methods and the third 
method adds a single-item overload that is useful for both Java and Scala.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, the three new overloads technically constitute an API change to the 
Dataset class.  These overloads are net-new and have been commented 
appropriately in line with the existing methods.
    
    ### How was this patch tested?
    Test cases were not added because it is unclear to me where/how syntactic 
sugar overloads fit into the testing suites (if at all).  Happy to add them if 
I can be pointed in the correct direction.
    
    * Changes were tested in Scala via spark-shell.
    * Changes were tested in Java by modifying an example:
      ```
      diff --git 
a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java 
b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
      index 86a9045d8a..342810c1e6 100644
      --- 
a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
      +++ 
b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
       -124,6 +124,10  public class JavaSparkSQLExample {
           // |-- age: long (nullable = true)
           // |-- name: string (nullable = true)
    
      +    df.join(df, new String[] {"age"}).show();
      +    df.join(df, "age", "left").show();
      +    df.join(df, new String[] {"age"}, "left").show();
      +
           // Select only the "name" column
           df.select("name").show();
           // +-------+
      ```
    
    #### Notes
    Re-opening of #33323 and #34923 with comments addressed.
    
    Closes #36343 from brandondahler/features/JavaCompatibleJoinOverloads.
    
    Authored-by: Brandon Dahler <b...@amazon.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 62 +++++++++++++++++++++-
 .../org/apache/spark/sql/DataFrameJoinSuite.scala  | 55 +++++++++++++++++++
 2 files changed, 115 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index e66bfd87337..36b6d6b470d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -946,7 +946,21 @@ class Dataset[T] private[sql](
   }
 
   /**
-   * Inner equi-join with another `DataFrame` using the given columns.
+   * (Java-specific) Inner equi-join with another `DataFrame` using the given 
columns. See the
+   * Scala-specific overload for more details.
+   *
+   * @param right Right side of the join operation.
+   * @param usingColumns Names of the columns to join on. This columns must 
exist on both sides.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def join(right: Dataset[_], usingColumns: Array[String]): DataFrame = {
+    join(right, usingColumns.toSeq)
+  }
+
+  /**
+   * (Scala-specific) Inner equi-join with another `DataFrame` using the given 
columns.
    *
    * Different from other join functions, the join columns will only appear 
once in the output,
    * i.e. similar to SQL's `JOIN USING` syntax.
@@ -971,10 +985,54 @@ class Dataset[T] private[sql](
   }
 
   /**
-   * Equi-join with another `DataFrame` using the given columns. A cross join 
with a predicate
+   * Equi-join with another `DataFrame` using the given column. A cross join 
with a predicate
    * is specified as an inner join. If you would explicitly like to perform a 
cross join use the
    * `crossJoin` method.
    *
+   * Different from other join functions, the join column will only appear 
once in the output,
+   * i.e. similar to SQL's `JOIN USING` syntax.
+   *
+   * @param right Right side of the join operation.
+   * @param usingColumn Name of the column to join on. This column must exist 
on both sides.
+   * @param joinType Type of join to perform. Default `inner`. Must be one of:
+   *                 `inner`, `cross`, `outer`, `full`, `fullouter`, 
`full_outer`, `left`,
+   *                 `leftouter`, `left_outer`, `right`, `rightouter`, 
`right_outer`,
+   *                 `semi`, `leftsemi`, `left_semi`, `anti`, `leftanti`, 
left_anti`.
+   *
+   * @note If you perform a self-join using this function without aliasing the 
input
+   * `DataFrame`s, you will NOT be able to reference any columns after the 
join, since
+   * there is no way to disambiguate which side of the join you would like to 
reference.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def join(right: Dataset[_], usingColumn: String, joinType: String): 
DataFrame = {
+    join(right, Seq(usingColumn), joinType)
+  }
+
+  /**
+   * (Java-specific) Equi-join with another `DataFrame` using the given 
columns. See the
+   * Scala-specific overload for more details.
+   *
+   * @param right Right side of the join operation.
+   * @param usingColumns Names of the columns to join on. This columns must 
exist on both sides.
+   * @param joinType Type of join to perform. Default `inner`. Must be one of:
+   *                 `inner`, `cross`, `outer`, `full`, `fullouter`, 
`full_outer`, `left`,
+   *                 `leftouter`, `left_outer`, `right`, `rightouter`, 
`right_outer`,
+   *                 `semi`, `leftsemi`, `left_semi`, `anti`, `leftanti`, 
left_anti`.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def join(right: Dataset[_], usingColumns: Array[String], joinType: String): 
DataFrame = {
+    join(right, usingColumns.toSeq, joinType)
+  }
+
+  /**
+   * (Scala-specific) Equi-join with another `DataFrame` using the given 
columns. A cross join
+   * with a predicate is specified as an inner join. If you would explicitly 
like to perform a
+   * cross join use the `crossJoin` method.
+   *
    * Different from other join functions, the join columns will only appear 
once in the output,
    * i.e. similar to SQL's `JOIN USING` syntax.
    *
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index f317511639e..5286a70674e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -57,6 +57,15 @@ class DataFrameJoinSuite extends QueryTest
       Row(1, 2, "1", "2") :: Row(2, 3, "2", "3") :: Row(3, 4, "3", "4") :: Nil)
   }
 
+  test("join - join using multiple columns array") {
+    val df = Seq(1, 2, 3).map(i => (i, i + 1, i.toString)).toDF("int", "int2", 
"str")
+    val df2 = Seq(1, 2, 3).map(i => (i, i + 1, (i + 1).toString)).toDF("int", 
"int2", "str")
+
+    checkAnswer(
+      df.join(df2, Array("int", "int2")),
+      Row(1, 2, "1", "2") :: Row(2, 3, "2", "3") :: Row(3, 4, "3", "4") :: Nil)
+  }
+
   test("join - sorted columns not in join's outputSet") {
     val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", 
"str_sort").as("df1")
     val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", 
"str").as("df2")
@@ -73,6 +82,15 @@ class DataFrameJoinSuite extends QueryTest
       Row(5, 5) :: Row(1, 1) :: Nil)
   }
 
+  test("join - join using specifying join type") {
+    val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")
+    val df2 = Seq(1, 2, 3).map(i => (i, (i + 1).toString)).toDF("int", "str")
+
+    checkAnswer(
+      df.join(df2, "int", "inner"),
+      Row(1, "1", "2") :: Row(2, "2", "3") :: Row(3, "3", "4") :: Nil)
+  }
+
   test("join - join using multiple columns and specifying join type") {
     val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str")
     val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str")
@@ -110,6 +128,43 @@ class DataFrameJoinSuite extends QueryTest
       Row(3, "3", 4) :: Nil)
   }
 
+  test("join - join using multiple columns array and specifying join type") {
+    val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str")
+    val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str")
+
+    checkAnswer(
+      df.join(df2, Array("int", "str"), "inner"),
+      Row(1, "1", 2, 3) :: Nil)
+
+    checkAnswer(
+      df.join(df2, Array("int", "str"), "left"),
+      Row(1, "1", 2, 3) :: Row(3, "3", 4, null) :: Nil)
+
+    checkAnswer(
+      df.join(df2, Array("int", "str"), "right"),
+      Row(1, "1", 2, 3) :: Row(5, "5", null, 6) :: Nil)
+
+    checkAnswer(
+      df.join(df2, Array("int", "str"), "outer"),
+      Row(1, "1", 2, 3) :: Row(3, "3", 4, null) :: Row(5, "5", null, 6) :: Nil)
+
+    checkAnswer(
+      df.join(df2, Array("int", "str"), "left_semi"),
+      Row(1, "1", 2) :: Nil)
+
+    checkAnswer(
+      df.join(df2, Array("int", "str"), "semi"),
+      Row(1, "1", 2) :: Nil)
+
+    checkAnswer(
+      df.join(df2, Array("int", "str"), "left_anti"),
+      Row(3, "3", 4) :: Nil)
+
+    checkAnswer(
+      df.join(df2, Array("int", "str"), "anti"),
+      Row(3, "3", 4) :: Nil)
+  }
+
   test("join - cross join") {
     val df1 = Seq((1, "1"), (3, "3")).toDF("int", "str")
     val df2 = Seq((2, "2"), (4, "4")).toDF("int", "str")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to