This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 50b652e241f [SPARK-43226] Define extractors for file-constant metadata 50b652e241f is described below commit 50b652e241f7e31b99303359ec53e26a8989a4f0 Author: Ryan Johnson <ryan.john...@databricks.com> AuthorDate: Tue Apr 25 16:11:21 2023 +0800 [SPARK-43226] Define extractors for file-constant metadata ### What changes were proposed in this pull request? File-source constant metadata columns are often derived indirectly from file-level metadata values rather than exposing those values directly. Add support for metadata extractors, so that we can express such columns in a generic way. ### Why are the changes needed? Allows to express the existing file-source metadata columns in a generic way (previously hard-wired), and also allows to lazily derive expensive metadata values only if the query actually selects them. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. Plus, existing file-source metadata unit tests pass. Closes #40885 from ryan-johnson-databricks/file-constant-metadata-extractors. Authored-by: Ryan Johnson <ryan.john...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/execution/DataSourceScanExec.scala | 33 +++--- .../sql/execution/datasources/FileFormat.scala | 124 ++++++++++++++------- .../sql/execution/datasources/FileScanRDD.scala | 70 +++--------- .../datasources/PartitioningAwareFileIndex.scala | 10 +- .../FileSourceCustomMetadataStructSuite.scala | 35 +++++- 5 files changed, 156 insertions(+), 116 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 0400a2b6abc..0d5091f4a97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -548,10 +548,9 @@ case class FileSourceScanExec( hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) val readRDD = if (bucketedScan) { - createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions, - relation) + createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions) } else { - createReadRDD(readFile, dynamicallySelectedPartitions, relation) + createReadRDD(readFile, dynamicallySelectedPartitions) } sendDriverMetrics() readRDD @@ -617,13 +616,11 @@ case class FileSourceScanExec( * @param bucketSpec the bucketing spec. * @param readFile a function to read each (part of a) file. * @param selectedPartitions Hive-style partition that are part of the read. - * @param fsRelation [[HadoopFsRelation]] associated with the read. */ private def createBucketedReadRDD( bucketSpec: BucketSpec, readFile: (PartitionedFile) => Iterator[InternalRow], - selectedPartitions: Array[PartitionDirectory], - fsRelation: HadoopFsRelation): RDD[InternalRow] = { + selectedPartitions: Array[PartitionDirectory]): RDD[InternalRow] = { logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions.flatMap { p => @@ -660,9 +657,10 @@ case class FileSourceScanExec( } } - new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions, - new StructType(requiredSchema.fields ++ fsRelation.partitionSchema.fields), - fileConstantMetadataColumns, new FileSourceOptions(CaseInsensitiveMap(relation.options))) + new FileScanRDD(relation.sparkSession, readFile, filePartitions, + new StructType(requiredSchema.fields ++ relation.partitionSchema.fields), + fileConstantMetadataColumns, relation.fileFormat.fileConstantMetadataExtractors, + new FileSourceOptions(CaseInsensitiveMap(relation.options))) } /** @@ -671,20 +669,18 @@ case class FileSourceScanExec( * * @param readFile a function to read each (part of a) file. * @param selectedPartitions Hive-style partition that are part of the read. - * @param fsRelation [[HadoopFsRelation]] associated with the read. */ private def createReadRDD( readFile: (PartitionedFile) => Iterator[InternalRow], - selectedPartitions: Array[PartitionDirectory], - fsRelation: HadoopFsRelation): RDD[InternalRow] = { - val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes + selectedPartitions: Array[PartitionDirectory]): RDD[InternalRow] = { + val openCostInBytes = relation.sparkSession.sessionState.conf.filesOpenCostInBytes val maxSplitBytes = - FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions) + FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions) logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") // Filter files with bucket pruning if possible - val bucketingEnabled = fsRelation.sparkSession.sessionState.conf.bucketingEnabled + val bucketingEnabled = relation.sparkSession.sessionState.conf.bucketingEnabled val shouldProcess: Path => Boolean = optionalBucketSet match { case Some(bucketSet) if bucketingEnabled => // Do not prune the file if bucket file name is invalid @@ -722,9 +718,10 @@ case class FileSourceScanExec( val partitions = FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) - new FileScanRDD(fsRelation.sparkSession, readFile, partitions, - new StructType(requiredSchema.fields ++ fsRelation.partitionSchema.fields), - fileConstantMetadataColumns, new FileSourceOptions(CaseInsensitiveMap(relation.options))) + new FileScanRDD(relation.sparkSession, readFile, partitions, + new StructType(requiredSchema.fields ++ relation.partitionSchema.fields), + fileConstantMetadataColumns, relation.fileFormat.fileConstantMetadataExtractors, + new FileSourceOptions(CaseInsensitiveMap(relation.options))) } // Filters unused DynamicPruningExpression expressions - one which has been replaced diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index 72e3e1e2406..8ec1d3bb8c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -22,6 +22,7 @@ import org.apache.hadoop.fs._ import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} import org.apache.hadoop.mapreduce.Job +import org.apache.spark.paths.SparkPath import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -30,7 +31,6 @@ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String /** @@ -190,19 +190,38 @@ trait FileFormat { /** * All fields the file format's _metadata struct defines. * - * Each field's metadata should define [[METADATA_COL_ATTR_KEY]], - * [[FILE_SOURCE_METADATA_COL_ATTR_KEY]], and either - * [[FILE_SOURCE_CONSTANT_METADATA_COL_ATTR_KEY]] or - * [[FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY]] as appropriate. + * Each metadata struct field is either "constant" or "generated" (respectively defined/matched by + * [[FileSourceConstantMetadataStructField]] or [[FileSourceGeneratedMetadataAttribute]]). * - * Constant attributes will be extracted automatically from - * [[PartitionedFile.extraConstantMetadataColumnValues]], while generated metadata columns always - * map to some hidden/internal column the underslying reader provides. + * Constant metadata columns are derived from the [[PartitionedFile]] instances a scan's + * [[FileIndex]] provides. Thus, a custom [[FileFormat]] that defines constant metadata columns + * will generally pair with a a custom [[FileIndex]] that populates [[PartitionedFile]] with + * appropriate metadata values. By default, constant attribute values are obtained by a simple + * name-based lookup in [[PartitionedFile.extraConstantMetadataColumnValues]], but implementations + * can override [[fileConstantMetadataExtractors]] to define custom extractors that have access to + * the entire [[PartitionedFile]] when deriving the column's value. * - * NOTE: It is not possible to change the semantics of the base metadata fields by overriding this - * method. Technically, a file format could choose suppress them, but that is not recommended. + * Generated metadata columns map to a hidden/internal column the underlying reader provides, and + * so will often pair with a custom reader that can populate those columns. For example, + * [[ParquetFileFormat]] defines a "_metadata.row_index" column that relies on + * [[VectorizedParquetRecordReader]] to extract the actual row index values from the parquet scan. */ def metadataSchemaFields: Seq[StructField] = FileFormat.BASE_METADATA_FIELDS + + /** + * The extractors to use when deriving file-constant metadata columns for this file format. + * + * Implementations that define custom constant metadata columns can override this method to + * associate a custom extractor with a given metadata column name, when a simple name-based lookup + * in [[PartitionedFile.extraConstantMetadataColumnValues]] is not expressive enough; extractors + * have access to the entire [[PartitionedFile]] and can perform arbitrary computations. + * + * NOTE: Extractors are lazy, invoked only if the query actually selects their column at runtime. + * + * See also [[FileFormat.getFileConstantMetadataColumnValue]]. + */ + def fileConstantMetadataExtractors: Map[String, PartitionedFile => Any] = + FileFormat.BASE_METADATA_EXTRACTORS } object FileFormat { @@ -241,47 +260,74 @@ object FileFormat { FileSourceConstantMetadataStructField(FILE_BLOCK_LENGTH, LongType, nullable = false), FileSourceConstantMetadataStructField(FILE_MODIFICATION_TIME, TimestampType, nullable = false)) + /** + * All [[BASE_METADATA_FIELDS]] require custom extractors because they are derived directly from + * fields of the [[PartitionedFile]], and do have entries in the file's metadata map. + */ + val BASE_METADATA_EXTRACTORS: Map[String, PartitionedFile => Any] = Map( + FILE_PATH -> { pf: PartitionedFile => pf.toPath.toString }, + FILE_NAME -> { pf: PartitionedFile => pf.toPath.getName }, + FILE_SIZE -> { pf: PartitionedFile => pf.fileSize }, + FILE_BLOCK_START -> { pf: PartitionedFile => pf.start }, + FILE_BLOCK_LENGTH -> { pf: PartitionedFile => pf.length }, + // The modificationTime from the file has millisecond granularity, but the TimestampType for + // `file_modification_time` has microsecond granularity. + FILE_MODIFICATION_TIME -> { pf: PartitionedFile => pf.modificationTime * 1000 } + ) + + /** + * Extracts the [[Literal]] value of a file-constant metadata column from a [[PartitionedFile]]. + * + * If an extractor is available, apply it. Otherwise, look up the column's name in the file's + * column value map and return the result (or null, if not found). + * + * Raw values (including null) are automatically converted to literals as a courtesy. + */ + def getFileConstantMetadataColumnValue( + name: String, + file: PartitionedFile, + metadataExtractors: Map[String, PartitionedFile => Any]): Literal = { + val extractor = metadataExtractors.get(name).getOrElse { + pf: PartitionedFile => pf.otherConstantMetadataColumnValues.get(name).orNull + } + Literal(extractor.apply(file)) + } + // create an internal row given required metadata fields and file information def createMetadataInternalRow( + partitionValues: InternalRow, fieldNames: Seq[String], - filePath: Path, + filePath: SparkPath, fileSize: Long, fileModificationTime: Long): InternalRow = { - // We are not aware of `FILE_BLOCK_START` and `FILE_BLOCK_LENGTH` before splitting files - assert(!fieldNames.contains(FILE_BLOCK_START) && !fieldNames.contains(FILE_BLOCK_LENGTH)) - updateMetadataInternalRow(new GenericInternalRow(fieldNames.length), fieldNames, - filePath, fileSize, 0L, fileSize, fileModificationTime, Map.empty) + // When scanning files directly from the filesystem, we only support file-constant metadata + // fields whose values can be derived from a file status. In particular, we don't have accurate + // file split information yet, nor do we have a way to provide custom metadata column values. + val validFieldNames = Set(FILE_PATH, FILE_NAME, FILE_SIZE, FILE_MODIFICATION_TIME) + val extractors = FileFormat.BASE_METADATA_EXTRACTORS.filterKeys(validFieldNames.contains).toMap + assert(fieldNames.forall(validFieldNames.contains)) + val pf = PartitionedFile( + partitionValues = partitionValues, + filePath = filePath, + start = 0L, + length = fileSize, + locations = Array.empty, + modificationTime = fileModificationTime, + fileSize = fileSize, + otherConstantMetadataColumnValues = Map.empty) + updateMetadataInternalRow(new GenericInternalRow(fieldNames.length), fieldNames, pf, extractors) } // update an internal row given required metadata fields and file information def updateMetadataInternalRow( row: InternalRow, fieldNames: Seq[String], - filePath: Path, - fileSize: Long, - fileBlockStart: Long, - fileBlockLength: Long, - fileModificationTime: Long, - otherConstantMetadataColumnValues: Map[String, Any]): InternalRow = { + file: PartitionedFile, + metadataExtractors: Map[String, PartitionedFile => Any]): InternalRow = { fieldNames.zipWithIndex.foreach { case (name, i) => - name match { - // NOTE: The base metadata fields are hard-wired here and cannot be overridden. - case FILE_PATH => row.update(i, UTF8String.fromString(filePath.toString)) - case FILE_NAME => row.update(i, UTF8String.fromString(filePath.getName)) - case FILE_SIZE => row.update(i, fileSize) - case FILE_BLOCK_START => row.update(i, fileBlockStart) - case FILE_BLOCK_LENGTH => row.update(i, fileBlockLength) - case FILE_MODIFICATION_TIME => - // the modificationTime from the file is in millisecond, - // while internally, the TimestampType `file_modification_time` is stored in microsecond - row.update(i, fileModificationTime * 1000L) - case other => - // Other metadata columns use the file-provided value (if any). Automatically convert raw - // values (including nulls) to literals as a courtesy. - Literal(otherConstantMetadataColumnValues.get(other).orNull) match { - case Literal(null, _) => row.setNullAt(i) - case literal => row.update(i, literal.value) - } + getFileConstantMetadataColumnValue(name, file, metadataExtractors) match { + case Literal(null, _) => row.setNullAt(i) + case literal => row.update(i, literal.value) } } row diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 706228e656b..0cca51cf4e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -31,13 +31,13 @@ import org.apache.spark.rdd.{InputFileBlockHolder, RDD} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericInternalRow, JoinedRow, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.types.PhysicalDataType import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.FileFormat._ import org.apache.spark.sql.execution.vectorized.{ColumnVectorUtils, ConstantColumnVector} import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} -import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.NextIterator /** @@ -81,6 +81,7 @@ class FileScanRDD( @transient val filePartitions: Seq[FilePartition], val readSchema: StructType, val metadataColumns: Seq[AttributeReference] = Seq.empty, + metadataExtractors: Map[String, PartitionedFile => Any] = Map.empty, options: FileSourceOptions = new FileSourceOptions(CaseInsensitiveMap(Map.empty))) extends RDD[InternalRow](sparkSession.sparkContext, Nil) { @@ -149,65 +150,28 @@ class FileScanRDD( */ private def updateMetadataRow(): Unit = if (metadataColumns.nonEmpty && currentFile != null) { - updateMetadataInternalRow(metadataRow, metadataColumns.map(_.name), - currentFile.toPath, currentFile.fileSize, currentFile.start, currentFile.length, - currentFile.modificationTime, currentFile.otherConstantMetadataColumnValues) + updateMetadataInternalRow( + metadataRow, metadataColumns.map(_.name), currentFile, metadataExtractors) } /** * Create an array of constant column vectors containing all required metadata columns */ private def createMetadataColumnVector(c: ColumnarBatch): Array[ColumnVector] = { - val path = currentFile.toPath - lazy val tmpRow = new GenericInternalRow(1) // for populating custom metadata fields - metadataColumns.map(a => (a.name, a.dataType)).map { - case (FILE_PATH, dataType) => - require(dataType == StringType) - val columnVector = new ConstantColumnVector(c.numRows(), StringType) - columnVector.setUtf8String(UTF8String.fromString(path.toString)) - columnVector - case (FILE_NAME, dataType) => - require(dataType == StringType) - val columnVector = new ConstantColumnVector(c.numRows(), StringType) - columnVector.setUtf8String(UTF8String.fromString(path.getName)) - columnVector - case (FILE_SIZE, dataType) => - require(dataType == LongType) - val columnVector = new ConstantColumnVector(c.numRows(), LongType) - columnVector.setLong(currentFile.fileSize) - columnVector - case (FILE_BLOCK_START, dataType) => - require(dataType == LongType) - val columnVector = new ConstantColumnVector(c.numRows(), LongType) - columnVector.setLong(currentFile.start) - columnVector - case (FILE_BLOCK_LENGTH, dataType) => - require(dataType == LongType) - val columnVector = new ConstantColumnVector(c.numRows(), LongType) - columnVector.setLong(currentFile.length) - columnVector - case (FILE_MODIFICATION_TIME, dataType) => - require(dataType == TimestampType) - val columnVector = new ConstantColumnVector(c.numRows(), LongType) - // the modificationTime from the file is in millisecond, - // while internally, the TimestampType is stored in microsecond - columnVector.setLong(currentFile.modificationTime * 1000L) - columnVector - case (other, dataType: DataType) => - // Other metadata columns use the file-provided value, if one exists. Automatically - // convert raw values (including nulls) to literals as a courtesy, then populate the - // column by passing the resulting value through the `tmpRow` we allocated above. - Literal(currentFile.otherConstantMetadataColumnValues.get(other).orNull) match { - case Literal(null, _) => - tmpRow.setNullAt(0) - case literal => - require(dataType == literal.dataType) - tmpRow.update(0, literal.value) - } + val tmpRow = new GenericInternalRow(1) + metadataColumns.map { attr => + // Populate each metadata column by passing the resulting value through `tmpRow`. + getFileConstantMetadataColumnValue(attr.name, currentFile, metadataExtractors) match { + case Literal(null, _) => + tmpRow.setNullAt(0) + case literal => + require(PhysicalDataType(attr.dataType) == PhysicalDataType(literal.dataType)) + tmpRow.update(0, literal.value) + } - val columnVector = new ConstantColumnVector(c.numRows(), dataType) - ColumnVectorUtils.populate(columnVector, tmpRow, 0) - columnVector + val columnVector = new ConstantColumnVector(c.numRows(), attr.dataType) + ColumnVectorUtils.populate(columnVector, tmpRow, 0) + columnVector }.toArray } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 2aa5d30946c..b25162aad9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -102,19 +102,19 @@ abstract class PartitioningAwareFileIndex( }) } - def matchFileMetadataPredicate(f: FileStatus): Boolean = { + def matchFileMetadataPredicate(partitionValues: InternalRow, f: FileStatus): Boolean = { // use option.forall, so if there is no filter no metadata struct, return true boundedFilterMetadataStructOpt.forall { boundedFilter => val row = - createMetadataInternalRow(requiredMetadataColumnNames.toSeq, - f.getPath, f.getLen, f.getModificationTime) + createMetadataInternalRow(partitionValues, requiredMetadataColumnNames.toSeq, + SparkPath.fromFileStatus(f), f.getLen, f.getModificationTime) boundedFilter.eval(row) } } val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { PartitionDirectory(InternalRow.empty, allFiles().toArray - .filter(f => isNonEmptyFile(f) && matchFileMetadataPredicate(f))) :: Nil + .filter(f => isNonEmptyFile(f) && matchFileMetadataPredicate(InternalRow.empty, f))) :: Nil } else { if (recursiveFileLookup) { throw new IllegalArgumentException( @@ -126,7 +126,7 @@ abstract class PartitioningAwareFileIndex( case Some(existingDir) => // Directory has children files in it, return them existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f) && - matchFileMetadataPredicate(f)) + matchFileMetadataPredicate(values, f)) case None => // Directory does not exist, or has no children files diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala index 210535ba62b..05872d41131 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala @@ -134,6 +134,39 @@ class FileSourceCustomMetadataStructSuite extends QueryTest with SharedSparkSess } } + test("[SPARK-43226] extra constant metadata fields with extractors") { + withTempData("parquet", FILE_SCHEMA) { (_, f0, f1) => + val format = new TestFileFormat(extraConstantMetadataFields) { + val extractPartitionNumber = { pf: PartitionedFile => + pf.toPath.toString.split("/").collectFirst { + case "f0" => 9990 + case "f1" => 9991 + }.get + } + val extractPartitionName = { pf: PartitionedFile => + pf.toPath.toString.split("/").collectFirst { + case "f0" => "f0f" + case "f1" => "f1f" + }.get + } + override def fileConstantMetadataExtractors: Map[String, PartitionedFile => Any] = { + super.fileConstantMetadataExtractors ++ Map( + "foo" -> extractPartitionNumber, "bar" -> extractPartitionName) + } + } + val files = Seq(FileStatusWithMetadata(f0), FileStatusWithMetadata(f1)) + val df = createDF(format, files) + + checkAnswer( + df.select("fileNum", "x", "_metadata.row_index", "_metadata.foo", "_metadata.bar"), + Seq( + Row(0, 101L, 0L, 9990, "f0f"), + Row(0, 102L, 1L, 9990, "f0f"), + Row(1, 111L, 0L, 9991, "f1f"), + Row(1, 112L, 1L, 9991, "f1f"))) + } + } + test("filters and projections on extra constant metadata fields") { withTempData("parquet", FILE_SCHEMA) { (_, f0, f1) => val format = new TestFileFormat(extraConstantMetadataFields) @@ -302,7 +335,7 @@ class FileSourceCustomMetadataStructSuite extends QueryTest with SharedSparkSess } } - test("cannot override base metadata fields") { + test("generated columns and extractors take precedence over metadata map values") { withTempData("parquet", FILE_SCHEMA) { (_, f0, f1) => import FileFormat.{FILE_NAME, FILE_SIZE} import ParquetFileFormat.ROW_INDEX --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org