This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 95aebcbf100 [SPARK-37980][SQL] Extend METADATA column to support row indexes for Parquet files 95aebcbf100 is described below commit 95aebcbf100de1dbedd32626ce67bd01014c973e Author: Ala Luszczak <a...@databricks.com> AuthorDate: Tue Aug 16 21:03:42 2022 +0800 [SPARK-37980][SQL] Extend METADATA column to support row indexes for Parquet files ### What changes were proposed in this pull request? This change adds `row_index` column to `_metadata` struct. This column allows us to uniquely identify rows read from a given file with an index number. The n-th row in a given file with be assigned `n-1` row index in every scan of the file, irrespective of file splitting and data skipping in use. The new column requires file format specific support. This change introduces Parquet support, and other formats can follow later. ### Why are the changes needed? Row Indexes can be used in a variety of ways. A (fileName, rowIndex) tuple uniquely identifies a row in a table. This information can be used to mark rows e.g. can be used to create an indexer. ### Does this PR introduce _any_ user-facing change? Yes. With this change the customers will be able to access `_metadata.row_index` metadata column when reading Parquet data. The schema of `_matadata` column remains unchanged for file formats without row index support. ### How was this patch tested? * Added `FileMetadataStructSuite.scala` to make sure the feature works correctly in different scenarios (supported/unsupported file format, batch/record reads, on/off heap memory...). * Added `ParquetRowIndexSuite.scala` to make sure the row indexes are generated correctly for Parquet file in conjunction with any combination of data skipping features. * Extended `FileMetadataStructRowIndexSuite` to account for new column in `_metadata` struct. Closes #37228 from ala/row-idx-v4. Lead-authored-by: Ala Luszczak <a...@databricks.com> Co-authored-by: IonutBoicu <ionut.bo...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- ...ParquetNestedSchemaPruningBenchmark-results.txt | 60 ++-- .../datasources/parquet/ParquetColumnVector.java | 5 + .../parquet/VectorizedParquetRecordReader.java | 14 + .../spark/sql/execution/DataSourceScanExec.scala | 20 +- .../sql/execution/datasources/DataSource.scala | 2 +- .../sql/execution/datasources/FileFormat.scala | 46 ++- .../sql/execution/datasources/FileScanRDD.scala | 9 +- .../execution/datasources/FileSourceStrategy.scala | 62 +++- .../execution/datasources/LogicalRelation.scala | 4 +- .../sql/execution/datasources/RowIndexUtil.scala | 40 +++ .../datasources/parquet/ParquetFileFormat.scala | 6 +- .../datasources/parquet/ParquetRowIndexUtil.scala | 120 ++++++++ .../v2/parquet/ParquetPartitionReaderFactory.scala | 6 +- .../datasources/v2/parquet/ParquetScan.scala | 7 +- .../execution/streaming/StreamingRelation.scala | 3 +- .../benchmark/MetadataStructBenchmark.scala | 85 ++++++ .../FileMetadataStructRowIndexSuite.scala | 235 ++++++++++++++++ .../datasources/FileMetadataStructSuite.scala | 66 ++--- .../datasources/parquet/ParquetRowIndexSuite.scala | 313 +++++++++++++++++++++ 19 files changed, 1000 insertions(+), 103 deletions(-) diff --git a/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt b/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt index ec149dce313..ca035973065 100644 --- a/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt +++ b/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt @@ -2,52 +2,52 @@ Nested Schema Pruning Benchmark For Parquet ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +Java HotSpot(TM) 64-Bit Server VM 1.8.0_221-b11 on Linux 5.4.0-122-generic +Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz Selection: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Top-level column 104 114 14 9.6 104.0 1.0X -Nested column 111 121 20 9.0 110.5 0.9X -Nested column in array 321 328 8 3.1 320.5 0.3X +Top-level column 91 131 30 10.9 91.4 1.0X +Nested column 73 109 32 13.6 73.5 1.2X +Nested column in array 248 266 28 4.0 248.1 0.4X -OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +Java HotSpot(TM) 64-Bit Server VM 1.8.0_221-b11 on Linux 5.4.0-122-generic +Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz Limiting: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Top-level column 373 384 8 2.7 373.4 1.0X -Nested column 386 400 10 2.6 385.8 1.0X -Nested column in array 831 859 22 1.2 831.2 0.4X +Top-level column 268 386 154 3.7 268.4 1.0X +Nested column 271 305 22 3.7 271.3 1.0X +Nested column in array 648 737 49 1.5 648.0 0.4X -OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +Java HotSpot(TM) 64-Bit Server VM 1.8.0_221-b11 on Linux 5.4.0-122-generic +Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz Repartitioning: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Top-level column 353 366 9 2.8 353.1 1.0X -Nested column 366 373 6 2.7 366.3 1.0X -Nested column in array 772 793 14 1.3 771.8 0.5X +Top-level column 227 259 24 4.4 226.6 1.0X +Nested column 271 429 182 3.7 271.2 0.8X +Nested column in array 644 751 114 1.6 644.1 0.4X -OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +Java HotSpot(TM) 64-Bit Server VM 1.8.0_221-b11 on Linux 5.4.0-122-generic +Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz Repartitioning by exprs: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Top-level column 333 339 5 3.0 333.4 1.0X -Nested column 370 388 12 2.7 369.7 0.9X -Nested column in array 784 830 21 1.3 784.5 0.4X +Top-level column 263 306 29 3.8 262.8 1.0X +Nested column 323 385 48 3.1 322.5 0.8X +Nested column in array 637 789 195 1.6 637.1 0.4X -OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +Java HotSpot(TM) 64-Bit Server VM 1.8.0_221-b11 on Linux 5.4.0-122-generic +Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz Sample: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Top-level column 123 149 30 8.1 123.1 1.0X -Nested column 133 157 37 7.5 132.8 0.9X -Nested column in array 382 420 16 2.6 381.6 0.3X +Top-level column 84 129 43 12.0 83.5 1.0X +Nested column 83 99 13 12.1 82.7 1.0X +Nested column in array 312 366 47 3.2 312.2 0.3X -OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +Java HotSpot(TM) 64-Bit Server VM 1.8.0_221-b11 on Linux 5.4.0-122-generic +Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz Sorting: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Top-level column 567 579 10 1.8 566.6 1.0X -Nested column 637 652 15 1.6 637.1 0.9X -Nested column in array 1171 1212 28 0.9 1171.4 0.5X +Top-level column 342 420 54 2.9 342.2 1.0X +Nested column 423 441 17 2.4 422.6 0.8X +Nested column in array 821 870 28 1.2 821.1 0.4X diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java index 47774e0a397..5272151acf2 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java @@ -75,6 +75,11 @@ final class ParquetColumnVector { this.isPrimitive = column.isPrimitive(); if (missingColumns.contains(column)) { + if (ParquetRowIndexUtil.isRowIndexColumn(column)) { + // The values of row index column are going to be generated by the reader instead. + return; + } + if (defaultValue == null) { vector.setAllNull(); return; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 63fdec4056f..97f739c5bf2 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -132,6 +132,11 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa */ private boolean returnColumnarBatch; + /** + * Populates the row index column if needed. + */ + private ParquetRowIndexUtil.RowIndexGenerator rowIndexGenerator = null; + /** * The memory mode of the columnarBatch */ @@ -275,6 +280,8 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa (ConstantColumnVector) vectors[i + partitionIdx], partitionValues, i); } } + + rowIndexGenerator = ParquetRowIndexUtil.createGeneratorIfNeeded(sparkSchema); } private void initBatch() { @@ -324,6 +331,10 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa } cv.assemble(); } + // If needed, compute row indexes within a file. + if (rowIndexGenerator != null) { + rowIndexGenerator.populateRowIndex(columnVectors, num); + } rowsReturned += num; columnarBatch.setNumRows(num); @@ -395,6 +406,9 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa throw new IOException("expecting more rows but reached last block. Read " + rowsReturned + " out of " + totalRowCount); } + if (rowIndexGenerator != null) { + rowIndexGenerator.initFromPageReadStore(pages); + } for (ParquetColumnVector cv : columnVectors) { initColumnReader(pages, cv); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 5950136e79a..fdb49bd7674 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.execution.vectorized.ConstantColumnVector +import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructType @@ -214,8 +214,17 @@ trait FileSourceScanLike extends DataSourceScanExec { requiredSchema = requiredSchema, partitionSchema = relation.partitionSchema, relation.sparkSession.sessionState.conf).map { vectorTypes => - // for column-based file format, append metadata column's vector type classes if any - vectorTypes ++ Seq.fill(metadataColumns.size)(classOf[ConstantColumnVector].getName) + vectorTypes ++ + // for column-based file format, append metadata column's vector type classes if any + metadataColumns.map { metadataCol => + if (FileFormat.isConstantMetadataAttr(metadataCol.name)) { + classOf[ConstantColumnVector].getName + } else if (relation.sparkSession.sessionState.conf.offHeapColumnVectorEnabled) { + classOf[OffHeapColumnVector].getName + } else { + classOf[OnHeapColumnVector].getName + } + } } lazy val driverMetrics = Map( @@ -690,7 +699,10 @@ case class FileSourceScanExec( if (shouldProcess(filePath)) { val isSplitable = relation.fileFormat.isSplitable( - relation.sparkSession, relation.options, filePath) + relation.sparkSession, relation.options, filePath) && + // SPARK-39634: Allow file splitting in combination with row index generation once + // the fix for PARQUET-2161 is available. + !RowIndexUtil.isNeededForSchema(requiredSchema) PartitionedFileUtil.splitFiles( sparkSession = relation.sparkSession, file = file, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 8f8846b89f3..d50fd88f65c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -111,7 +111,7 @@ case class DataSource( } } - private def providingInstance() = providingClass.getConstructor().newInstance() + private[sql] def providingInstance(): Any = providingClass.getConstructor().newInstance() private def newHadoopConfiguration(): Configuration = sparkSession.sessionState.newHadoopConfWithOptions(options) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index f9b37fb5d9f..f7f917d8947 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.{DataType, LongType, StringType, StructField, StructType, TimestampType} @@ -182,18 +183,32 @@ object FileFormat { val FILE_MODIFICATION_TIME = "file_modification_time" + val ROW_INDEX = "row_index" + + // A name for a temporary column that holds row indexes computed by the file format reader + // until they can be placed in the _metadata struct. + val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX" + val METADATA_NAME = "_metadata" - // supported metadata struct fields for hadoop fs relation - val METADATA_STRUCT: StructType = new StructType() - .add(StructField(FILE_PATH, StringType)) - .add(StructField(FILE_NAME, StringType)) - .add(StructField(FILE_SIZE, LongType)) - .add(StructField(FILE_MODIFICATION_TIME, TimestampType)) + /** Schema of metadata struct that can be produced by every file format. */ + val BASE_METADATA_STRUCT: StructType = new StructType() + .add(StructField(FileFormat.FILE_PATH, StringType)) + .add(StructField(FileFormat.FILE_NAME, StringType)) + .add(StructField(FileFormat.FILE_SIZE, LongType)) + .add(StructField(FileFormat.FILE_MODIFICATION_TIME, TimestampType)) - // create a file metadata struct col - def createFileMetadataCol: AttributeReference = - FileSourceMetadataAttribute(METADATA_NAME, METADATA_STRUCT) + /** + * Create a file metadata struct column containing fields supported by the given file format. + */ + def createFileMetadataCol(fileFormat: FileFormat): AttributeReference = { + val struct = if (fileFormat.isInstanceOf[ParquetFileFormat]) { + BASE_METADATA_STRUCT.add(StructField(FileFormat.ROW_INDEX, LongType)) + } else { + BASE_METADATA_STRUCT + } + FileSourceMetadataAttribute(FileFormat.METADATA_NAME, struct) + } // create an internal row given required metadata fields and file information def createMetadataInternalRow( @@ -220,10 +235,23 @@ object FileFormat { // the modificationTime from the file is in millisecond, // while internally, the TimestampType `file_modification_time` is stored in microsecond row.update(i, fileModificationTime * 1000L) + case ROW_INDEX => + // Do nothing. Only the metadata fields that have identical values for each row of the + // file are set by this function, while fields that have different values (such as row + // index) are set separately. } } row } + + /** + * Returns true if the given metadata column always contains identical values for all rows + * originating from the same data file. + */ + def isConstantMetadataAttr(name: String): Boolean = name match { + case FILE_PATH | FILE_NAME | FILE_SIZE | FILE_MODIFICATION_TIME => true + case ROW_INDEX => false + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 4c3f5629e78..827d41dd096 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.FileFormat._ import org.apache.spark.sql.execution.vectorized.ConstantColumnVector import org.apache.spark.sql.types.{LongType, StringType, StructType} -import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.NextIterator @@ -133,8 +133,9 @@ class FileScanRDD( } /** - * For each partitioned file, metadata columns for each record in the file are exactly same. - * Only update metadata row when `currentFile` is changed. + * The value of some of the metadata columns remains exactly the same for each record of + * a partitioned file. Only need to update their values in the metadata row when `currentFile` + * is changed. */ private def updateMetadataRow(): Unit = if (metadataColumns.nonEmpty && currentFile != null) { @@ -145,7 +146,7 @@ class FileScanRDD( /** * Create an array of constant column vectors containing all required metadata columns */ - private def createMetadataColumnVector(c: ColumnarBatch): Array[ConstantColumnVector] = { + private def createMetadataColumnVector(c: ColumnarBatch): Array[ColumnVector] = { val path = new Path(currentFile.filePath) metadataColumns.map(_.name).map { case FILE_PATH => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 4995a0d6cd4..22ad7960cdd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import java.util.Locale + import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.BucketSpec @@ -26,7 +28,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_NAME -import org.apache.spark.sql.types.{DoubleType, FloatType, StructType} +import org.apache.spark.sql.types.{DoubleType, FloatType, LongType, StructType} import org.apache.spark.util.collection.BitSet /** @@ -206,13 +208,6 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging { val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects val requiredAttributes = AttributeSet(requiredExpressions) - val readDataColumns = - dataColumns - .filter(requiredAttributes.contains) - .filterNot(partitionColumns.contains) - val outputSchema = readDataColumns.toStructType - logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}") - val metadataStructOpt = l.output.collectFirst { case FileSourceMetadataAttribute(attr) => attr } @@ -223,14 +218,45 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging { }.toSeq }.getOrElse(Seq.empty) - // outputAttributes should also include the metadata columns at the very end - val outputAttributes = readDataColumns ++ partitionColumns ++ metadataColumns + val fileConstantMetadataColumns: Seq[Attribute] = + metadataColumns.filter(_.name != FileFormat.ROW_INDEX) + + val readDataColumns = dataColumns + .filter(requiredAttributes.contains) + .filterNot(partitionColumns.contains) + + val fileFormatReaderGeneratedMetadataColumns: Seq[Attribute] = + metadataColumns.map(_.name).flatMap { + case FileFormat.ROW_INDEX => + if ((readDataColumns ++ partitionColumns).map(_.name.toLowerCase(Locale.ROOT)) + .contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) { + throw new AnalysisException(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME + + " is a reserved column name that cannot be read in combination with " + + s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX} column.") + } + Some(AttributeReference(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType)()) + case _ => None + } + + val outputDataSchema = (readDataColumns ++ fileFormatReaderGeneratedMetadataColumns) + .toStructType + + // The output rows will be produced during file scan operation in three steps: + // (1) File format reader populates a `Row` with `readDataColumns` and + // `fileFormatReaderGeneratedMetadataColumns` + // (2) Then, a row containing `partitionColumns` is joined at the end. + // (3) Finally, a row containing `fileConstantMetadataColumns` is also joined at the end. + // By placing `fileFormatReaderGeneratedMetadataColumns` before `partitionColumns` and + // `fileConstantMetadataColumns` in the `outputAttributes` we make these row operations + // simpler and more efficient. + val outputAttributes = readDataColumns ++ fileFormatReaderGeneratedMetadataColumns ++ + partitionColumns ++ fileConstantMetadataColumns val scan = FileSourceScanExec( fsRelation, outputAttributes, - outputSchema, + outputDataSchema, partitionKeyFilters.toSeq, bucketSet, None, @@ -239,10 +265,20 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging { // extra Project node: wrap flat metadata columns to a metadata struct val withMetadataProjections = metadataStructOpt.map { metadataStruct => + val structColumns = metadataColumns.map { col => col.name match { + case FileFormat.FILE_PATH | FileFormat.FILE_NAME | FileFormat.FILE_SIZE | + FileFormat.FILE_MODIFICATION_TIME => + col + case FileFormat.ROW_INDEX => + fileFormatReaderGeneratedMetadataColumns + .find(_.name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME) + .get.withName(FileFormat.ROW_INDEX) + } + } val metadataAlias = - Alias(CreateStruct(metadataColumns), METADATA_NAME)(exprId = metadataStruct.exprId) + Alias(CreateStruct(structColumns), METADATA_NAME)(exprId = metadataStruct.exprId) execution.ProjectExec( - scan.output.dropRight(metadataColumns.length) :+ metadataAlias, scan) + readDataColumns ++ partitionColumns :+ metadataAlias, scan) }.getOrElse(scan) val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 291b98fb37c..43699c1b6b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -69,7 +69,7 @@ case class LogicalRelation( } override lazy val metadataOutput: Seq[AttributeReference] = relation match { - case _: HadoopFsRelation => + case relation: HadoopFsRelation => val resolve = conf.resolver val outputNames = outputSet.map(_.name) def isOutputColumn(col: AttributeReference): Boolean = { @@ -78,7 +78,7 @@ case class LogicalRelation( // filter out the metadata struct column if it has the name conflicting with output columns. // if the file has a column "_metadata", // then the data column should be returned not the metadata struct column - Seq(FileFormat.createFileMetadataCol).filterNot(isOutputColumn) + Seq(FileFormat.createFileMetadataCol(relation.fileFormat)).filterNot(isOutputColumn) case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RowIndexUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RowIndexUtil.scala new file mode 100644 index 00000000000..1512b6da1e8 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RowIndexUtil.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.types.{LongType, StructField, StructType} + + +object RowIndexUtil { + def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int = { + sparkSchema.fields.zipWithIndex.find { case (field: StructField, _: Int) => + field.name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME + } match { + case Some((field: StructField, idx: Int)) => + if (field.dataType != LongType) { + throw new RuntimeException(s"${FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME} must be of " + + "LongType") + } + idx + case _ => -1 + } + } + + def isNeededForSchema(sparkSchema: StructType): Boolean = { + findRowIndexColumnIndexInSchema(sparkSchema) >= 0 + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 513379d23d6..c20063333c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -366,9 +366,11 @@ class ParquetFileFormat } else { new ParquetRecordReader[InternalRow](readSupport) } - val iter = new RecordReaderIterator[InternalRow](reader) + val readerWithRowIndexes = ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader, + requiredSchema) + val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes) try { - reader.initialize(split, hadoopAttemptContext) + readerWithRowIndexes.initialize(split, hadoopAttemptContext) val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexUtil.scala new file mode 100644 index 00000000000..fb1f6a417f4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexUtil.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.parquet.column.page.PageReadStore +import org.apache.parquet.hadoop.ParquetRecordReader + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.{FileFormat, RowIndexUtil} +import org.apache.spark.sql.execution.datasources.RowIndexUtil.findRowIndexColumnIndexInSchema +import org.apache.spark.sql.execution.vectorized.WritableColumnVector +import org.apache.spark.sql.types.StructType + + +object ParquetRowIndexUtil { + /** + * Generate row indexes for vectorized readers. + */ + class RowIndexGenerator(rowIndexColumnIdx: Int) { + var rowIndexIterator: Iterator[Long] = _ + + /** For Parquet only: initialize the generator using provided PageReadStore. */ + def initFromPageReadStore(pages: PageReadStore): Unit = { + if (!pages.getRowIndexOffset.isPresent) { + throw new IOException("PageReadStore returned no row index offset.") + } + val startingRowIdx: Long = pages.getRowIndexOffset.get() + if (pages.getRowIndexes.isPresent) { + // The presence of `getRowIndexes` indicates that page skipping is effective and only + // a subset of rows in the row group is going to be read. Note that there is a name + // collision here: these row indexes (unlike ones this class is generating) are counted + // starting from 0 in each of the row groups. + rowIndexIterator = pages.getRowIndexes.get.asScala.map(idx => idx + startingRowIdx) + } else { + val numRowsInRowGroup = pages.getRowCount + rowIndexIterator = (startingRowIdx until startingRowIdx + numRowsInRowGroup).iterator + } + } + + def populateRowIndex(columnVectors: Array[ParquetColumnVector], numRows: Int): Unit = { + populateRowIndex(columnVectors(rowIndexColumnIdx).getValueVector, numRows) + } + + def populateRowIndex(columnVector: WritableColumnVector, numRows: Int): Unit = { + for (i <- 0 until numRows) { + columnVector.putLong(i, rowIndexIterator.next()) + } + } + } + + def createGeneratorIfNeeded(sparkSchema: StructType): RowIndexGenerator = { + val columnIdx = findRowIndexColumnIndexInSchema(sparkSchema) + if (columnIdx >= 0) new RowIndexGenerator(columnIdx) + else null + } + + /** + * A wrapper for `ParquetRecordReader` that sets row index column to the correct value in + * the returned InternalRow. Used in combination with non-vectorized (parquet-mr) Parquet reader. + */ + private class RecordReaderWithRowIndexes( + parent: ParquetRecordReader[InternalRow], + rowIndexColumnIdx: Int) + extends RecordReader[Void, InternalRow] { + + override def initialize( + inputSplit: InputSplit, + taskAttemptContext: TaskAttemptContext): Unit = { + parent.initialize(inputSplit, taskAttemptContext) + } + + override def nextKeyValue(): Boolean = parent.nextKeyValue() + + override def getCurrentKey: Void = parent.getCurrentKey + + override def getCurrentValue: InternalRow = { + val row = parent.getCurrentValue + row.setLong(rowIndexColumnIdx, parent.getCurrentRowIndex) + row + } + + override def getProgress: Float = parent.getProgress + + override def close(): Unit = parent.close() + } + + def addRowIndexToRecordReaderIfNeeded( + reader: ParquetRecordReader[InternalRow], + sparkSchema: StructType): RecordReader[Void, InternalRow] = { + val rowIndexColumnIdx = RowIndexUtil.findRowIndexColumnIndexInSchema(sparkSchema) + if (rowIndexColumnIdx >= 0) { + new RecordReaderWithRowIndexes(reader, rowIndexColumnIdx) + } else { + reader + } + } + + def isRowIndexColumn(column: ParquetColumn): Boolean = { + column.path.length == 1 && column.path.last == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 0f6e5201df8..121ebe1cfa2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -295,10 +295,12 @@ case class ParquetPartitionReaderFactory( } else { new ParquetRecordReader[InternalRow](readSupport) } - val iter = new RecordReaderIterator(reader) + val readerWithRowIndexes = ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded( + reader, readDataSchema) + val iter = new RecordReaderIterator(readerWithRowIndexes) // SPARK-23457 Register a task completion listener before `initialization`. taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) - reader + readerWithRowIndexes } private def createVectorizedReader(file: PartitionedFile): VectorizedParquetRecordReader = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala index 0457e8be715..ff0b38880fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.connector.expressions.aggregate.Aggregation import org.apache.spark.sql.connector.read.PartitionReaderFactory -import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, PartitioningAwareFileIndex, RowIndexUtil} import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, ParquetReadSupport, ParquetWriteSupport} import org.apache.spark.sql.execution.datasources.v2.FileScan import org.apache.spark.sql.internal.SQLConf @@ -50,7 +50,10 @@ case class ParquetScan( override def isSplitable(path: Path): Boolean = { // If aggregate is pushed down, only the file footer will be read once, // so file should not be split across multiple tasks. - pushedAggregate.isEmpty + pushedAggregate.isEmpty && + // SPARK-39634: Allow file splitting in combination with row index generation once + // the fix for PARQUET-2161 is available. + !RowIndexUtil.isNeededForSchema(readSchema) } override def readSchema(): StructType = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala index 7b177f3b67d..af90b692a70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala @@ -70,7 +70,8 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output: // filter out the metadata struct column if it has the name conflicting with output columns. // if the file has a column "_metadata", // then the data column should be returned not the metadata struct column - Seq(FileFormat.createFileMetadataCol).filterNot(isOutputColumn) + Seq(FileFormat.createFileMetadataCol( + dataSource.providingInstance().asInstanceOf[FileFormat])).filterNot(isOutputColumn) case _ => Nil } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MetadataStructBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MetadataStructBenchmark.scala new file mode 100644 index 00000000000..38fff24abe5 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MetadataStructBenchmark.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils + + +object MetadataStructBenchmark extends SqlBasedBenchmark { + import spark.implicits._ + + private val NUM_ROWS = 5000000 + private val NUM_ITERS = 32 + + private def withTempData(format: String = "parquet")(f: DataFrame => Unit): Unit = { + val dir = Utils.createTempDir() + dir.delete() + try { + spark.range(0, NUM_ROWS, 1, 1).toDF("id") + .withColumn("num1", $"id" + 10) + .withColumn("num2", $"id" / 10) + .withColumn("str", lit("a sample string ") + $"id".cast("string")) + .write.format(format).save(dir.getAbsolutePath) + val df = spark.read.format(format).load(dir.getAbsolutePath) + f(df) + } finally { + Utils.deleteRecursively(dir) + } + } + + private def addCase(benchmark: Benchmark, df: DataFrame, metadataCol: Option[String]): Unit = { + benchmark.addCase(metadataCol.getOrElse("no metadata columns")) { _ => + df.select("*", metadataCol.toSeq: _*).noop() + } + } + + private def metadataBenchmark(name: String, format: String): Unit = { + withTempData(format) { df => + val metadataCols = df.select(FileFormat.METADATA_NAME).schema + .fields.head.dataType.asInstanceOf[StructType].fieldNames + + val benchmark = new Benchmark(name, NUM_ROWS, NUM_ITERS, output = output) + + addCase(benchmark, df, None) + for (metadataCol <- metadataCols) { + addCase(benchmark, df, Some(s"${FileFormat.METADATA_NAME}.$metadataCol")) + } + addCase(benchmark, df, Some(FileFormat.METADATA_NAME)) + + benchmark.run() + } + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + runBenchmark("Metadata Struct Benchmark") { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") { + metadataBenchmark("Vectorized Parquet", "parquet") + } + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { + metadataBenchmark("Parquet-mr", "parquet") + } + metadataBenchmark("JSON", "json") + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructRowIndexSuite.scala new file mode 100644 index 00000000000..af2d56159bf --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructRowIndexSuite.scala @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest} +import org.apache.spark.sql.functions.{col, lit} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{LongType, StructField, StructType} + +class FileMetadataStructRowIndexSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + val EXPECTED_ROW_ID_COL = "expected_row_idx" + val EXPECTED_EXTRA_COL = "expected_extra_col" + val EXPECTED_PARTITION_COL = "experted_pb_col" + val NUM_ROWS = 100 + + def withReadDataFrame( + format: String, + partitionCol: String = null, + extraCol: String = "ec", + extraSchemaFields: Seq[StructField] = Seq.empty) + (f: DataFrame => Unit): Unit = { + withTempPath { path => + val baseDf = spark.range(0, NUM_ROWS, 1, 1).toDF("id") + .withColumn(extraCol, $"id" + lit(1000 * 1000)) + .withColumn(EXPECTED_EXTRA_COL, col(extraCol)) + val writeSchema: StructType = if (partitionCol != null) { + val writeDf = baseDf + .withColumn(partitionCol, ($"id" / 10).cast("int") + lit(1000)) + .withColumn(EXPECTED_PARTITION_COL, col(partitionCol)) + .withColumn(EXPECTED_ROW_ID_COL, $"id" % 10) + writeDf.write.format(format).partitionBy(partitionCol).save(path.getAbsolutePath) + writeDf.schema + } else { + val writeDf = baseDf + .withColumn(EXPECTED_ROW_ID_COL, $"id") + writeDf.write.format(format).save(path.getAbsolutePath) + writeDf.schema + } + val readSchema: StructType = new StructType(writeSchema.fields ++ extraSchemaFields) + val readDf = spark.read.format(format).schema(readSchema).load(path.getAbsolutePath) + f(readDf) + } + } + + private val allMetadataCols = Seq( + FileFormat.FILE_PATH, + FileFormat.FILE_SIZE, + FileFormat.FILE_MODIFICATION_TIME, + FileFormat.ROW_INDEX + ) + + /** Identifies the names of all the metadata columns present in the schema. */ + private def collectMetadataCols(struct: StructType): Seq[String] = { + struct.fields.flatMap { field => field.dataType match { + case s: StructType => collectMetadataCols(s) + case _ if allMetadataCols.contains(field.name) => Some(field.name) + case _ => None + }} + } + + for (useVectorizedReader <- Seq(false, true)) + for (useOffHeapMemory <- Seq(useVectorizedReader, false).distinct) + for (partitioned <- Seq(false, true)) { + val label = Seq( + { if (useVectorizedReader) "vectorized" else "parquet-mr"}, + { if (useOffHeapMemory) "off-heap" else "" }, + { if (partitioned) "partitioned" else "" } + ).filter(_.nonEmpty).mkString(", ") + test(s"parquet ($label) - read _metadata.row_index") { + withSQLConf( + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> useVectorizedReader.toString, + SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> useOffHeapMemory.toString) { + withReadDataFrame("parquet", partitionCol = "pb") { df => + val res = df.select("*", s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}") + .where(s"$EXPECTED_ROW_ID_COL != ${FileFormat.ROW_INDEX}") + assert(res.count() == 0) + } + } + } + } + + test("supported file format - read _metadata struct") { + withReadDataFrame("parquet") { df => + val withMetadataStruct = df.select("*", FileFormat.METADATA_NAME) + + // `_metadata.row_index` column is present when selecting `_metadata` as a whole. + val metadataCols = collectMetadataCols(withMetadataStruct.schema) + assert(metadataCols.contains(FileFormat.ROW_INDEX)) + } + } + + test("unsupported file format - read _metadata struct") { + withReadDataFrame("orc") { df => + val withMetadataStruct = df.select("*", FileFormat.METADATA_NAME) + + // Metadata struct can be read without an error. + withMetadataStruct.collect() + + // Schema does not contain row index column, but contains all the remaining metadata columns. + val metadataCols = collectMetadataCols(withMetadataStruct.schema) + assert(!metadataCols.contains(FileFormat.ROW_INDEX)) + assert(allMetadataCols.intersect(metadataCols).size == allMetadataCols.size - 1) + } + } + + test("unsupported file format - read _metadata.row_index") { + withReadDataFrame("orc") { df => + val ex = intercept[AnalysisException] { + df.select("*", s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}") + } + assert(ex.getMessage.contains("No such struct field row_index")) + } + } + + for (useVectorizedReader <- Seq(true, false)) { + val label = if (useVectorizedReader) "vectorized" else "parquet-mr" + test(s"parquet ($label) - use mixed case for column name") { + withSQLConf( + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> useVectorizedReader.toString) { + withReadDataFrame("parquet") { df => + val mixedCaseRowIndex = "RoW_InDeX" + assert(mixedCaseRowIndex.toLowerCase() == FileFormat.ROW_INDEX) + + assert(df.select("*", s"${FileFormat.METADATA_NAME}.$mixedCaseRowIndex") + .where(s"$EXPECTED_ROW_ID_COL != $mixedCaseRowIndex") + .count == 0) + } + } + } + } + + test(s"reading ${FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME} - not present in a table") { + // File format supporting row index generation populates the column with row indexes. + withReadDataFrame("parquet", extraSchemaFields = + Seq(StructField(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType))) { df => + assert(df + .where(col(EXPECTED_ROW_ID_COL) === col(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) + .count == NUM_ROWS) + } + + // File format not supporting row index generation populates missing column with nulls. + withReadDataFrame("json", extraSchemaFields = + Seq(StructField(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType))) { df => + assert(df + .where(col(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME).isNull) + .count == NUM_ROWS) + } + } + + test(s"reading ${FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME} - present in a table") { + withReadDataFrame("parquet", extraCol = FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME) { df => + // Values of FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME column are always populated with + // generated row indexes, rather than read from the file. + // TODO(SPARK-40059): Allow users to include columns named + // FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME in their schemas. + assert(df + .where(col(EXPECTED_ROW_ID_COL) === col(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) + .count == NUM_ROWS) + + // Column cannot be read in combination with _metadata.row_index. + intercept[AnalysisException](df.select("*", FileFormat.METADATA_NAME).collect()) + intercept[AnalysisException](df + .select("*", s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}").collect()) + } + } + + test(s"reading ${FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME} - as partition col") { + withReadDataFrame("parquet", partitionCol = FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME) { df => + // Column values are set for each partition, rather than populated with generated row indexes. + assert(df + .where(col(EXPECTED_PARTITION_COL) === col(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) + .count == NUM_ROWS) + + // Column cannot be read in combination with _metadata.row_index. + intercept[AnalysisException](df.select("*", FileFormat.METADATA_NAME).collect()) + intercept[AnalysisException](df + .select("*", s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}").collect()) + } + } + + test(s"cannot make ${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX} a partition column") { + withTempPath { srcPath => + spark.range(0, 10, 1, 1).toDF("id").write.parquet(srcPath.getAbsolutePath) + + withTempPath { dstPath => + intercept[AnalysisException] { + spark.read.parquet(srcPath.getAbsolutePath) + .select("*", FileFormat.METADATA_NAME) + .write + .partitionBy(s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}") + .save(dstPath.getAbsolutePath) + } + } + } + } + + test(s"read user created ${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX} column") { + withReadDataFrame("parquet", partitionCol = "pb") { df => + withTempPath { dir => + // The `df` has 10 input files with 10 rows each. Therefore the `_metadata.row_index` values + // will be { 10 x 0, 10 x 1, ..., 10 x 9 }. We store all these values in a single file. + df.select("id", s"${FileFormat.METADATA_NAME}") + .coalesce(1) + .write.parquet(dir.getAbsolutePath) + + assert(spark + .read.parquet(dir.getAbsolutePath) + .count == NUM_ROWS) + + // The _metadata.row_index is returning data from the file, not generated metadata. + assert(spark + .read.parquet(dir.getAbsolutePath) + .select(s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}") + .distinct.count == NUM_ROWS / 10) + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala index 6afea42ee83..2c56adbab94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala @@ -59,6 +59,30 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { private val METADATA_FILE_MODIFICATION_TIME = "_metadata.file_modification_time" + private val METADATA_ROW_INDEX = "_metadata.row_index" + + private val FILE_FORMAT = "fileFormat" + + private def getMetadataRow(f: Map[String, Any]): Row = f(FILE_FORMAT) match { + case "parquet" => + Row(f(METADATA_FILE_PATH), f(METADATA_FILE_NAME), f(METADATA_FILE_SIZE), + f(METADATA_FILE_MODIFICATION_TIME), f(METADATA_ROW_INDEX)) + case _ => + Row(f(METADATA_FILE_PATH), f(METADATA_FILE_NAME), f(METADATA_FILE_SIZE), + f(METADATA_FILE_MODIFICATION_TIME)) + } + + private def getMetadataForFile(f: File): Map[String, Any] = { + Map( + METADATA_FILE_PATH -> f.toURI.toString, + METADATA_FILE_NAME -> f.getName, + METADATA_FILE_SIZE -> f.length(), + METADATA_FILE_MODIFICATION_TIME -> new Timestamp(f.lastModified()), + METADATA_ROW_INDEX -> 0, + FILE_FORMAT -> f.getName.split("\\.").last + ) + } + /** * This test wrapper will test for both row-based and column-based file formats: * (json and parquet) with nested schema: @@ -101,21 +125,7 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { val realF1 = new File(dir, "data/f1").listFiles() .filter(_.getName.endsWith(s".$testFileFormat")).head - // 3. create f0 and f1 metadata data - val f0Metadata = Map( - METADATA_FILE_PATH -> realF0.toURI.toString, - METADATA_FILE_NAME -> realF0.getName, - METADATA_FILE_SIZE -> realF0.length(), - METADATA_FILE_MODIFICATION_TIME -> new Timestamp(realF0.lastModified()) - ) - val f1Metadata = Map( - METADATA_FILE_PATH -> realF1.toURI.toString, - METADATA_FILE_NAME -> realF1.getName, - METADATA_FILE_SIZE -> realF1.length(), - METADATA_FILE_MODIFICATION_TIME -> new Timestamp(realF1.lastModified()) - ) - - f(df, f0Metadata, f1Metadata) + f(df, getMetadataForFile(realF0), getMetadataForFile(realF1)) } } } @@ -232,10 +242,8 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { checkAnswer( df.select("_metadata"), Seq( - Row(Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME), - f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME))), - Row(Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME), - f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME))) + Row(getMetadataRow(f0)), + Row(getMetadataRow(f1)) ) ) } @@ -348,11 +356,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { df.select("name", "age", "_METADATA", "_metadata"), Seq( Row("jack", 24, Row(12345L, "uom"), - Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME), - f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME))), + getMetadataRow(f0)), Row("lily", 31, Row(54321L, "ucb"), - Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME), - f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME))) + getMetadataRow(f1)) ) ) } else { @@ -492,12 +498,8 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { checkAnswer( newDF.select("*"), Seq( - Row("jack", 24, Row(12345L, "uom"), - Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME), - f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME))), - Row("lily", 31, Row(54321L, "ucb"), - Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME), - f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME))) + Row("jack", 24, Row(12345L, "uom"), getMetadataRow(f0)), + Row("lily", 31, Row(54321L, "ucb"), getMetadataRow(f1)) ) ) @@ -505,10 +507,8 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { checkAnswer( newDF.select("_metadata"), Seq( - Row(Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME), - f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME))), - Row(Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME), - f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME))) + Row(getMetadataRow(f0)), + Row(getMetadataRow(f1)) ) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala new file mode 100644 index 00000000000..c36ab49b5e3 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.Path +import org.apache.parquet.column.ParquetProperties._ +import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat} +import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2 +import org.apache.spark.sql.functions.{col, max, min} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{LongType, StringType} + +class ParquetRowIndexSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + private def readRowGroupRowCounts(path: String): Seq[Long] = { + ParquetFileReader.readFooter(spark.sessionState.newHadoopConf(), new Path(path)) + .getBlocks.asScala.toSeq.map(_.getRowCount) + } + + private def readRowGroupRowCounts(dir: File): Seq[Seq[Long]] = { + assert(dir.isDirectory) + dir.listFiles() + .filter { f => f.isFile && f.getName.endsWith("parquet") } + .map { f => readRowGroupRowCounts(f.getAbsolutePath) } + } + + /** + * Do the files contain exactly one row group? + */ + private def assertOneRowGroup(dir: File): Unit = { + readRowGroupRowCounts(dir).foreach { rcs => + assert(rcs.length == 1, "expected one row group per file") + } + } + + /** + * Do the files have a good layout to test row group skipping (both range metadata filter, and + * by using min/max). + */ + private def assertTinyRowGroups(dir: File): Unit = { + readRowGroupRowCounts(dir).foreach { rcs => + assert(rcs.length > 1, "expected multiple row groups per file") + assert(rcs.last <= DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK) + assert(rcs.reverse.tail.distinct == Seq(DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK), + "expected row groups with minimal row count") + } + } + + /** + * Do the files have a good layout to test a combination of page skipping and row group skipping? + */ + private def assertIntermediateRowGroups(dir: File): Unit = { + readRowGroupRowCounts(dir).foreach { rcs => + assert(rcs.length >= 3, "expected at least 3 row groups per file") + rcs.reverse.tail.foreach { rc => + assert(rc > DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, + "expected row groups larger than minimal row count") + } + } + } + + case class RowIndexTestConf( + numRows: Long = 10000L, + useMultipleFiles: Boolean = false, + useVectorizedReader: Boolean = true, + useSmallPages: Boolean = false, + useSmallRowGroups: Boolean = false, + useSmallSplits: Boolean = false, + useFilter: Boolean = false, + useDataSourceV2: Boolean = false) { + + val NUM_MULTIPLE_FILES = 4 + // The test doesn't work correctly if the number of records per file is uneven. + assert(!useMultipleFiles || (numRows % NUM_MULTIPLE_FILES == 0)) + + def numFiles: Int = if (useMultipleFiles) { NUM_MULTIPLE_FILES } else { 1 } + + def rowGroupSize: Long = if (useSmallRowGroups) { + if (useSmallPages) { + // Each file will contain multiple row groups. All of them (except for the last one) + // will contain more than DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, so that individual + // pages within the row group can be skipped. + 2048L + } else { + // Each file will contain multiple row groups. All of them (except for the last one) + // will contain exactly DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK records. + 64L + } + } else { + // Each file will contain a single row group. + DEFAULT_BLOCK_SIZE + } + + def pageSize: Long = if (useSmallPages) { + // Each page (except for the last one for each column) will contain exactly + // DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK records. + 64L + } else { + DEFAULT_PAGE_SIZE + } + + def writeFormat: String = "parquet" + def readFormat: String = if (useDataSourceV2) { + classOf[ParquetDataSourceV2].getCanonicalName + } else { + "parquet" + } + + assert(useSmallRowGroups || !useSmallSplits) + def filesMaxPartitionBytes: Long = if (useSmallSplits) { + 256L + } else { + SQLConf.FILES_MAX_PARTITION_BYTES.defaultValue.get + } + + def desc: String = { + { if (useVectorizedReader) Seq("vectorized reader") else Seq("parquet-mr reader") } ++ + { if (useMultipleFiles) Seq("many files") else Seq.empty[String] } ++ + { if (useFilter) Seq("filtered") else Seq.empty[String] } ++ + { if (useSmallPages) Seq("small pages") else Seq.empty[String] } ++ + { if (useSmallRowGroups) Seq("small row groups") else Seq.empty[String] } ++ + { if (useSmallSplits) Seq("small splits") else Seq.empty[String] } ++ + { if (useDataSourceV2) Seq("datasource v2") else Seq.empty[String] } + }.mkString(", ") + + def sqlConfs: Seq[(String, String)] = Seq( + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> useVectorizedReader.toString, + SQLConf.FILES_MAX_PARTITION_BYTES.key -> filesMaxPartitionBytes.toString + ) ++ { if (useDataSourceV2) Seq(SQLConf.USE_V1_SOURCE_LIST.key -> "") else Seq.empty } + } + + for (useVectorizedReader <- Seq(true, false)) + for (useDataSourceV2 <- Seq(true, false)) + for (useSmallRowGroups <- Seq(true, false)) + for (useSmallPages <- Seq(true, false)) + for (useFilter <- Seq(true, false)) + for (useSmallSplits <- Seq(useSmallRowGroups, false).distinct) { + val conf = RowIndexTestConf(useVectorizedReader = useVectorizedReader, + useDataSourceV2 = useDataSourceV2, useSmallRowGroups = useSmallRowGroups, + useSmallPages = useSmallPages, useFilter = useFilter, + useSmallSplits = useSmallSplits) + testRowIndexGeneration("row index generation", conf) + } + + private def testRowIndexGeneration(label: String, conf: RowIndexTestConf): Unit = { + test (s"$label - ${conf.desc}") { + withSQLConf(conf.sqlConfs: _*) { + withTempPath { path => + val rowIndexColName = FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME + val numRecordsPerFile = conf.numRows / conf.numFiles + val (skipCentileFirst, skipCentileMidLeft, skipCentileMidRight, skipCentileLast) = + (0.2, 0.4, 0.6, 0.8) + val expectedRowIdxCol = "expected_rowIdx_col" + val df = spark.range(0, conf.numRows, 1, conf.numFiles).toDF("id") + .withColumn("dummy_col", ($"id" / 55).cast("int")) + .withColumn(expectedRowIdxCol, ($"id" % numRecordsPerFile).cast("int")) + + // With row index in schema. + val schemaWithRowIdx = df.schema.add(rowIndexColName, LongType, nullable = true) + + df.write + .format(conf.writeFormat) + .option(ParquetOutputFormat.BLOCK_SIZE, conf.rowGroupSize) + .option(ParquetOutputFormat.PAGE_SIZE, conf.pageSize) + .option(ParquetOutputFormat.DICTIONARY_PAGE_SIZE, conf.pageSize) + .save(path.getAbsolutePath) + val dfRead = spark.read + .format(conf.readFormat) + .schema(schemaWithRowIdx) + .load(path.getAbsolutePath) + + // Verify that the produced files are laid out as expected. + if (conf.useSmallRowGroups) { + if (conf.useSmallPages) { + assertIntermediateRowGroups(path) + } else { + assertTinyRowGroups(path) + } + } else { + assertOneRowGroup(path) + } + + val dfToAssert = if (conf.useFilter) { + // Add a filter such that we skip 60% of the records: + // [0%, 20%], [40%, 60%], [80%, 100%] + dfRead.filter(( + $"id" >= (skipCentileFirst * conf.numRows).toInt && + $"id" < (skipCentileMidLeft * conf.numRows).toInt) || ( + $"id" >= (skipCentileMidRight * conf.numRows).toInt && + $"id" < (skipCentileLast * conf.numRows).toInt)) + } else { + dfRead + } + + var numPartitions: Long = 0 + var numOutputRows: Long = 0 + dfToAssert.collect() + dfToAssert.queryExecution.executedPlan.foreach { + case b: BatchScanExec => + numPartitions += b.inputRDD.partitions.length + numOutputRows += b.metrics("numOutputRows").value + case f: FileSourceScanExec => + numPartitions += f.inputRDD.partitions.length + numOutputRows += f.metrics("numOutputRows").value + case _ => + } + assert(numPartitions > 0) + assert(numOutputRows > 0) + + if (conf.useSmallSplits) { + // SPARK-39634: Until the fix the fix for PARQUET-2161 is available is available, + // it is not possible to split Parquet files into multiple partitions while generating + // row indexes. + // assert(numPartitions >= 2 * conf.numFiles) + } + + // Assert that every rowIdx value matches the value in `expectedRowIdx`. + assert(dfToAssert.filter(s"$rowIndexColName != $expectedRowIdxCol") + .count() == 0) + + if (conf.useFilter) { + if (conf.useSmallRowGroups) { + assert(numOutputRows < conf.numRows) + } + + val minMaxRowIndexes = dfToAssert.select( + max(col(rowIndexColName)), + min(col(rowIndexColName))).collect() + val (expectedMaxRowIdx, expectedMinRowIdx) = if (conf.numFiles == 1) { + // When there is a single file, we still have row group skipping, + // but that should not affect the produced rowIdx. + (conf.numRows * skipCentileLast - 1, conf.numRows * skipCentileFirst) + } else { + // For simplicity, the chosen filter skips the whole files. + // Thus all unskipped files will have the same max and min rowIdx values. + (numRecordsPerFile - 1, 0) + } + assert(minMaxRowIndexes(0).get(0) == expectedMaxRowIdx) + assert(minMaxRowIndexes(0).get(1) == expectedMinRowIdx) + if (!conf.useMultipleFiles) { + val skippedValues = List.range(0, (skipCentileFirst * conf.numRows).toInt) ++ + List.range((skipCentileMidLeft * conf.numRows).toInt, + (skipCentileMidRight * conf.numRows).toInt) ++ + List.range((skipCentileLast * conf.numRows).toInt, conf.numRows) + // rowIdx column should not have any of the `skippedValues`. + assert(dfToAssert + .filter(col(rowIndexColName).isin(skippedValues: _*)).count() == 0) + } + } else { + assert(numOutputRows == conf.numRows) + // When there is no filter, the rowIdx values should be in range + // [0-`numRecordsPerFile`]. + val expectedRowIdxValues = List.range(0, numRecordsPerFile) + assert(dfToAssert.filter(col(rowIndexColName).isin(expectedRowIdxValues: _*)) + .count() == conf.numRows) + } + } + } + } + } + + for (useDataSourceV2 <- Seq(true, false)) { + val conf = RowIndexTestConf(useDataSourceV2 = useDataSourceV2) + + test(s"invalid row index column type - ${conf.desc}") { + withSQLConf(conf.sqlConfs: _*) { + withTempPath{ path => + val df = spark.range(0, 10, 1, 1).toDF("id") + val schemaWithRowIdx = df.schema + .add(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, StringType) + + df.write + .format(conf.writeFormat) + .save(path.getAbsolutePath) + + val dfRead = spark.read + .format(conf.readFormat) + .schema(schemaWithRowIdx) + .load(path.getAbsolutePath) + + val exception = intercept[Exception](dfRead.collect()) + assert(exception.getMessage.contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) + } + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org