This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f610d1b4ce [SPARK-39265][SQL] Support non-vectorized Parquet scans 
with DEFAULT values
8f610d1b4ce is described below

commit 8f610d1b4ce532705c528f3c085b0289b2b17a94
Author: Daniel Tenedorio <daniel.tenedo...@databricks.com>
AuthorDate: Thu May 26 22:38:32 2022 +0800

    [SPARK-39265][SQL] Support non-vectorized Parquet scans with DEFAULT values
    
    ### What changes were proposed in this pull request?
    
    Support non-vectorized Parquet scans when the table schema has associated 
DEFAULT column values.
    
    Example:
    
    ```
    create table t(i int) using parquet;
    insert into t values(42);
    alter table t add column s string default concat('abc', def');
    select * from t;
    > 42, 'abcdef'
    ```
    
    ### Why are the changes needed?
    
    This change makes it easier to build, query, and maintain tables backed by 
Parquet data.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    This PR includes new test coverage.
    
    Closes #36643 from dtenedor/default-parquet.
    
    Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../datasources/parquet/ParquetRowConverter.scala  |  65 +++++++++++-
 .../org/apache/spark/sql/sources/InsertSuite.scala | 115 ++++++++++++---------
 2 files changed, 128 insertions(+), 52 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 7bfc294a2d4..6d64349cc3c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.DataSourceUtils
 import org.apache.spark.sql.internal.SQLConf
@@ -172,7 +173,7 @@ private[parquet] class ParquetRowConverter(
    * Updater used together with field converters within a 
[[ParquetRowConverter]].  It propagates
    * converted filed values to the `ordinal`-th cell in `currentRow`.
    */
-  private final class RowUpdater(row: InternalRow, ordinal: Int) extends 
ParentContainerUpdater {
+  private class RowUpdater(row: InternalRow, ordinal: Int) extends 
ParentContainerUpdater {
     override def set(value: Any): Unit = row(ordinal) = value
     override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, 
value)
     override def setByte(value: Byte): Unit = row.setByte(ordinal, value)
@@ -183,12 +184,58 @@ private[parquet] class ParquetRowConverter(
     override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
   }
 
+  /**
+   * Subclass of RowUpdater that also updates a boolean array bitmask. In this 
way, after all
+   * assignments are complete, it is possible to inspect the bitmask to 
determine which columns have
+   * been written at least once.
+   */
+  private final class RowUpdaterWithBitmask(
+      row: InternalRow,
+      ordinal: Int,
+      bitmask: Array[Boolean]) extends RowUpdater(row, ordinal) {
+    override def set(value: Any): Unit = {
+      bitmask(ordinal) = false
+      super.set(value)
+    }
+    override def setBoolean(value: Boolean): Unit = {
+      bitmask(ordinal) = false
+      super.setBoolean(value)
+    }
+    override def setByte(value: Byte): Unit = {
+      bitmask(ordinal) = false
+      super.setByte(value)
+    }
+    override def setShort(value: Short): Unit = {
+      bitmask(ordinal) = false
+      super.setShort(value)
+    }
+    override def setInt(value: Int): Unit = {
+      bitmask(ordinal) = false
+      super.setInt(value)
+    }
+    override def setLong(value: Long): Unit = {
+      bitmask(ordinal) = false
+      super.setLong(value)
+    }
+    override def setDouble(value: Double): Unit = {
+      bitmask(ordinal) = false
+      super.setDouble(value)
+    }
+    override def setFloat(value: Float): Unit = {
+      bitmask(ordinal) = false
+      super.setFloat(value)
+    }
+  }
+
   private[this] val currentRow = new 
SpecificInternalRow(catalystType.map(_.dataType))
 
   /**
    * The [[InternalRow]] converted from an entire Parquet record.
    */
-  def currentRecord: InternalRow = currentRow
+  def currentRecord: InternalRow = {
+    applyExistenceDefaultValuesToRow(catalystType, currentRow)
+    currentRow
+  }
 
   private val dateRebaseFunc = DataSourceUtils.createDateRebaseFuncInRead(
     datetimeRebaseSpec.mode, "Parquet")
@@ -232,9 +279,19 @@ private[parquet] class ParquetRowConverter(
         catalystFieldIdxByName(parquetField.getName)
       }
       val catalystField = catalystType(catalystFieldIndex)
+      // Create a RowUpdater instance for converting Parquet objects to 
Catalyst rows. If any fields
+      // in the Catalyst result schema have associated existence default 
values, maintain a boolean
+      // array to track which fields have been explicitly assigned for each 
row.
+      val rowUpdater: RowUpdater =
+        if (catalystType.hasExistenceDefaultValues) {
+          resetExistenceDefaultsBitmask(catalystType)
+          new RowUpdaterWithBitmask(
+            currentRow, catalystFieldIndex, 
catalystType.existenceDefaultsBitmask)
+        } else {
+          new RowUpdater(currentRow, catalystFieldIndex)
+        }
       // Converted field value should be set to the `fieldIndex`-th cell of 
`currentRow`
-      newConverter(parquetField,
-        catalystField.dataType, new RowUpdater(currentRow, catalystFieldIndex))
+      newConverter(parquetField, catalystField.dataType, rowUpdater)
     }.toArray
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 247c6bdb355..35d5096e603 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1516,100 +1516,119 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
     }
   }
 
