[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-12-21 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r773672644



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
##
@@ -171,6 +171,29 @@ trait FileFormat {
   def supportFieldName(name: String): Boolean = true
 }
 
+object FileFormat {
+
+  val FILE_PATH = "file_path"
+
+  val FILE_NAME = "file_name"

Review comment:
   Good point. I think we should, as `InputFileName` is really fragile and 
can't be used with join for example.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-12-21 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r773672049



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -103,6 +115,101 @@ class FileScanRDD(
 context.killTaskIfInterrupted()
 (currentIterator != null && currentIterator.hasNext) || nextIterator()
   }
+
+  ///
+  // FILE METADATA METHODS //
+  ///
+
+  // a metadata internal row, will only be updated when the current file 
is changed
+  val metadataRow: InternalRow = new 
GenericInternalRow(metadataColumns.length)
+
+  // an unsafe projection to convert a joined internal row to an unsafe row
+  private lazy val projection = {
+val joinedExpressions =
+  readDataSchema.fields.map(_.dataType) ++ 
metadataColumns.map(_.dataType)
+UnsafeProjection.create(joinedExpressions)
+  }
+
+  /**
+   * For each partitioned file, metadata columns for each record in the 
file are exactly same.
+   * Only update metadata row when `currentFile` is changed.
+   */
+  private def updateMetadataRow(): Unit = {
+if (metadataColumns.nonEmpty && currentFile != null) {
+  val path = new Path(currentFile.filePath)
+  metadataColumns.zipWithIndex.foreach { case (attr, i) =>
+attr.name match {
+  case FILE_PATH => metadataRow.update(i, 
UTF8String.fromString(path.toString))
+  case FILE_NAME => metadataRow.update(i, 
UTF8String.fromString(path.getName))
+  case FILE_SIZE => metadataRow.update(i, currentFile.fileSize)
+  case FILE_MODIFICATION_TIME =>
+// the modificationTime from the file is in millisecond,
+// while internally, the TimestampType is stored in microsecond
+metadataRow.update(i, currentFile.modificationTime * 1000L)
+}
+  }
+}
+  }
+
+  /**
+   * Create a writable column vector containing all required metadata 
columns
+   */
+  private def createMetadataColumnVector(c: ColumnarBatch): 
Array[WritableColumnVector] = {
+val path = new Path(currentFile.filePath)
+val filePathBytes = path.toString.getBytes
+val fileNameBytes = path.getName.getBytes
+var rowId = 0
+metadataColumns.map(_.name).map {

Review comment:
   per-batch should be fine to have some small overhead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-12-21 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r773030036



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -45,7 +45,7 @@ import org.apache.spark.util.NextIterator
  * @param filePath URI of the file to read
  * @param start the beginning offset (in bytes) of the block.
  * @param length number of bytes to read.
- * @param modificationTime The modification time of the input file, in 
milliseconds.
+ * @param modificationTime The modification time of the input file, in 
microseconds.

Review comment:
   nit: I think we can still put `milliseconds` here, as it matches 
`file.getModificationTime`. We can ` * 1000` in FileScanRDD when we set the 
value to the internal row.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-12-21 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r772971151



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
##
@@ -171,6 +171,29 @@ trait FileFormat {
   def supportFieldName(name: String): Boolean = true
 }
 
+object FileFormat {
+
+  val FILE_PATH = "file_path"
+
+  val FILE_NAME = "file_name"
+
+  val FILE_SIZE = "file_size"
+
+  val FILE_MODIFICATION_TIME = "file_modification_time"
+
+  val METADATA_NAME = "_metadata"
+
+  // supported metadata struct fields for hadoop fs relation
+  val METADATA_STRUCT: StructType = new StructType()
+.add(StructField(FILE_PATH, StringType))
+.add(StructField(FILE_NAME, StringType))
+.add(StructField(FILE_SIZE, LongType))
+.add(StructField(FILE_MODIFICATION_TIME, LongType))

Review comment:
   I think this one is an easy decision. Timestamp type is much better as 
people can do `WHERE _metadata.modificationTime < TIMESTAMP'2020-12-12 
12:12:12'` or other datetime operations. And `df.show` can also display the 
value in a more user-readable format.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-12-21 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r772943950



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
##
@@ -171,6 +171,29 @@ trait FileFormat {
   def supportFieldName(name: String): Boolean = true
 }
 
+object FileFormat {
+
+  val FILE_PATH = "file_path"
+
+  val FILE_NAME = "file_name"
+
+  val FILE_SIZE = "file_size"
+
+  val FILE_MODIFICATION_TIME = "file_modification_time"
+
+  val METADATA_NAME = "_metadata"
+
+  // supported metadata struct fields for hadoop fs relation
+  val METADATA_STRUCT: StructType = new StructType()
+.add(StructField(FILE_PATH, StringType))
+.add(StructField(FILE_NAME, StringType))
+.add(StructField(FILE_SIZE, LongType))
+.add(StructField(FILE_MODIFICATION_TIME, LongType))

Review comment:
   should this be TimestampType?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-12-20 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r772888602



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -103,6 +116,108 @@ class FileScanRDD(
 context.killTaskIfInterrupted()
 (currentIterator != null && currentIterator.hasNext) || nextIterator()
   }
+
+  ///
+  // FILE METADATA METHODS //
+  ///
+
+  // metadata columns unsafe row, will only be updated when the current 
file is changed
+  @volatile private var metadataColumnsUnsafeRow: UnsafeRow = _
+  // metadata columns internal row, will only be updated when the current 
file is changed
+  @volatile private var metadataColumnsInternalRow: InternalRow = _
+  // an unsafe joiner to join an unsafe row with the metadata unsafe row
+  lazy private val metadataUnsafeRowJoiner =

Review comment:
   nit: `private lazy val`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-12-20 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r772195408



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -57,11 +66,15 @@ case class PartitionedFile(
 class FileScanRDD(
 @transient private val sparkSession: SparkSession,
 readFunction: (PartitionedFile) => Iterator[InternalRow],
-@transient val filePartitions: Seq[FilePartition])
+@transient val filePartitions: Seq[FilePartition],
+val requiredSchema: StructType = StructType(Seq.empty),

Review comment:
   `readDataSchema` sounds good. Can we always pass this parameter?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-12-20 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r772881079



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -103,6 +116,108 @@ class FileScanRDD(
 context.killTaskIfInterrupted()
 (currentIterator != null && currentIterator.hasNext) || nextIterator()
   }
+
+  ///
+  // FILE METADATA METHODS //
+  ///
+
+  // metadata columns unsafe row, will only be updated when the current 
file is changed
+  @volatile private var metadataColumnsUnsafeRow: UnsafeRow = _
+  // metadata columns internal row, will only be updated when the current 
file is changed
+  @volatile private var metadataColumnsInternalRow: InternalRow = _
+  // an unsafe joiner to join an unsafe row with the metadata unsafe row
+  lazy private val metadataUnsafeRowJoiner =
+GenerateUnsafeRowJoiner.create(requiredSchema, 
metadataColumns.toStructType)
+  // metadata columns unsafe row converter
+  lazy private val unsafeRowConverter = {
+val metadataColumnsDataTypes = metadataColumns.map(_.dataType).toArray
+val converter = UnsafeProjection.create(metadataColumnsDataTypes)
+(row: InternalRow) => converter(row)
+  }
+
+  /**
+   * For each partitioned file, metadata columns for each record in the 
file are exactly same.
+   * Only update metadata columns when `currentFile` is changed.
+   */
+  private def updateMetadataColumns(): Unit = {
+if (metadataColumns.nonEmpty) {
+  if (currentFile == null) {
+metadataColumnsUnsafeRow = null
+metadataColumnsInternalRow = null
+  } else {
+// construct an internal row
+val path = new Path(currentFile.filePath)
+metadataColumnsInternalRow = InternalRow.fromSeq(
+  metadataColumns.map(_.name).map {
+case FILE_PATH => UTF8String.fromString(path.toString)
+case FILE_NAME => UTF8String.fromString(path.getName)
+case FILE_SIZE => currentFile.fileSize
+case FILE_MODIFICATION_TIME => currentFile.modificationTime
+  }
+)
+// convert the internal row to an unsafe row
+metadataColumnsUnsafeRow = 
unsafeRowConverter(metadataColumnsInternalRow)
+  }
+}
+  }
+
+  /**
+   * Create a writable column vector containing all required metadata 
columns
+   */
+  private def createMetadataColumnVector(c: ColumnarBatch): 
Array[WritableColumnVector] = {
+val path = new Path(currentFile.filePath)
+val filePathBytes = path.toString.getBytes
+val fileNameBytes = path.getName.getBytes
+var rowId = 0
+metadataColumns.map(_.name).map {
+  case FILE_PATH =>
+val columnVector = new OnHeapColumnVector(c.numRows(), StringType)
+rowId = 0
+// use a tight-loop for better performance
+while (rowId < c.numRows()) {
+  columnVector.putByteArray(rowId, filePathBytes)
+  rowId += 1
+}
+columnVector
+  case FILE_NAME =>
+val columnVector = new OnHeapColumnVector(c.numRows(), StringType)
+rowId = 0
+// use a tight-loop for better performance
+while (rowId < c.numRows()) {
+  columnVector.putByteArray(rowId, fileNameBytes)
+  rowId += 1
+}
+columnVector
+  case FILE_SIZE =>
+val columnVector = new OnHeapColumnVector(c.numRows(), LongType)
+columnVector.putLongs(0, c.numRows(), currentFile.fileSize)
+columnVector
+  case FILE_MODIFICATION_TIME =>
+val columnVector = new OnHeapColumnVector(c.numRows(), LongType)
+columnVector.putLongs(0, c.numRows(), currentFile.modificationTime)
+columnVector
+}.toArray
+  }
+
+  /**
+   * Add metadata columns at the end of nextElement if needed.
+   * For different row implementations, use different methods to update 
and append.
+   */
+  private def addMetadataColumnsIfNeeded(nextElement: Object): Object = {
+if (metadataColumns.nonEmpty) {
+  nextElement match {
+case c: ColumnarBatch =>
+  new ColumnarBatch(
+Array.tabulate(c.numCols())(c.column) ++ 
createMetadataColumnVector(c),

Review comment:
   ah sorry, I was confused by java code vs scala code. nvm, the code here 
is correct.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, 

[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-12-20 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r772862245



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -103,6 +116,108 @@ class FileScanRDD(
 context.killTaskIfInterrupted()
 (currentIterator != null && currentIterator.hasNext) || nextIterator()
   }
+
+  ///
+  // FILE METADATA METHODS //
+  ///
+
+  // metadata columns unsafe row, will only be updated when the current 
file is changed
+  @volatile private var metadataColumnsUnsafeRow: UnsafeRow = _
+  // metadata columns internal row, will only be updated when the current 
file is changed
+  @volatile private var metadataColumnsInternalRow: InternalRow = _
+  // an unsafe joiner to join an unsafe row with the metadata unsafe row
+  lazy private val metadataUnsafeRowJoiner =
+GenerateUnsafeRowJoiner.create(requiredSchema, 
metadataColumns.toStructType)
+  // metadata columns unsafe row converter
+  lazy private val unsafeRowConverter = {
+val metadataColumnsDataTypes = metadataColumns.map(_.dataType).toArray
+val converter = UnsafeProjection.create(metadataColumnsDataTypes)
+(row: InternalRow) => converter(row)
+  }
+
+  /**
+   * For each partitioned file, metadata columns for each record in the 
file are exactly same.
+   * Only update metadata columns when `currentFile` is changed.
+   */
+  private def updateMetadataColumns(): Unit = {
+if (metadataColumns.nonEmpty) {
+  if (currentFile == null) {
+metadataColumnsUnsafeRow = null
+metadataColumnsInternalRow = null
+  } else {
+// construct an internal row
+val path = new Path(currentFile.filePath)
+metadataColumnsInternalRow = InternalRow.fromSeq(
+  metadataColumns.map(_.name).map {
+case FILE_PATH => UTF8String.fromString(path.toString)
+case FILE_NAME => UTF8String.fromString(path.getName)
+case FILE_SIZE => currentFile.fileSize
+case FILE_MODIFICATION_TIME => currentFile.modificationTime
+  }
+)
+// convert the internal row to an unsafe row
+metadataColumnsUnsafeRow = 
unsafeRowConverter(metadataColumnsInternalRow)
+  }
+}
+  }
+
+  /**
+   * Create a writable column vector containing all required metadata 
columns
+   */
+  private def createMetadataColumnVector(c: ColumnarBatch): 
Array[WritableColumnVector] = {
+val path = new Path(currentFile.filePath)
+val filePathBytes = path.toString.getBytes
+val fileNameBytes = path.getName.getBytes
+var rowId = 0
+metadataColumns.map(_.name).map {
+  case FILE_PATH =>
+val columnVector = new OnHeapColumnVector(c.numRows(), StringType)
+rowId = 0
+// use a tight-loop for better performance
+while (rowId < c.numRows()) {
+  columnVector.putByteArray(rowId, filePathBytes)
+  rowId += 1
+}
+columnVector
+  case FILE_NAME =>
+val columnVector = new OnHeapColumnVector(c.numRows(), StringType)
+rowId = 0
+// use a tight-loop for better performance
+while (rowId < c.numRows()) {
+  columnVector.putByteArray(rowId, fileNameBytes)
+  rowId += 1
+}
+columnVector
+  case FILE_SIZE =>
+val columnVector = new OnHeapColumnVector(c.numRows(), LongType)
+columnVector.putLongs(0, c.numRows(), currentFile.fileSize)
+columnVector
+  case FILE_MODIFICATION_TIME =>
+val columnVector = new OnHeapColumnVector(c.numRows(), LongType)
+columnVector.putLongs(0, c.numRows(), currentFile.modificationTime)
+columnVector
+}.toArray
+  }
+
+  /**
+   * Add metadata columns at the end of nextElement if needed.
+   * For different row implementations, use different methods to update 
and append.
+   */
+  private def addMetadataColumnsIfNeeded(nextElement: Object): Object = {
+if (metadataColumns.nonEmpty) {
+  nextElement match {
+case c: ColumnarBatch =>
+  new ColumnarBatch(
+Array.tabulate(c.numCols())(c.column) ++ 
createMetadataColumnVector(c),

Review comment:
   hmm, is `Array(c.column(0), c.column(1), ... c.column(numCols - 1))` the 
same as `c.column`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this 

[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-12-20 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r772861339



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala
##
@@ -164,7 +164,12 @@ object SchemaPruning extends Rule[LogicalPlan] {
   outputRelation: LogicalRelation,
   prunedBaseRelation: HadoopFsRelation) = {
 val prunedOutput = getPrunedOutput(outputRelation.output, 
prunedBaseRelation.schema)
-outputRelation.copy(relation = prunedBaseRelation, output = prunedOutput)
+// also add the metadata output if any
+// TODO: should be able to prune the metadata schema

Review comment:
   makes sense. Let's update this rule or add a new rule 
`MetadataSchemaPruning` later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-12-20 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r772861339



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala
##
@@ -164,7 +164,12 @@ object SchemaPruning extends Rule[LogicalPlan] {
   outputRelation: LogicalRelation,
   prunedBaseRelation: HadoopFsRelation) = {
 val prunedOutput = getPrunedOutput(outputRelation.output, 
prunedBaseRelation.schema)
-outputRelation.copy(relation = prunedBaseRelation, output = prunedOutput)
+// also add the metadata output if any
+// TODO: should be able to prune the metadata schema

Review comment:
   makes sense. Let's add a new rule `MetadataSchemaPruning` later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-12-20 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r772214752



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
##
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.text.SimpleDateFormat
+
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, 
Row}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
+
+class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
+
+  val data0: Seq[Row] = Seq(Row("jack", 24, Row(12345L, "uom")))
+
+  val data1: Seq[Row] = Seq(Row("lily", 31, Row(54321L, "ucb")))
+
+  val schema: StructType = new StructType()
+.add(StructField("name", StringType))
+.add(StructField("age", IntegerType))
+.add(StructField("info", new StructType()
+  .add(StructField("id", LongType))
+  .add(StructField("university", StringType
+
+  val schemaWithNameConflicts: StructType = new StructType()
+.add(StructField("name", StringType))
+.add(StructField("age", IntegerType))
+.add(StructField("_METADATA", new StructType()
+  .add(StructField("id", LongType))
+  .add(StructField("university", StringType
+
+  private val METADATA_FILE_PATH = "_metadata.file_path"
+
+  private val METADATA_FILE_NAME = "_metadata.file_name"
+
+  private val METADATA_FILE_SIZE = "_metadata.file_size"
+
+  private val METADATA_FILE_MODIFICATION_TIME = 
"_metadata.file_modification_time"
+
+  /**
+   * This test wrapper will test for both row-based and column-based file 
formats:
+   * (json and parquet) with nested schema:
+   * 1. create df0 and df1 and save them as testFileFormat under /data/f0 and 
/data/f1
+   * 2. read the path /data, return the df for further testing
+   * 3. create actual metadata maps for both files under /data/f0 and /data/f1 
for further testing
+   *
+   * The final df will have data:
+   * jack | 24 | {12345, uom}
+   * lily | 31 | {54321, ucb}
+   *
+   * The schema of the df will be the `fileSchema` provided to this method
+   *
+   * This test wrapper will provide a `df` and actual metadata map `f0`, `f1`
+   */
+  private def metadataColumnsTest(
+  testName: String, fileSchema: StructType)
+(f: (DataFrame, Map[String, Any], Map[String, Any]) => Unit): Unit = {
+Seq("json", "parquet").foreach { testFileFormat =>
+  test(s"metadata struct ($testFileFormat): " + testName) {
+withTempDir { dir =>
+  import scala.collection.JavaConverters._
+
+  // 1. create df0 and df1 and save under /data/f0 and /data/f1
+  val df0 = spark.createDataFrame(data0.asJava, fileSchema)
+  val f0 = new File(dir, "data/f0").getCanonicalPath
+  df0.coalesce(1).write.format(testFileFormat).save(f0)
+
+  val df1 = spark.createDataFrame(data1.asJava, fileSchema)
+  val f1 = new File(dir, "data/f1").getCanonicalPath
+  df1.coalesce(1).write.format(testFileFormat).save(f1)
+
+  // 2. read both f0 and f1
+  val df = spark.read.format(testFileFormat).schema(fileSchema)
+.load(new File(dir, "data").getCanonicalPath + "/*")
+
+  val realF0 = new File(dir, "data/f0").listFiles()
+.filter(_.getName.endsWith(s".$testFileFormat")).head
+
+  val realF1 = new File(dir, "data/f1").listFiles()
+.filter(_.getName.endsWith(s".$testFileFormat")).head
+
+  // 3. create f0 and f1 metadata data
+  val f0Metadata = Map(
+METADATA_FILE_PATH -> realF0.toURI.toString,
+METADATA_FILE_NAME -> realF0.getName,
+METADATA_FILE_SIZE -> realF0.length(),
+METADATA_FILE_MODIFICATION_TIME -> realF0.lastModified()
+  )
+  val f1Metadata = Map(
+METADATA_FILE_PATH -> realF1.toURI.toString,
+METADATA_FILE_NAME -> realF1.getName,
+METADATA_FILE_SIZE -> realF1.length(),

[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-12-20 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r772209636



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -103,6 +116,108 @@ class FileScanRDD(
 context.killTaskIfInterrupted()
 (currentIterator != null && currentIterator.hasNext) || nextIterator()
   }
+
+  ///
+  // FILE METADATA METHODS //
+  ///
+
+  // metadata columns unsafe row, will only be updated when the current 
file is changed
+  @volatile private var metadataColumnsUnsafeRow: UnsafeRow = _
+  // metadata columns internal row, will only be updated when the current 
file is changed
+  @volatile private var metadataColumnsInternalRow: InternalRow = _
+  // an unsafe joiner to join an unsafe row with the metadata unsafe row
+  lazy private val metadataUnsafeRowJoiner =
+GenerateUnsafeRowJoiner.create(requiredSchema, 
metadataColumns.toStructType)
+  // metadata columns unsafe row converter
+  lazy private val unsafeRowConverter = {
+val metadataColumnsDataTypes = metadataColumns.map(_.dataType).toArray
+val converter = UnsafeProjection.create(metadataColumnsDataTypes)
+(row: InternalRow) => converter(row)
+  }
+
+  /**
+   * For each partitioned file, metadata columns for each record in the 
file are exactly same.
+   * Only update metadata columns when `currentFile` is changed.
+   */
+  private def updateMetadataColumns(): Unit = {
+if (metadataColumns.nonEmpty) {
+  if (currentFile == null) {
+metadataColumnsUnsafeRow = null
+metadataColumnsInternalRow = null
+  } else {
+// construct an internal row
+val path = new Path(currentFile.filePath)

Review comment:
   ```suggestion
   val path = new Path(currentFile.filePath).toString
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-12-20 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r772208355



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -103,6 +116,108 @@ class FileScanRDD(
 context.killTaskIfInterrupted()
 (currentIterator != null && currentIterator.hasNext) || nextIterator()
   }
+
+  ///
+  // FILE METADATA METHODS //
+  ///
+
+  // metadata columns unsafe row, will only be updated when the current 
file is changed
+  @volatile private var metadataColumnsUnsafeRow: UnsafeRow = _
+  // metadata columns internal row, will only be updated when the current 
file is changed
+  @volatile private var metadataColumnsInternalRow: InternalRow = _
+  // an unsafe joiner to join an unsafe row with the metadata unsafe row
+  lazy private val metadataUnsafeRowJoiner =
+GenerateUnsafeRowJoiner.create(requiredSchema, 
metadataColumns.toStructType)
+  // metadata columns unsafe row converter
+  lazy private val unsafeRowConverter = {
+val metadataColumnsDataTypes = metadataColumns.map(_.dataType).toArray
+val converter = UnsafeProjection.create(metadataColumnsDataTypes)
+(row: InternalRow) => converter(row)
+  }
+
+  /**
+   * For each partitioned file, metadata columns for each record in the 
file are exactly same.
+   * Only update metadata columns when `currentFile` is changed.
+   */
+  private def updateMetadataColumns(): Unit = {
+if (metadataColumns.nonEmpty) {
+  if (currentFile == null) {
+metadataColumnsUnsafeRow = null
+metadataColumnsInternalRow = null
+  } else {
+// construct an internal row
+val path = new Path(currentFile.filePath)
+metadataColumnsInternalRow = InternalRow.fromSeq(
+  metadataColumns.map(_.name).map {
+case FILE_PATH => UTF8String.fromString(path.toString)
+case FILE_NAME => UTF8String.fromString(path.getName)
+case FILE_SIZE => currentFile.fileSize
+case FILE_MODIFICATION_TIME => currentFile.modificationTime
+  }
+)
+// convert the internal row to an unsafe row
+metadataColumnsUnsafeRow = 
unsafeRowConverter(metadataColumnsInternalRow)
+  }
+}
+  }
+
+  /**
+   * Create a writable column vector containing all required metadata 
columns
+   */
+  private def createMetadataColumnVector(c: ColumnarBatch): 
Array[WritableColumnVector] = {
+val path = new Path(currentFile.filePath)
+val filePathBytes = path.toString.getBytes
+val fileNameBytes = path.getName.getBytes
+var rowId = 0
+metadataColumns.map(_.name).map {
+  case FILE_PATH =>
+val columnVector = new OnHeapColumnVector(c.numRows(), StringType)
+rowId = 0
+// use a tight-loop for better performance
+while (rowId < c.numRows()) {
+  columnVector.putByteArray(rowId, filePathBytes)
+  rowId += 1
+}
+columnVector
+  case FILE_NAME =>
+val columnVector = new OnHeapColumnVector(c.numRows(), StringType)
+rowId = 0
+// use a tight-loop for better performance
+while (rowId < c.numRows()) {
+  columnVector.putByteArray(rowId, fileNameBytes)
+  rowId += 1
+}
+columnVector
+  case FILE_SIZE =>
+val columnVector = new OnHeapColumnVector(c.numRows(), LongType)
+columnVector.putLongs(0, c.numRows(), currentFile.fileSize)
+columnVector
+  case FILE_MODIFICATION_TIME =>
+val columnVector = new OnHeapColumnVector(c.numRows(), LongType)
+columnVector.putLongs(0, c.numRows(), currentFile.modificationTime)
+columnVector
+}.toArray
+  }
+
+  /**
+   * Add metadata columns at the end of nextElement if needed.
+   * For different row implementations, use different methods to update 
and append.
+   */
+  private def addMetadataColumnsIfNeeded(nextElement: Object): Object = {
+if (metadataColumns.nonEmpty) {
+  nextElement match {
+case c: ColumnarBatch =>
+  new ColumnarBatch(
+Array.tabulate(c.numCols())(c.column) ++ 
createMetadataColumnVector(c),

Review comment:
   ```suggestion
   c.column ++ createMetadataColumnVector(c),
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, 

[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-12-20 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r772205588



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala
##
@@ -164,7 +164,12 @@ object SchemaPruning extends Rule[LogicalPlan] {
   outputRelation: LogicalRelation,
   prunedBaseRelation: HadoopFsRelation) = {
 val prunedOutput = getPrunedOutput(outputRelation.output, 
prunedBaseRelation.schema)
-outputRelation.copy(relation = prunedBaseRelation, output = prunedOutput)
+// also add the metadata output if any
+// TODO: should be able to prune the metadata schema

Review comment:
   hmm, seems like column pruning is automatically supported if we remove 
the code change here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-12-20 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r772199075



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
##
@@ -225,8 +238,18 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
   dataFilters,
   table.map(_.identifier))
 
+  // extra Project node: wrap flat metadata columns to a metadata struct
+  val withMetadataProjections = metadataStructOpt.map { metadataStruct =>
+val metadataAlias =
+  Alias(CreateStruct(metadataColumns), METADATA_NAME)(exprId = 
metadataStruct.exprId)
+execution.ProjectExec(
+  scan.output.filterNot(metadataColumns.contains) :+ metadataAlias, 
scan)

Review comment:
   nit: `scan.output.dropRight(metadataColumns.length)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-12-20 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r772195408



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -57,11 +66,15 @@ case class PartitionedFile(
 class FileScanRDD(
 @transient private val sparkSession: SparkSession,
 readFunction: (PartitionedFile) => Iterator[InternalRow],
-@transient val filePartitions: Seq[FilePartition])
+@transient val filePartitions: Seq[FilePartition],
+val requiredSchema: StructType = StructType(Seq.empty),

Review comment:
   `readSchema` sounds good. Can we always pass this parameter?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-12-20 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r772193740



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##
@@ -355,7 +362,15 @@ case class FileSourceScanExec(
   @transient
   private lazy val pushedDownFilters = {
 val supportNestedPredicatePushdown = 
DataSourceUtils.supportNestedPredicatePushdown(relation)
-dataFilters.flatMap(DataSourceStrategy.translateFilter(_, 
supportNestedPredicatePushdown))
+// TODO: should be able to push filters containing metadata columns down 
to skip files
+dataFilters
+  .filterNot(
+_.references.exists {
+  case MetadataAttribute(_) => true
+  case _ => false
+}
+  )
+  .flatMap(DataSourceStrategy.translateFilter(_, 
supportNestedPredicatePushdown))

Review comment:
   ```suggestion
   dataFilters.filterNot(_.references.exists {
 case MetadataAttribute(_) => true
 case _ => false
   }).flatMap(DataSourceStrategy.translateFilter(_, 
supportNestedPredicatePushdown))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-23 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r755188315



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataColumnsSuite.scala
##
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.nio.file.Files
+import java.text.SimpleDateFormat
+
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, 
Row}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
+
+class FileMetadataColumnsSuite extends QueryTest with SharedSparkSession {
+
+  val data0: String =
+"""
+  |jack,24,12345,uom
+  |""".stripMargin
+
+  val data1: String =
+"""
+  |lily,31,,ucb
+  |""".stripMargin
+
+  val schema: StructType = new StructType()
+.add(StructField("name", StringType))
+.add(StructField("age", IntegerType))
+.add(StructField("id", LongType))
+.add(StructField("university", StringType))
+
+  val schemaWithNameConflicts: StructType = new StructType()
+.add(StructField("name", StringType))
+.add(StructField("age", IntegerType))
+.add(StructField("_metadata.file_size", LongType))
+.add(StructField("_metadata.FILE_NAME", StringType))
+
+  private val METADATA_FILE_PATH = "_metadata.file_path"
+
+  private val METADATA_FILE_NAME = "_metadata.file_name"
+
+  private val METADATA_FILE_SIZE = "_metadata.file_size"
+
+  private val METADATA_FILE_MODIFICATION_TIME = 
"_metadata.file_modification_time"
+
+  /**
+   * Create a CSV file named `fileName` with `data` under `dir` directory.
+   */
+  private def createCSVFile(data: String, dir: File, fileName: String): String 
= {
+val dataFile = new File(dir, s"/$fileName")
+dataFile.getParentFile.mkdirs()
+val bytes = data.getBytes()
+Files.write(dataFile.toPath, bytes)
+dataFile.getCanonicalPath
+  }
+
+  /**
+   * This test wrapper will test for both row-based and column-based file 
formats (csv and parquet)
+   * 1. read data0 and data1 and write them as testFileFormat: f0 and f1
+   * 2. read both f0 and f1, return the df to the downstream for further 
testing
+   * 3. construct actual metadata map for both f0 and f1 to the downstream for 
further testing
+   *
+   * The final df will have data:
+   * jack | 24 | 12345 | uom
+   * lily | 31 | null | ucb
+   *
+   * The schema of the df will be the `fileSchema` provided to this method
+   *
+   * This test wrapper will provide a `df` and actual metadata map `f0`, `f1`
+   */
+  private def metadataColumnsTest(
+  testName: String, fileSchema: StructType)
+(f: (DataFrame, Map[String, Any], Map[String, Any]) => Unit): Unit = {
+Seq("csv", "parquet").foreach { testFileFormat =>
+  test(s"metadata columns ($testFileFormat): " + testName) {
+withTempDir { dir =>
+  // read data0 as CSV and write f0 as testFileFormat
+  val df0 = spark.read.schema(fileSchema).csv(
+createCSVFile(data0, dir, "temp/0")
+  )
+  val f0Path = new File(dir, "data/f0").getCanonicalPath
+  df0.coalesce(1).write.format(testFileFormat).save(f0Path)
+
+  // read data1 as CSV and write f1 as testFileFormat
+  val df1 = spark.read.schema(fileSchema).csv(
+createCSVFile(data1, dir, "temp/1")
+  )
+  val f1Path = new File(dir, "data/f1").getCanonicalPath
+  df1.coalesce(1).write.format(testFileFormat).save(f1Path)
+
+  // read both f0 and f1
+  val df = spark.read.format(testFileFormat).schema(fileSchema)
+.load(new File(dir, "data").getCanonicalPath + "/*")
+
+  val realF0 = new File(dir, "data/f0").listFiles()
+.filter(_.getName.endsWith(s".$testFileFormat")).head
+
+  val realF1 = new File(dir, "data/f1").listFiles()
+.filter(_.getName.endsWith(s".$testFileFormat")).head
+
+  // construct f0 and f1 metadata data
+  val f0Metadata = Map(
+

[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-23 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r755185991



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
##
@@ -212,7 +212,9 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
   val outputSchema = readDataColumns.toStructType
   logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")
 
-  val outputAttributes = readDataColumns ++ partitionColumns
+  // outputAttributes should also include referenced metadata columns at 
the every end
+  val outputAttributes = readDataColumns ++ partitionColumns ++
+plan.references.collect { case MetadataAttribute(attr) => attr }

Review comment:
   shall we use `requiredAttributes` instead of `plan.references`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-23 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r755183339



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -103,6 +116,135 @@ class FileScanRDD(
 context.killTaskIfInterrupted()
 (currentIterator != null && currentIterator.hasNext) || nextIterator()
   }
+
+  ///
+  // FILE METADATA METHODS //
+  ///
+
+  // whether a metadata column exists and it is a `MetadataAttribute`
+  private lazy val hasMetadataAttribute: Boolean = {
+metadataStruct.exists {
+  case MetadataAttribute(_) => true
+  case _ => false
+}
+  }
+
+  // metadata struct unsafe row, will only be updated when the current 
file is changed
+  @volatile private var metadataStructUnsafeRow: UnsafeRow = _
+  // metadata generic row, will only be updated when the current file is 
changed
+  @volatile private var metadataStructGenericRow: Row = _
+  // an unsafe joiner to join an unsafe row with the metadata unsafe row
+  lazy private val unsafeRowJoiner =
+if (hasMetadataAttribute)
+  GenerateUnsafeRowJoiner.create(requiredSchema, 
Seq(metadataStruct.get).toStructType)
+
+  // Create a off/on heap WritableColumnVector
+  private def createColumnVector(numRows: Int, dataType: DataType): 
WritableColumnVector = {
+if (offHeapColumnVectorEnabled) {
+  new OffHeapColumnVector(numRows, dataType)
+} else {
+  new OnHeapColumnVector(numRows, dataType)
+}
+  }
+
+  /**
+   * For each partitioned file, metadata columns for each record in the 
file are exactly same.
+   * Only update metadata columns when `currentFile` is changed.
+   */
+  private def updateMetadataStruct(): Unit = {
+if (hasMetadataAttribute) {
+  val meta = metadataStruct.get
+  if (currentFile == null) {
+metadataStructUnsafeRow = new UnsafeRow(1)
+metadataStructGenericRow = new GenericRow(1)
+  } else {
+// make an generic row
+assert(meta.dataType.isInstanceOf[StructType])
+metadataStructGenericRow = Row.fromSeq(

Review comment:
   And does `metadataStructGenericRow` need to be a class member? Seems can 
be a local variable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-23 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r755181619



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -103,6 +116,135 @@ class FileScanRDD(
 context.killTaskIfInterrupted()
 (currentIterator != null && currentIterator.hasNext) || nextIterator()
   }
+
+  ///
+  // FILE METADATA METHODS //
+  ///
+
+  // whether a metadata column exists and it is a `MetadataAttribute`
+  private lazy val hasMetadataAttribute: Boolean = {
+metadataStruct.exists {
+  case MetadataAttribute(_) => true
+  case _ => false
+}
+  }
+
+  // metadata struct unsafe row, will only be updated when the current 
file is changed
+  @volatile private var metadataStructUnsafeRow: UnsafeRow = _
+  // metadata generic row, will only be updated when the current file is 
changed
+  @volatile private var metadataStructGenericRow: Row = _
+  // an unsafe joiner to join an unsafe row with the metadata unsafe row
+  lazy private val unsafeRowJoiner =
+if (hasMetadataAttribute)
+  GenerateUnsafeRowJoiner.create(requiredSchema, 
Seq(metadataStruct.get).toStructType)
+
+  // Create a off/on heap WritableColumnVector
+  private def createColumnVector(numRows: Int, dataType: DataType): 
WritableColumnVector = {
+if (offHeapColumnVectorEnabled) {
+  new OffHeapColumnVector(numRows, dataType)
+} else {
+  new OnHeapColumnVector(numRows, dataType)
+}
+  }
+
+  /**
+   * For each partitioned file, metadata columns for each record in the 
file are exactly same.
+   * Only update metadata columns when `currentFile` is changed.
+   */
+  private def updateMetadataStruct(): Unit = {
+if (hasMetadataAttribute) {
+  val meta = metadataStruct.get
+  if (currentFile == null) {
+metadataStructUnsafeRow = new UnsafeRow(1)
+metadataStructGenericRow = new GenericRow(1)
+  } else {
+// make an generic row
+assert(meta.dataType.isInstanceOf[StructType])
+metadataStructGenericRow = Row.fromSeq(

Review comment:
   Why do we create Row here and then use `CatalystTypeConverters` later? 
Let's just use `InternalRow.fromSeq` here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-23 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r755180169



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -103,6 +116,135 @@ class FileScanRDD(
 context.killTaskIfInterrupted()
 (currentIterator != null && currentIterator.hasNext) || nextIterator()
   }
+
+  ///
+  // FILE METADATA METHODS //
+  ///
+
+  // whether a metadata column exists and it is a `MetadataAttribute`
+  private lazy val hasMetadataAttribute: Boolean = {
+metadataStruct.exists {
+  case MetadataAttribute(_) => true
+  case _ => false
+}
+  }
+
+  // metadata struct unsafe row, will only be updated when the current 
file is changed
+  @volatile private var metadataStructUnsafeRow: UnsafeRow = _
+  // metadata generic row, will only be updated when the current file is 
changed
+  @volatile private var metadataStructGenericRow: Row = _
+  // an unsafe joiner to join an unsafe row with the metadata unsafe row
+  lazy private val unsafeRowJoiner =
+if (hasMetadataAttribute)
+  GenerateUnsafeRowJoiner.create(requiredSchema, 
Seq(metadataStruct.get).toStructType)
+
+  // Create a off/on heap WritableColumnVector
+  private def createColumnVector(numRows: Int, dataType: DataType): 
WritableColumnVector = {
+if (offHeapColumnVectorEnabled) {
+  new OffHeapColumnVector(numRows, dataType)
+} else {
+  new OnHeapColumnVector(numRows, dataType)
+}
+  }
+
+  /**
+   * For each partitioned file, metadata columns for each record in the 
file are exactly same.
+   * Only update metadata columns when `currentFile` is changed.
+   */
+  private def updateMetadataStruct(): Unit = {
+if (hasMetadataAttribute) {
+  val meta = metadataStruct.get
+  if (currentFile == null) {
+metadataStructUnsafeRow = new UnsafeRow(1)
+metadataStructGenericRow = new GenericRow(1)
+  } else {
+// make an generic row
+assert(meta.dataType.isInstanceOf[StructType])
+metadataStructGenericRow = Row.fromSeq(
+  meta.dataType.asInstanceOf[StructType].names.map {
+case FILE_PATH => UTF8String.fromString(new 
File(currentFile.filePath).toString)
+case FILE_NAME => UTF8String.fromString(
+  currentFile.filePath.split("/").last)
+case FILE_SIZE => currentFile.fileSize
+case FILE_MODIFICATION_TIME => currentFile.modificationTime
+case _ => None // be exhaustive, won't happen
+  }
+)
+
+// convert the generic row to an unsafe row
+val unsafeRowConverter = {
+  val converter = UnsafeProjection.create(
+Array(METADATA_STRUCT))

Review comment:
   seems safer to use `Array(meta.dataType.asInstanceOf[StructType])`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-23 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r755178439



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -103,6 +116,135 @@ class FileScanRDD(
 context.killTaskIfInterrupted()
 (currentIterator != null && currentIterator.hasNext) || nextIterator()
   }
+
+  ///
+  // FILE METADATA METHODS //
+  ///
+
+  // whether a metadata column exists and it is a `MetadataAttribute`
+  private lazy val hasMetadataAttribute: Boolean = {
+metadataStruct.exists {
+  case MetadataAttribute(_) => true
+  case _ => false
+}
+  }
+
+  // metadata struct unsafe row, will only be updated when the current 
file is changed
+  @volatile private var metadataStructUnsafeRow: UnsafeRow = _
+  // metadata generic row, will only be updated when the current file is 
changed
+  @volatile private var metadataStructGenericRow: Row = _
+  // an unsafe joiner to join an unsafe row with the metadata unsafe row
+  lazy private val unsafeRowJoiner =
+if (hasMetadataAttribute)
+  GenerateUnsafeRowJoiner.create(requiredSchema, 
Seq(metadataStruct.get).toStructType)
+
+  // Create a off/on heap WritableColumnVector
+  private def createColumnVector(numRows: Int, dataType: DataType): 
WritableColumnVector = {
+if (offHeapColumnVectorEnabled) {
+  new OffHeapColumnVector(numRows, dataType)
+} else {
+  new OnHeapColumnVector(numRows, dataType)
+}
+  }
+
+  /**
+   * For each partitioned file, metadata columns for each record in the 
file are exactly same.
+   * Only update metadata columns when `currentFile` is changed.
+   */
+  private def updateMetadataStruct(): Unit = {
+if (hasMetadataAttribute) {
+  val meta = metadataStruct.get
+  if (currentFile == null) {
+metadataStructUnsafeRow = new UnsafeRow(1)
+metadataStructGenericRow = new GenericRow(1)

Review comment:
   seem we can just null out these 2 rows. If `currentFile` is null, we 
won't output any more records, and these 2 rows are useless.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-23 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r755176293



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -103,6 +116,135 @@ class FileScanRDD(
 context.killTaskIfInterrupted()
 (currentIterator != null && currentIterator.hasNext) || nextIterator()
   }
+
+  ///
+  // FILE METADATA METHODS //
+  ///
+
+  // whether a metadata column exists and it is a `MetadataAttribute`
+  private lazy val hasMetadataAttribute: Boolean = {
+metadataStruct.exists {
+  case MetadataAttribute(_) => true
+  case _ => false
+}
+  }
+
+  // metadata struct unsafe row, will only be updated when the current 
file is changed
+  @volatile private var metadataStructUnsafeRow: UnsafeRow = _
+  // metadata generic row, will only be updated when the current file is 
changed
+  @volatile private var metadataStructGenericRow: Row = _
+  // an unsafe joiner to join an unsafe row with the metadata unsafe row
+  lazy private val unsafeRowJoiner =
+if (hasMetadataAttribute)
+  GenerateUnsafeRowJoiner.create(requiredSchema, 
Seq(metadataStruct.get).toStructType)
+
+  // Create a off/on heap WritableColumnVector
+  private def createColumnVector(numRows: Int, dataType: DataType): 
WritableColumnVector = {
+if (offHeapColumnVectorEnabled) {
+  new OffHeapColumnVector(numRows, dataType)
+} else {
+  new OnHeapColumnVector(numRows, dataType)
+}
+  }
+
+  /**
+   * For each partitioned file, metadata columns for each record in the 
file are exactly same.
+   * Only update metadata columns when `currentFile` is changed.
+   */
+  private def updateMetadataStruct(): Unit = {
+if (hasMetadataAttribute) {
+  val meta = metadataStruct.get
+  if (currentFile == null) {
+metadataStructUnsafeRow = new UnsafeRow(1)
+metadataStructGenericRow = new GenericRow(1)
+  } else {
+// make an generic row
+assert(meta.dataType.isInstanceOf[StructType])
+metadataStructGenericRow = Row.fromSeq(
+  meta.dataType.asInstanceOf[StructType].names.map {
+case FILE_PATH => UTF8String.fromString(new 
File(currentFile.filePath).toString)
+case FILE_NAME => UTF8String.fromString(
+  currentFile.filePath.split("/").last)

Review comment:
   path separator is not always `/`. Let's use `java.io.File.separator`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-23 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r755175121



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -103,6 +116,135 @@ class FileScanRDD(
 context.killTaskIfInterrupted()
 (currentIterator != null && currentIterator.hasNext) || nextIterator()
   }
+
+  ///
+  // FILE METADATA METHODS //
+  ///
+
+  // whether a metadata column exists and it is a `MetadataAttribute`
+  private lazy val hasMetadataAttribute: Boolean = {
+metadataStruct.exists {
+  case MetadataAttribute(_) => true
+  case _ => false
+}
+  }
+
+  // metadata struct unsafe row, will only be updated when the current 
file is changed
+  @volatile private var metadataStructUnsafeRow: UnsafeRow = _
+  // metadata generic row, will only be updated when the current file is 
changed
+  @volatile private var metadataStructGenericRow: Row = _
+  // an unsafe joiner to join an unsafe row with the metadata unsafe row
+  lazy private val unsafeRowJoiner =
+if (hasMetadataAttribute)
+  GenerateUnsafeRowJoiner.create(requiredSchema, 
Seq(metadataStruct.get).toStructType)
+
+  // Create a off/on heap WritableColumnVector
+  private def createColumnVector(numRows: Int, dataType: DataType): 
WritableColumnVector = {
+if (offHeapColumnVectorEnabled) {
+  new OffHeapColumnVector(numRows, dataType)
+} else {
+  new OnHeapColumnVector(numRows, dataType)
+}
+  }
+
+  /**
+   * For each partitioned file, metadata columns for each record in the 
file are exactly same.
+   * Only update metadata columns when `currentFile` is changed.
+   */
+  private def updateMetadataStruct(): Unit = {
+if (hasMetadataAttribute) {
+  val meta = metadataStruct.get
+  if (currentFile == null) {
+metadataStructUnsafeRow = new UnsafeRow(1)
+metadataStructGenericRow = new GenericRow(1)
+  } else {
+// make an generic row
+assert(meta.dataType.isInstanceOf[StructType])
+metadataStructGenericRow = Row.fromSeq(
+  meta.dataType.asInstanceOf[StructType].names.map {
+case FILE_PATH => UTF8String.fromString(new 
File(currentFile.filePath).toString)

Review comment:
   Why can't we use `filePath` directly? `new File` looks buggy as it won't 
work for `s3://` or other file systems.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-23 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r755170678



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -103,6 +116,135 @@ class FileScanRDD(
 context.killTaskIfInterrupted()
 (currentIterator != null && currentIterator.hasNext) || nextIterator()
   }
+
+  ///
+  // FILE METADATA METHODS //
+  ///
+
+  // whether a metadata column exists and it is a `MetadataAttribute`
+  private lazy val hasMetadataAttribute: Boolean = {

Review comment:
   do we need this `lazy val`? I think this is simply 
`metadataStructCol.isDefined`, as the caller side should always pass in the 
metadata col.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-23 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r755169884



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -57,11 +66,15 @@ case class PartitionedFile(
 class FileScanRDD(
 @transient private val sparkSession: SparkSession,
 readFunction: (PartitionedFile) => Iterator[InternalRow],
-@transient val filePartitions: Seq[FilePartition])
+@transient val filePartitions: Seq[FilePartition],
+val requiredSchema: StructType = StructType(Seq.empty),
+val metadataStruct: Option[AttributeReference] = None)

Review comment:
   nit: `metadataStructCol`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-23 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r755169306



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -57,11 +66,15 @@ case class PartitionedFile(
 class FileScanRDD(
 @transient private val sparkSession: SparkSession,
 readFunction: (PartitionedFile) => Iterator[InternalRow],
-@transient val filePartitions: Seq[FilePartition])
+@transient val filePartitions: Seq[FilePartition],
+val requiredSchema: StructType = StructType(Seq.empty),

Review comment:
   nit: `dataSchema` is probably a better name in this context. We need to 
join the data rows with metadata col w.r.t. this schema.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-23 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r755167684



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
##
@@ -171,6 +171,28 @@ trait FileFormat {
   def supportFieldName(name: String): Boolean = true
 }
 
+object FileFormat {
+
+  val FILE_PATH = "file_path"
+
+  val FILE_NAME = "file_name"
+
+  val FILE_SIZE = "file_size"
+
+  val FILE_MODIFICATION_TIME = "file_modification_time"
+
+  val METADATA_NAME = "_metadata"
+
+  val METADATA_STRUCT: DataType = new StructType()
+.add(StructField(FILE_PATH, StringType))
+.add(StructField(FILE_NAME, StringType))
+.add(StructField(FILE_SIZE, LongType))
+.add(StructField(FILE_MODIFICATION_TIME, LongType))
+
+  // supported metadata columns for hadoop fs relation
+  def FILE_METADATA_COLUMNS: AttributeReference = 
MetadataAttribute(METADATA_NAME, METADATA_STRUCT)

Review comment:
   since it's a `def` now, how about `def createFileMetadataCol`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-23 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r755165986



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##
@@ -355,7 +377,14 @@ case class FileSourceScanExec(
   @transient
   private lazy val pushedDownFilters = {
 val supportNestedPredicatePushdown = 
DataSourceUtils.supportNestedPredicatePushdown(relation)
-dataFilters.flatMap(DataSourceStrategy.translateFilter(_, 
supportNestedPredicatePushdown))
+dataFilters
+  .filterNot(
+_.references.exists {
+  case MetadataAttribute(_) => true

Review comment:
   can we leave a TODO comment here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-23 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r755164613



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##
@@ -212,7 +225,16 @@ case class FileSourceScanExec(
 relation.fileFormat.vectorTypes(
   requiredSchema = requiredSchema,
   partitionSchema = relation.partitionSchema,
-  relation.sparkSession.sessionState.conf)
+  relation.sparkSession.sessionState.conf).map { vectorTypes =>
+val metadataVectorClz =
+  if 
(relation.sparkSession.sessionState.conf.offHeapColumnVectorEnabled) {
+classOf[OffHeapColumnVector].getName
+  } else {
+classOf[OnHeapColumnVector].getName

Review comment:
   since we will change to use a constant vector soon, how about we always 
use `OnHeapColumnVector` for now,  to simplify the code?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-23 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r755163320



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##
@@ -194,10 +195,22 @@ case class FileSourceScanExec(
 disableBucketedScan: Boolean = false)
   extends DataSourceScanExec {
 
+  lazy val outputMetadataStruct: Option[AttributeReference] =
+output.collectFirst { case MetadataAttribute(attr) => attr }
+
   // Note that some vals referring the file-based relation are lazy 
intentionally
   // so that this plan can be canonicalized on executor side too. See 
SPARK-23731.
   override lazy val supportsColumnar: Boolean = {
-relation.fileFormat.supportBatch(relation.sparkSession, schema)
+// schema without the file metadata column
+val fileSchema = if (outputMetadataStruct.isEmpty) schema else {
+  StructType.fromAttributes(

Review comment:
   nit: `output.filter(_.exprId != metadataStructCol.get.exprId).toStruct`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-23 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r755163320



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##
@@ -194,10 +195,22 @@ case class FileSourceScanExec(
 disableBucketedScan: Boolean = false)
   extends DataSourceScanExec {
 
+  lazy val outputMetadataStruct: Option[AttributeReference] =
+output.collectFirst { case MetadataAttribute(attr) => attr }
+
   // Note that some vals referring the file-based relation are lazy 
intentionally
   // so that this plan can be canonicalized on executor side too. See 
SPARK-23731.
   override lazy val supportsColumnar: Boolean = {
-relation.fileFormat.supportBatch(relation.sparkSession, schema)
+// schema without the file metadata column
+val fileSchema = if (outputMetadataStruct.isEmpty) schema else {
+  StructType.fromAttributes(

Review comment:
   nit: `output.filterNot(_.exprId == 
metadataStructCol.get.exprId).toStruct`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-23 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r755163320



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##
@@ -194,10 +195,22 @@ case class FileSourceScanExec(
 disableBucketedScan: Boolean = false)
   extends DataSourceScanExec {
 
+  lazy val outputMetadataStruct: Option[AttributeReference] =
+output.collectFirst { case MetadataAttribute(attr) => attr }
+
   // Note that some vals referring the file-based relation are lazy 
intentionally
   // so that this plan can be canonicalized on executor side too. See 
SPARK-23731.
   override lazy val supportsColumnar: Boolean = {
-relation.fileFormat.supportBatch(relation.sparkSession, schema)
+// schema without the file metadata column
+val fileSchema = if (outputMetadataStruct.isEmpty) schema else {
+  StructType.fromAttributes(

Review comment:
   nit: `StructType.fromAttributes(output.filterNot(_.exprId == 
metadataStructCol.get.exprId))`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-23 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r755161598



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##
@@ -194,10 +195,22 @@ case class FileSourceScanExec(
 disableBucketedScan: Boolean = false)
   extends DataSourceScanExec {
 
+  lazy val outputMetadataStruct: Option[AttributeReference] =

Review comment:
   nit: it's probably better to use a noun here, how about 
`metadataStructCol`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-19 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r752977298



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataColumnsSuite.scala
##
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.nio.file.Files
+import java.text.SimpleDateFormat
+
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, 
Row}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
+
+class FileMetadataColumnsSuite extends QueryTest with SharedSparkSession {
+
+  val data0: String =
+"""
+  |jack,24,12345,uom
+  |""".stripMargin
+
+  val data1: String =
+"""
+  |lily,31,,ucb
+  |""".stripMargin
+
+  val schema: StructType = new StructType()
+.add(StructField("name", StringType))
+.add(StructField("age", IntegerType))
+.add(StructField("id", LongType))
+.add(StructField("university", StringType))
+
+  val schemaWithNameConflicts: StructType = new StructType()
+.add(StructField("name", StringType))
+.add(StructField("age", IntegerType))
+.add(StructField("_metadata.file_size", LongType))
+.add(StructField("_metadata.FILE_NAME", StringType))
+
+  private val METADATA_FILE_PATH = "_metadata.file_path"
+
+  private val METADATA_FILE_NAME = "_metadata.file_name"
+
+  private val METADATA_FILE_SIZE = "_metadata.file_size"
+
+  private val METADATA_FILE_MODIFICATION_TIME = 
"_metadata.file_modification_time"
+
+  /**
+   * Create a CSV file named `fileName` with `data` under `dir` directory.
+   */
+  private def createCSVFile(data: String, dir: File, fileName: String): String 
= {
+val dataFile = new File(dir, s"/$fileName")
+dataFile.getParentFile.mkdirs()
+val bytes = data.getBytes()
+Files.write(dataFile.toPath, bytes)
+dataFile.getCanonicalPath
+  }
+
+  /**
+   * This test wrapper will test for both row-based and column-based file 
formats (csv and parquet)
+   * 1. read data0 and data1 and write them as testFileFormat: f0 and f1
+   * 2. read both f0 and f1, return the df to the downstream for further 
testing
+   * 3. construct actual metadata map for both f0 and f1 to the downstream for 
further testing
+   *
+   * The final df will have data:
+   * jack | 24 | 12345 | uom
+   * lily | 31 | null | ucb
+   *
+   * The schema of the df will be the `fileSchema` provided to this method
+   *
+   * This test wrapper will provide a `df` and actual metadata map `f0`, `f1`
+   */
+  private def metadataColumnsTest(
+  testName: String, fileSchema: StructType)
+(f: (DataFrame, Map[String, Any], Map[String, Any]) => Unit): Unit = {
+Seq("csv", "parquet").foreach { testFileFormat =>
+  test(s"metadata columns ($testFileFormat): " + testName) {
+withTempDir { dir =>
+  // read data0 as CSV and write f0 as testFileFormat
+  val df0 = spark.read.schema(fileSchema).csv(
+createCSVFile(data0, dir, "temp/0")
+  )
+  val f0Path = new File(dir, "data/f0").getCanonicalPath
+  df0.coalesce(1).write.format(testFileFormat).save(f0Path)
+
+  // read data1 as CSV and write f1 as testFileFormat
+  val df1 = spark.read.schema(fileSchema).csv(
+createCSVFile(data1, dir, "temp/1")
+  )
+  val f1Path = new File(dir, "data/f1").getCanonicalPath
+  df1.coalesce(1).write.format(testFileFormat).save(f1Path)
+
+  // read both f0 and f1
+  val df = spark.read.format(testFileFormat).schema(fileSchema)
+.load(new File(dir, "data").getCanonicalPath + "/*")
+
+  val realF0 = new File(dir, "data/f0").listFiles()
+.filter(_.getName.endsWith(s".$testFileFormat")).head
+
+  val realF1 = new File(dir, "data/f1").listFiles()
+.filter(_.getName.endsWith(s".$testFileFormat")).head
+
+  // construct f0 and f1 metadata data
+  val f0Metadata = Map(
+

[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-19 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r752975918



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataColumnsSuite.scala
##
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.nio.file.Files
+import java.text.SimpleDateFormat
+
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, 
Row}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
+
+class FileMetadataColumnsSuite extends QueryTest with SharedSparkSession {
+
+  val data0: String =
+"""
+  |jack,24,12345,uom
+  |""".stripMargin
+
+  val data1: String =
+"""
+  |lily,31,,ucb
+  |""".stripMargin
+
+  val schema: StructType = new StructType()
+.add(StructField("name", StringType))
+.add(StructField("age", IntegerType))
+.add(StructField("id", LongType))
+.add(StructField("university", StringType))
+
+  val schemaWithNameConflicts: StructType = new StructType()
+.add(StructField("name", StringType))
+.add(StructField("age", IntegerType))
+.add(StructField("_metadata.file_size", LongType))
+.add(StructField("_metadata.FILE_NAME", StringType))
+
+  private val METADATA_FILE_PATH = "_metadata.file_path"
+
+  private val METADATA_FILE_NAME = "_metadata.file_name"
+
+  private val METADATA_FILE_SIZE = "_metadata.file_size"
+
+  private val METADATA_FILE_MODIFICATION_TIME = 
"_metadata.file_modification_time"
+
+  /**
+   * Create a CSV file named `fileName` with `data` under `dir` directory.
+   */
+  private def createCSVFile(data: String, dir: File, fileName: String): String 
= {
+val dataFile = new File(dir, s"/$fileName")
+dataFile.getParentFile.mkdirs()
+val bytes = data.getBytes()
+Files.write(dataFile.toPath, bytes)
+dataFile.getCanonicalPath
+  }
+
+  /**
+   * This test wrapper will test for both row-based and column-based file 
formats (csv and parquet)
+   * 1. read data0 and data1 and write them as testFileFormat: f0 and f1
+   * 2. read both f0 and f1, return the df to the downstream for further 
testing
+   * 3. construct actual metadata map for both f0 and f1 to the downstream for 
further testing
+   *
+   * The final df will have data:
+   * jack | 24 | 12345 | uom
+   * lily | 31 | null | ucb
+   *
+   * The schema of the df will be the `fileSchema` provided to this method
+   *
+   * This test wrapper will provide a `df` and actual metadata map `f0`, `f1`
+   */
+  private def metadataColumnsTest(
+  testName: String, fileSchema: StructType)
+(f: (DataFrame, Map[String, Any], Map[String, Any]) => Unit): Unit = {
+Seq("csv", "parquet").foreach { testFileFormat =>
+  test(s"metadata columns ($testFileFormat): " + testName) {
+withTempDir { dir =>
+  // read data0 as CSV and write f0 as testFileFormat
+  val df0 = spark.read.schema(fileSchema).csv(
+createCSVFile(data0, dir, "temp/0")
+  )
+  val f0Path = new File(dir, "data/f0").getCanonicalPath
+  df0.coalesce(1).write.format(testFileFormat).save(f0Path)
+
+  // read data1 as CSV and write f1 as testFileFormat
+  val df1 = spark.read.schema(fileSchema).csv(
+createCSVFile(data1, dir, "temp/1")
+  )
+  val f1Path = new File(dir, "data/f1").getCanonicalPath
+  df1.coalesce(1).write.format(testFileFormat).save(f1Path)
+
+  // read both f0 and f1
+  val df = spark.read.format(testFileFormat).schema(fileSchema)
+.load(new File(dir, "data").getCanonicalPath + "/*")
+
+  val realF0 = new File(dir, "data/f0").listFiles()
+.filter(_.getName.endsWith(s".$testFileFormat")).head
+
+  val realF1 = new File(dir, "data/f1").listFiles()
+.filter(_.getName.endsWith(s".$testFileFormat")).head
+
+  // construct f0 and f1 metadata data
+  val f0Metadata = Map(
+

[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-19 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r752973640



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataColumnsSuite.scala
##
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.nio.file.Files
+import java.text.SimpleDateFormat
+
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, 
Row}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
+
+class FileMetadataColumnsSuite extends QueryTest with SharedSparkSession {
+
+  val data0: String =
+"""
+  |jack,24,12345,uom
+  |""".stripMargin
+
+  val data1: String =
+"""
+  |lily,31,,ucb
+  |""".stripMargin
+
+  val schema: StructType = new StructType()
+.add(StructField("name", StringType))
+.add(StructField("age", IntegerType))
+.add(StructField("id", LongType))
+.add(StructField("university", StringType))
+
+  val schemaWithNameConflicts: StructType = new StructType()
+.add(StructField("name", StringType))
+.add(StructField("age", IntegerType))
+.add(StructField("_metadata.file_size", LongType))
+.add(StructField("_metadata.FILE_NAME", StringType))

Review comment:
   This looks a bit confusing. IIRC we only have one metadata column: 
`_metadata`, and `StructField("_metadata.file_size", LongType)` is a column 
with name "_metadata.file_size", but not getting an inner field `file_size` 
from a struct-type column `_metadata`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-19 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r752967539



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -103,6 +116,123 @@ class FileScanRDD(
 context.killTaskIfInterrupted()
 (currentIterator != null && currentIterator.hasNext) || nextIterator()
   }
+
+  ///
+  // FILE METADATA METHODS //
+  ///
+
+  // use to join with an unsafe row, will only be updated when the current 
file is changed
+  @volatile var metadataStructUnsafeRow: UnsafeRow = _
+  // use to append to an internal row, will only be updated when the 
current file is changed
+  @volatile var metadataStructGenericRow: Row = _
+
+  // Create a off/on heap WritableColumnVector
+  private def createColumnVector(numRows: Int, dataType: DataType): 
WritableColumnVector = {
+if (offHeapColumnVectorEnabled) {
+  new OffHeapColumnVector(numRows, dataType)
+} else {
+  new OnHeapColumnVector(numRows, dataType)
+}
+  }
+
+  /**
+   * For each partitioned file, metadata columns for each record in the 
file are exactly same.
+   * Only update metadata columns when `currentFile` is changed.
+   */
+  private def updateMetadataStruct(): Unit =
+if (metadataStruct.exists(_.sameRef(FILE_METADATA_COLUMNS))) {
+  val meta = metadataStruct.get
+  if (currentFile == null) {
+metadataStructUnsafeRow = new UnsafeRow(1)
+metadataStructGenericRow = new GenericRow(1)
+  } else {
+// make an generic row
+assert(meta.dataType.isInstanceOf[StructType])
+metadataStructGenericRow = Row.fromSeq(
+  meta.dataType.asInstanceOf[StructType].names.map {
+case FILE_PATH => UTF8String.fromString(new 
File(currentFile.filePath).toString)
+case FILE_NAME => UTF8String.fromString(
+  currentFile.filePath.split("/").last)
+case FILE_SIZE => currentFile.fileSize
+case FILE_MODIFICATION_TIME => currentFile.modificationTime
+case _ => None // be exhaustive, won't happen
+  }
+)
+
+// convert the generic row to an unsafe row
+val unsafeRowConverter = {
+  val converter = UnsafeProjection.create(
+Array(FILE_METADATA_COLUMNS.dataType))
+  (row: Row) => {
+converter(CatalystTypeConverters.convertToCatalyst(row)
+  .asInstanceOf[InternalRow])
+  }
+}
+metadataStructUnsafeRow =
+  unsafeRowConverter(Row.fromSeq(Seq(metadataStructGenericRow)))
+  }
+}
+
+  /**
+   * Create a writable column vector containing all required metadata 
fields
+   */
+  private def createMetadataStructColumnVector(
+  c: ColumnarBatch, meta: AttributeReference): WritableColumnVector = {
+val columnVector = createColumnVector(c.numRows(), 
FILE_METADATA_COLUMNS.dataType)

Review comment:
   We can create a new type of column vector to hold constants, which will 
be more efficient than this approach. We can do this in a followup.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-19 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r752964031



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -103,6 +116,123 @@ class FileScanRDD(
 context.killTaskIfInterrupted()
 (currentIterator != null && currentIterator.hasNext) || nextIterator()
   }
+
+  ///
+  // FILE METADATA METHODS //
+  ///
+
+  // use to join with an unsafe row, will only be updated when the current 
file is changed
+  @volatile var metadataStructUnsafeRow: UnsafeRow = _
+  // use to append to an internal row, will only be updated when the 
current file is changed
+  @volatile var metadataStructGenericRow: Row = _
+
+  // Create a off/on heap WritableColumnVector
+  private def createColumnVector(numRows: Int, dataType: DataType): 
WritableColumnVector = {
+if (offHeapColumnVectorEnabled) {
+  new OffHeapColumnVector(numRows, dataType)
+} else {
+  new OnHeapColumnVector(numRows, dataType)
+}
+  }
+
+  /**
+   * For each partitioned file, metadata columns for each record in the 
file are exactly same.
+   * Only update metadata columns when `currentFile` is changed.
+   */
+  private def updateMetadataStruct(): Unit =
+if (metadataStruct.exists(_.sameRef(FILE_METADATA_COLUMNS))) {
+  val meta = metadataStruct.get
+  if (currentFile == null) {
+metadataStructUnsafeRow = new UnsafeRow(1)
+metadataStructGenericRow = new GenericRow(1)
+  } else {
+// make an generic row
+assert(meta.dataType.isInstanceOf[StructType])
+metadataStructGenericRow = Row.fromSeq(
+  meta.dataType.asInstanceOf[StructType].names.map {
+case FILE_PATH => UTF8String.fromString(new 
File(currentFile.filePath).toString)
+case FILE_NAME => UTF8String.fromString(
+  currentFile.filePath.split("/").last)
+case FILE_SIZE => currentFile.fileSize
+case FILE_MODIFICATION_TIME => currentFile.modificationTime
+case _ => None // be exhaustive, won't happen
+  }
+)
+
+// convert the generic row to an unsafe row
+val unsafeRowConverter = {
+  val converter = UnsafeProjection.create(
+Array(FILE_METADATA_COLUMNS.dataType))
+  (row: Row) => {
+converter(CatalystTypeConverters.convertToCatalyst(row)
+  .asInstanceOf[InternalRow])
+  }
+}
+metadataStructUnsafeRow =
+  unsafeRowConverter(Row.fromSeq(Seq(metadataStructGenericRow)))
+  }
+}
+
+  /**
+   * Create a writable column vector containing all required metadata 
fields
+   */
+  private def createMetadataStructColumnVector(
+  c: ColumnarBatch, meta: AttributeReference): WritableColumnVector = {
+val columnVector = createColumnVector(c.numRows(), 
FILE_METADATA_COLUMNS.dataType)
+val filePathBytes = new File(currentFile.filePath).toString.getBytes
+val fileNameBytes = currentFile.filePath.split("/").last.getBytes
+var rowId = 0
+
+assert(meta.dataType.isInstanceOf[StructType])
+meta.dataType.asInstanceOf[StructType].names.zipWithIndex.foreach { 
case (name, ind) =>
+  name match {
+case FILE_PATH =>
+  rowId = 0
+  // use a tight-loop for better performance
+  while (rowId < c.numRows()) {
+columnVector.getChild(ind).putByteArray(rowId, filePathBytes)
+rowId += 1
+  }
+case FILE_NAME =>
+  rowId = 0
+  // use a tight-loop for better performance
+  while (rowId < c.numRows()) {
+columnVector.getChild(ind).putByteArray(rowId, fileNameBytes)
+rowId += 1
+  }
+case FILE_SIZE =>
+  columnVector.getChild(ind).putLongs(0, c.numRows(), 
currentFile.fileSize)
+case FILE_MODIFICATION_TIME =>
+  columnVector.getChild(ind).putLongs(0, c.numRows(), 
currentFile.modificationTime)
+case _ => // be exhaustive, won't happen
+  }
+}
+columnVector
+  }
+
+  /**
+   * Add metadata struct at the end of nextElement if needed.
+   * For different row implementations, use different methods to update 
and append.
+   */
+  private def addMetadataStructIfNeeded(nextElement: Object): Object =
+if (metadataStruct.exists(_.sameRef(FILE_METADATA_COLUMNS))) {
+  val meta = metadataStruct.get
+  nextElement match {
+case c: ColumnarBatch =>
+  val 

[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-19 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r752960655



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -103,6 +116,123 @@ class FileScanRDD(
 context.killTaskIfInterrupted()
 (currentIterator != null && currentIterator.hasNext) || nextIterator()
   }
+
+  ///
+  // FILE METADATA METHODS //
+  ///
+
+  // use to join with an unsafe row, will only be updated when the current 
file is changed
+  @volatile var metadataStructUnsafeRow: UnsafeRow = _
+  // use to append to an internal row, will only be updated when the 
current file is changed
+  @volatile var metadataStructGenericRow: Row = _
+
+  // Create a off/on heap WritableColumnVector
+  private def createColumnVector(numRows: Int, dataType: DataType): 
WritableColumnVector = {
+if (offHeapColumnVectorEnabled) {
+  new OffHeapColumnVector(numRows, dataType)
+} else {
+  new OnHeapColumnVector(numRows, dataType)
+}
+  }
+
+  /**
+   * For each partitioned file, metadata columns for each record in the 
file are exactly same.
+   * Only update metadata columns when `currentFile` is changed.
+   */
+  private def updateMetadataStruct(): Unit =
+if (metadataStruct.exists(_.sameRef(FILE_METADATA_COLUMNS))) {
+  val meta = metadataStruct.get
+  if (currentFile == null) {
+metadataStructUnsafeRow = new UnsafeRow(1)
+metadataStructGenericRow = new GenericRow(1)
+  } else {
+// make an generic row
+assert(meta.dataType.isInstanceOf[StructType])
+metadataStructGenericRow = Row.fromSeq(
+  meta.dataType.asInstanceOf[StructType].names.map {
+case FILE_PATH => UTF8String.fromString(new 
File(currentFile.filePath).toString)
+case FILE_NAME => UTF8String.fromString(
+  currentFile.filePath.split("/").last)
+case FILE_SIZE => currentFile.fileSize
+case FILE_MODIFICATION_TIME => currentFile.modificationTime
+case _ => None // be exhaustive, won't happen
+  }
+)
+
+// convert the generic row to an unsafe row
+val unsafeRowConverter = {
+  val converter = UnsafeProjection.create(
+Array(FILE_METADATA_COLUMNS.dataType))
+  (row: Row) => {
+converter(CatalystTypeConverters.convertToCatalyst(row)
+  .asInstanceOf[InternalRow])
+  }
+}
+metadataStructUnsafeRow =
+  unsafeRowConverter(Row.fromSeq(Seq(metadataStructGenericRow)))
+  }
+}
+
+  /**
+   * Create a writable column vector containing all required metadata 
fields
+   */
+  private def createMetadataStructColumnVector(
+  c: ColumnarBatch, meta: AttributeReference): WritableColumnVector = {
+val columnVector = createColumnVector(c.numRows(), 
FILE_METADATA_COLUMNS.dataType)
+val filePathBytes = new File(currentFile.filePath).toString.getBytes
+val fileNameBytes = currentFile.filePath.split("/").last.getBytes
+var rowId = 0
+
+assert(meta.dataType.isInstanceOf[StructType])
+meta.dataType.asInstanceOf[StructType].names.zipWithIndex.foreach { 
case (name, ind) =>
+  name match {
+case FILE_PATH =>
+  rowId = 0
+  // use a tight-loop for better performance
+  while (rowId < c.numRows()) {
+columnVector.getChild(ind).putByteArray(rowId, filePathBytes)
+rowId += 1
+  }
+case FILE_NAME =>
+  rowId = 0
+  // use a tight-loop for better performance
+  while (rowId < c.numRows()) {
+columnVector.getChild(ind).putByteArray(rowId, fileNameBytes)
+rowId += 1
+  }
+case FILE_SIZE =>
+  columnVector.getChild(ind).putLongs(0, c.numRows(), 
currentFile.fileSize)
+case FILE_MODIFICATION_TIME =>
+  columnVector.getChild(ind).putLongs(0, c.numRows(), 
currentFile.modificationTime)
+case _ => // be exhaustive, won't happen
+  }
+}
+columnVector
+  }
+
+  /**
+   * Add metadata struct at the end of nextElement if needed.
+   * For different row implementations, use different methods to update 
and append.
+   */
+  private def addMetadataStructIfNeeded(nextElement: Object): Object =
+if (metadataStruct.exists(_.sameRef(FILE_METADATA_COLUMNS))) {
+  val meta = metadataStruct.get
+  nextElement match {
+case c: ColumnarBatch =>
+  val 

[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-19 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r752960194



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -103,6 +116,123 @@ class FileScanRDD(
 context.killTaskIfInterrupted()
 (currentIterator != null && currentIterator.hasNext) || nextIterator()
   }
+
+  ///
+  // FILE METADATA METHODS //
+  ///
+
+  // use to join with an unsafe row, will only be updated when the current 
file is changed
+  @volatile var metadataStructUnsafeRow: UnsafeRow = _
+  // use to append to an internal row, will only be updated when the 
current file is changed
+  @volatile var metadataStructGenericRow: Row = _
+
+  // Create a off/on heap WritableColumnVector
+  private def createColumnVector(numRows: Int, dataType: DataType): 
WritableColumnVector = {
+if (offHeapColumnVectorEnabled) {
+  new OffHeapColumnVector(numRows, dataType)
+} else {
+  new OnHeapColumnVector(numRows, dataType)
+}
+  }
+
+  /**
+   * For each partitioned file, metadata columns for each record in the 
file are exactly same.
+   * Only update metadata columns when `currentFile` is changed.
+   */
+  private def updateMetadataStruct(): Unit =
+if (metadataStruct.exists(_.sameRef(FILE_METADATA_COLUMNS))) {
+  val meta = metadataStruct.get
+  if (currentFile == null) {
+metadataStructUnsafeRow = new UnsafeRow(1)
+metadataStructGenericRow = new GenericRow(1)
+  } else {
+// make an generic row
+assert(meta.dataType.isInstanceOf[StructType])
+metadataStructGenericRow = Row.fromSeq(
+  meta.dataType.asInstanceOf[StructType].names.map {
+case FILE_PATH => UTF8String.fromString(new 
File(currentFile.filePath).toString)
+case FILE_NAME => UTF8String.fromString(
+  currentFile.filePath.split("/").last)
+case FILE_SIZE => currentFile.fileSize
+case FILE_MODIFICATION_TIME => currentFile.modificationTime
+case _ => None // be exhaustive, won't happen
+  }
+)
+
+// convert the generic row to an unsafe row
+val unsafeRowConverter = {
+  val converter = UnsafeProjection.create(
+Array(FILE_METADATA_COLUMNS.dataType))
+  (row: Row) => {
+converter(CatalystTypeConverters.convertToCatalyst(row)
+  .asInstanceOf[InternalRow])
+  }
+}
+metadataStructUnsafeRow =
+  unsafeRowConverter(Row.fromSeq(Seq(metadataStructGenericRow)))
+  }
+}
+
+  /**
+   * Create a writable column vector containing all required metadata 
fields
+   */
+  private def createMetadataStructColumnVector(
+  c: ColumnarBatch, meta: AttributeReference): WritableColumnVector = {
+val columnVector = createColumnVector(c.numRows(), 
FILE_METADATA_COLUMNS.dataType)
+val filePathBytes = new File(currentFile.filePath).toString.getBytes
+val fileNameBytes = currentFile.filePath.split("/").last.getBytes
+var rowId = 0
+
+assert(meta.dataType.isInstanceOf[StructType])
+meta.dataType.asInstanceOf[StructType].names.zipWithIndex.foreach { 
case (name, ind) =>
+  name match {
+case FILE_PATH =>
+  rowId = 0
+  // use a tight-loop for better performance
+  while (rowId < c.numRows()) {
+columnVector.getChild(ind).putByteArray(rowId, filePathBytes)
+rowId += 1
+  }
+case FILE_NAME =>
+  rowId = 0
+  // use a tight-loop for better performance
+  while (rowId < c.numRows()) {
+columnVector.getChild(ind).putByteArray(rowId, fileNameBytes)
+rowId += 1
+  }
+case FILE_SIZE =>
+  columnVector.getChild(ind).putLongs(0, c.numRows(), 
currentFile.fileSize)
+case FILE_MODIFICATION_TIME =>
+  columnVector.getChild(ind).putLongs(0, c.numRows(), 
currentFile.modificationTime)
+case _ => // be exhaustive, won't happen
+  }
+}
+columnVector
+  }
+
+  /**
+   * Add metadata struct at the end of nextElement if needed.
+   * For different row implementations, use different methods to update 
and append.
+   */
+  private def addMetadataStructIfNeeded(nextElement: Object): Object =
+if (metadataStruct.exists(_.sameRef(FILE_METADATA_COLUMNS))) {
+  val meta = metadataStruct.get
+  nextElement match {
+case c: ColumnarBatch =>
+  val 

[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-19 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r752953419



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##
@@ -103,6 +116,123 @@ class FileScanRDD(
 context.killTaskIfInterrupted()
 (currentIterator != null && currentIterator.hasNext) || nextIterator()
   }
+
+  ///
+  // FILE METADATA METHODS //
+  ///
+
+  // use to join with an unsafe row, will only be updated when the current 
file is changed
+  @volatile var metadataStructUnsafeRow: UnsafeRow = _
+  // use to append to an internal row, will only be updated when the 
current file is changed
+  @volatile var metadataStructGenericRow: Row = _
+
+  // Create a off/on heap WritableColumnVector
+  private def createColumnVector(numRows: Int, dataType: DataType): 
WritableColumnVector = {
+if (offHeapColumnVectorEnabled) {
+  new OffHeapColumnVector(numRows, dataType)
+} else {
+  new OnHeapColumnVector(numRows, dataType)
+}
+  }
+
+  /**
+   * For each partitioned file, metadata columns for each record in the 
file are exactly same.
+   * Only update metadata columns when `currentFile` is changed.
+   */
+  private def updateMetadataStruct(): Unit =
+if (metadataStruct.exists(_.sameRef(FILE_METADATA_COLUMNS))) {
+  val meta = metadataStruct.get
+  if (currentFile == null) {
+metadataStructUnsafeRow = new UnsafeRow(1)
+metadataStructGenericRow = new GenericRow(1)
+  } else {
+// make an generic row
+assert(meta.dataType.isInstanceOf[StructType])
+metadataStructGenericRow = Row.fromSeq(
+  meta.dataType.asInstanceOf[StructType].names.map {
+case FILE_PATH => UTF8String.fromString(new 
File(currentFile.filePath).toString)
+case FILE_NAME => UTF8String.fromString(
+  currentFile.filePath.split("/").last)
+case FILE_SIZE => currentFile.fileSize
+case FILE_MODIFICATION_TIME => currentFile.modificationTime
+case _ => None // be exhaustive, won't happen
+  }
+)
+
+// convert the generic row to an unsafe row
+val unsafeRowConverter = {
+  val converter = UnsafeProjection.create(
+Array(FILE_METADATA_COLUMNS.dataType))
+  (row: Row) => {
+converter(CatalystTypeConverters.convertToCatalyst(row)
+  .asInstanceOf[InternalRow])
+  }
+}
+metadataStructUnsafeRow =
+  unsafeRowConverter(Row.fromSeq(Seq(metadataStructGenericRow)))
+  }
+}
+
+  /**
+   * Create a writable column vector containing all required metadata 
fields
+   */
+  private def createMetadataStructColumnVector(
+  c: ColumnarBatch, meta: AttributeReference): WritableColumnVector = {
+val columnVector = createColumnVector(c.numRows(), 
FILE_METADATA_COLUMNS.dataType)
+val filePathBytes = new File(currentFile.filePath).toString.getBytes
+val fileNameBytes = currentFile.filePath.split("/").last.getBytes
+var rowId = 0
+
+assert(meta.dataType.isInstanceOf[StructType])
+meta.dataType.asInstanceOf[StructType].names.zipWithIndex.foreach { 
case (name, ind) =>
+  name match {
+case FILE_PATH =>
+  rowId = 0
+  // use a tight-loop for better performance
+  while (rowId < c.numRows()) {
+columnVector.getChild(ind).putByteArray(rowId, filePathBytes)
+rowId += 1
+  }
+case FILE_NAME =>
+  rowId = 0
+  // use a tight-loop for better performance
+  while (rowId < c.numRows()) {
+columnVector.getChild(ind).putByteArray(rowId, fileNameBytes)
+rowId += 1
+  }
+case FILE_SIZE =>
+  columnVector.getChild(ind).putLongs(0, c.numRows(), 
currentFile.fileSize)
+case FILE_MODIFICATION_TIME =>
+  columnVector.getChild(ind).putLongs(0, c.numRows(), 
currentFile.modificationTime)
+case _ => // be exhaustive, won't happen
+  }
+}
+columnVector
+  }
+
+  /**
+   * Add metadata struct at the end of nextElement if needed.
+   * For different row implementations, use different methods to update 
and append.
+   */
+  private def addMetadataStructIfNeeded(nextElement: Object): Object =
+if (metadataStruct.exists(_.sameRef(FILE_METADATA_COLUMNS))) {
+  val meta = metadataStruct.get
+  nextElement match {
+case c: ColumnarBatch =>
+  val 

[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-19 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r752946851



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
##
@@ -171,6 +171,27 @@ trait FileFormat {
   def supportFieldName(name: String): Boolean = true
 }
 
+object FileFormat {
+
+  val FILE_PATH = "file_path"
+
+  val FILE_NAME = "file_name"
+
+  val FILE_SIZE = "file_size"
+
+  val FILE_MODIFICATION_TIME = "file_modification_time"
+
+  val METADATA_NAME = "_metadata"
+
+  // supported metadata columns for hadoop fs relation
+  val FILE_METADATA_COLUMNS: AttributeReference = 
MetadataAttribute(METADATA_NAME,

Review comment:
   I think this should be a `def`, otherwise all the file source metadata 
columns share one `ExprId`. This can cause issues if you scan two file sources 
and join them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-19 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r752940105



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##
@@ -194,10 +195,22 @@ case class FileSourceScanExec(
 disableBucketedScan: Boolean = false)
   extends DataSourceScanExec {

Review comment:
   shall we extends `ExposesMetadataColumns` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-19 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r752943643



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##
@@ -355,7 +377,14 @@ case class FileSourceScanExec(
   @transient
   private lazy val pushedDownFilters = {
 val supportNestedPredicatePushdown = 
DataSourceUtils.supportNestedPredicatePushdown(relation)
-dataFilters.flatMap(DataSourceStrategy.translateFilter(_, 
supportNestedPredicatePushdown))
+dataFilters
+  .filterNot(
+_.references.exists {
+  case MetadataAttribute(_) => true

Review comment:
   How do we handle predicates against metadata columns? Do we ask Spark to 
run the filters?
   
   I think it's better to push them down to the file source reader. For 
example, we can skip reading an entire file if the predicates have some check 
about the file name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-19 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r752940728



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##
@@ -194,10 +195,22 @@ case class FileSourceScanExec(
 disableBucketedScan: Boolean = false)
   extends DataSourceScanExec {
 
+  lazy val outputMetadataStruct: Option[AttributeReference] =
+output.collectFirst { case MetadataAttribute(attr) => attr }

Review comment:
   This is probably a design problem: do we prefer 4 flat columns or one 
struct-type column?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-19 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r752940105



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##
@@ -194,10 +195,22 @@ case class FileSourceScanExec(
 disableBucketedScan: Boolean = false)
   extends DataSourceScanExec {

Review comment:
   shall we extends `ExposesMetadataColumns` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #34575: [SPARK-37273][SQL] Support hidden file metadata columns in Spark SQL

2021-11-19 Thread GitBox


cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r752936812



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
##
@@ -438,3 +438,22 @@ object VirtualColumn {
   val groupingIdName: String = "spark_grouping_id"
   val groupingIdAttribute: UnresolvedAttribute = 
UnresolvedAttribute(groupingIdName)
 }
+
+/**
+ * The internal representation of the hidden metadata column:
+ * set `__metadata_col` to `true` in AttributeReference metadata
+ * - apply() will create a metadata attribute reference
+ * - unapply() will check if an attribute reference is the metadata attribute 
reference
+ */
+object MetadataAttribute {
+  def apply(name: String, dataType: DataType): AttributeReference =
+AttributeReference(name, dataType, true,

Review comment:
   shall we allow non-nullable metadata attr? We should probably add one 
more parameter in `apply`: `nullable: boolean`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org