This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c37d7dec8aa [SPARK-44271][SQL] Move default values functions from 
StructType to ResolveDefaultColumns
c37d7dec8aa is described below

commit c37d7dec8aa4d703b6dac9b9d60ff25d9d5dc665
Author: Rui Wang <rui.w...@databricks.com>
AuthorDate: Mon Jul 10 06:23:03 2023 -0400

    [SPARK-44271][SQL] Move default values functions from StructType to 
ResolveDefaultColumns
    
    ### What changes were proposed in this pull request?
    
    Move default values functions from StructType to ResolveDefaultColumns.
    
    ### Why are the changes needed?
    
    To simply DataType interface.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing test
    
    Closes #41820 from amaliujia/clean_up_left_errors.
    
    Authored-by: Rui Wang <rui.w...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../spark/sql/catalyst/csv/UnivocityParser.scala   |  2 +-
 .../spark/sql/catalyst/json/JacksonParser.scala    |  7 +++--
 .../catalyst/util/ResolveDefaultColumnsUtil.scala  | 32 ++++++++++++++++------
 .../org/apache/spark/sql/types/StructType.scala    |  8 ------
 .../apache/spark/sql/types/StructTypeSuite.scala   | 16 +++++------
 .../datasources/orc/OrcColumnarBatchReader.java    |  3 +-
 .../parquet/VectorizedParquetRecordReader.java     |  3 +-
 .../datasources/orc/OrcDeserializer.scala          | 15 ++++++----
 .../datasources/parquet/ParquetRowConverter.scala  | 15 ++++++----
 9 files changed, 58 insertions(+), 43 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index b58649da61c..a02d57c0bc7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -353,7 +353,7 @@ class UnivocityParser(
         case NonFatal(e) =>
           badRecordException = badRecordException.orElse(Some(e))
           // Use the corresponding DEFAULT value associated with the column, 
if any.
-          row.update(i, requiredSchema.existenceDefaultValues(i))
+          row.update(i, 
ResolveDefaultColumns.existenceDefaultValues(requiredSchema)(i))
       }
       i += 1
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 5286e16b088..03dce431837 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -448,14 +448,15 @@ class JacksonParser(
     var skipRow = false
 
     structFilters.reset()
-    resetExistenceDefaultsBitmask(schema)
+    lazy val bitmask = ResolveDefaultColumns.existenceDefaultsBitmask(schema)
+    resetExistenceDefaultsBitmask(schema, bitmask)
     while (!skipRow && nextUntil(parser, JsonToken.END_OBJECT)) {
       schema.getFieldIndex(parser.getCurrentName) match {
         case Some(index) =>
           try {
             row.update(index, fieldConverters(index).apply(parser))
             skipRow = structFilters.skipRow(row, index)
-            schema.existenceDefaultsBitmask(index) = false
+            bitmask(index) = false
           } catch {
             case e: SparkUpgradeException => throw e
             case NonFatal(e) if isRoot || enablePartialResults =>
@@ -469,7 +470,7 @@ class JacksonParser(
     if (skipRow) {
       None
     } else if (badRecordException.isEmpty) {
-      applyExistenceDefaultValuesToRow(schema, row)
+      applyExistenceDefaultValuesToRow(schema, row, bitmask)
       Some(row)
     } else {
       throw PartialResultException(row, badRecordException.get)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index 26efa8c8df2..6489fb9aaaf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -396,27 +396,30 @@ object ResolveDefaultColumns extends QueryErrorsBase {
    * above, for convenience.
    */
   def getExistenceDefaultsBitmask(schema: StructType): Array[Boolean] = {
-    Array.fill[Boolean](schema.existenceDefaultValues.size)(true)
+    Array.fill[Boolean](existenceDefaultValues(schema).size)(true)
   }
 
   /**
    * Resets the elements of the array initially returned from 
[[getExistenceDefaultsBitmask]] above.
    * Afterwards, set element(s) to false before calling 
[[applyExistenceDefaultValuesToRow]] below.
    */
-  def resetExistenceDefaultsBitmask(schema: StructType): Unit = {
-    for (i <- 0 until schema.existenceDefaultValues.size) {
-      schema.existenceDefaultsBitmask(i) = (schema.existenceDefaultValues(i) 
!= null)
+  def resetExistenceDefaultsBitmask(schema: StructType, bitmask: 
Array[Boolean]): Unit = {
+    val defaultValues = existenceDefaultValues(schema)
+    for (i <- 0 until defaultValues.size) {
+      bitmask(i) = (defaultValues(i) != null)
     }
   }
 
   /**
    * Updates a subset of columns in the row with default values from the 
metadata in the schema.
    */
-  def applyExistenceDefaultValuesToRow(schema: StructType, row: InternalRow): 
Unit = {
-    if (schema.hasExistenceDefaultValues) {
-      for (i <- 0 until schema.existenceDefaultValues.size) {
-        if (schema.existenceDefaultsBitmask(i)) {
-          row.update(i, schema.existenceDefaultValues(i))
+  def applyExistenceDefaultValuesToRow(schema: StructType, row: InternalRow,
+      bitmask: Array[Boolean]): Unit = {
+    val existingValues = existenceDefaultValues(schema)
+    if (hasExistenceDefaultValues(schema)) {
+      for (i <- 0 until existingValues.size) {
+        if (bitmask(i)) {
+          row.update(i, existingValues(i))
         }
       }
     }
@@ -437,6 +440,17 @@ object ResolveDefaultColumns extends QueryErrorsBase {
     rows.toSeq
   }
 
+  /**
+   * These define existence default values for the struct fields for 
efficiency purposes.
+   * The caller should avoid using such methods in a loop for efficiency.
+   */
+  def existenceDefaultValues(schema: StructType): Array[Any] =
+    getExistenceDefaultValues(schema)
+  def existenceDefaultsBitmask(schema: StructType): Array[Boolean] =
+    getExistenceDefaultsBitmask(schema)
+  def hasExistenceDefaultValues(schema: StructType): Boolean =
+    existenceDefaultValues(schema).exists(_ != null)
+
   /**
    * This is an Analyzer for processing default column values using built-in 
functions only.
    */
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index 56e1356aec7..5eea207d15b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.parser.{DataTypeParser, 
LegacyTypeStringPar
 import org.apache.spark.sql.catalyst.trees.Origin
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util.{SparkStringUtils, StringConcat}
-import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.util.collection.Utils
 
@@ -498,13 +497,6 @@ case class StructType(fields: Array[StructField]) extends 
DataType with Seq[Stru
   override private[spark] def existsRecursively(f: (DataType) => Boolean): 
Boolean = {
     f(this) || fields.exists(field => field.dataType.existsRecursively(f))
   }
-
-  /**
-   * These define and cache existence default values for the struct fields for 
efficiency purposes.
-   */
-  private[sql] lazy val existenceDefaultValues: Array[Any] = 
getExistenceDefaultValues(this)
-  private[sql] lazy val existenceDefaultsBitmask: Array[Boolean] = 
getExistenceDefaultsBitmask(this)
-  private[sql] lazy val hasExistenceDefaultValues = 
existenceDefaultValues.exists(_ != null)
 }
 
 /**
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
index 1f4d8311540..6eedd7f9b6f 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
@@ -538,10 +538,10 @@ class StructTypeSuite extends SparkFunSuite with 
SQLHelper {
           
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "'abc'")
           .build()),
       StructField("c3", BooleanType)))
-    assert(source1.existenceDefaultValues.size == 3)
-    assert(source1.existenceDefaultValues(0) == 42)
-    assert(source1.existenceDefaultValues(1) == UTF8String.fromString("abc"))
-    assert(source1.existenceDefaultValues(2) == null)
+    assert(ResolveDefaultColumns.existenceDefaultValues(source1).size == 3)
+    assert(ResolveDefaultColumns.existenceDefaultValues(source1)(0) == 42)
+    assert(ResolveDefaultColumns.existenceDefaultValues(source1)(1) == 
UTF8String.fromString("abc"))
+    assert(ResolveDefaultColumns.existenceDefaultValues(source1)(2) == null)
 
     // Positive test: StructType.defaultValues works because the existence 
default value parses and
     // resolves successfully, then evaluates to a non-literal expression: this 
is constant-folded at
@@ -553,8 +553,8 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
           
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "1 + 1")
           .build())))
     val error = "fails to parse as a valid literal value"
-    assert(source2.existenceDefaultValues.size == 1)
-    assert(source2.existenceDefaultValues(0) == 2)
+    assert(ResolveDefaultColumns.existenceDefaultValues(source2).size == 1)
+    assert(ResolveDefaultColumns.existenceDefaultValues(source2)(0) == 2)
 
     // Negative test: StructType.defaultValues fails because the existence 
default value fails to
     // parse.
@@ -565,7 +565,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
           
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "invalid")
           .build())))
     assert(intercept[AnalysisException] {
-      source3.existenceDefaultValues
+      ResolveDefaultColumns.existenceDefaultValues(source3)
     }.getMessage.contains(error))
 
     // Negative test: StructType.defaultValues fails because the existence 
default value fails to
@@ -581,7 +581,7 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
             "(SELECT 'abc' FROM missingtable)")
           .build())))
     assert(intercept[AnalysisException] {
-      source4.existenceDefaultValues
+      ResolveDefaultColumns.existenceDefaultValues(source4)
     }.getMessage.contains(error))
   }
 }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
index 97f9d47d095..b6184baa2e0 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
@@ -32,6 +32,7 @@ import org.apache.orc.TypeDescription;
 import org.apache.orc.mapred.OrcInputFormat;
 
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns;
 import 
org.apache.spark.sql.execution.datasources.orc.OrcShimUtils.VectorizedRowBatchWrap;
 import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
 import org.apache.spark.sql.execution.vectorized.ConstantColumnVector;
@@ -179,7 +180,7 @@ public class OrcColumnarBatchReader extends 
RecordReader<Void, ColumnarBatch> {
           OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
           // Check if the missing column has an associated default value in 
the schema metadata.
           // If so, fill the corresponding column vector with the value.
-          Object defaultValue = requiredSchema.existenceDefaultValues()[i];
+          Object defaultValue = 
ResolveDefaultColumns.existenceDefaultValues(requiredSchema)[i];
           if (defaultValue == null) {
             missingCol.putNulls(0, capacity);
           } else if (!missingCol.appendObjects(capacity, 
defaultValue).isPresent()) {
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 25712367c23..0f0455c0d8e 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns;
 import scala.Option;
 import scala.collection.JavaConverters;
 
@@ -280,7 +281,7 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
     for (int i = 0; i < columnVectors.length; i++) {
       Object defaultValue = null;
       if (sparkRequestedSchema != null) {
-        defaultValue = sparkRequestedSchema.existenceDefaultValues()[i];
+        defaultValue = 
ResolveDefaultColumns.existenceDefaultValues(sparkRequestedSchema)[i];
       }
       columnVectors[i] = new 
ParquetColumnVector(parquetColumn.children().apply(i),
         (WritableColumnVector) vectors[i], capacity, memMode, missingColumns, 
true, defaultValue);
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
index 5bac404fd53..795c2618af1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
@@ -37,6 +37,8 @@ class OrcDeserializer(
 
   private val resultRow = new 
SpecificInternalRow(requiredSchema.map(_.dataType))
 
+  private lazy val bitmask = 
ResolveDefaultColumns.existenceDefaultsBitmask(requiredSchema)
+
   // `fieldWriters(index)` is
   // - null if the respective source column is missing, since the output value
   //   is always null in this case
@@ -46,13 +48,14 @@ class OrcDeserializer(
     // ADD COLUMN c DEFAULT <value>" on the Orc table, this adds one field to 
the Catalyst schema.
     // Then if we query the old files with the new Catalyst schema, we should 
only apply the
     // existence default value to the columns whose IDs are not explicitly 
requested.
-    if (requiredSchema.hasExistenceDefaultValues) {
-      for (i <- 0 until requiredSchema.existenceDefaultValues.size) {
-        requiredSchema.existenceDefaultsBitmask(i) =
+    val existingValues = 
ResolveDefaultColumns.existenceDefaultValues(requiredSchema)
+    if (ResolveDefaultColumns.hasExistenceDefaultValues(requiredSchema)) {
+      for (i <- 0 until existingValues.size) {
+        bitmask(i) =
           if (requestedColIds(i) != -1) {
             false
           } else {
-            requiredSchema.existenceDefaultValues(i) != null
+            existingValues(i) != null
           }
       }
     }
@@ -81,7 +84,7 @@ class OrcDeserializer(
       }
       targetColumnIndex += 1
     }
-    applyExistenceDefaultValuesToRow(requiredSchema, resultRow)
+    applyExistenceDefaultValuesToRow(requiredSchema, resultRow, bitmask)
     resultRow
   }
 
@@ -98,7 +101,7 @@ class OrcDeserializer(
       }
       targetColumnIndex += 1
     }
-    applyExistenceDefaultValuesToRow(requiredSchema, resultRow)
+    applyExistenceDefaultValuesToRow(requiredSchema, resultRow, bitmask)
     resultRow
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index ba0314f3be3..e257be3d189 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -35,7 +35,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.types.{PhysicalByteType, 
PhysicalShortType}
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
CaseInsensitiveMap, DateTimeUtils, GenericArrayData, ResolveDefaultColumns}
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -188,11 +188,13 @@ private[parquet] class ParquetRowConverter(
 
   private[this] val currentRow = new 
SpecificInternalRow(catalystType.map(_.dataType))
 
+  private[this] lazy val bitmask = 
ResolveDefaultColumns.existenceDefaultsBitmask(catalystType)
+
   /**
    * The [[InternalRow]] converted from an entire Parquet record.
    */
   def currentRecord: InternalRow = {
-    applyExistenceDefaultValuesToRow(catalystType, currentRow)
+    applyExistenceDefaultValuesToRow(catalystType, currentRow, bitmask)
     currentRow
   }
 
@@ -230,9 +232,10 @@ private[parquet] class ParquetRowConverter(
       }
     // If any fields in the Catalyst result schema have associated existence 
default values,
     // maintain a boolean array to track which fields have been explicitly 
assigned for each row.
-    if (catalystType.hasExistenceDefaultValues) {
-      for (i <- 0 until catalystType.existenceDefaultValues.size) {
-        catalystType.existenceDefaultsBitmask(i) =
+    if (ResolveDefaultColumns.hasExistenceDefaultValues(catalystType)) {
+      val existingValues = 
ResolveDefaultColumns.existenceDefaultValues(catalystType)
+      for (i <- 0 until existingValues.size) {
+       bitmask(i) =
           // Assume the schema for a Parquet file-based table contains N 
fields. Then if we later
           // run a command "ALTER TABLE t ADD COLUMN c DEFAULT <value>" on the 
Parquet table, this
           // adds one field to the Catalyst schema. Then if we query the old 
files with the new
@@ -240,7 +243,7 @@ private[parquet] class ParquetRowConverter(
           if (i < parquetType.getFieldCount) {
             false
           } else {
-            catalystType.existenceDefaultValues(i) != null
+            existingValues(i) != null
           }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to