This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new ef42225 [SPARK-32693][SQL][2.4] Compare two dataframes with same schema except nullable property ef42225 is described below commit ef422257fe47ec0e8a0e94c6671c5d6316944620 Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Sun Aug 30 06:52:15 2020 +0900 [SPARK-32693][SQL][2.4] Compare two dataframes with same schema except nullable property ### What changes were proposed in this pull request? This PR changes key data types check in `HashJoin` to use `sameType`. This backports #29555 to branch-2.4. ### Why are the changes needed? Looks at the resolving condition of `SetOperation`, it requires only each left data types should be `sameType` as the right ones. Logically the `EqualTo` expression in equi-join, also requires only left data type `sameType` as right data type. Then `HashJoin` requires left keys data type exactly the same as right keys data type, looks not reasonable. It makes inconsistent results when doing `except` between two dataframes. If two dataframes don't have nested fields, even their field nullable property different, `HashJoin` passes the key type check because it checks field individually so field nullable property is ignored. If two dataframes have nested fields like struct, `HashJoin` fails the key type check because now it compare two struct types and nullable property now affects. ### Does this PR introduce _any_ user-facing change? Yes. Making consistent `except` operation between dataframes. ### How was this patch tested? Unit test. Closes #29576 from viirya/SPARK-32693-2.4. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: Takeshi Yamamuro <yamam...@apache.org> --- .../spark/sql/execution/joins/HashJoin.scala | 7 ++-- .../org/apache/spark/sql/DataFrameJoinSuite.scala | 39 ++++++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index b197bf6..141f388a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -62,8 +62,11 @@ trait HashJoin { } protected lazy val (buildKeys, streamedKeys) = { - require(leftKeys.map(_.dataType) == rightKeys.map(_.dataType), - "Join keys from two sides should have same types") + require(leftKeys.length == rightKeys.length && + leftKeys.map(_.dataType) + .zip(rightKeys.map(_.dataType)) + .forall(types => types._1.sameType(types._2)), + "Join keys from two sides should have same length and types") val lkeys = HashJoin.rewriteKeyExpr(leftKeys).map(BindReferences.bindReference(_, left.output)) val rkeys = HashJoin.rewriteKeyExpr(rightKeys) .map(BindReferences.bindReference(_, right.output)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index e6b30f9..9f77314 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -17,12 +17,15 @@ package org.apache.spark.sql +import scala.collection.JavaConverters._ + import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -295,4 +298,40 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan } } + + test("SPARK-32693: Compare two dataframes with same schema except nullable property") { + val schema1 = StructType( + StructField("a", IntegerType, false) :: + StructField("b", IntegerType, false) :: + StructField("c", IntegerType, false) :: Nil) + val rowSeq1: List[Row] = List(Row(10, 1, 1), Row(10, 50, 2)) + val df1 = spark.createDataFrame(rowSeq1.asJava, schema1) + + val schema2 = StructType( + StructField("a", IntegerType) :: + StructField("b", IntegerType) :: + StructField("c", IntegerType) :: Nil) + val rowSeq2: List[Row] = List(Row(10, 1, 1)) + val df2 = spark.createDataFrame(rowSeq2.asJava, schema2) + + checkAnswer(df1.except(df2), Row(10, 50, 2)) + + val schema3 = StructType( + StructField("a", IntegerType, false) :: + StructField("b", IntegerType, false) :: + StructField("c", IntegerType, false) :: + StructField("d", schema1, false) :: Nil) + val rowSeq3: List[Row] = List(Row(10, 1, 1, Row(10, 1, 1)), Row(10, 50, 2, Row(10, 50, 2))) + val df3 = spark.createDataFrame(rowSeq3.asJava, schema3) + + val schema4 = StructType( + StructField("a", IntegerType) :: + StructField("b", IntegerType) :: + StructField("b", IntegerType) :: + StructField("d", schema2) :: Nil) + val rowSeq4: List[Row] = List(Row(10, 1, 1, Row(10, 1, 1))) + val df4 = spark.createDataFrame(rowSeq4.asJava, schema4) + + checkAnswer(df3.except(df4), Row(10, 50, 2, Row(10, 50, 2))) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org