This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3e6598e8d4f [SPARK-39265][SQL] Support vectorized Parquet scans with DEFAULT values 3e6598e8d4f is described below commit 3e6598e8d4fbfd7db595d991f6ebad92eb2fa33f Author: Daniel Tenedorio <daniel.tenedo...@databricks.com> AuthorDate: Fri Jun 3 11:44:55 2022 -0700 [SPARK-39265][SQL] Support vectorized Parquet scans with DEFAULT values ### What changes were proposed in this pull request? Support vectorized Parquet scans when the table schema has associated DEFAULT column values. Example: ``` create table t(i int) using parquet; insert into t values(42); alter table t add column s string default concat('abc', def'); select * from t; > 42, 'abcdef' ``` ### Why are the changes needed? This change makes it easier to build, query, and maintain tables backed by Parquet data. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? This PR includes new test coverage. Closes #36672 from dtenedor/default-parquet-vectorized. Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../datasources/parquet/ParquetColumnVector.java | 31 +++++++------ .../parquet/SpecificParquetRecordReaderBase.java | 5 ++- .../parquet/VectorizedParquetRecordReader.java | 6 ++- .../execution/vectorized/WritableColumnVector.java | 52 ++++++++++++++++++++++ .../execution/datasources/DataSourceStrategy.scala | 10 +++-- .../sql/internal/BaseSessionStateBuilder.scala | 2 +- .../sql/sources/DataSourceAnalysisSuite.scala | 3 +- .../org/apache/spark/sql/sources/InsertSuite.scala | 17 ++++--- .../spark/sql/hive/HiveSessionStateBuilder.scala | 2 +- 9 files changed, 97 insertions(+), 31 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java index c8399d9137f..2ad8cdfcca6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java @@ -55,22 +55,14 @@ final class ParquetColumnVector { /** Reader for this column - only set if 'isPrimitive' is true */ private VectorizedColumnReader columnReader; - ParquetColumnVector( - ParquetColumn column, - WritableColumnVector vector, - int capacity, - MemoryMode memoryMode, - Set<ParquetColumn> missingColumns) { - this(column, vector, capacity, memoryMode, missingColumns, true); - } - ParquetColumnVector( ParquetColumn column, WritableColumnVector vector, int capacity, MemoryMode memoryMode, Set<ParquetColumn> missingColumns, - boolean isTopLevel) { + boolean isTopLevel, + Object defaultValue) { DataType sparkType = column.sparkType(); if (!sparkType.sameType(vector.dataType())) { throw new IllegalArgumentException("Spark type: " + sparkType + @@ -83,8 +75,21 @@ final class ParquetColumnVector { this.isPrimitive = column.isPrimitive(); if (missingColumns.contains(column)) { - vector.setAllNull(); - return; + if (defaultValue == null) { + vector.setAllNull(); + return; + } + // For Parquet tables whose columns have associated DEFAULT values, this reader must return + // those values instead of NULL when the corresponding columns are not present in storage. + // Here we write the 'defaultValue' to each element in the new WritableColumnVector using + // the appendObjects method. This delegates to some specific append* method depending on the + // type of 'defaultValue'; for example, if 'defaultValue' is a Float, then we call the + // appendFloats method. + if (!vector.appendObjects(capacity, defaultValue).isPresent()) { + throw new IllegalArgumentException("Cannot assign default column value to result " + + "column batch in vectorized Parquet reader because the data type is not supported: " + + defaultValue); + } } if (isPrimitive) { @@ -101,7 +106,7 @@ final class ParquetColumnVector { for (int i = 0; i < column.children().size(); i++) { ParquetColumnVector childCv = new ParquetColumnVector(column.children().apply(i), - vector.getChild(i), capacity, memoryMode, missingColumns, false); + vector.getChild(i), capacity, memoryMode, missingColumns, false, null); children.add(childCv); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 61aab6c5398..6ea1a0c37b1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -71,6 +71,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo protected MessageType fileSchema; protected MessageType requestedSchema; protected StructType sparkSchema; + protected StructType sparkRequestedSchema; // Keep track of the version of the parquet writer. An older version wrote // corrupt delta byte arrays, and the version check is needed to detect that. protected ParsedVersion writerVersion; @@ -113,10 +114,10 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo fileReader.setRequestedSchema(requestedSchema); String sparkRequestedSchemaString = configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA()); - StructType sparkRequestedSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString); + this.sparkRequestedSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString); ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(configuration); this.parquetColumn = converter.convertParquetColumn(requestedSchema, - Option.apply(sparkRequestedSchema)); + Option.apply(this.sparkRequestedSchema)); this.sparkSchema = (StructType) parquetColumn.sparkType(); this.totalRowCount = fileReader.getFilteredRecordCount(); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 80f6f88810a..6a30876a3bc 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -259,8 +259,12 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa columnVectors = new ParquetColumnVector[sparkSchema.fields().length]; for (int i = 0; i < columnVectors.length; i++) { + Object defaultValue = null; + if (sparkRequestedSchema != null) { + defaultValue = sparkRequestedSchema.existenceDefaultValues()[i]; + } columnVectors[i] = new ParquetColumnVector(parquetColumn.children().apply(i), - vectors[i], capacity, memMode, missingColumns); + vectors[i], capacity, memMode, missingColumns, true, defaultValue); } if (partitionColumns != null) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index 88930d509cf..5debc1adacd 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.vectorized; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.util.Optional; import com.google.common.annotations.VisibleForTesting; @@ -690,6 +691,57 @@ public abstract class WritableColumnVector extends ColumnVector { return elementsAppended; } + /** + * Appends multiple copies of a Java Object to the vector using the corresponding append* method + * above. + * @param length: The number of instances to append + * @param value value to append to the vector + * @return the number of values appended if the value maps to one of the append* methods above, + * or Optional.empty() otherwise. + */ + public Optional<Integer> appendObjects(int length, Object value) { + if (value instanceof Boolean) { + return Optional.of(appendBooleans(length, (Boolean) value)); + } + if (value instanceof Byte) { + return Optional.of(appendBytes(length, (Byte) value)); + } + if (value instanceof Decimal) { + Decimal decimal = (Decimal) value; + long unscaled = decimal.toUnscaledLong(); + if (decimal.precision() < 10) { + return Optional.of(appendInts(length, (int) unscaled)); + } else { + return Optional.of(appendLongs(length, unscaled)); + } + } + if (value instanceof Double) { + return Optional.of(appendDoubles(length, (Double) value)); + } + if (value instanceof Float) { + return Optional.of(appendFloats(length, (Float) value)); + } + if (value instanceof Integer) { + return Optional.of(appendInts(length, (Integer) value)); + } + if (value instanceof Long) { + return Optional.of(appendLongs(length, (Long) value)); + } + if (value instanceof Short) { + return Optional.of(appendShorts(length, (Short) value)); + } + if (value instanceof UTF8String) { + UTF8String utf8 = (UTF8String) value; + byte[] bytes = utf8.getBytes(); + int result = 0; + for (int i = 0; i < length; ++i) { + result += appendByteArray(bytes, 0, bytes.length); + } + return Optional.of(result); + } + return Optional.empty(); + } + // `WritableColumnVector` puts the data of array in the first child column vector, and puts the // array offsets and lengths in the current column vector. @Override diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index bf5882bd63f..429b7072cae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 -import org.apache.spark.sql.catalyst.util.V2ExpressionBuilder +import org.apache.spark.sql.catalyst.util.{ResolveDefaultColumns, V2ExpressionBuilder} import org.apache.spark.sql.connector.catalog.SupportsRead import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, FieldReference, NullOrdering, SortDirection, SortOrder => V2SortOrder, SortValue} @@ -61,7 +61,7 @@ import org.apache.spark.unsafe.types.UTF8String * Note that, this rule must be run after `PreprocessTableCreation` and * `PreprocessTableInsertion`. */ -object DataSourceAnalysis extends Rule[LogicalPlan] { +case class DataSourceAnalysis(analyzer: Analyzer) extends Rule[LogicalPlan] { def resolver: Resolver = conf.resolver @@ -147,7 +147,11 @@ object DataSourceAnalysis extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) => - CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) + val newTableDesc: CatalogTable = + tableDesc.copy(schema = + ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults( + analyzer, tableDesc.schema, "CREATE TABLE")) + CreateDataSourceTableCommand(newTableDesc, ignoreIfExists = mode == SaveMode.Ignore) case CreateTable(tableDesc, mode, Some(query)) if query.resolved && DDLUtils.isDatasourceTable(tableDesc) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 92725cda49c..2271990741d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -195,7 +195,7 @@ abstract class BaseSessionStateBuilder( DetectAmbiguousSelfJoin +: PreprocessTableCreation(session) +: PreprocessTableInsertion +: - DataSourceAnalysis +: + DataSourceAnalysis(this) +: ReplaceCharWithVarchar +: customPostHocResolutionRules diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala index 2df79e3da80..152d096afdf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala @@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, Literal} import org.apache.spark.sql.catalyst.plans.SQLHelper @@ -70,7 +71,7 @@ class DataSourceAnalysisSuite extends SparkFunSuite with BeforeAndAfterAll with Cast(e, dt, Option(SQLConf.get.sessionLocalTimeZone)) } } - val rule = DataSourceAnalysis + val rule = DataSourceAnalysis(SimpleAnalyzer) testRule( "convertStaticPartitions only handle INSERT having at least static partitions", caseSensitive) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index c5313a642f9..35a6f8f8a0b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -1029,24 +1029,22 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } // The default value fails to analyze. withTable("t") { - sql("create table t(i boolean, s bigint default badvalue) using parquet") assert(intercept[AnalysisException] { - sql("insert into t values (default, default)") + sql("create table t(i boolean, s bigint default badvalue) using parquet") }.getMessage.contains(Errors.COMMON_SUBSTRING)) } // The default value analyzes to a table not in the catalog. withTable("t") { - sql("create table t(i boolean, s bigint default (select min(x) from badtable)) using parquet") assert(intercept[AnalysisException] { - sql("insert into t values (default, default)") + sql("create table t(i boolean, s bigint default (select min(x) from badtable)) " + + "using parquet") }.getMessage.contains(Errors.COMMON_SUBSTRING)) } // The default value parses but refers to a table from the catalog. withTable("t", "other") { sql("create table other(x string) using parquet") - sql("create table t(i boolean, s bigint default (select min(x) from other)) using parquet") assert(intercept[AnalysisException] { - sql("insert into t values (default, default)") + sql("create table t(i boolean, s bigint default (select min(x) from other)) using parquet") }.getMessage.contains(Errors.COMMON_SUBSTRING)) } // The default value has an explicit alias. It fails to evaluate when inlined into the VALUES @@ -1083,10 +1081,9 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } // The default value parses but the type is not coercible. withTable("t") { - sql("create table t(i boolean, s bigint default false) using parquet") assert(intercept[AnalysisException] { - sql("insert into t values (default, default)") - }.getMessage.contains("provided a value of incompatible type")) + sql("create table t(i boolean, s bigint default false) using parquet") + }.getMessage.contains(Errors.COMMON_SUBSTRING)) } // The number of columns in the INSERT INTO statement is greater than the number of columns in // the table. @@ -1617,6 +1614,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { TestCase( dataSource = "parquet", Seq( + Config( + None), Config( Some(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false"), insertNullsToStorage = false))) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 510d4c13117..b554958572a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -100,7 +100,7 @@ class HiveSessionStateBuilder( RelationConversions(catalog) +: PreprocessTableCreation(session) +: PreprocessTableInsertion +: - DataSourceAnalysis +: + DataSourceAnalysis(this) +: HiveAnalysis +: ReplaceCharWithVarchar +: customPostHocResolutionRules --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org