This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a90a49828f4 [SPARK-39211][SQL] Support JSON scans with DEFAULT values a90a49828f4 is described below commit a90a49828f4484fa6c3dcfe5183bd4181f7cfd91 Author: Daniel Tenedorio <daniel.tenedo...@databricks.com> AuthorDate: Tue May 24 21:31:17 2022 +0800 [SPARK-39211][SQL] Support JSON scans with DEFAULT values ### What changes were proposed in this pull request? Support JSON scans when the table schema has associated DEFAULT column values. Example: ``` create table t(i int) using json; insert into t values(42); alter table t add column s string default concat('abc', def'); select * from t; > 42, 'abcdef' ``` Interesting note: JSON does not distinguish between NULL values and the absence of values. Therefore inserting NULL and then selecting back the same column yields the default value (if any), since the insert did not change any storage. ### Why are the changes needed? This change makes it easier to build, query, and maintain tables backed by JSON data. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? This PR includes new test coverage. Closes #36583 from dtenedor/default-json. Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../spark/sql/catalyst/csv/UnivocityParser.scala | 2 +- .../spark/sql/catalyst/json/JacksonParser.scala | 5 +- .../catalyst/util/ResolveDefaultColumnsUtil.scala | 63 ++++++++++++++++++++++ .../org/apache/spark/sql/types/StructType.scala | 30 +++-------- .../apache/spark/sql/types/StructTypeSuite.scala | 14 ++--- .../org/apache/spark/sql/sources/InsertSuite.scala | 35 +++++++++--- 6 files changed, 110 insertions(+), 39 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index ff46672e67f..56ebfcc26c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -327,7 +327,7 @@ class UnivocityParser( case NonFatal(e) => badRecordException = badRecordException.orElse(Some(e)) // Use the corresponding DEFAULT value associated with the column, if any. - row.update(i, requiredSchema.defaultValues(i)) + row.update(i, requiredSchema.existenceDefaultValues(i)) } i += 1 } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index abcbdb83813..7004d2a8f16 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter @@ -421,12 +422,14 @@ class JacksonParser( var skipRow = false structFilters.reset() + resetExistenceDefaultsBitmask(schema) while (!skipRow && nextUntil(parser, JsonToken.END_OBJECT)) { schema.getFieldIndex(parser.getCurrentName) match { case Some(index) => try { row.update(index, fieldConverters(index).apply(parser)) skipRow = structFilters.skipRow(row, index) + schema.existenceDefaultsBitmask(index) = false } catch { case e: SparkUpgradeException => throw e case NonFatal(e) if isRoot => @@ -437,10 +440,10 @@ class JacksonParser( parser.skipChildren() } } - if (skipRow) { None } else if (badRecordException.isEmpty) { + applyExistenceDefaultValuesToRow(schema, row) Some(row) } else { throw PartialResultException(row, badRecordException.get) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala index d2963a60409..262150174ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala @@ -18,11 +18,14 @@ package org.apache.spark.sql.catalyst.util import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.Analyzer import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.{Literal => ExprLiteral} import org.apache.spark.sql.catalyst.optimizer.ConstantFolding import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -168,4 +171,64 @@ object ResolveDefaultColumns { str.toLowerCase() } } + + /** + * Parses the text representing constant-folded default column literal values. These are known as + * "existence" default values because each one is the constant-folded result of the original + * default value first assigned to the column at table/column creation time. When scanning a field + * from any data source, if the corresponding value is not present in storage, the output row + * returns this "existence" default value instead of NULL. + * @return a sequence of either (1) NULL, if the column had no default value, or (2) an object of + * Any type suitable for assigning into a row using the InternalRow.update method. + */ + def getExistenceDefaultValues(schema: StructType): Array[Any] = { + schema.fields.map { field: StructField => + val defaultValue: Option[String] = field.getExistenceDefaultValue() + defaultValue.map { text: String => + val expr = try { + val expr = CatalystSqlParser.parseExpression(text) + expr match { + case _: ExprLiteral | _: AnsiCast | _: Cast => expr + } + } catch { + case _: ParseException | _: MatchError => + throw QueryCompilationErrors.failedToParseExistenceDefaultAsLiteral(field.name, text) + } + // The expression should be a literal value by this point, possibly wrapped in a cast + // function. This is enforced by the execution of commands that assign default values. + expr.eval() + }.orNull + } + } + + /** + * Returns an array of boolean values equal in size to the result of [[getExistenceDefaultValues]] + * above, for convenience. + */ + def getExistenceDefaultsBitmask(schema: StructType): Array[Boolean] = { + Array.fill[Boolean](schema.existenceDefaultValues.size)(true) + } + + /** + * Resets the elements of the array initially returned from [[getExistenceDefaultsBitmask]] above. + * Afterwards, set element(s) to false before calling [[applyExistenceDefaultValuesToRow]] below. + */ + def resetExistenceDefaultsBitmask(schema: StructType): Unit = { + for (i <- 0 until schema.existenceDefaultValues.size) { + schema.existenceDefaultsBitmask(i) = (schema.existenceDefaultValues(i) != null) + } + } + + /** + * Updates a subset of columns in the row with default values from the metadata in the schema. + */ + def applyExistenceDefaultValuesToRow(schema: StructType, row: InternalRow): Unit = { + if (schema.hasExistenceDefaultValues) { + for (i <- 0 until schema.existenceDefaultValues.size) { + if (schema.existenceDefaultsBitmask(i)) { + row.update(i, schema.existenceDefaultValues(i)) + } + } + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 464d1ba1ef9..06460513c8a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -25,10 +25,11 @@ import org.json4s.JsonDSL._ import org.apache.spark.annotation.Stable import org.apache.spark.sql.catalyst.analysis.Resolver -import org.apache.spark.sql.catalyst.expressions.{AnsiCast, Attribute, AttributeReference, Cast, InterpretedOrdering, Literal => ExprLiteral} -import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser, ParseException} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, InterpretedOrdering} +import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser} import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.catalyst.util.{truncatedString, StringUtils} +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.internal.SQLConf @@ -513,28 +514,11 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru InterpretedOrdering.forSchema(this.fields.map(_.dataType)) /** - * Parses the text representing constant-folded default column literal values. - * @return a sequence of either (1) NULL, if the column had no default value, or (2) an object of - * Any type suitable for assigning into a row using the InternalRow.update method. + * These define and cache existence default values for the struct fields for efficiency purposes. */ - private [sql] lazy val defaultValues: Array[Any] = - fields.map { field: StructField => - val defaultValue: Option[String] = field.getExistenceDefaultValue() - defaultValue.map { text: String => - val expr = try { - val expr = CatalystSqlParser.parseExpression(text) - expr match { - case _: ExprLiteral | _: AnsiCast | _: Cast => expr - } - } catch { - case _: ParseException | _: MatchError => - throw QueryCompilationErrors.failedToParseExistenceDefaultAsLiteral(field.name, text) - } - // The expression should be a literal value by this point, possibly wrapped in a cast - // function. This is enforced by the execution of commands that assign default values. - expr.eval() - }.getOrElse(null) - } + private[sql] lazy val existenceDefaultValues: Array[Any] = getExistenceDefaultValues(this) + private[sql] lazy val existenceDefaultsBitmask: Array[Boolean] = getExistenceDefaultsBitmask(this) + private[sql] lazy val hasExistenceDefaultValues = existenceDefaultValues.exists(_ != null) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala index 3aca7b1e52e..940a8e5e2ec 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala @@ -457,10 +457,10 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper { .putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "'abc'") .build()), StructField("c3", BooleanType))) - assert(source1.defaultValues.size == 3) - assert(source1.defaultValues(0) == 42) - assert(source1.defaultValues(1) == UTF8String.fromString("abc")) - assert(source1.defaultValues(2) == null) + assert(source1.existenceDefaultValues.size == 3) + assert(source1.existenceDefaultValues(0) == 42) + assert(source1.existenceDefaultValues(1) == UTF8String.fromString("abc")) + assert(source1.existenceDefaultValues(2) == null) // Negative test: StructType.defaultValues fails because the existence default value parses and // resolves successfully, but evaluates to a non-literal expression. @@ -472,7 +472,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper { .build()))) val error = "fails to parse as a valid literal value" assert(intercept[AnalysisException] { - source2.defaultValues + source2.existenceDefaultValues }.getMessage.contains(error)) // Negative test: StructType.defaultValues fails because the existence default value fails to @@ -484,7 +484,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper { .putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "invalid") .build()))) assert(intercept[AnalysisException] { - source3.defaultValues + source3.existenceDefaultValues }.getMessage.contains(error)) // Negative test: StructType.defaultValues fails because the existence default value fails to @@ -500,7 +500,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper { "(SELECT 'abc' FROM missingtable)") .build()))) assert(intercept[AnalysisException] { - source4.defaultValues + source4.existenceDefaultValues }.getMessage.contains(error)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 1580a33a9eb..247c6bdb355 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -1572,23 +1572,44 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } // This represents one test configuration over a data source. - case class Config(dataSource: String, sqlConf: Seq[(String, String)] = Seq()) + case class Config( + dataSource: String, + sqlConf: Seq[Option[(String, String)]] = Seq()) + // Run the test several times using each configuration. Seq( + Config(dataSource = "json", + Seq( + Some(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS.key -> "false"))), Config(dataSource = "csv", Seq( - SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false")) + None, + Some(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false"))) ).foreach { config: Config => - // First run the test with default settings. - runTest(config.dataSource) - // Then run the test again with each pair of custom SQLConf values. - config.sqlConf.foreach { kv: (String, String) => - withSQLConf(kv) { + config.sqlConf.foreach { + _.map { kv: (String, String) => + withSQLConf(kv) { + // Run the test with the pair of custom SQLConf values. + runTest(config.dataSource) + } + }.getOrElse { + // Run the test with default settings. runTest(config.dataSource) } } } } + test("SPARK-39211 INSERT into JSON table, ADD COLUMNS with DEFAULTs, then SELECT them") { + // By default, INSERT commands into JSON tables do not store NULL values. Therefore, if such + // destination table columns have DEFAULT values, SELECTing out the same columns will return the + // default values (instead of NULL) since nothing is present in storage. + withTable("t") { + sql("create table t(a string default 'abc') using json") + sql("insert into t values(null)") + checkAnswer(spark.table("t"), Row("abc")) + } + } + test("Stop task set if FileAlreadyExistsException was thrown") { Seq(true, false).foreach { fastFail => withSQLConf("fs.file.impl" -> classOf[FileExistingTestFileSystem].getName, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org