This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new f715e8a02e8 [HUDI-7565] Create spark file readers to read a single file instead of an entire partition (#10954) f715e8a02e8 is described below commit f715e8a02e8ee5561274ad38bdda5e863317b240 Author: Jon Vexler <jbvex...@gmail.com> AuthorDate: Fri Apr 12 13:29:12 2024 -0400 [HUDI-7565] Create spark file readers to read a single file instead of an entire partition (#10954) Co-authored-by: Jonathan Vexler <=> --- .../datasources/parquet/SparkParquetReader.scala | 44 ++++ .../org/apache/spark/sql/hudi/SparkAdapter.scala | 18 +- .../parquet/SparkParquetReaderBase.scala | 96 +++++++ .../parquet/TestSparkParquetReaderFormat.scala | 56 ++++ .../hudi/functional/TestSparkParquetReader.java | 48 ++++ .../org/apache/hudi/util/JavaConversions.scala | 22 +- .../apache/spark/sql/adapter/Spark2Adapter.scala | 20 +- .../datasources/parquet/Spark24ParquetReader.scala | 225 ++++++++++++++++ .../apache/spark/sql/adapter/Spark3_0Adapter.scala | 20 +- .../datasources/parquet/Spark30ParquetReader.scala | 229 +++++++++++++++++ .../apache/spark/sql/adapter/Spark3_1Adapter.scala | 19 +- .../datasources/parquet/Spark31ParquetReader.scala | 242 ++++++++++++++++++ .../apache/spark/sql/adapter/Spark3_2Adapter.scala | 20 +- .../datasources/parquet/Spark32ParquetReader.scala | 267 +++++++++++++++++++ .../apache/spark/sql/adapter/Spark3_3Adapter.scala | 20 +- .../datasources/parquet/Spark33ParquetReader.scala | 268 +++++++++++++++++++ .../apache/spark/sql/adapter/Spark3_4Adapter.scala | 20 +- .../datasources/parquet/Spark34ParquetReader.scala | 277 ++++++++++++++++++++ .../apache/spark/sql/adapter/Spark3_5Adapter.scala | 20 +- .../datasources/parquet/Spark35ParquetReader.scala | 284 +++++++++++++++++++++ 20 files changed, 2206 insertions(+), 9 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala new file mode 100644 index 00000000000..920e4cb0e0b --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType + +trait SparkParquetReader extends Serializable { + /** + * Read an individual parquet file + * + * @param file parquet file to read + * @param requiredSchema desired output schema of the data + * @param partitionSchema schema of the partition columns. Partition values will be appended to the end of every row + * @param filters filters for data skipping. Not guaranteed to be used; the spark plan will also apply the filters. + * @param sharedConf the hadoop conf + * @return iterator of rows read from the file output type says [[InternalRow]] but could be [[ColumnarBatch]] + */ + def read(file: PartitionedFile, + requiredSchema: StructType, + partitionSchema: StructType, + filters: Seq[Filter], + sharedConf: Configuration): Iterator[InternalRow] +} diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index 1c6111afe47..91fe6dabc2e 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.common.table.HoodieTableMetaClient @@ -33,7 +34,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} import org.apache.spark.sql.catalyst.util.DateFormatter import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, SparkParquetReader} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.parser.HoodieExtendedParserInterface import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.{DataType, Metadata, StructType} @@ -214,4 +216,18 @@ trait SparkAdapter extends Serializable { * Tries to translate a Catalyst Expression into data source Filter */ def translateFilter(predicate: Expression, supportNestedPredicatePushdown: Boolean = false): Option[Filter] + + /** + * Get parquet file reader + * + * @param vectorized true if vectorized reading is not prohibited due to schema, reading mode, etc + * @param sqlConf the [[SQLConf]] used for the read + * @param options passed as a param to the file format + * @param hadoopConf some configs will be set for the hadoopConf + * @return parquet file reader + */ + def createParquetFileReader(vectorized: Boolean, + sqlConf: SQLConf, + options: Map[String, String], + hadoopConf: Configuration): SparkParquetReader } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala new file mode 100644 index 00000000000..2b47da76456 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType + +abstract class SparkParquetReaderBase(enableVectorizedReader: Boolean, + enableParquetFilterPushDown: Boolean, + pushDownDate: Boolean, + pushDownTimestamp: Boolean, + pushDownDecimal: Boolean, + pushDownInFilterThreshold: Int, + isCaseSensitive: Boolean, + timestampConversion: Boolean, + enableOffHeapColumnVector: Boolean, + capacity: Int, + returningBatch: Boolean, + enableRecordFilter: Boolean, + timeZoneId: Option[String]) extends SparkParquetReader { + /** + * Read an individual parquet file + * + * @param file parquet file to read + * @param requiredSchema desired output schema of the data + * @param partitionSchema schema of the partition columns. Partition values will be appended to the end of every row + * @param filters filters for data skipping. Not guaranteed to be used; the spark plan will also apply the filters. + * @param sharedConf the hadoop conf + * @return iterator of rows read from the file output type says [[InternalRow]] but could be [[ColumnarBatch]] + */ + final def read(file: PartitionedFile, + requiredSchema: StructType, + partitionSchema: StructType, + filters: Seq[Filter], + sharedConf: Configuration): Iterator[InternalRow] = { + val conf = new Configuration(sharedConf) + conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, requiredSchema.json) + conf.set(ParquetWriteSupport.SPARK_ROW_SCHEMA, requiredSchema.json) + ParquetWriteSupport.setSchema(requiredSchema, conf) + doRead(file, requiredSchema, partitionSchema, filters, conf) + } + + /** + * Implemented for each spark version + * + * @param file parquet file to read + * @param requiredSchema desired output schema of the data + * @param partitionSchema schema of the partition columns. Partition values will be appended to the end of every row + * @param filters filters for data skipping. Not guaranteed to be used; the spark plan will also apply the filters. + * @param sharedConf the hadoop conf + * @return iterator of rows read from the file output type says [[InternalRow]] but could be [[ColumnarBatch]] + */ + protected def doRead(file: PartitionedFile, + requiredSchema: StructType, + partitionSchema: StructType, + filters: Seq[Filter], + sharedConf: Configuration): Iterator[InternalRow] + +} + +trait SparkParquetReaderBuilder { + /** + * Get parquet file reader + * + * @param vectorized true if vectorized reading is not prohibited due to schema, reading mode, etc + * @param sqlConf the [[SQLConf]] used for the read + * @param options passed as a param to the file format + * @param hadoopConf some configs will be set for the hadoopConf + * @return properties needed for reading a parquet file + */ + def build(vectorized: Boolean, + sqlConf: SQLConf, + options: Map[String, String], + hadoopConf: Configuration): SparkParquetReader +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSparkParquetReaderFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSparkParquetReaderFormat.scala new file mode 100644 index 00000000000..bf513847cfc --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSparkParquetReaderFormat.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.hudi.SparkAdapterSupport +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + +/** + * Class used to test [[SparkParquetReader]] + * This class should have the same functionality as [[ParquetFileFormat]] + */ +class TestSparkParquetReaderFormat extends ParquetFileFormat with SparkAdapterSupport { + + override def buildReaderWithPartitionValues(sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + //reader must be created outsize of the lambda. This happens on the driver + val reader = sparkAdapter.createParquetFileReader(supportBatch(sparkSession, + StructType(partitionSchema.fields ++ requiredSchema.fields)), + sparkSession.sqlContext.conf, options, hadoopConf) + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + + (file: PartitionedFile) => { + //code inside the lambda will run on the executor + reader.read(file, requiredSchema, partitionSchema, filters, broadcastedHadoopConf.value.value) + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkParquetReader.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkParquetReader.java new file mode 100644 index 00000000000..1c755ce3c8f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkParquetReader.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.functional; + +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.util.JavaConversions; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("functional") +public class TestSparkParquetReader extends TestBootstrapReadBase { + + @Test + public void testReader() { + dataGen = new HoodieTestDataGenerator(dashPartitionPaths); + int n = 10; + Dataset<Row> inserts = makeInsertDf("000", n); + inserts.write().format("parquet").save(bootstrapBasePath); + Dataset<Row> parquetReadRows = JavaConversions.createTestDataFrame(sparkSession, bootstrapBasePath); + Dataset<Row> datasourceReadRows = sparkSession.read().format("parquet").load(bootstrapBasePath); + assertEquals(datasourceReadRows.count(), n); + assertEquals(parquetReadRows.count(), n); + assertEquals(datasourceReadRows.except(parquetReadRows).count(), 0); + assertEquals(parquetReadRows.except(datasourceReadRows).count(), 0); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/JavaConversions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/JavaConversions.scala index c9abe090975..52333e72628 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/JavaConversions.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/JavaConversions.scala @@ -18,9 +18,14 @@ package org.apache.hudi.util +import org.apache.spark.sql.execution.datasources.parquet.TestSparkParquetReaderFormat +import org.apache.hudi.SparkAdapterSupport +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.{DataFrame, SparkSession} + import java.util.function.Predicate -object JavaConversions { +object JavaConversions extends SparkAdapterSupport { def getPredicate[T](function1: (T) => Boolean): Predicate[T] = { new Predicate[T] { override def test(t: T): Boolean = function1.apply(t) @@ -34,4 +39,19 @@ object JavaConversions { } } } + + /** + * Read parquet files using [[TestSparkParquetReaderFormat]] + * + * @param sparkSession the spark session + * @param paths comma seperated list of parquet files or directories containing parquet files + * @return dataframe containing the data from the input paths + */ + def createTestDataFrame(sparkSession: SparkSession, paths: String): DataFrame = { + sparkSession.sqlContext.baseRelationToDataFrame(DataSource.apply( + sparkSession = sparkSession, + className = "org.apache.spark.sql.execution.datasources.parquet.TestSparkParquetReaderFormat", + paths = paths.split(",").toSeq + ).resolveRelation()) + } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index 932e3dd05f0..7b5f5847562 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.adapter import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.common.table.HoodieTableMetaClient @@ -31,10 +32,11 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable} import org.apache.spark.sql.catalyst.util.DateFormatter import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark24LegacyHoodieParquetFileFormat} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark24LegacyHoodieParquetFileFormat, Spark24ParquetReader, SparkParquetReader} import org.apache.spark.sql.execution.vectorized.MutableColumnarRow import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.parser.HoodieExtendedParserInterface import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, StructType} @@ -205,4 +207,20 @@ class Spark2Adapter extends SparkAdapter { batch.setNumRows(numRows) batch } + + /** + * Get parquet file reader + * + * @param vectorized true if vectorized reading is not prohibited due to schema, reading mode, etc + * @param sqlConf the [[SQLConf]] used for the read + * @param options passed as a param to the file format + * @param hadoopConf some configs will be set for the hadoopConf + * @return parquet file reader + */ + override def createParquetFileReader(vectorized: Boolean, + sqlConf: SQLConf, + options: Map[String, String], + hadoopConf: Configuration): SparkParquetReader = { + Spark24ParquetReader.build(vectorized, sqlConf, options, hadoopConf) + } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala new file mode 100644 index 00000000000..7fa30a36222 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS +import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader} +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.{PartitionedFile, RecordReaderIterator} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType + +import java.net.URI + +class Spark24ParquetReader(enableVectorizedReader: Boolean, + enableParquetFilterPushDown: Boolean, + pushDownDate: Boolean, + pushDownTimestamp: Boolean, + pushDownDecimal: Boolean, + pushDownInFilterThreshold: Int, + pushDownStringStartWith: Boolean, + isCaseSensitive: Boolean, + timestampConversion: Boolean, + enableOffHeapColumnVector: Boolean, + capacity: Int, + returningBatch: Boolean, + enableRecordFilter: Boolean, + timeZoneId: Option[String]) extends SparkParquetReaderBase( + enableVectorizedReader = enableVectorizedReader, + enableParquetFilterPushDown = enableParquetFilterPushDown, + pushDownDate = pushDownDate, + pushDownTimestamp = pushDownTimestamp, + pushDownDecimal = pushDownDecimal, + pushDownInFilterThreshold = pushDownInFilterThreshold, + isCaseSensitive = isCaseSensitive, + timestampConversion = timestampConversion, + enableOffHeapColumnVector = enableOffHeapColumnVector, + capacity = capacity, + returningBatch = returningBatch, + enableRecordFilter = enableRecordFilter, + timeZoneId = timeZoneId) { + + /** + * Read an individual parquet file + * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark v2.4.8 adapted here + * + * @param file parquet file to read + * @param requiredSchema desired output schema of the data + * @param partitionSchema schema of the partition columns. Partition values will be appended to the end of every row + * @param filters filters for data skipping. Not guaranteed to be used; the spark plan will also apply the filters. + * @param sharedConf the hadoop conf + * @return iterator of rows read from the file output type says [[InternalRow]] but could be [[ColumnarBatch]] + */ + protected def doRead(file: PartitionedFile, + requiredSchema: StructType, + partitionSchema: StructType, + filters: Seq[Filter], + sharedConf: Configuration): Iterator[InternalRow] = { + assert(file.partitionValues.numFields == partitionSchema.size) + + val fileSplit = + new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty) + val filePath = fileSplit.getPath + + val split = + new org.apache.parquet.hadoop.ParquetInputSplit( + filePath, + fileSplit.getStart, + fileSplit.getStart + fileSplit.getLength, + fileSplit.getLength, + fileSplit.getLocations, + null) + + lazy val footerFileMetaData = + ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData + // Try to push down filters when filter push-down is enabled. + val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema + val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal, + pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(parquetFilters.createFilter(parquetSchema, _)) + .reduceOption(FilterApi.and) + } else { + None + } + + // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' + // *only* if the file was created by something other than "parquet-mr", so check the actual + // writer here for this file. We have to do this per-file, as each file in the table may + // have different writers. + // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. + def isCreatedByParquetMr: Boolean = + footerFileMetaData.getCreatedBy().startsWith("parquet-mr") + + val convertTz = + if (timestampConversion && !isCreatedByParquetMr) { + Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + } else { + None + } + + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = + new TaskAttemptContextImpl(sharedConf, attemptId) + + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + if (pushed.isDefined) { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) + } + val taskContext = Option(TaskContext.get()) + if (enableVectorizedReader) { + val vectorizedReader = new VectorizedParquetRecordReader( + convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity) + val iter = new RecordReaderIterator(vectorizedReader) + // SPARK-23457 Register a task completion lister before `initialization`. + taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + vectorizedReader.initialize(split, hadoopAttemptContext) + vectorizedReader.initBatch(partitionSchema, file.partitionValues) + if (returningBatch) { + vectorizedReader.enableReturningBatches() + } + + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. + iter.asInstanceOf[Iterator[InternalRow]] + } else { + // ParquetRecordReader returns UnsafeRow + val reader = if (pushed.isDefined && enableRecordFilter) { + val parquetFilter = FilterCompat.get(pushed.get, null) + new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter) + } else { + new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz)) + } + val iter = new RecordReaderIterator(reader) + // SPARK-23457 Register a task completion lister before `initialization`. + taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + reader.initialize(split, hadoopAttemptContext) + + val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes + val joinedRow = new JoinedRow() + val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + + // This is a horrible erasure hack... if we type the iterator above, then it actually check + // the type in next() and we get a class cast exception. If we make that function return + // Object, then we can defer the cast until later! + if (partitionSchema.length == 0) { + // There is no partition columns + iter.asInstanceOf[Iterator[InternalRow]] + } else { + iter.asInstanceOf[Iterator[InternalRow]] + .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues))) + } + } + } +} + +object Spark24ParquetReader extends SparkParquetReaderBuilder { + /** + * Get parquet file reader + * + * @param vectorized true if vectorized reading is not prohibited due to schema, reading mode, etc + * @param sqlConf the [[SQLConf]] used for the read + * @param options passed as a param to the file format + * @param hadoopConf some configs will be set for the hadoopConf + * @return parquet file reader + */ + def build(vectorized: Boolean, + sqlConf: SQLConf, + options: Map[String, String], + hadoopConf: Configuration): SparkParquetReader = { + //set hadoopconf + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) + hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, sqlConf.sessionLocalTimeZone) + hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key, sqlConf.caseSensitiveAnalysis) + hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, sqlConf.isParquetBinaryAsString) + hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sqlConf.isParquetINT96AsTimestamp) + + new Spark24ParquetReader( + enableVectorizedReader = vectorized, + enableParquetFilterPushDown = sqlConf.parquetFilterPushDown, + pushDownDate = sqlConf.parquetFilterPushDownDate, + pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp, + pushDownDecimal = sqlConf.parquetFilterPushDownDecimal, + pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold, + pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith, + isCaseSensitive = sqlConf.caseSensitiveAnalysis, + timestampConversion = sqlConf.isParquetINT96TimestampConversion, + enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled, + capacity = sqlConf.parquetVectorizedReaderBatchSize, + returningBatch = sqlConf.parquetVectorizedReaderEnabled, + enableRecordFilter = sqlConf.parquetRecordFilterEnabled, + timeZoneId = Some(sqlConf.sessionLocalTimeZone)) + } +} diff --git a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala index 22a9f090fb3..8fbcf5a060b 100644 --- a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.adapter import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration import org.apache.hudi.Spark30HoodieFileScanRDD import org.apache.spark.sql._ import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer, HoodieSpark3_0AvroDeserializer, HoodieSpark3_0AvroSerializer} @@ -29,10 +30,11 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark30LegacyHoodieParquetFileFormat} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark30LegacyHoodieParquetFileFormat, Spark30ParquetReader, SparkParquetReader} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, HoodieSpark30PartitionedFileUtils, HoodieSparkPartitionedFileUtils, PartitionedFile} import org.apache.spark.sql.hudi.SparkAdapter +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, HoodieSpark3_0ExtendedSqlParser} import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, StructType} import org.apache.spark.sql.vectorized.ColumnarUtils @@ -118,4 +120,20 @@ class Spark3_0Adapter extends BaseSpark3Adapter { case OFF_HEAP => "OFF_HEAP" case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $level") } + + /** + * Get parquet file reader + * + * @param vectorized true if vectorized reading is not prohibited due to schema, reading mode, etc + * @param sqlConf the [[SQLConf]] used for the read + * @param options passed as a param to the file format + * @param hadoopConf some configs will be set for the hadoopConf + * @return parquet file reader + */ + override def createParquetFileReader(vectorized: Boolean, + sqlConf: SQLConf, + options: Map[String, String], + hadoopConf: Configuration): SparkParquetReader = { + Spark30ParquetReader.build(vectorized, sqlConf, options, hadoopConf) + } } diff --git a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala new file mode 100644 index 00000000000..9088676c335 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS +import org.apache.parquet.hadoop._ +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +import java.net.URI + +class Spark30ParquetReader(enableVectorizedReader: Boolean, + enableParquetFilterPushDown: Boolean, + pushDownDate: Boolean, + pushDownTimestamp: Boolean, + pushDownDecimal: Boolean, + pushDownInFilterThreshold: Int, + pushDownStringStartWith: Boolean, + isCaseSensitive: Boolean, + timestampConversion: Boolean, + enableOffHeapColumnVector: Boolean, + capacity: Int, + returningBatch: Boolean, + enableRecordFilter: Boolean, + timeZoneId: Option[String]) extends SparkParquetReaderBase( + enableVectorizedReader = enableVectorizedReader, + enableParquetFilterPushDown = enableParquetFilterPushDown, + pushDownDate = pushDownDate, + pushDownTimestamp = pushDownTimestamp, + pushDownDecimal = pushDownDecimal, + pushDownInFilterThreshold = pushDownInFilterThreshold, + isCaseSensitive = isCaseSensitive, + timestampConversion = timestampConversion, + enableOffHeapColumnVector = enableOffHeapColumnVector, + capacity = capacity, + returningBatch = returningBatch, + enableRecordFilter = enableRecordFilter, + timeZoneId = timeZoneId) { + + /** + * Read an individual parquet file + * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark v3.0.3 adapted here + * + * @param file parquet file to read + * @param requiredSchema desired output schema of the data + * @param partitionSchema schema of the partition columns. Partition values will be appended to the end of every row + * @param filters filters for data skipping. Not guaranteed to be used; the spark plan will also apply the filters. + * @param sharedConf the hadoop conf + * @return iterator of rows read from the file output type says [[InternalRow]] but could be [[ColumnarBatch]] + */ + protected def doRead(file: PartitionedFile, + requiredSchema: StructType, + partitionSchema: StructType, + filters: Seq[Filter], + sharedConf: Configuration): Iterator[InternalRow] = { + assert(file.partitionValues.numFields == partitionSchema.size) + + val filePath = new Path(new URI(file.filePath)) + val split = + new org.apache.parquet.hadoop.ParquetInputSplit( + filePath, + file.start, + file.start + file.length, + file.length, + Array.empty, + null) + + lazy val footerFileMetaData = + ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData + // Try to push down filters when filter push-down is enabled. + val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema + val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp, + pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(parquetFilters.createFilter(_)) + .reduceOption(FilterApi.and) + } else { + None + } + + // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' + // *only* if the file was created by something other than "parquet-mr", so check the actual + // writer here for this file. We have to do this per-file, as each file in the table may + // have different writers. + // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. + def isCreatedByParquetMr: Boolean = + footerFileMetaData.getCreatedBy().startsWith("parquet-mr") + + val convertTz = + if (timestampConversion && !isCreatedByParquetMr) { + Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + } else { + None + } + + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( + footerFileMetaData.getKeyValueMetaData.get, + SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) + + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = + new TaskAttemptContextImpl(sharedConf, attemptId) + + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + if (pushed.isDefined) { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) + } + val taskContext = Option(TaskContext.get()) + if (enableVectorizedReader) { + val vectorizedReader = new VectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseMode.toString, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) + val iter = new RecordReaderIterator(vectorizedReader) + // SPARK-23457 Register a task completion listener before `initialization`. + taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + vectorizedReader.initialize(split, hadoopAttemptContext) + vectorizedReader.initBatch(partitionSchema, file.partitionValues) + if (returningBatch) { + vectorizedReader.enableReturningBatches() + } + + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. + iter.asInstanceOf[Iterator[InternalRow]] + } else { + // ParquetRecordReader returns InternalRow + val readSupport = new ParquetReadSupport( + convertTz, enableVectorizedReader = false, datetimeRebaseMode) + val reader = if (pushed.isDefined && enableRecordFilter) { + val parquetFilter = FilterCompat.get(pushed.get, null) + new ParquetRecordReader[InternalRow](readSupport, parquetFilter) + } else { + new ParquetRecordReader[InternalRow](readSupport) + } + val iter = new RecordReaderIterator[InternalRow](reader) + // SPARK-23457 Register a task completion listener before `initialization`. + taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + reader.initialize(split, hadoopAttemptContext) + + val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes + val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + + if (partitionSchema.length == 0) { + // There is no partition columns + iter.map(unsafeProjection) + } else { + val joinedRow = new JoinedRow() + iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) + } + } + } + +} + +object Spark30ParquetReader extends SparkParquetReaderBuilder { + /** + * Get parquet file reader + * + * @param vectorized true if vectorized reading is not prohibited due to schema, reading mode, etc + * @param sqlConf the [[SQLConf]] used for the read + * @param options passed as a param to the file format + * @param hadoopConf some configs will be set for the hadoopConf + * @return parquet file reader + */ + def build(vectorized: Boolean, + sqlConf: SQLConf, + options: Map[String, String], + hadoopConf: Configuration): SparkParquetReader = { + //set hadoopconf + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) + hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, sqlConf.sessionLocalTimeZone) + hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, sqlConf.nestedSchemaPruningEnabled) + hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key, sqlConf.caseSensitiveAnalysis) + hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, sqlConf.isParquetBinaryAsString) + hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sqlConf.isParquetINT96AsTimestamp) + + new Spark30ParquetReader( + enableVectorizedReader = vectorized, + enableParquetFilterPushDown = sqlConf.parquetFilterPushDown, + pushDownDate = sqlConf.parquetFilterPushDownDate, + pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp, + pushDownDecimal = sqlConf.parquetFilterPushDownDecimal, + pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold, + pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith, + isCaseSensitive = sqlConf.caseSensitiveAnalysis, + timestampConversion = sqlConf.isParquetINT96TimestampConversion, + enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled, + capacity = sqlConf.parquetVectorizedReaderBatchSize, + returningBatch = sqlConf.parquetVectorizedReaderEnabled, + enableRecordFilter = sqlConf.parquetRecordFilterEnabled, + timeZoneId = Some(sqlConf.sessionLocalTimeZone)) + } +} diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala index 8ca072333d0..21f897afe1c 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.adapter import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration import org.apache.hudi.Spark31HoodieFileScanRDD import org.apache.spark.sql._ import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, HoodieSpark3_1AvroSerializer} @@ -30,10 +31,11 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark31LegacyHoodieParquetFileFormat} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark31LegacyHoodieParquetFileFormat, Spark31ParquetReader, SparkParquetReader} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, HoodieSpark31PartitionedFileUtils, HoodieSparkPartitionedFileUtils, PartitionedFile} import org.apache.spark.sql.hudi.SparkAdapter +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, HoodieSpark3_1ExtendedSqlParser} import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, StructType} import org.apache.spark.sql.vectorized.ColumnarUtils @@ -121,4 +123,19 @@ class Spark3_1Adapter extends BaseSpark3Adapter { case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $level") } + /** + * Get parquet file reader + * + * @param vectorized true if vectorized reading is not prohibited due to schema, reading mode, etc + * @param sqlConf the [[SQLConf]] used for the read + * @param options passed as a param to the file format + * @param hadoopConf some configs will be set for the hadoopConf + * @return parquet file reader + */ + override def createParquetFileReader(vectorized: Boolean, + sqlConf: SQLConf, + options: Map[String, String], + hadoopConf: Configuration): SparkParquetReader = { + Spark31ParquetReader.build(vectorized, sqlConf, options, hadoopConf) + } } diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetReader.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetReader.scala new file mode 100644 index 00000000000..94a5efaee61 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetReader.scala @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS +import org.apache.parquet.hadoop._ +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +import java.net.URI + +class Spark31ParquetReader(enableVectorizedReader: Boolean, + enableParquetFilterPushDown: Boolean, + pushDownDate: Boolean, + pushDownTimestamp: Boolean, + pushDownDecimal: Boolean, + pushDownInFilterThreshold: Int, + pushDownStringStartWith: Boolean, + isCaseSensitive: Boolean, + timestampConversion: Boolean, + enableOffHeapColumnVector: Boolean, + capacity: Int, + returningBatch: Boolean, + enableRecordFilter: Boolean, + timeZoneId: Option[String]) extends SparkParquetReaderBase( + enableVectorizedReader = enableVectorizedReader, + enableParquetFilterPushDown = enableParquetFilterPushDown, + pushDownDate = pushDownDate, + pushDownTimestamp = pushDownTimestamp, + pushDownDecimal = pushDownDecimal, + pushDownInFilterThreshold = pushDownInFilterThreshold, + isCaseSensitive = isCaseSensitive, + timestampConversion = timestampConversion, + enableOffHeapColumnVector = enableOffHeapColumnVector, + capacity = capacity, + returningBatch = returningBatch, + enableRecordFilter = enableRecordFilter, + timeZoneId = timeZoneId) { + + /** + * Read an individual parquet file + * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark v3.1.3 adapted here + * + * @param file parquet file to read + * @param requiredSchema desired output schema of the data + * @param partitionSchema schema of the partition columns. Partition values will be appended to the end of every row + * @param filters filters for data skipping. Not guaranteed to be used; the spark plan will also apply the filters. + * @param sharedConf the hadoop conf + * @return iterator of rows read from the file output type says [[InternalRow]] but could be [[ColumnarBatch]] + */ + protected def doRead(file: PartitionedFile, + requiredSchema: StructType, + partitionSchema: StructType, + filters: Seq[Filter], + sharedConf: Configuration): Iterator[InternalRow] = { + assert(file.partitionValues.numFields == partitionSchema.size) + + val filePath = new Path(new URI(file.filePath)) + val split = + new org.apache.parquet.hadoop.ParquetInputSplit( + filePath, + file.start, + file.start + file.length, + file.length, + Array.empty, + null) + + lazy val footerFileMetaData = + ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( + footerFileMetaData.getKeyValueMetaData.get, + SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) + // Try to push down filters when filter push-down is enabled. + val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseMode) + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(parquetFilters.createFilter(_)) + .reduceOption(FilterApi.and) + } else { + None + } + + // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' + // *only* if the file was created by something other than "parquet-mr", so check the actual + // writer here for this file. We have to do this per-file, as each file in the table may + // have different writers. + // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. + def isCreatedByParquetMr: Boolean = + footerFileMetaData.getCreatedBy().startsWith("parquet-mr") + + val convertTz = + if (timestampConversion && !isCreatedByParquetMr) { + Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + } else { + None + } + + val int96RebaseMode = DataSourceUtils.int96RebaseMode( + footerFileMetaData.getKeyValueMetaData.get, + SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)) + + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = + new TaskAttemptContextImpl(sharedConf, attemptId) + + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + if (pushed.isDefined) { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) + } + val taskContext = Option(TaskContext.get()) + if (enableVectorizedReader) { + val vectorizedReader = new VectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseMode.toString, + int96RebaseMode.toString, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) + val iter = new RecordReaderIterator(vectorizedReader) + // SPARK-23457 Register a task completion listener before `initialization`. + taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + vectorizedReader.initialize(split, hadoopAttemptContext) + vectorizedReader.initBatch(partitionSchema, file.partitionValues) + if (returningBatch) { + vectorizedReader.enableReturningBatches() + } + + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. + iter.asInstanceOf[Iterator[InternalRow]] + } else { + // ParquetRecordReader returns InternalRow + val readSupport = new ParquetReadSupport( + convertTz, + enableVectorizedReader = false, + datetimeRebaseMode, + int96RebaseMode) + val reader = if (pushed.isDefined && enableRecordFilter) { + val parquetFilter = FilterCompat.get(pushed.get, null) + new ParquetRecordReader[InternalRow](readSupport, parquetFilter) + } else { + new ParquetRecordReader[InternalRow](readSupport) + } + val iter = new RecordReaderIterator[InternalRow](reader) + // SPARK-23457 Register a task completion listener before `initialization`. + taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + reader.initialize(split, hadoopAttemptContext) + + val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes + val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + + if (partitionSchema.length == 0) { + // There is no partition columns + iter.map(unsafeProjection) + } else { + val joinedRow = new JoinedRow() + iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) + } + } + } +} + +object Spark31ParquetReader extends SparkParquetReaderBuilder { + + /** + * Get parquet file reader + * + * @param vectorized true if vectorized reading is not prohibited due to schema, reading mode, etc + * @param sqlConf the [[SQLConf]] used for the read + * @param options passed as a param to the file format + * @param hadoopConf some configs will be set for the hadoopConf + * @return parquet file reader + */ + def build(vectorized: Boolean, + sqlConf: SQLConf, + options: Map[String, String], + hadoopConf: Configuration): SparkParquetReader = { + //set hadoopconf + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) + hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, sqlConf.sessionLocalTimeZone) + hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, sqlConf.nestedSchemaPruningEnabled) + hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key, sqlConf.caseSensitiveAnalysis) + hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, sqlConf.isParquetBinaryAsString) + hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sqlConf.isParquetINT96AsTimestamp) + new Spark31ParquetReader( + enableVectorizedReader = vectorized, + enableParquetFilterPushDown = sqlConf.parquetFilterPushDown, + pushDownDate = sqlConf.parquetFilterPushDownDate, + pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp, + pushDownDecimal = sqlConf.parquetFilterPushDownDecimal, + pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold, + pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith, + isCaseSensitive = sqlConf.caseSensitiveAnalysis, + timestampConversion = sqlConf.isParquetINT96TimestampConversion, + enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled, + capacity = sqlConf.parquetVectorizedReaderBatchSize, + returningBatch = sqlConf.parquetVectorizedReaderEnabled, + enableRecordFilter = sqlConf.parquetRecordFilterEnabled, + timeZoneId = Some(sqlConf.sessionLocalTimeZone)) + } +} diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala index 3a5812a5faa..ea486c7383b 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.adapter import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration import org.apache.hudi.Spark32HoodieFileScanRDD import org.apache.spark.sql._ import org.apache.spark.sql.avro._ @@ -30,10 +31,11 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable, LogicalPlan} import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark32LegacyHoodieParquetFileFormat} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark32LegacyHoodieParquetFileFormat, Spark32ParquetReader, SparkParquetReader} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hudi.analysis.TableValuedFunctions +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, HoodieSpark3_2ExtendedSqlParser} import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, StructType} import org.apache.spark.sql.vectorized.ColumnarUtils @@ -123,4 +125,20 @@ class Spark3_2Adapter extends BaseSpark3Adapter { case OFF_HEAP => "OFF_HEAP" case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $level") } + + /** + * Get parquet file reader + * + * @param vectorized true if vectorized reading is not prohibited due to schema, reading mode, etc + * @param sqlConf the [[SQLConf]] used for the read + * @param options passed as a param to the file format + * @param hadoopConf some configs will be set for the hadoopConf + * @return parquet file reader + */ + override def createParquetFileReader(vectorized: Boolean, + sqlConf: SQLConf, + options: Map[String, String], + hadoopConf: Configuration): SparkParquetReader = { + Spark32ParquetReader.build(vectorized, sqlConf, options, hadoopConf) + } } diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32ParquetReader.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32ParquetReader.scala new file mode 100644 index 00000000000..5437a18cd4b --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32ParquetReader.scala @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.FileSplit +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS +import org.apache.parquet.hadoop._ +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +import java.net.URI + +class Spark32ParquetReader(enableVectorizedReader: Boolean, + datetimeRebaseModeInRead: String, + int96RebaseModeInRead: String, + enableParquetFilterPushDown: Boolean, + pushDownDate: Boolean, + pushDownTimestamp: Boolean, + pushDownDecimal: Boolean, + pushDownInFilterThreshold: Int, + pushDownStringStartWith: Boolean, + isCaseSensitive: Boolean, + timestampConversion: Boolean, + enableOffHeapColumnVector: Boolean, + capacity: Int, + returningBatch: Boolean, + enableRecordFilter: Boolean, + timeZoneId: Option[String]) extends SparkParquetReaderBase( + enableVectorizedReader = enableVectorizedReader, + enableParquetFilterPushDown = enableParquetFilterPushDown, + pushDownDate = pushDownDate, + pushDownTimestamp = pushDownTimestamp, + pushDownDecimal = pushDownDecimal, + pushDownInFilterThreshold = pushDownInFilterThreshold, + isCaseSensitive = isCaseSensitive, + timestampConversion = timestampConversion, + enableOffHeapColumnVector = enableOffHeapColumnVector, + capacity = capacity, + returningBatch = returningBatch, + enableRecordFilter = enableRecordFilter, + timeZoneId = timeZoneId) { + + /** + * Read an individual parquet file + * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark v3.2.4 adapted here + * + * @param file parquet file to read + * @param requiredSchema desired output schema of the data + * @param partitionSchema schema of the partition columns. Partition values will be appended to the end of every row + * @param filters filters for data skipping. Not guaranteed to be used; the spark plan will also apply the filters. + * @param sharedConf the hadoop conf + * @return iterator of rows read from the file output type says [[InternalRow]] but could be [[ColumnarBatch]] + */ + protected def doRead(file: PartitionedFile, + requiredSchema: StructType, + partitionSchema: StructType, + filters: Seq[Filter], + sharedConf: Configuration): Iterator[InternalRow] = { + assert(file.partitionValues.numFields == partitionSchema.size) + + val filePath = new Path(new URI(file.filePath)) + val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) + + lazy val footerFileMetaData = + ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData + val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( + footerFileMetaData.getKeyValueMetaData.get, + datetimeRebaseModeInRead) + // Try to push down filters when filter push-down is enabled. + val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseSpec) + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(parquetFilters.createFilter(_)) + .reduceOption(FilterApi.and) + } else { + None + } + + // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' + // *only* if the file was created by something other than "parquet-mr", so check the actual + // writer here for this file. We have to do this per-file, as each file in the table may + // have different writers. + // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. + def isCreatedByParquetMr: Boolean = + footerFileMetaData.getCreatedBy().startsWith("parquet-mr") + + val convertTz = + if (timestampConversion && !isCreatedByParquetMr) { + Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + } else { + None + } + + val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( + footerFileMetaData.getKeyValueMetaData.get, + int96RebaseModeInRead) + + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = + new TaskAttemptContextImpl(sharedConf, attemptId) + + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + if (pushed.isDefined) { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) + } + val taskContext = Option(TaskContext.get()) + if (enableVectorizedReader) { + val vectorizedReader = new VectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseSpec.mode.toString, + datetimeRebaseSpec.timeZone, + int96RebaseSpec.mode.toString, + int96RebaseSpec.timeZone, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) + // SPARK-37089: We cannot register a task completion listener to close this iterator here + // because downstream exec nodes have already registered their listeners. Since listeners + // are executed in reverse order of registration, a listener registered here would close the + // iterator while downstream exec nodes are still running. When off-heap column vectors are + // enabled, this can cause a use-after-free bug leading to a segfault. + // + // Instead, we use FileScanRDD's task completion listener to close this iterator. + val iter = new RecordReaderIterator(vectorizedReader) + try { + vectorizedReader.initialize(split, hadoopAttemptContext) + vectorizedReader.initBatch(partitionSchema, file.partitionValues) + if (returningBatch) { + vectorizedReader.enableReturningBatches() + } + + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. + iter.asInstanceOf[Iterator[InternalRow]] + } catch { + case e: Throwable => + // SPARK-23457: In case there is an exception in initialization, close the iterator to + // avoid leaking resources. + iter.close() + throw e + } + } else { + // ParquetRecordReader returns InternalRow + val readSupport = new ParquetReadSupport( + convertTz, + enableVectorizedReader = false, + datetimeRebaseSpec, + int96RebaseSpec) + val reader = if (pushed.isDefined && enableRecordFilter) { + val parquetFilter = FilterCompat.get(pushed.get, null) + new ParquetRecordReader[InternalRow](readSupport, parquetFilter) + } else { + new ParquetRecordReader[InternalRow](readSupport) + } + val iter = new RecordReaderIterator[InternalRow](reader) + try { + reader.initialize(split, hadoopAttemptContext) + + val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes + val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + + if (partitionSchema.length == 0) { + // There is no partition columns + iter.map(unsafeProjection) + } else { + val joinedRow = new JoinedRow() + iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) + } + } catch { + case e: Throwable => + // SPARK-23457: In case there is an exception in initialization, close the iterator to + // avoid leaking resources. + iter.close() + throw e + } + } + } +} + +object Spark32ParquetReader extends SparkParquetReaderBuilder { + /** + * Get parquet file reader + * + * @param vectorized true if vectorized reading is not prohibited due to schema, reading mode, etc + * @param sqlConf the [[SQLConf]] used for the read + * @param options passed as a param to the file format + * @param hadoopConf some configs will be set for the hadoopConf + * @return parquet file reader + */ + def build(vectorized: Boolean, + sqlConf: SQLConf, + options: Map[String, String], + hadoopConf: Configuration): SparkParquetReader = { + //set hadoopconf + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) + hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, sqlConf.sessionLocalTimeZone) + hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, sqlConf.nestedSchemaPruningEnabled) + hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key, sqlConf.caseSensitiveAnalysis) + hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, sqlConf.isParquetBinaryAsString) + hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sqlConf.isParquetINT96AsTimestamp) + // Using string value of this conf to preserve compatibility across spark versions. See [HUDI-5868] + hadoopConf.setBoolean( + "spark.sql.legacy.parquet.nanosAsLong", + sqlConf.getConfString("spark.sql.legacy.parquet.nanosAsLong", "false").toBoolean + ) + + val parquetOptions = new ParquetOptions(options, sqlConf) + new Spark32ParquetReader( + enableVectorizedReader = vectorized, + datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead, + int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead, + enableParquetFilterPushDown = sqlConf.parquetFilterPushDown, + pushDownDate = sqlConf.parquetFilterPushDownDate, + pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp, + pushDownDecimal = sqlConf.parquetFilterPushDownDecimal, + pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold, + pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith, + isCaseSensitive = sqlConf.caseSensitiveAnalysis, + timestampConversion = sqlConf.isParquetINT96TimestampConversion, + enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled, + capacity = sqlConf.parquetVectorizedReaderBatchSize, + returningBatch = sqlConf.parquetVectorizedReaderEnabled, + enableRecordFilter = sqlConf.parquetRecordFilterEnabled, + timeZoneId = Some(sqlConf.sessionLocalTimeZone)) + } +} diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala index e3d2cc9cd18..c11c404c33a 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.adapter import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration import org.apache.hudi.Spark33HoodieFileScanRDD import org.apache.spark.sql._ import org.apache.spark.sql.avro._ @@ -30,10 +31,11 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark33LegacyHoodieParquetFileFormat} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark33LegacyHoodieParquetFileFormat, Spark33ParquetReader, SparkParquetReader} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hudi.analysis.TableValuedFunctions +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, HoodieSpark3_3ExtendedSqlParser} import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, StructType} import org.apache.spark.sql.vectorized.ColumnarBatchRow @@ -124,4 +126,20 @@ class Spark3_3Adapter extends BaseSpark3Adapter { case OFF_HEAP => "OFF_HEAP" case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $level") } + + /** + * Get parquet file reader + * + * @param vectorized true if vectorized reading is not prohibited due to schema, reading mode, etc + * @param sqlConf the [[SQLConf]] used for the read + * @param options passed as a param to the file format + * @param hadoopConf some configs will be set for the hadoopConf + * @return parquet file reader + */ + override def createParquetFileReader(vectorized: Boolean, + sqlConf: SQLConf, + options: Map[String, String], + hadoopConf: Configuration): SparkParquetReader = { + Spark33ParquetReader.build(vectorized, sqlConf, options, hadoopConf) + } } diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala new file mode 100644 index 00000000000..0bd8cca3599 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.FileSplit +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS +import org.apache.parquet.hadoop._ +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +import java.net.URI + +class Spark33ParquetReader(enableVectorizedReader: Boolean, + datetimeRebaseModeInRead: String, + int96RebaseModeInRead: String, + enableParquetFilterPushDown: Boolean, + pushDownDate: Boolean, + pushDownTimestamp: Boolean, + pushDownDecimal: Boolean, + pushDownInFilterThreshold: Int, + pushDownStringStartWith: Boolean, + isCaseSensitive: Boolean, + timestampConversion: Boolean, + enableOffHeapColumnVector: Boolean, + capacity: Int, + returningBatch: Boolean, + enableRecordFilter: Boolean, + timeZoneId: Option[String]) extends SparkParquetReaderBase( + enableVectorizedReader = enableVectorizedReader, + enableParquetFilterPushDown = enableParquetFilterPushDown, + pushDownDate = pushDownDate, + pushDownTimestamp = pushDownTimestamp, + pushDownDecimal = pushDownDecimal, + pushDownInFilterThreshold = pushDownInFilterThreshold, + isCaseSensitive = isCaseSensitive, + timestampConversion = timestampConversion, + enableOffHeapColumnVector = enableOffHeapColumnVector, + capacity = capacity, + returningBatch = returningBatch, + enableRecordFilter = enableRecordFilter, + timeZoneId = timeZoneId) { + + /** + * Read an individual parquet file + * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark v3.3.4 adapted here + * + * @param file parquet file to read + * @param requiredSchema desired output schema of the data + * @param partitionSchema schema of the partition columns. Partition values will be appended to the end of every row + * @param filters filters for data skipping. Not guaranteed to be used; the spark plan will also apply the filters. + * @param sharedConf the hadoop conf + * @return iterator of rows read from the file output type says [[InternalRow]] but could be [[ColumnarBatch]] + */ + protected def doRead(file: PartitionedFile, + requiredSchema: StructType, + partitionSchema: StructType, + filters: Seq[Filter], + sharedConf: Configuration): Iterator[InternalRow] = { + assert(file.partitionValues.numFields == partitionSchema.size) + + val filePath = new Path(new URI(file.filePath)) + val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) + + + lazy val footerFileMetaData = + ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData + val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( + footerFileMetaData.getKeyValueMetaData.get, + datetimeRebaseModeInRead) + // Try to push down filters when filter push-down is enabled. + val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseSpec) + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(parquetFilters.createFilter(_)) + .reduceOption(FilterApi.and) + } else { + None + } + + // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' + // *only* if the file was created by something other than "parquet-mr", so check the actual + // writer here for this file. We have to do this per-file, as each file in the table may + // have different writers. + // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. + def isCreatedByParquetMr: Boolean = + footerFileMetaData.getCreatedBy().startsWith("parquet-mr") + + val convertTz = + if (timestampConversion && !isCreatedByParquetMr) { + Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + } else { + None + } + + val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( + footerFileMetaData.getKeyValueMetaData.get, + int96RebaseModeInRead) + + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = + new TaskAttemptContextImpl(sharedConf, attemptId) + + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + if (pushed.isDefined) { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) + } + val taskContext = Option(TaskContext.get()) + if (enableVectorizedReader) { + val vectorizedReader = new VectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseSpec.mode.toString, + datetimeRebaseSpec.timeZone, + int96RebaseSpec.mode.toString, + int96RebaseSpec.timeZone, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) + // SPARK-37089: We cannot register a task completion listener to close this iterator here + // because downstream exec nodes have already registered their listeners. Since listeners + // are executed in reverse order of registration, a listener registered here would close the + // iterator while downstream exec nodes are still running. When off-heap column vectors are + // enabled, this can cause a use-after-free bug leading to a segfault. + // + // Instead, we use FileScanRDD's task completion listener to close this iterator. + val iter = new RecordReaderIterator(vectorizedReader) + try { + vectorizedReader.initialize(split, hadoopAttemptContext) + vectorizedReader.initBatch(partitionSchema, file.partitionValues) + if (returningBatch) { + vectorizedReader.enableReturningBatches() + } + + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. + iter.asInstanceOf[Iterator[InternalRow]] + } catch { + case e: Throwable => + // SPARK-23457: In case there is an exception in initialization, close the iterator to + // avoid leaking resources. + iter.close() + throw e + } + } else { + // ParquetRecordReader returns InternalRow + val readSupport = new ParquetReadSupport( + convertTz, + enableVectorizedReader = false, + datetimeRebaseSpec, + int96RebaseSpec) + val reader = if (pushed.isDefined && enableRecordFilter) { + val parquetFilter = FilterCompat.get(pushed.get, null) + new ParquetRecordReader[InternalRow](readSupport, parquetFilter) + } else { + new ParquetRecordReader[InternalRow](readSupport) + } + val iter = new RecordReaderIterator[InternalRow](reader) + try { + reader.initialize(split, hadoopAttemptContext) + + val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes + val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + + if (partitionSchema.length == 0) { + // There is no partition columns + iter.map(unsafeProjection) + } else { + val joinedRow = new JoinedRow() + iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) + } + } catch { + case e: Throwable => + // SPARK-23457: In case there is an exception in initialization, close the iterator to + // avoid leaking resources. + iter.close() + throw e + } + } + } +} + +object Spark33ParquetReader extends SparkParquetReaderBuilder { + /** + * Get parquet file reader + * + * @param vectorized true if vectorized reading is not prohibited due to schema, reading mode, etc + * @param sqlConf the [[SQLConf]] used for the read + * @param options passed as a param to the file format + * @param hadoopConf some configs will be set for the hadoopConf + * @return parquet file reader + */ + def build(vectorized: Boolean, + sqlConf: SQLConf, + options: Map[String, String], + hadoopConf: Configuration): SparkParquetReader = { + //set hadoopconf + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) + hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, sqlConf.sessionLocalTimeZone) + hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, sqlConf.nestedSchemaPruningEnabled) + hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key, sqlConf.caseSensitiveAnalysis) + hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, sqlConf.isParquetBinaryAsString) + hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sqlConf.isParquetINT96AsTimestamp) + // Using string value of this conf to preserve compatibility across spark versions. See [HUDI-5868] + hadoopConf.setBoolean( + "spark.sql.legacy.parquet.nanosAsLong", + sqlConf.getConfString("spark.sql.legacy.parquet.nanosAsLong", "false").toBoolean + ) + + val parquetOptions = new ParquetOptions(options, sqlConf) + new Spark33ParquetReader(enableVectorizedReader = vectorized, + datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead, + int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead, + enableParquetFilterPushDown = sqlConf.parquetFilterPushDown, + pushDownDate = sqlConf.parquetFilterPushDownDate, + pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp, + pushDownDecimal = sqlConf.parquetFilterPushDownDecimal, + pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold, + pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith, + isCaseSensitive = sqlConf.caseSensitiveAnalysis, + timestampConversion = sqlConf.isParquetINT96TimestampConversion, + enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled, + capacity = sqlConf.parquetVectorizedReaderBatchSize, + returningBatch = sqlConf.parquetVectorizedReaderEnabled, + enableRecordFilter = sqlConf.parquetRecordFilterEnabled, + timeZoneId = Some(sqlConf.sessionLocalTimeZone)) + } +} diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala index 0ae5ef3dbf3..1e2807df55b 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.adapter import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration import org.apache.hudi.Spark34HoodieFileScanRDD import org.apache.spark.sql.avro._ import org.apache.spark.sql.catalyst.InternalRow @@ -29,7 +30,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark34LegacyHoodieParquetFileFormat} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark34LegacyHoodieParquetFileFormat, Spark34ParquetReader, SparkParquetReader} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hudi.analysis.TableValuedFunctions @@ -37,6 +38,7 @@ import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, HoodieSpark3_ import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, StructType} import org.apache.spark.sql.vectorized.ColumnarBatchRow import org.apache.spark.sql._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel._ @@ -124,4 +126,20 @@ class Spark3_4Adapter extends BaseSpark3Adapter { case OFF_HEAP => "OFF_HEAP" case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $level") } + + /** + * Get parquet file reader + * + * @param vectorized true if vectorized reading is not prohibited due to schema, reading mode, etc + * @param sqlConf the [[SQLConf]] used for the read + * @param options passed as a param to the file format + * @param hadoopConf some configs will be set for the hadoopConf + * @return parquet file reader + */ + override def createParquetFileReader(vectorized: Boolean, + sqlConf: SQLConf, + options: Map[String, String], + hadoopConf: Configuration): SparkParquetReader = { + Spark34ParquetReader.build(vectorized, sqlConf, options, hadoopConf) + } } diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala new file mode 100644 index 00000000000..73db889a044 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.FileSplit +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS +import org.apache.parquet.hadoop._ +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +class Spark34ParquetReader(enableVectorizedReader: Boolean, + datetimeRebaseModeInRead: String, + int96RebaseModeInRead: String, + enableParquetFilterPushDown: Boolean, + pushDownDate: Boolean, + pushDownTimestamp: Boolean, + pushDownDecimal: Boolean, + pushDownInFilterThreshold: Int, + pushDownStringPredicate: Boolean, + isCaseSensitive: Boolean, + timestampConversion: Boolean, + enableOffHeapColumnVector: Boolean, + capacity: Int, + returningBatch: Boolean, + enableRecordFilter: Boolean, + timeZoneId: Option[String]) extends SparkParquetReaderBase( + enableVectorizedReader = enableVectorizedReader, + enableParquetFilterPushDown = enableParquetFilterPushDown, + pushDownDate = pushDownDate, + pushDownTimestamp = pushDownTimestamp, + pushDownDecimal = pushDownDecimal, + pushDownInFilterThreshold = pushDownInFilterThreshold, + isCaseSensitive = isCaseSensitive, + timestampConversion = timestampConversion, + enableOffHeapColumnVector = enableOffHeapColumnVector, + capacity = capacity, + returningBatch = returningBatch, + enableRecordFilter = enableRecordFilter, + timeZoneId = timeZoneId) { + + /** + * Read an individual parquet file + * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark v3.4.2 adapted here + * + * @param file parquet file to read + * @param requiredSchema desired output schema of the data + * @param partitionSchema schema of the partition columns. Partition values will be appended to the end of every row + * @param filters filters for data skipping. Not guaranteed to be used; the spark plan will also apply the filters. + * @param sharedConf the hadoop conf + * @return iterator of rows read from the file output type says [[InternalRow]] but could be [[ColumnarBatch]] + */ + protected def doRead(file: PartitionedFile, + requiredSchema: StructType, + partitionSchema: StructType, + filters: Seq[Filter], + sharedConf: Configuration): Iterator[InternalRow] = { + assert(file.partitionValues.numFields == partitionSchema.size) + + val filePath = file.toPath + val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) + + + lazy val footerFileMetaData = + ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData + val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( + footerFileMetaData.getKeyValueMetaData.get, + datetimeRebaseModeInRead) + // Try to push down filters when filter push-down is enabled. + val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringPredicate, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseSpec) + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(parquetFilters.createFilter(_)) + .reduceOption(FilterApi.and) + } else { + None + } + + // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' + // *only* if the file was created by something other than "parquet-mr", so check the actual + // writer here for this file. We have to do this per-file, as each file in the table may + // have different writers. + // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. + def isCreatedByParquetMr: Boolean = + footerFileMetaData.getCreatedBy().startsWith("parquet-mr") + + val convertTz = + if (timestampConversion && !isCreatedByParquetMr) { + Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + } else { + None + } + + val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( + footerFileMetaData.getKeyValueMetaData.get, + int96RebaseModeInRead) + + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = + new TaskAttemptContextImpl(sharedConf, attemptId) + + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + if (pushed.isDefined) { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) + } + val taskContext = Option(TaskContext.get()) + if (enableVectorizedReader) { + val vectorizedReader = new VectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseSpec.mode.toString, + datetimeRebaseSpec.timeZone, + int96RebaseSpec.mode.toString, + int96RebaseSpec.timeZone, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) + // SPARK-37089: We cannot register a task completion listener to close this iterator here + // because downstream exec nodes have already registered their listeners. Since listeners + // are executed in reverse order of registration, a listener registered here would close the + // iterator while downstream exec nodes are still running. When off-heap column vectors are + // enabled, this can cause a use-after-free bug leading to a segfault. + // + // Instead, we use FileScanRDD's task completion listener to close this iterator. + val iter = new RecordReaderIterator(vectorizedReader) + try { + vectorizedReader.initialize(split, hadoopAttemptContext) + vectorizedReader.initBatch(partitionSchema, file.partitionValues) + if (returningBatch) { + vectorizedReader.enableReturningBatches() + } + + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. + iter.asInstanceOf[Iterator[InternalRow]] + } catch { + case e: Throwable => + // SPARK-23457: In case there is an exception in initialization, close the iterator to + // avoid leaking resources. + iter.close() + throw e + } + } else { + // ParquetRecordReader returns InternalRow + val readSupport = new ParquetReadSupport( + convertTz, + enableVectorizedReader = false, + datetimeRebaseSpec, + int96RebaseSpec) + val reader = if (pushed.isDefined && enableRecordFilter) { + val parquetFilter = FilterCompat.get(pushed.get, null) + new ParquetRecordReader[InternalRow](readSupport, parquetFilter) + } else { + new ParquetRecordReader[InternalRow](readSupport) + } + val readerWithRowIndexes = ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader, + requiredSchema) + val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes) + try { + readerWithRowIndexes.initialize(split, hadoopAttemptContext) + + val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes + val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + + if (partitionSchema.length == 0) { + // There is no partition columns + iter.map(unsafeProjection) + } else { + val joinedRow = new JoinedRow() + iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) + } + } catch { + case e: Throwable => + // SPARK-23457: In case there is an exception in initialization, close the iterator to + // avoid leaking resources. + iter.close() + throw e + } + } + } +} + +object Spark34ParquetReader extends SparkParquetReaderBuilder { + /** + * Get parquet file reader + * + * @param vectorized true if vectorized reading is not prohibited due to schema, reading mode, etc + * @param sqlConf the [[SQLConf]] used for the read + * @param options passed as a param to the file format + * @param hadoopConf some configs will be set for the hadoopConf + * @return parquet file reader + */ + def build(vectorized: Boolean, + sqlConf: SQLConf, + options: Map[String, String], + hadoopConf: Configuration): SparkParquetReader = { + //set hadoopconf + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) + hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, sqlConf.sessionLocalTimeZone) + hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, sqlConf.nestedSchemaPruningEnabled) + hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key, sqlConf.caseSensitiveAnalysis) + hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, sqlConf.isParquetBinaryAsString) + hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sqlConf.isParquetINT96AsTimestamp) + // Using string value of this conf to preserve compatibility across spark versions. See [HUDI-5868] + hadoopConf.setBoolean( + SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, + sqlConf.getConfString( + SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, + SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValueString).toBoolean + ) + hadoopConf.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key, sqlConf.parquetInferTimestampNTZEnabled) + + val returningBatch = sqlConf.parquetVectorizedReaderEnabled && + options.getOrElse(FileFormat.OPTION_RETURNING_BATCH, + throw new IllegalArgumentException( + "OPTION_RETURNING_BATCH should always be set for ParquetFileFormat. " + + "To workaround this issue, set spark.sql.parquet.enableVectorizedReader=false.")) + .equals("true") + + val parquetOptions = new ParquetOptions(options, sqlConf) + new Spark34ParquetReader( + enableVectorizedReader = vectorized, + datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead, + int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead, + enableParquetFilterPushDown = sqlConf.parquetFilterPushDown, + pushDownDate = sqlConf.parquetFilterPushDownDate, + pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp, + pushDownDecimal = sqlConf.parquetFilterPushDownDecimal, + pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold, + pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate, + isCaseSensitive = sqlConf.caseSensitiveAnalysis, + timestampConversion = sqlConf.isParquetINT96TimestampConversion, + enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled, + capacity = sqlConf.parquetVectorizedReaderBatchSize, + returningBatch = returningBatch, + enableRecordFilter = sqlConf.parquetRecordFilterEnabled, + timeZoneId = Some(sqlConf.sessionLocalTimeZone)) + } +} diff --git a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala index d18291a1809..a3e0c19621b 100644 --- a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.adapter import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration import org.apache.hudi.Spark35HoodieFileScanRDD import org.apache.spark.sql._ import org.apache.spark.sql.avro._ @@ -31,9 +32,10 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark35LegacyHoodieParquetFileFormat} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark35ParquetReader, Spark35LegacyHoodieParquetFileFormat, SparkParquetReader} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.hudi.analysis.TableValuedFunctions +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, HoodieSpark3_5ExtendedSqlParser} import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, StructType} import org.apache.spark.sql.vectorized.ColumnarBatchRow @@ -124,4 +126,20 @@ class Spark3_5Adapter extends BaseSpark3Adapter { case OFF_HEAP => "OFF_HEAP" case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $level") } + + /** + * Get parquet file reader + * + * @param vectorized true if vectorized reading is not prohibited due to schema, reading mode, etc + * @param sqlConf the [[SQLConf]] used for the read + * @param options passed as a param to the file format + * @param hadoopConf some configs will be set for the hadoopConf + * @return parquet file reader + */ + override def createParquetFileReader(vectorized: Boolean, + sqlConf: SQLConf, + options: Map[String, String], + hadoopConf: Configuration): SparkParquetReader = { + Spark35ParquetReader.build(vectorized, sqlConf, options, hadoopConf) + } } diff --git a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala new file mode 100644 index 00000000000..f088efd07e1 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.FileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader} +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FileFormat, PartitionedFile, RecordReaderIterator} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types.StructType + +class Spark35ParquetReader(enableVectorizedReader: Boolean, + datetimeRebaseModeInRead: String, + int96RebaseModeInRead: String, + enableParquetFilterPushDown: Boolean, + pushDownDate: Boolean, + pushDownTimestamp: Boolean, + pushDownDecimal: Boolean, + pushDownInFilterThreshold: Int, + pushDownStringPredicate: Boolean, + isCaseSensitive: Boolean, + timestampConversion: Boolean, + enableOffHeapColumnVector: Boolean, + capacity: Int, + returningBatch: Boolean, + enableRecordFilter: Boolean, + timeZoneId: Option[String]) extends SparkParquetReaderBase( + enableVectorizedReader = enableVectorizedReader, + enableParquetFilterPushDown = enableParquetFilterPushDown, + pushDownDate = pushDownDate, + pushDownTimestamp = pushDownTimestamp, + pushDownDecimal = pushDownDecimal, + pushDownInFilterThreshold = pushDownInFilterThreshold, + isCaseSensitive = isCaseSensitive, + timestampConversion = timestampConversion, + enableOffHeapColumnVector = enableOffHeapColumnVector, + capacity = capacity, + returningBatch = returningBatch, + enableRecordFilter = enableRecordFilter, + timeZoneId = timeZoneId) { + + /** + * Read an individual parquet file + * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark v3.5.1 adapted here + * + * @param file parquet file to read + * @param requiredSchema desired output schema of the data + * @param partitionSchema schema of the partition columns. Partition values will be appended to the end of every row + * @param filters filters for data skipping. Not guaranteed to be used; the spark plan will also apply the filters. + * @param sharedConf the hadoop conf + * @return iterator of rows read from the file output type says [[InternalRow]] but could be [[ColumnarBatch]] + */ + protected def doRead(file: PartitionedFile, + requiredSchema: StructType, + partitionSchema: StructType, + filters: Seq[Filter], + sharedConf: Configuration): Iterator[InternalRow] = { + assert(file.partitionValues.numFields == partitionSchema.size) + + val filePath = file.toPath + val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) + + val fileFooter = if (enableVectorizedReader) { + // When there are vectorized reads, we can avoid reading the footer twice by reading + // all row groups in advance and filter row groups according to filters that require + // push down (no need to read the footer metadata again). + ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.WITH_ROW_GROUPS) + } else { + ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.SKIP_ROW_GROUPS) + } + + val footerFileMetaData = fileFooter.getFileMetaData + val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( + footerFileMetaData.getKeyValueMetaData.get, + datetimeRebaseModeInRead) + val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( + footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + + // Try to push down filters when filter push-down is enabled. + val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringPredicate, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseSpec) + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(parquetFilters.createFilter(_)) + .reduceOption(FilterApi.and) + } else { + None + } + + // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' + // *only* if the file was created by something other than "parquet-mr", so check the actual + // writer here for this file. We have to do this per-file, as each file in the table may + // have different writers. + // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. + def isCreatedByParquetMr: Boolean = + footerFileMetaData.getCreatedBy().startsWith("parquet-mr") + + val convertTz = + if (timestampConversion && !isCreatedByParquetMr) { + Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + } else { + None + } + + + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = + new TaskAttemptContextImpl(sharedConf, attemptId) + + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + if (pushed.isDefined) { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) + } + val taskContext = Option(TaskContext.get()) + if (enableVectorizedReader) { + val vectorizedReader = new VectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseSpec.mode.toString, + datetimeRebaseSpec.timeZone, + int96RebaseSpec.mode.toString, + int96RebaseSpec.timeZone, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) + // SPARK-37089: We cannot register a task completion listener to close this iterator here + // because downstream exec nodes have already registered their listeners. Since listeners + // are executed in reverse order of registration, a listener registered here would close the + // iterator while downstream exec nodes are still running. When off-heap column vectors are + // enabled, this can cause a use-after-free bug leading to a segfault. + // + // Instead, we use FileScanRDD's task completion listener to close this iterator. + val iter = new RecordReaderIterator(vectorizedReader) + try { + vectorizedReader.initialize(split, hadoopAttemptContext, Option.apply(fileFooter)) + vectorizedReader.initBatch(partitionSchema, file.partitionValues) + if (returningBatch) { + vectorizedReader.enableReturningBatches() + } + + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. + iter.asInstanceOf[Iterator[InternalRow]] + } catch { + case e: Throwable => + // SPARK-23457: In case there is an exception in initialization, close the iterator to + // avoid leaking resources. + iter.close() + throw e + } + } else { + // ParquetRecordReader returns InternalRow + val readSupport = new ParquetReadSupport( + convertTz, + enableVectorizedReader = false, + datetimeRebaseSpec, + int96RebaseSpec) + val reader = if (pushed.isDefined && enableRecordFilter) { + val parquetFilter = FilterCompat.get(pushed.get, null) + new ParquetRecordReader[InternalRow](readSupport, parquetFilter) + } else { + new ParquetRecordReader[InternalRow](readSupport) + } + val readerWithRowIndexes = ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader, + requiredSchema) + val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes) + try { + readerWithRowIndexes.initialize(split, hadoopAttemptContext) + + val fullSchema = toAttributes(requiredSchema) ++ toAttributes(partitionSchema) + val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + + if (partitionSchema.length == 0) { + // There is no partition columns + iter.map(unsafeProjection) + } else { + val joinedRow = new JoinedRow() + iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) + } + } catch { + case e: Throwable => + // SPARK-23457: In case there is an exception in initialization, close the iterator to + // avoid leaking resources. + iter.close() + throw e + } + } + } +} + +object Spark35ParquetReader extends SparkParquetReaderBuilder { + /** + * Get parquet file reader + * + * @param vectorized true if vectorized reading is not prohibited due to schema, reading mode, etc + * @param sqlConf the [[SQLConf]] used for the read + * @param options passed as a param to the file format + * @param hadoopConf some configs will be set for the hadoopConf + * @return parquet file reader + */ + def build(vectorized: Boolean, + sqlConf: SQLConf, + options: Map[String, String], + hadoopConf: Configuration): SparkParquetReader = { + //set hadoopconf + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) + hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, sqlConf.sessionLocalTimeZone) + hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, sqlConf.nestedSchemaPruningEnabled) + hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key, sqlConf.caseSensitiveAnalysis) + hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, sqlConf.isParquetBinaryAsString) + hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sqlConf.isParquetINT96AsTimestamp) + // Using string value of this conf to preserve compatibility across spark versions. See [HUDI-5868] + hadoopConf.setBoolean( + SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, + sqlConf.getConfString( + SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, + SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValueString).toBoolean + ) + hadoopConf.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key, sqlConf.parquetInferTimestampNTZEnabled) + + val returningBatch = sqlConf.parquetVectorizedReaderEnabled && + options.getOrElse(FileFormat.OPTION_RETURNING_BATCH, + throw new IllegalArgumentException( + "OPTION_RETURNING_BATCH should always be set for ParquetFileFormat. " + + "To workaround this issue, set spark.sql.parquet.enableVectorizedReader=false.")) + .equals("true") + + val parquetOptions = new ParquetOptions(options, sqlConf) + new Spark35ParquetReader( + enableVectorizedReader = vectorized, + datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead, + int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead, + enableParquetFilterPushDown = sqlConf.parquetFilterPushDown, + pushDownDate = sqlConf.parquetFilterPushDownDate, + pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp, + pushDownDecimal = sqlConf.parquetFilterPushDownDecimal, + pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold, + pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate, + isCaseSensitive = sqlConf.caseSensitiveAnalysis, + timestampConversion = sqlConf.isParquetINT96TimestampConversion, + enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled, + capacity = sqlConf.parquetVectorizedReaderBatchSize, + returningBatch = returningBatch, + enableRecordFilter = sqlConf.parquetRecordFilterEnabled, + timeZoneId = Some(sqlConf.sessionLocalTimeZone)) + } +}