[spark] branch branch-2.4 updated: [SPARK-32693][SQL][2.4] Compare two dataframes with same schema except nullable property

2020-08-29 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new ef42225  [SPARK-32693][SQL][2.4] Compare two dataframes with same 
schema except nullable property
ef42225 is described below

commit ef422257fe47ec0e8a0e94c6671c5d6316944620
Author: Liang-Chi Hsieh 
AuthorDate: Sun Aug 30 06:52:15 2020 +0900

[SPARK-32693][SQL][2.4] Compare two dataframes with same schema except 
nullable property

### What changes were proposed in this pull request?

This PR changes key data types check in `HashJoin` to use `sameType`. This 
backports #29555 to branch-2.4.

### Why are the changes needed?

Looks at the resolving condition of `SetOperation`, it requires only each 
left data types should be `sameType` as the right ones. Logically the `EqualTo` 
expression in equi-join, also requires only left data type `sameType` as right 
data type. Then `HashJoin` requires left keys data type exactly the same as 
right keys data type, looks not reasonable.

It makes inconsistent results when doing `except` between two dataframes.

If two dataframes don't have nested fields, even their field nullable 
property different, `HashJoin` passes the key type check because it checks 
field individually so field nullable property is ignored.

If two dataframes have nested fields like struct, `HashJoin` fails the key 
type check because now it compare two struct types and nullable property now 
affects.

### Does this PR introduce _any_ user-facing change?

Yes. Making consistent `except` operation between dataframes.

### How was this patch tested?

Unit test.

Closes #29576 from viirya/SPARK-32693-2.4.

Authored-by: Liang-Chi Hsieh 
Signed-off-by: Takeshi Yamamuro 
---
 .../spark/sql/execution/joins/HashJoin.scala   |  7 ++--
 .../org/apache/spark/sql/DataFrameJoinSuite.scala  | 39 ++
 2 files changed, 44 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index b197bf6..141f388a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -62,8 +62,11 @@ trait HashJoin {
   }
 
   protected lazy val (buildKeys, streamedKeys) = {
-require(leftKeys.map(_.dataType) == rightKeys.map(_.dataType),
-  "Join keys from two sides should have same types")
+require(leftKeys.length == rightKeys.length &&
+  leftKeys.map(_.dataType)
+.zip(rightKeys.map(_.dataType))
+.forall(types => types._1.sameType(types._2)),
+  "Join keys from two sides should have same length and types")
 val lkeys = 
HashJoin.rewriteKeyExpr(leftKeys).map(BindReferences.bindReference(_, 
left.output))
 val rkeys = HashJoin.rewriteKeyExpr(rightKeys)
   .map(BindReferences.bindReference(_, right.output))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index e6b30f9..9f77314 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.plans.logical.Join
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
 
 class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
@@ -295,4 +298,40 @@ class DataFrameJoinSuite extends QueryTest with 
SharedSQLContext {
   df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan
 }
   }
