This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 69946bb5c70 [SPARK-42317][SQL] Assign name to _LEGACY_ERROR_TEMP_2247: CANNOT_MERGE_SCHEMAS 69946bb5c70 is described below commit 69946bb5c707657bf0840b21356fbe95b8524ab9 Author: Koray Beyaz <koraybeya...@gmail.com> AuthorDate: Mon Apr 24 11:30:11 2023 +0300 [SPARK-42317][SQL] Assign name to _LEGACY_ERROR_TEMP_2247: CANNOT_MERGE_SCHEMAS ### What changes were proposed in this pull request? This PR proposes to assign name to _LEGACY_ERROR_TEMP_2247 as "CANNOT_MERGE_SCHEMAS". Also proposes to display both left and right schemas in the exception so that one can compare them. Please let me know if you prefer the old error message with a single schema. This is the stack trace after the changes: ``` scala> spark.read.option("mergeSchema", "true").parquet(path) org.apache.spark.SparkException: [CANNOT_MERGE_SCHEMAS] Failed merging schemas: Initial schema: "STRUCT<id: BIGINT>" Schema that cannot be merged with the initial schema: "STRUCT<id: INT>". at org.apache.spark.sql.errors.QueryExecutionErrors$.failedMergingSchemaError(QueryExecutionErrors.scala:2355) at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:104) at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:100) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:100) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:496) at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:132) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:78) at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:208) at scala.Option.orElse(Option.scala:447) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:205) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:407) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229) at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:563) at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:548) ... 49 elided Caused by: org.apache.spark.SparkException: [CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE] Failed to merge incompatible data types "BIGINT" and "INT". at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotMergeIncompatibleDataTypesError(QueryExecutionErrors.scala:1326) at org.apache.spark.sql.types.StructType$.$anonfun$merge$3(StructType.scala:610) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:602) at org.apache.spark.sql.types.StructType$.$anonfun$merge$2$adapted(StructType.scala:599) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:599) at org.apache.spark.sql.types.StructType$.mergeInternal(StructType.scala:647) at org.apache.spark.sql.types.StructType$.merge(StructType.scala:593) at org.apache.spark.sql.types.StructType.merge(StructType.scala:498) at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:102) ... 67 more ``` ### Why are the changes needed? We should assign proper name to LEGACY_ERROR_TEMP* ### Does this PR introduce _any_ user-facing change? Yes, the users will see an improved error message. ### How was this patch tested? Changed an existing test case to test the new error class with `checkError` utility. Closes #40810 from kori73/assign-name-2247. Lead-authored-by: Koray Beyaz <koraybeya...@gmail.com> Co-authored-by: kori73 <koray.beya...@gmail.com> Co-authored-by: Koray Beyaz <koray.beya...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- core/src/main/resources/error/error-classes.json | 16 ++++++++------ .../spark/sql/errors/QueryExecutionErrors.scala | 10 +++++---- .../execution/datasources/SchemaMergeUtils.scala | 4 ++-- .../datasources/parquet/ParquetSchemaSuite.scala | 25 +++++++++++++--------- 4 files changed, 33 insertions(+), 22 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 11b280efad8..370508b70a8 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -94,6 +94,16 @@ ], "sqlState" : "42825" }, + "CANNOT_MERGE_SCHEMAS" : { + "message" : [ + "Failed merging schemas:", + "Initial schema:", + "<left>", + "Schema that cannot be merged with the initial schema:", + "<right>." + ], + "sqlState" : "42KD9" + }, "CANNOT_MODIFY_CONFIG" : { "message" : [ "Cannot modify the value of the Spark config: <key>.", @@ -4817,12 +4827,6 @@ "Table does not support dynamic partition overwrite: <table>." ] }, - "_LEGACY_ERROR_TEMP_2247" : { - "message" : [ - "Failed merging schema:", - "<schema>." - ] - }, "_LEGACY_ERROR_TEMP_2248" : { "message" : [ "Cannot broadcast the table over <maxBroadcastTableRows> rows: <numRows> rows." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 11fe84990c1..6c4066e638c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2345,11 +2345,13 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { cause = null) } - def failedMergingSchemaError(schema: StructType, e: SparkException): Throwable = { + def failedMergingSchemaError( + leftSchema: StructType, + rightSchema: StructType, + e: SparkException): Throwable = { new SparkException( - errorClass = "_LEGACY_ERROR_TEMP_2247", - messageParameters = Map( - "schema" -> schema.treeString), + errorClass = "CANNOT_MERGE_SCHEMAS", + messageParameters = Map("left" -> toSQLType(leftSchema), "right" -> toSQLType(rightSchema)), cause = e) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala index babecfc1f38..35d9b5d6034 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala @@ -86,7 +86,7 @@ object SchemaMergeUtils extends Logging { try { mergedSchema = mergedSchema.merge(schema) } catch { case cause: SparkException => - throw QueryExecutionErrors.failedMergingSchemaError(schema, cause) + throw QueryExecutionErrors.failedMergingSchemaError(mergedSchema, schema, cause) } } Iterator.single(mergedSchema) @@ -101,7 +101,7 @@ object SchemaMergeUtils extends Logging { try { finalSchema = finalSchema.merge(schema) } catch { case cause: SparkException => - throw QueryExecutionErrors.failedMergingSchemaError(schema, cause) + throw QueryExecutionErrors.failedMergingSchemaError(finalSchema, schema, cause) } } Some(finalSchema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 5589c61be7a..80c27049e5a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -28,6 +28,7 @@ import org.apache.parquet.schema.Type._ import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.functions.desc import org.apache.spark.sql.internal.SQLConf @@ -980,20 +981,24 @@ class ParquetSchemaSuite extends ParquetSchemaTest { } } - test("schema merging failure error message") { + test("CANNOT_MERGE_SCHEMAS: Failed merging schemas") { import testImplicits._ withTempPath { dir => val path = dir.getCanonicalPath - spark.range(3).write.parquet(s"$path/p=1") - spark.range(3).select($"id" cast IntegerType as Symbol("id")) - .write.parquet(s"$path/p=2") - - val message = intercept[SparkException] { - spark.read.option("mergeSchema", "true").parquet(path).schema - }.getMessage - - assert(message.contains("Failed merging schema")) + val df1 = spark.range(3) + df1.write.parquet(s"$path/p=1") + val df2 = spark.range(3).select($"id" cast IntegerType as Symbol("id")) + df2.write.parquet(s"$path/p=2") + checkError( + exception = intercept[SparkException] { + spark.read.option("mergeSchema", "true").parquet(path) + }, + errorClass = "CANNOT_MERGE_SCHEMAS", + sqlState = "42KD9", + parameters = Map( + "left" -> toSQLType(df1.schema), + "right" -> toSQLType(df2.schema))) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org