This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 973283c33ad [SPARK-35739][SQL] Add Java-compatible Dataset.join overloads 973283c33ad is described below commit 973283c33ad908d071550e9be92a4fca76a8a9df Author: Brandon Dahler <b...@amazon.com> AuthorDate: Thu Apr 28 18:52:26 2022 -0500 [SPARK-35739][SQL] Add Java-compatible Dataset.join overloads ### What changes were proposed in this pull request? Adds 3 new syntactic sugar overloads to Dataset's join method as proposed in [SPARK-35739](https://issues.apache.org/jira/browse/SPARK-35739). ### Why are the changes needed? Improved development experience for developers using Spark SQL, specifically when coding in Java. Prior to changes the Seq overloads required developers to use less-known Java-to-Scala converter methods that made code less readable. The overloads internalize those converter calls for two of the new methods and the third method adds a single-item overload that is useful for both Java and Scala. ### Does this PR introduce _any_ user-facing change? Yes, the three new overloads technically constitute an API change to the Dataset class. These overloads are net-new and have been commented appropriately in line with the existing methods. ### How was this patch tested? Test cases were not added because it is unclear to me where/how syntactic sugar overloads fit into the testing suites (if at all). Happy to add them if I can be pointed in the correct direction. * Changes were tested in Scala via spark-shell. * Changes were tested in Java by modifying an example: ``` diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java index 86a9045d8a..342810c1e6 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java -124,6 +124,10 public class JavaSparkSQLExample { // |-- age: long (nullable = true) // |-- name: string (nullable = true) + df.join(df, new String[] {"age"}).show(); + df.join(df, "age", "left").show(); + df.join(df, new String[] {"age"}, "left").show(); + // Select only the "name" column df.select("name").show(); // +-------+ ``` #### Notes Re-opening of #33323 and #34923 with comments addressed. Closes #36343 from brandondahler/features/JavaCompatibleJoinOverloads. Authored-by: Brandon Dahler <b...@amazon.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../main/scala/org/apache/spark/sql/Dataset.scala | 62 +++++++++++++++++++++- .../org/apache/spark/sql/DataFrameJoinSuite.scala | 55 +++++++++++++++++++ 2 files changed, 115 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index e66bfd87337..36b6d6b470d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -946,7 +946,21 @@ class Dataset[T] private[sql]( } /** - * Inner equi-join with another `DataFrame` using the given columns. + * (Java-specific) Inner equi-join with another `DataFrame` using the given columns. See the + * Scala-specific overload for more details. + * + * @param right Right side of the join operation. + * @param usingColumns Names of the columns to join on. This columns must exist on both sides. + * + * @group untypedrel + * @since 3.4.0 + */ + def join(right: Dataset[_], usingColumns: Array[String]): DataFrame = { + join(right, usingColumns.toSeq) + } + + /** + * (Scala-specific) Inner equi-join with another `DataFrame` using the given columns. * * Different from other join functions, the join columns will only appear once in the output, * i.e. similar to SQL's `JOIN USING` syntax. @@ -971,10 +985,54 @@ class Dataset[T] private[sql]( } /** - * Equi-join with another `DataFrame` using the given columns. A cross join with a predicate + * Equi-join with another `DataFrame` using the given column. A cross join with a predicate * is specified as an inner join. If you would explicitly like to perform a cross join use the * `crossJoin` method. * + * Different from other join functions, the join column will only appear once in the output, + * i.e. similar to SQL's `JOIN USING` syntax. + * + * @param right Right side of the join operation. + * @param usingColumn Name of the column to join on. This column must exist on both sides. + * @param joinType Type of join to perform. Default `inner`. Must be one of: + * `inner`, `cross`, `outer`, `full`, `fullouter`, `full_outer`, `left`, + * `leftouter`, `left_outer`, `right`, `rightouter`, `right_outer`, + * `semi`, `leftsemi`, `left_semi`, `anti`, `leftanti`, left_anti`. + * + * @note If you perform a self-join using this function without aliasing the input + * `DataFrame`s, you will NOT be able to reference any columns after the join, since + * there is no way to disambiguate which side of the join you would like to reference. + * + * @group untypedrel + * @since 3.4.0 + */ + def join(right: Dataset[_], usingColumn: String, joinType: String): DataFrame = { + join(right, Seq(usingColumn), joinType) + } + + /** + * (Java-specific) Equi-join with another `DataFrame` using the given columns. See the + * Scala-specific overload for more details. + * + * @param right Right side of the join operation. + * @param usingColumns Names of the columns to join on. This columns must exist on both sides. + * @param joinType Type of join to perform. Default `inner`. Must be one of: + * `inner`, `cross`, `outer`, `full`, `fullouter`, `full_outer`, `left`, + * `leftouter`, `left_outer`, `right`, `rightouter`, `right_outer`, + * `semi`, `leftsemi`, `left_semi`, `anti`, `leftanti`, left_anti`. + * + * @group untypedrel + * @since 3.4.0 + */ + def join(right: Dataset[_], usingColumns: Array[String], joinType: String): DataFrame = { + join(right, usingColumns.toSeq, joinType) + } + + /** + * (Scala-specific) Equi-join with another `DataFrame` using the given columns. A cross join + * with a predicate is specified as an inner join. If you would explicitly like to perform a + * cross join use the `crossJoin` method. + * * Different from other join functions, the join columns will only appear once in the output, * i.e. similar to SQL's `JOIN USING` syntax. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index f317511639e..5286a70674e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -57,6 +57,15 @@ class DataFrameJoinSuite extends QueryTest Row(1, 2, "1", "2") :: Row(2, 3, "2", "3") :: Row(3, 4, "3", "4") :: Nil) } + test("join - join using multiple columns array") { + val df = Seq(1, 2, 3).map(i => (i, i + 1, i.toString)).toDF("int", "int2", "str") + val df2 = Seq(1, 2, 3).map(i => (i, i + 1, (i + 1).toString)).toDF("int", "int2", "str") + + checkAnswer( + df.join(df2, Array("int", "int2")), + Row(1, 2, "1", "2") :: Row(2, 3, "2", "3") :: Row(3, 4, "3", "4") :: Nil) + } + test("join - sorted columns not in join's outputSet") { val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str_sort").as("df1") val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("df2") @@ -73,6 +82,15 @@ class DataFrameJoinSuite extends QueryTest Row(5, 5) :: Row(1, 1) :: Nil) } + test("join - join using specifying join type") { + val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str") + val df2 = Seq(1, 2, 3).map(i => (i, (i + 1).toString)).toDF("int", "str") + + checkAnswer( + df.join(df2, "int", "inner"), + Row(1, "1", "2") :: Row(2, "2", "3") :: Row(3, "3", "4") :: Nil) + } + test("join - join using multiple columns and specifying join type") { val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str") val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str") @@ -110,6 +128,43 @@ class DataFrameJoinSuite extends QueryTest Row(3, "3", 4) :: Nil) } + test("join - join using multiple columns array and specifying join type") { + val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str") + val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str") + + checkAnswer( + df.join(df2, Array("int", "str"), "inner"), + Row(1, "1", 2, 3) :: Nil) + + checkAnswer( + df.join(df2, Array("int", "str"), "left"), + Row(1, "1", 2, 3) :: Row(3, "3", 4, null) :: Nil) + + checkAnswer( + df.join(df2, Array("int", "str"), "right"), + Row(1, "1", 2, 3) :: Row(5, "5", null, 6) :: Nil) + + checkAnswer( + df.join(df2, Array("int", "str"), "outer"), + Row(1, "1", 2, 3) :: Row(3, "3", 4, null) :: Row(5, "5", null, 6) :: Nil) + + checkAnswer( + df.join(df2, Array("int", "str"), "left_semi"), + Row(1, "1", 2) :: Nil) + + checkAnswer( + df.join(df2, Array("int", "str"), "semi"), + Row(1, "1", 2) :: Nil) + + checkAnswer( + df.join(df2, Array("int", "str"), "left_anti"), + Row(3, "3", 4) :: Nil) + + checkAnswer( + df.join(df2, Array("int", "str"), "anti"), + Row(3, "3", 4) :: Nil) + } + test("join - cross join") { val df1 = Seq((1, "1"), (3, "3")).toDF("int", "str") val df2 = Seq((2, "2"), (4, "4")).toDF("int", "str") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org