This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 2ebea13 [SPARK-33136][SQL] Fix mistakenly swapped parameter in V2WriteCommand.outputResolved 2ebea13 is described below commit 2ebea135a1f4e4cb3187ffc8e77d3f52d3b3a91a Author: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com> AuthorDate: Wed Oct 14 08:30:03 2020 -0700 [SPARK-33136][SQL] Fix mistakenly swapped parameter in V2WriteCommand.outputResolved ### What changes were proposed in this pull request? This PR proposes to fix a bug on calling `DataType.equalsIgnoreCompatibleNullability` with mistakenly swapped parameters in `V2WriteCommand.outputResolved`. The order of parameters for `DataType.equalsIgnoreCompatibleNullability` are `from` and `to`, which says that the right order of matching variables are `inAttr` and `outAttr`. ### Why are the changes needed? Spark throws AnalysisException due to unresolved operator in v2 write, while the operator is unresolved due to a bug that parameters to call `DataType.equalsIgnoreCompatibleNullability` in `outputResolved` have been swapped. ### Does this PR introduce _any_ user-facing change? Yes, end users no longer suffer on unresolved operator in v2 write if they're trying to write dataframe containing non-nullable complex types against table matching complex types as nullable. ### How was this patch tested? New UT added. Closes #30033 from HeartSaVioR/SPARK-33136. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> (cherry picked from commit 8e5cb1d276686ec428e4e6aa1c3cfd6bb99e4e9a) Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../sql/catalyst/plans/logical/v2Commands.scala | 2 +- .../apache/spark/sql/DataFrameWriterV2Suite.scala | 87 +++++++++++++++++++++- 2 files changed, 84 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index b4120d9..d2830a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -45,7 +45,7 @@ trait V2WriteCommand extends Command { case (inAttr, outAttr) => // names and types must match, nullability must be compatible inAttr.name == outAttr.name && - DataType.equalsIgnoreCompatibleNullability(outAttr.dataType, inAttr.dataType) && + DataType.equalsIgnoreCompatibleNullability(inAttr.dataType, outAttr.dataType) && (outAttr.nullable || !inAttr.nullable) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index 508eefa..ff5c624 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -23,16 +23,15 @@ import scala.collection.JavaConverters._ import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException} -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic} +import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamedRelation, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, V2WriteCommand} import org.apache.spark.sql.connector.{InMemoryTable, InMemoryTableCatalog} import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType} -import org.apache.spark.sql.types.TimestampType +import org.apache.spark.sql.types.{ArrayType, DataType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -101,6 +100,86 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(v2.catalog.exists(_ == catalogPlugin)) } + case class FakeV2WriteCommand(table: NamedRelation, query: LogicalPlan) extends V2WriteCommand + + test("SPARK-33136 output resolved on complex types for V2 write commands") { + val tableCatalog = catalog("testcat") + + def assertTypeCompatibility(name: String, fromType: DataType, toType: DataType): Unit = { + val fromTableName = s"from_table_$name" + tableCatalog.createTable( + Identifier.of(Array(), fromTableName), + StructType(Array(StructField("col", fromType))), + Array.empty, + new java.util.HashMap[String, String]()) + + val toTable = tableCatalog.createTable( + Identifier.of(Array(), s"to_table_$name"), + StructType(Array(StructField("col", toType))), + Array.empty, + new java.util.HashMap[String, String]()) + + val df = spark.table(s"testcat.$fromTableName") + + val relation = DataSourceV2Relation.create(toTable, Some(tableCatalog), None) + val writeCommand = FakeV2WriteCommand(relation, df.queryExecution.analyzed) + + assert(writeCommand.outputResolved, s"Unable to write from type $fromType to type $toType.") + } + + // The major difference between `from` and `to` is that `from` is a complex type + // with non-nullable, whereas `to` is same data type with flipping nullable. + + // nested struct type + val fromStructType = StructType(Array( + StructField("s", StringType), + StructField("i_nonnull", IntegerType, nullable = false), + StructField("st", StructType(Array( + StructField("l", LongType), + StructField("s_nonnull", StringType, nullable = false)))))) + + val toStructType = StructType(Array( + StructField("s", StringType), + StructField("i_nonnull", IntegerType), + StructField("st", StructType(Array( + StructField("l", LongType), + StructField("s_nonnull", StringType)))))) + + assertTypeCompatibility("struct", fromStructType, toStructType) + + // array type + assertTypeCompatibility("array", ArrayType(LongType, containsNull = false), + ArrayType(LongType, containsNull = true)) + + // array type with struct type + val fromArrayWithStructType = ArrayType( + StructType(Array(StructField("s", StringType, nullable = false))), + containsNull = false) + + val toArrayWithStructType = ArrayType( + StructType(Array(StructField("s", StringType))), + containsNull = true) + + assertTypeCompatibility("array_struct", fromArrayWithStructType, toArrayWithStructType) + + // map type + assertTypeCompatibility("map", MapType(IntegerType, StringType, valueContainsNull = false), + MapType(IntegerType, StringType, valueContainsNull = true)) + + // map type with struct type + val fromMapWithStructType = MapType( + IntegerType, + StructType(Array(StructField("s", StringType, nullable = false))), + valueContainsNull = false) + + val toMapWithStructType = MapType( + IntegerType, + StructType(Array(StructField("s", StringType))), + valueContainsNull = true) + + assertTypeCompatibility("map_struct", fromMapWithStructType, toMapWithStructType) + } + test("Append: basic append") { spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org