-  test("INSERT rows, ALTER TABLE ADD COLUMNS with DEFAULTs, then SELECT them: 
Positive tests") {
-    def runTest(dataSource: String): Unit = {
-      val createTableIntCol = s"create table t(a string, i int) using 
$dataSource"
+  test("INSERT rows, ALTER TABLE ADD COLUMNS with DEFAULTs, then SELECT them") 
{
+    case class Config(
+        sqlConf: Option[(String, String)],
+        insertNullsToStorage: Boolean = true)
+    def runTest(dataSource: String, config: Config): Unit = {
+      def withTableT(f: => Unit): Unit = {
+        sql(s"create table t(a string, i int) using $dataSource")
+        sql("insert into t values('xyz', 42)")
+        withTable("t") { f }
+      }
+      // Positive tests:
       // Adding a column with a valid default value into a table containing 
existing data works
       // successfully. Querying data from the altered table returns the new 
value.
-      withTable("t") {
-        sql(createTableIntCol)
-        sql("insert into t values('xyz', 42)")
+      withTableT {
         sql("alter table t add column (s string default concat('abc', 'def'))")
         checkAnswer(spark.table("t"), Row("xyz", 42, "abcdef"))
         checkAnswer(sql("select i, s from t"), Row(42, "abcdef"))
-      }
-      // Same as above, but a following command alters the column to change 
the default value.
-      // This returns the previous value, not the new value, since the 
behavior semantics are
-      // the same as if the first command had performed a backfill of the new 
default value in
-      // the existing rows.
-      withTable("t") {
-        sql(createTableIntCol)
-        sql("insert into t values('xyz', 42)")
-        sql("alter table t add column (s string default concat('abc', 'def'))")
+        // Now alter the column to change the default value. This still 
returns the previous value,
+        // not the new value, since the behavior semantics are the same as if 
the first command had
+        // performed a backfill of the new default value in the existing rows.
         sql("alter table t alter column s set default concat('ghi', 'jkl')")
-        checkAnswer(spark.table("t"), Row("xyz", 42, "abcdef"))
         checkAnswer(sql("select i, s from t"), Row(42, "abcdef"))
       }
       // Adding a column with a default value and then inserting explicit NULL 
values works.
       // Querying data back from the table differentiates between the explicit 
NULL values and
       // default values.
-      withTable("t") {
-        sql(createTableIntCol)
-        sql("insert into t values('xyz', 42)")
+      withTableT {
         sql("alter table t add column (s string default concat('abc', 'def'))")
         sql("insert into t values(null, null, null)")
         sql("alter table t add column (x boolean default true)")
+        // By default, INSERT commands into some tables (such as JSON) do not 
store NULL values.
+        // Therefore, if such destination columns have DEFAULT values, 
SELECTing the same columns
+        // will return the default values (instead of NULL) since nothing is 
present in storage.
+        val insertedSColumn = if (config.insertNullsToStorage) null else 
"abcdef"
         checkAnswer(spark.table("t"),
           Seq(
             Row("xyz", 42, "abcdef", true),
-            Row(null, null, null, true)))
+            Row(null, null, insertedSColumn, true)))
         checkAnswer(sql("select i, s, x from t"),
           Seq(
             Row(42, "abcdef", true),
-            Row(null, null, true)))
+            Row(null, insertedSColumn, true)))
       }
       // Adding two columns where only the first has a valid default value 
works successfully.
       // Querying data from the altered table returns the default value as 
well as NULL for the
       // second column.
-      withTable("t") {
-        sql(createTableIntCol)
-        sql("insert into t values('xyz', 42)")
+      withTableT {
         sql("alter table t add column (s string default concat('abc', 'def'))")
         sql("alter table t add column (x string)")
         checkAnswer(spark.table("t"), Row("xyz", 42, "abcdef", null))
         checkAnswer(sql("select i, s, x from t"), Row(42, "abcdef", null))
       }
+      // Test other supported data types.
+      withTableT {
+        sql("alter table t add columns (" +
+          "s boolean default true, " +
+          "t byte default cast('a' as byte), " +
+          "u short default cast(42 as short), " +
+          "v float default 0, " +
+          "w double default 0, " +
+          "x date default date'0000', " +
+          "y timestamp default timestamp'0000', " +
+          "z timestamp_ntz default cast(timestamp'0000' as timestamp_ntz), " +
+          "a1 timestamp_ltz default cast(timestamp'0000' as timestamp_ltz), " +
+          "a2 decimal(5, 2) default 123.45)")
+        checkAnswer(sql("select s, t, u, v, w, x is not null, " +
+          "y is not null, z is not null, a1 is not null, a2 is not null from 
t"),
+          Row(true, null, 42, 0.0f, 0.0d, true, true, true, true, true))
+      }
     }
 
     // This represents one test configuration over a data source.
-    case class Config(
-      dataSource: String,
-      sqlConf: Seq[Option[(String, String)]] = Seq())
+    case class TestCase(
+        dataSource: String,
+        configs: Seq[Config])
     // Run the test several times using each configuration.
     Seq(
-      Config(dataSource = "json",
+      TestCase(
+        dataSource = "csv",
         Seq(
-          Some(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS.key -> "false"))),
-      Config(dataSource = "csv",
+          Config(
+            None),
+          Config(
+            Some(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false")))),
+      TestCase(
+        dataSource = "json",
         Seq(
-          None,
-          Some(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false")))
-    ).foreach { config: Config =>
-      config.sqlConf.foreach {
-        _.map { kv: (String, String) =>
+          Config(
+            None,
+            insertNullsToStorage = false),
+          Config(
+            Some(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS.key -> "false")))),
+      TestCase(
+        dataSource = "parquet",
+        Seq(
+          Config(
+            Some(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false"),
+            insertNullsToStorage = false)))
+    ).foreach { testCase: TestCase =>
+      testCase.configs.foreach { config: Config =>
+        config.sqlConf.map { kv: (String, String) =>
           withSQLConf(kv) {
             // Run the test with the pair of custom SQLConf values.
-            runTest(config.dataSource)
+            runTest(testCase.dataSource, config)
           }
         }.getOrElse {
           // Run the test with default settings.
-          runTest(config.dataSource)
+          runTest(testCase.dataSource, config)
         }
       }
     }
   }
 
-  test("SPARK-39211 INSERT into JSON table, ADD COLUMNS with DEFAULTs, then 
SELECT them") {
-    // By default, INSERT commands into JSON tables do not store NULL values. 
Therefore, if such
-    // destination table columns have DEFAULT values, SELECTing out the same 
columns will return the
-    // default values (instead of NULL) since nothing is present in storage.
-    withTable("t") {
-      sql("create table t(a string default 'abc') using json")
-      sql("insert into t values(null)")
-      checkAnswer(spark.table("t"), Row("abc"))
-    }
-  }
-
   test("Stop task set if FileAlreadyExistsException was thrown") {
     Seq(true, false).foreach { fastFail =>
       withSQLConf("fs.file.impl" -> 
classOf[FileExistingTestFileSystem].getName,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to