+
+  test("SPARK-32693: Compare two dataframes with same schema except nullable 
property") {
+val schema1 = StructType(
+  StructField("a", IntegerType, false) ::
+StructField("b", IntegerType, false) ::
+StructField("c", IntegerType, false) :: Nil)
+val rowSeq1: List[Row] = List(Row(10, 1, 1), Row(10, 50, 2))
+val df1 = spark.createDataFrame(rowSeq1.asJava, schema1)
+
+val schema2 = StructType(
+  StructField("a", IntegerType) ::
+StructField("b", IntegerType) ::
+StructField("c", IntegerType) :: Nil)
+val rowSeq2: List[Row] = List(Row(10, 1, 1))
+val df2 = spark.createDataFrame(rowSeq2.asJava, schema2)
+
+checkAnswer(d

[spark] branch branch-2.4 updated: [SPARK-32693][SQL][2.4] Compare two dataframes with same schema except nullable property

2020-08-29 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new ef42225  [SPARK-32693][SQL][2.4] Compare two dataframes with same 
schema except nullable property
ef42225 is described below

commit ef422257fe47ec0e8a0e94c6671c5d6316944620
Author: Liang-Chi Hsieh 
AuthorDate: Sun Aug 30 06:52:15 2020 +0900

[SPARK-32693][SQL][2.4] Compare two dataframes with same schema except 
nullable property

### What changes were proposed in this pull request?

This PR changes key data types check in `HashJoin` to use `sameType`. This 
backports #29555 to branch-2.4.

### Why are the changes needed?

Looks at the resolving condition of `SetOperation`, it requires only each 
left data types should be `sameType` as the right ones. Logically the `EqualTo` 
expression in equi-join, also requires only left data type `sameType` as right 
data type. Then `HashJoin` requires left keys data type exactly the same as 
right keys data type, looks not reasonable.

It makes inconsistent results when doing `except` between two dataframes.

If two dataframes don't have nested fields, even their field nullable 
property different, `HashJoin` passes the key type check because it checks 
field individually so field nullable property is ignored.

If two dataframes have nested fields like struct, `HashJoin` fails the key 
type check because now it compare two struct types and nullable property now 
affects.

### Does this PR introduce _any_ user-facing change?

Yes. Making consistent `except` operation between dataframes.

### How was this patch tested?

Unit test.

Closes #29576 from viirya/SPARK-32693-2.4.

Authored-by: Liang-Chi Hsieh 
Signed-off-by: Takeshi Yamamuro 
---
 .../spark/sql/execution/joins/HashJoin.scala   |  7 ++--
 .../org/apache/spark/sql/DataFrameJoinSuite.scala  | 39 ++
 2 files changed, 44 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index b197bf6..141f388a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -62,8 +62,11 @@ trait HashJoin {
   }
 
   protected lazy val (buildKeys, streamedKeys) = {
-require(leftKeys.map(_.dataType) == rightKeys.map(_.dataType),
-  "Join keys from two sides should have same types")
+require(leftKeys.length == rightKeys.length &&
+  leftKeys.map(_.dataType)
+.zip(rightKeys.map(_.dataType))
+.forall(types => types._1.sameType(types._2)),
+  "Join keys from two sides should have same length and types")
 val lkeys = 
HashJoin.rewriteKeyExpr(leftKeys).map(BindReferences.bindReference(_, 
left.output))
 val rkeys = HashJoin.rewriteKeyExpr(rightKeys)
   .map(BindReferences.bindReference(_, right.output))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index e6b30f9..9f77314 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.plans.logical.Join
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
 
 class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
@@ -295,4 +298,40 @@ class DataFrameJoinSuite extends QueryTest with 
SharedSQLContext {
   df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan
 }
   }
+
+  test("SPARK-32693: Compare two dataframes with same schema except nullable 
property") {
+val schema1 = StructType(
+  StructField("a", IntegerType, false) ::
+StructField("b", IntegerType, false) ::
+StructField("c", IntegerType, false) :: Nil)
+val rowSeq1: List[Row] = List(Row(10, 1, 1), Row(10, 50, 2))
+val df1 = spark.createDataFrame(rowSeq1.asJava, schema1)
+
+val schema2 = StructType(
+  StructField("a", IntegerType) ::
+StructField("b", IntegerType) ::
+StructField("c", IntegerType) :: Nil)
+val rowSeq2: List[Row] = List(Row(10, 1, 1))
+val df2 = spark.createDataFrame(rowSeq2.asJava, schema2)
+
+checkAnswer(d

[spark] branch branch-2.4 updated: [SPARK-32693][SQL][2.4] Compare two dataframes with same schema except nullable property

2020-08-29 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new ef42225  [SPARK-32693][SQL][2.4] Compare two dataframes with same 
schema except nullable property
ef42225 is described below

commit ef422257fe47ec0e8a0e94c6671c5d6316944620
Author: Liang-Chi Hsieh 
AuthorDate: Sun Aug 30 06:52:15 2020 +0900

[SPARK-32693][SQL][2.4] Compare two dataframes with same schema except 
nullable property

### What changes were proposed in this pull request?

This PR changes key data types check in `HashJoin` to use `sameType`. This 
backports #29555 to branch-2.4.

### Why are the changes needed?

Looks at the resolving condition of `SetOperation`, it requires only each 
left data types should be `sameType` as the right ones. Logically the `EqualTo` 
expression in equi-join, also requires only left data type `sameType` as right 
data type. Then `HashJoin` requires left keys data type exactly the same as 
right keys data type, looks not reasonable.

It makes inconsistent results when doing `except` between two dataframes.

If two dataframes don't have nested fields, even their field nullable 
property different, `HashJoin` passes the key type check because it checks 
field individually so field nullable property is ignored.

If two dataframes have nested fields like struct, `HashJoin` fails the key 
type check because now it compare two struct types and nullable property now 
affects.

### Does this PR introduce _any_ user-facing change?

Yes. Making consistent `except` operation between dataframes.

### How was this patch tested?

Unit test.

Closes #29576 from viirya/SPARK-32693-2.4.

Authored-by: Liang-Chi Hsieh 
Signed-off-by: Takeshi Yamamuro 
---
 .../spark/sql/execution/joins/HashJoin.scala   |  7 ++--
 .../org/apache/spark/sql/DataFrameJoinSuite.scala  | 39 ++
 2 files changed, 44 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index b197bf6..141f388a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -62,8 +62,11 @@ trait HashJoin {
   }
 
   protected lazy val (buildKeys, streamedKeys) = {
-require(leftKeys.map(_.dataType) == rightKeys.map(_.dataType),
-  "Join keys from two sides should have same types")
+require(leftKeys.length == rightKeys.length &&
+  leftKeys.map(_.dataType)
+.zip(rightKeys.map(_.dataType))
+.forall(types => types._1.sameType(types._2)),
+  "Join keys from two sides should have same length and types")
 val lkeys = 
HashJoin.rewriteKeyExpr(leftKeys).map(BindReferences.bindReference(_, 
left.output))
 val rkeys = HashJoin.rewriteKeyExpr(rightKeys)
   .map(BindReferences.bindReference(_, right.output))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index e6b30f9..9f77314 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.plans.logical.Join
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
 
 class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
@@ -295,4 +298,40 @@ class DataFrameJoinSuite extends QueryTest with 
SharedSQLContext {
   df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan
 }
   }
+
+  test("SPARK-32693: Compare two dataframes with same schema except nullable 
property") {
+val schema1 = StructType(
+  StructField("a", IntegerType, false) ::
+StructField("b", IntegerType, false) ::
+StructField("c", IntegerType, false) :: Nil)
+val rowSeq1: List[Row] = List(Row(10, 1, 1), Row(10, 50, 2))
+val df1 = spark.createDataFrame(rowSeq1.asJava, schema1)
+
+val schema2 = StructType(
+  StructField("a", IntegerType) ::
+StructField("b", IntegerType) ::
+StructField("c", IntegerType) :: Nil)
+val rowSeq2: List[Row] = List(Row(10, 1, 1))
+val df2 = spark.createDataFrame(rowSeq2.asJava, schema2)
+
+checkAnswer(d

[spark] branch branch-2.4 updated: [SPARK-32693][SQL][2.4] Compare two dataframes with same schema except nullable property

2020-08-29 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new ef42225  [SPARK-32693][SQL][2.4] Compare two dataframes with same 
schema except nullable property
ef42225 is described below

commit ef422257fe47ec0e8a0e94c6671c5d6316944620
Author: Liang-Chi Hsieh 
AuthorDate: Sun Aug 30 06:52:15 2020 +0900

[SPARK-32693][SQL][2.4] Compare two dataframes with same schema except 
nullable property

### What changes were proposed in this pull request?

This PR changes key data types check in `HashJoin` to use `sameType`. This 
backports #29555 to branch-2.4.

### Why are the changes needed?

Looks at the resolving condition of `SetOperation`, it requires only each 
left data types should be `sameType` as the right ones. Logically the `EqualTo` 
expression in equi-join, also requires only left data type `sameType` as right 
data type. Then `HashJoin` requires left keys data type exactly the same as 
right keys data type, looks not reasonable.

It makes inconsistent results when doing `except` between two dataframes.

If two dataframes don't have nested fields, even their field nullable 
property different, `HashJoin` passes the key type check because it checks 
field individually so field nullable property is ignored.

If two dataframes have nested fields like struct, `HashJoin` fails the key 
type check because now it compare two struct types and nullable property now 
affects.

### Does this PR introduce _any_ user-facing change?

Yes. Making consistent `except` operation between dataframes.

### How was this patch tested?

Unit test.

Closes #29576 from viirya/SPARK-32693-2.4.

Authored-by: Liang-Chi Hsieh 
Signed-off-by: Takeshi Yamamuro 
---
 .../spark/sql/execution/joins/HashJoin.scala   |  7 ++--
 .../org/apache/spark/sql/DataFrameJoinSuite.scala  | 39 ++
 2 files changed, 44 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index b197bf6..141f388a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -62,8 +62,11 @@ trait HashJoin {
   }
 
   protected lazy val (buildKeys, streamedKeys) = {
-require(leftKeys.map(_.dataType) == rightKeys.map(_.dataType),
-  "Join keys from two sides should have same types")
+require(leftKeys.length == rightKeys.length &&
+  leftKeys.map(_.dataType)
+.zip(rightKeys.map(_.dataType))
+.forall(types => types._1.sameType(types._2)),
+  "Join keys from two sides should have same length and types")
 val lkeys = 
HashJoin.rewriteKeyExpr(leftKeys).map(BindReferences.bindReference(_, 
left.output))
 val rkeys = HashJoin.rewriteKeyExpr(rightKeys)
   .map(BindReferences.bindReference(_, right.output))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index e6b30f9..9f77314 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.plans.logical.Join
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
 
 class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
@@ -295,4 +298,40 @@ class DataFrameJoinSuite extends QueryTest with 
SharedSQLContext {
   df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan
 }
   }
+
+  test("SPARK-32693: Compare two dataframes with same schema except nullable 
property") {
+val schema1 = StructType(
+  StructField("a", IntegerType, false) ::
+StructField("b", IntegerType, false) ::
+StructField("c", IntegerType, false) :: Nil)
+val rowSeq1: List[Row] = List(Row(10, 1, 1), Row(10, 50, 2))
+val df1 = spark.createDataFrame(rowSeq1.asJava, schema1)
+
+val schema2 = StructType(
+  StructField("a", IntegerType) ::
+StructField("b", IntegerType) ::
+StructField("c", IntegerType) :: Nil)
+val rowSeq2: List[Row] = List(Row(10, 1, 1))
+val df2 = spark.createDataFrame(rowSeq2.asJava, schema2)
+
+checkAnswer(d

[spark] branch branch-2.4 updated: [SPARK-32693][SQL][2.4] Compare two dataframes with same schema except nullable property

2020-08-29 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new ef42225  [SPARK-32693][SQL][2.4] Compare two dataframes with same 
schema except nullable property
ef42225 is described below

commit ef422257fe47ec0e8a0e94c6671c5d6316944620
Author: Liang-Chi Hsieh 
AuthorDate: Sun Aug 30 06:52:15 2020 +0900

[SPARK-32693][SQL][2.4] Compare two dataframes with same schema except 
nullable property

### What changes were proposed in this pull request?

This PR changes key data types check in `HashJoin` to use `sameType`. This 
backports #29555 to branch-2.4.

### Why are the changes needed?

Looks at the resolving condition of `SetOperation`, it requires only each 
left data types should be `sameType` as the right ones. Logically the `EqualTo` 
expression in equi-join, also requires only left data type `sameType` as right 
data type. Then `HashJoin` requires left keys data type exactly the same as 
right keys data type, looks not reasonable.

It makes inconsistent results when doing `except` between two dataframes.

If two dataframes don't have nested fields, even their field nullable 
property different, `HashJoin` passes the key type check because it checks 
field individually so field nullable property is ignored.

If two dataframes have nested fields like struct, `HashJoin` fails the key 
type check because now it compare two struct types and nullable property now 
affects.

### Does this PR introduce _any_ user-facing change?

Yes. Making consistent `except` operation between dataframes.

### How was this patch tested?

Unit test.

Closes #29576 from viirya/SPARK-32693-2.4.

Authored-by: Liang-Chi Hsieh 
Signed-off-by: Takeshi Yamamuro 
---
 .../spark/sql/execution/joins/HashJoin.scala   |  7 ++--
 .../org/apache/spark/sql/DataFrameJoinSuite.scala  | 39 ++
 2 files changed, 44 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index b197bf6..141f388a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -62,8 +62,11 @@ trait HashJoin {
   }
 
   protected lazy val (buildKeys, streamedKeys) = {
-require(leftKeys.map(_.dataType) == rightKeys.map(_.dataType),
-  "Join keys from two sides should have same types")
+require(leftKeys.length == rightKeys.length &&
+  leftKeys.map(_.dataType)
+.zip(rightKeys.map(_.dataType))
+.forall(types => types._1.sameType(types._2)),
+  "Join keys from two sides should have same length and types")
 val lkeys = 
HashJoin.rewriteKeyExpr(leftKeys).map(BindReferences.bindReference(_, 
left.output))
 val rkeys = HashJoin.rewriteKeyExpr(rightKeys)
   .map(BindReferences.bindReference(_, right.output))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index e6b30f9..9f77314 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.plans.logical.Join
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
 
 class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
@@ -295,4 +298,40 @@ class DataFrameJoinSuite extends QueryTest with 
SharedSQLContext {
   df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan
 }
   }
+
+  test("SPARK-32693: Compare two dataframes with same schema except nullable 
property") {
+val schema1 = StructType(
+  StructField("a", IntegerType, false) ::
+StructField("b", IntegerType, false) ::
+StructField("c", IntegerType, false) :: Nil)
+val rowSeq1: List[Row] = List(Row(10, 1, 1), Row(10, 50, 2))
+val df1 = spark.createDataFrame(rowSeq1.asJava, schema1)
+
+val schema2 = StructType(
+  StructField("a", IntegerType) ::
+StructField("b", IntegerType) ::
+StructField("c", IntegerType) :: Nil)
+val rowSeq2: List[Row] = List(Row(10, 1, 1))
+val df2 = spark.createDataFrame(rowSeq2.asJava, schema2)
+
+checkAnswer(d