This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8f610d1b4ce [SPARK-39265][SQL] Support non-vectorized Parquet scans with DEFAULT values 8f610d1b4ce is described below commit 8f610d1b4ce532705c528f3c085b0289b2b17a94 Author: Daniel Tenedorio <daniel.tenedo...@databricks.com> AuthorDate: Thu May 26 22:38:32 2022 +0800 [SPARK-39265][SQL] Support non-vectorized Parquet scans with DEFAULT values ### What changes were proposed in this pull request? Support non-vectorized Parquet scans when the table schema has associated DEFAULT column values. Example: ``` create table t(i int) using parquet; insert into t values(42); alter table t add column s string default concat('abc', def'); select * from t; > 42, 'abcdef' ``` ### Why are the changes needed? This change makes it easier to build, query, and maintain tables backed by Parquet data. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? This PR includes new test coverage. Closes #36643 from dtenedor/default-parquet. Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../datasources/parquet/ParquetRowConverter.scala | 65 +++++++++++- .../org/apache/spark/sql/sources/InsertSuite.scala | 115 ++++++++++++--------- 2 files changed, 128 insertions(+), 52 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 7bfc294a2d4..6d64349cc3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CaseInsensitiveMap, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.internal.SQLConf @@ -172,7 +173,7 @@ private[parquet] class ParquetRowConverter( * Updater used together with field converters within a [[ParquetRowConverter]]. It propagates * converted filed values to the `ordinal`-th cell in `currentRow`. */ - private final class RowUpdater(row: InternalRow, ordinal: Int) extends ParentContainerUpdater { + private class RowUpdater(row: InternalRow, ordinal: Int) extends ParentContainerUpdater { override def set(value: Any): Unit = row(ordinal) = value override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, value) override def setByte(value: Byte): Unit = row.setByte(ordinal, value) @@ -183,12 +184,58 @@ private[parquet] class ParquetRowConverter( override def setFloat(value: Float): Unit = row.setFloat(ordinal, value) } + /** + * Subclass of RowUpdater that also updates a boolean array bitmask. In this way, after all + * assignments are complete, it is possible to inspect the bitmask to determine which columns have + * been written at least once. + */ + private final class RowUpdaterWithBitmask( + row: InternalRow, + ordinal: Int, + bitmask: Array[Boolean]) extends RowUpdater(row, ordinal) { + override def set(value: Any): Unit = { + bitmask(ordinal) = false + super.set(value) + } + override def setBoolean(value: Boolean): Unit = { + bitmask(ordinal) = false + super.setBoolean(value) + } + override def setByte(value: Byte): Unit = { + bitmask(ordinal) = false + super.setByte(value) + } + override def setShort(value: Short): Unit = { + bitmask(ordinal) = false + super.setShort(value) + } + override def setInt(value: Int): Unit = { + bitmask(ordinal) = false + super.setInt(value) + } + override def setLong(value: Long): Unit = { + bitmask(ordinal) = false + super.setLong(value) + } + override def setDouble(value: Double): Unit = { + bitmask(ordinal) = false + super.setDouble(value) + } + override def setFloat(value: Float): Unit = { + bitmask(ordinal) = false + super.setFloat(value) + } + } + private[this] val currentRow = new SpecificInternalRow(catalystType.map(_.dataType)) /** * The [[InternalRow]] converted from an entire Parquet record. */ - def currentRecord: InternalRow = currentRow + def currentRecord: InternalRow = { + applyExistenceDefaultValuesToRow(catalystType, currentRow) + currentRow + } private val dateRebaseFunc = DataSourceUtils.createDateRebaseFuncInRead( datetimeRebaseSpec.mode, "Parquet") @@ -232,9 +279,19 @@ private[parquet] class ParquetRowConverter( catalystFieldIdxByName(parquetField.getName) } val catalystField = catalystType(catalystFieldIndex) + // Create a RowUpdater instance for converting Parquet objects to Catalyst rows. If any fields + // in the Catalyst result schema have associated existence default values, maintain a boolean + // array to track which fields have been explicitly assigned for each row. + val rowUpdater: RowUpdater = + if (catalystType.hasExistenceDefaultValues) { + resetExistenceDefaultsBitmask(catalystType) + new RowUpdaterWithBitmask( + currentRow, catalystFieldIndex, catalystType.existenceDefaultsBitmask) + } else { + new RowUpdater(currentRow, catalystFieldIndex) + } // Converted field value should be set to the `fieldIndex`-th cell of `currentRow` - newConverter(parquetField, - catalystField.dataType, new RowUpdater(currentRow, catalystFieldIndex)) + newConverter(parquetField, catalystField.dataType, rowUpdater) }.toArray } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 247c6bdb355..35d5096e603 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -1516,100 +1516,119 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } } - test("INSERT rows, ALTER TABLE ADD COLUMNS with DEFAULTs, then SELECT them: Positive tests") { - def runTest(dataSource: String): Unit = { - val createTableIntCol = s"create table t(a string, i int) using $dataSource" + test("INSERT rows, ALTER TABLE ADD COLUMNS with DEFAULTs, then SELECT them") { + case class Config( + sqlConf: Option[(String, String)], + insertNullsToStorage: Boolean = true) + def runTest(dataSource: String, config: Config): Unit = { + def withTableT(f: => Unit): Unit = { + sql(s"create table t(a string, i int) using $dataSource") + sql("insert into t values('xyz', 42)") + withTable("t") { f } + } + // Positive tests: // Adding a column with a valid default value into a table containing existing data works // successfully. Querying data from the altered table returns the new value. - withTable("t") { - sql(createTableIntCol) - sql("insert into t values('xyz', 42)") + withTableT { sql("alter table t add column (s string default concat('abc', 'def'))") checkAnswer(spark.table("t"), Row("xyz", 42, "abcdef")) checkAnswer(sql("select i, s from t"), Row(42, "abcdef")) - } - // Same as above, but a following command alters the column to change the default value. - // This returns the previous value, not the new value, since the behavior semantics are - // the same as if the first command had performed a backfill of the new default value in - // the existing rows. - withTable("t") { - sql(createTableIntCol) - sql("insert into t values('xyz', 42)") - sql("alter table t add column (s string default concat('abc', 'def'))") + // Now alter the column to change the default value. This still returns the previous value, + // not the new value, since the behavior semantics are the same as if the first command had + // performed a backfill of the new default value in the existing rows. sql("alter table t alter column s set default concat('ghi', 'jkl')") - checkAnswer(spark.table("t"), Row("xyz", 42, "abcdef")) checkAnswer(sql("select i, s from t"), Row(42, "abcdef")) } // Adding a column with a default value and then inserting explicit NULL values works. // Querying data back from the table differentiates between the explicit NULL values and // default values. - withTable("t") { - sql(createTableIntCol) - sql("insert into t values('xyz', 42)") + withTableT { sql("alter table t add column (s string default concat('abc', 'def'))") sql("insert into t values(null, null, null)") sql("alter table t add column (x boolean default true)") + // By default, INSERT commands into some tables (such as JSON) do not store NULL values. + // Therefore, if such destination columns have DEFAULT values, SELECTing the same columns + // will return the default values (instead of NULL) since nothing is present in storage. + val insertedSColumn = if (config.insertNullsToStorage) null else "abcdef" checkAnswer(spark.table("t"), Seq( Row("xyz", 42, "abcdef", true), - Row(null, null, null, true))) + Row(null, null, insertedSColumn, true))) checkAnswer(sql("select i, s, x from t"), Seq( Row(42, "abcdef", true), - Row(null, null, true))) + Row(null, insertedSColumn, true))) } // Adding two columns where only the first has a valid default value works successfully. // Querying data from the altered table returns the default value as well as NULL for the // second column. - withTable("t") { - sql(createTableIntCol) - sql("insert into t values('xyz', 42)") + withTableT { sql("alter table t add column (s string default concat('abc', 'def'))") sql("alter table t add column (x string)") checkAnswer(spark.table("t"), Row("xyz", 42, "abcdef", null)) checkAnswer(sql("select i, s, x from t"), Row(42, "abcdef", null)) } + // Test other supported data types. + withTableT { + sql("alter table t add columns (" + + "s boolean default true, " + + "t byte default cast('a' as byte), " + + "u short default cast(42 as short), " + + "v float default 0, " + + "w double default 0, " + + "x date default date'0000', " + + "y timestamp default timestamp'0000', " + + "z timestamp_ntz default cast(timestamp'0000' as timestamp_ntz), " + + "a1 timestamp_ltz default cast(timestamp'0000' as timestamp_ltz), " + + "a2 decimal(5, 2) default 123.45)") + checkAnswer(sql("select s, t, u, v, w, x is not null, " + + "y is not null, z is not null, a1 is not null, a2 is not null from t"), + Row(true, null, 42, 0.0f, 0.0d, true, true, true, true, true)) + } } // This represents one test configuration over a data source. - case class Config( - dataSource: String, - sqlConf: Seq[Option[(String, String)]] = Seq()) + case class TestCase( + dataSource: String, + configs: Seq[Config]) // Run the test several times using each configuration. Seq( - Config(dataSource = "json", + TestCase( + dataSource = "csv", Seq( - Some(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS.key -> "false"))), - Config(dataSource = "csv", + Config( + None), + Config( + Some(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false")))), + TestCase( + dataSource = "json", Seq( - None, - Some(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false"))) - ).foreach { config: Config => - config.sqlConf.foreach { - _.map { kv: (String, String) => + Config( + None, + insertNullsToStorage = false), + Config( + Some(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS.key -> "false")))), + TestCase( + dataSource = "parquet", + Seq( + Config( + Some(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false"), + insertNullsToStorage = false))) + ).foreach { testCase: TestCase => + testCase.configs.foreach { config: Config => + config.sqlConf.map { kv: (String, String) => withSQLConf(kv) { // Run the test with the pair of custom SQLConf values. - runTest(config.dataSource) + runTest(testCase.dataSource, config) } }.getOrElse { // Run the test with default settings. - runTest(config.dataSource) + runTest(testCase.dataSource, config) } } } } - test("SPARK-39211 INSERT into JSON table, ADD COLUMNS with DEFAULTs, then SELECT them") { - // By default, INSERT commands into JSON tables do not store NULL values. Therefore, if such - // destination table columns have DEFAULT values, SELECTing out the same columns will return the - // default values (instead of NULL) since nothing is present in storage. - withTable("t") { - sql("create table t(a string default 'abc') using json") - sql("insert into t values(null)") - checkAnswer(spark.table("t"), Row("abc")) - } - } - test("Stop task set if FileAlreadyExistsException was thrown") { Seq(true, false).foreach { fastFail => withSQLConf("fs.file.impl" -> classOf[FileExistingTestFileSystem].getName, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org