This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 2ebea13  [SPARK-33136][SQL] Fix mistakenly swapped parameter in 
V2WriteCommand.outputResolved
2ebea13 is described below

commit 2ebea135a1f4e4cb3187ffc8e77d3f52d3b3a91a
Author: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
AuthorDate: Wed Oct 14 08:30:03 2020 -0700

    [SPARK-33136][SQL] Fix mistakenly swapped parameter in 
V2WriteCommand.outputResolved
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to fix a bug on calling 
`DataType.equalsIgnoreCompatibleNullability` with mistakenly swapped parameters 
in `V2WriteCommand.outputResolved`. The order of parameters for 
`DataType.equalsIgnoreCompatibleNullability` are `from` and `to`, which says 
that the right order of matching variables are `inAttr` and `outAttr`.
    
    ### Why are the changes needed?
    
    Spark throws AnalysisException due to unresolved operator in v2 write, 
while the operator is unresolved due to a bug that parameters to call 
`DataType.equalsIgnoreCompatibleNullability` in `outputResolved` have been 
swapped.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, end users no longer suffer on unresolved operator in v2 write if 
they're trying to write dataframe containing non-nullable complex types against 
table matching complex types as nullable.
    
    ### How was this patch tested?
    
    New UT added.
    
    Closes #30033 from HeartSaVioR/SPARK-33136.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 8e5cb1d276686ec428e4e6aa1c3cfd6bb99e4e9a)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../sql/catalyst/plans/logical/v2Commands.scala    |  2 +-
 .../apache/spark/sql/DataFrameWriterV2Suite.scala  | 87 +++++++++++++++++++++-
 2 files changed, 84 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index b4120d9..d2830a9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -45,7 +45,7 @@ trait V2WriteCommand extends Command {
           case (inAttr, outAttr) =>
             // names and types must match, nullability must be compatible
             inAttr.name == outAttr.name &&
-              DataType.equalsIgnoreCompatibleNullability(outAttr.dataType, 
inAttr.dataType) &&
+              DataType.equalsIgnoreCompatibleNullability(inAttr.dataType, 
outAttr.dataType) &&
               (outAttr.nullable || !inAttr.nullable)
         }
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
index 508eefa..ff5c624 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
@@ -23,16 +23,15 @@ import scala.collection.JavaConverters._
 
 import org.scalatest.BeforeAndAfter
 
-import 
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, 
NoSuchTableException, TableAlreadyExistsException}
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, 
OverwriteByExpression, OverwritePartitionsDynamic}
+import 
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, 
NamedRelation, NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, 
OverwriteByExpression, OverwritePartitionsDynamic, V2WriteCommand}
 import org.apache.spark.sql.connector.{InMemoryTable, InMemoryTableCatalog}
 import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
 import org.apache.spark.sql.connector.expressions.{BucketTransform, 
DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, 
MonthsTransform, YearsTransform}
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructType}
-import org.apache.spark.sql.types.TimestampType
+import org.apache.spark.sql.types.{ArrayType, DataType, IntegerType, LongType, 
MapType, StringType, StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.QueryExecutionListener
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
@@ -101,6 +100,86 @@ class DataFrameWriterV2Suite extends QueryTest with 
SharedSparkSession with Befo
     assert(v2.catalog.exists(_ == catalogPlugin))
   }
 
+  case class FakeV2WriteCommand(table: NamedRelation, query: LogicalPlan) 
extends V2WriteCommand
+
+  test("SPARK-33136 output resolved on complex types for V2 write commands") {
+    val tableCatalog = catalog("testcat")
+
+    def assertTypeCompatibility(name: String, fromType: DataType, toType: 
DataType): Unit = {
+      val fromTableName = s"from_table_$name"
+      tableCatalog.createTable(
+        Identifier.of(Array(), fromTableName),
+        StructType(Array(StructField("col", fromType))),
+        Array.empty,
+        new java.util.HashMap[String, String]())
+
+      val toTable = tableCatalog.createTable(
+        Identifier.of(Array(), s"to_table_$name"),
+        StructType(Array(StructField("col", toType))),
+        Array.empty,
+        new java.util.HashMap[String, String]())
+
+      val df = spark.table(s"testcat.$fromTableName")
+
+      val relation = DataSourceV2Relation.create(toTable, Some(tableCatalog), 
None)
+      val writeCommand = FakeV2WriteCommand(relation, 
df.queryExecution.analyzed)
+
+      assert(writeCommand.outputResolved, s"Unable to write from type 
$fromType to type $toType.")
+    }
+
+    // The major difference between `from` and `to` is that `from` is a 
complex type
+    // with non-nullable, whereas `to` is same data type with flipping 
nullable.
+
+    // nested struct type
+    val fromStructType = StructType(Array(
+      StructField("s", StringType),
+      StructField("i_nonnull", IntegerType, nullable = false),
+      StructField("st", StructType(Array(
+        StructField("l", LongType),
+        StructField("s_nonnull", StringType, nullable = false))))))
+
+    val toStructType = StructType(Array(
+      StructField("s", StringType),
+      StructField("i_nonnull", IntegerType),
+      StructField("st", StructType(Array(
+        StructField("l", LongType),
+        StructField("s_nonnull", StringType))))))
+
+    assertTypeCompatibility("struct", fromStructType, toStructType)
+
+    // array type
+    assertTypeCompatibility("array", ArrayType(LongType, containsNull = false),
+      ArrayType(LongType, containsNull = true))
+
+    // array type with struct type
+    val fromArrayWithStructType = ArrayType(
+      StructType(Array(StructField("s", StringType, nullable = false))),
+      containsNull = false)
+
+    val toArrayWithStructType = ArrayType(
+      StructType(Array(StructField("s", StringType))),
+      containsNull = true)
+
+    assertTypeCompatibility("array_struct", fromArrayWithStructType, 
toArrayWithStructType)
+
+    // map type
+    assertTypeCompatibility("map", MapType(IntegerType, StringType, 
valueContainsNull = false),
+      MapType(IntegerType, StringType, valueContainsNull = true))
+
+    // map type with struct type
+    val fromMapWithStructType = MapType(
+      IntegerType,
+      StructType(Array(StructField("s", StringType, nullable = false))),
+      valueContainsNull = false)
+
+    val toMapWithStructType = MapType(
+      IntegerType,
+      StructType(Array(StructField("s", StringType))),
+      valueContainsNull = true)
+
+    assertTypeCompatibility("map_struct", fromMapWithStructType, 
toMapWithStructType)
+  }
+
   test("Append: basic append") {
     spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING 
foo")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to