yihua commented on code in PR #11413:
URL: https://github.com/apache/hudi/pull/11413#discussion_r1631704549


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -116,45 +143,154 @@ class 
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
                                      skeletonRequiredSchema: Schema,
                                      dataFileIterator: 
ClosableIterator[InternalRow],
                                      dataRequiredSchema: Schema): 
ClosableIterator[InternalRow] = {
-    doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]],
-      dataFileIterator.asInstanceOf[ClosableIterator[Any]])
+    doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]], 
skeletonRequiredSchema,
+      dataFileIterator.asInstanceOf[ClosableIterator[Any]], dataRequiredSchema)
   }
 
-  protected def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any], 
dataFileIterator: ClosableIterator[Any]): ClosableIterator[InternalRow] = {
-    new ClosableIterator[Any] {
-      val combinedRow = new JoinedRow()
+  private def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any],
+                               skeletonRequiredSchema: Schema,
+                               dataFileIterator: ClosableIterator[Any],
+                               dataRequiredSchema: Schema): 
ClosableIterator[InternalRow] = {
+    if (supportsPositionField()) {
+      assert(AvroSchemaUtils.containsFieldInSchema(skeletonRequiredSchema, 
ROW_INDEX_TEMPORARY_COLUMN_NAME))
+      assert(AvroSchemaUtils.containsFieldInSchema(dataRequiredSchema, 
ROW_INDEX_TEMPORARY_COLUMN_NAME))
+      val rowIndexColumn = new java.util.HashSet[String]()
+      rowIndexColumn.add(ROW_INDEX_TEMPORARY_COLUMN_NAME)
+      //always remove the row index column from the skeleton because the data 
file will also have the same column
+      val skeletonProjection = projectRecord(skeletonRequiredSchema,
+        AvroSchemaUtils.removeFieldsFromSchema(skeletonRequiredSchema, 
rowIndexColumn))
 
-      override def hasNext: Boolean = {
-        //If the iterators are out of sync it is probably due to filter 
pushdown
-        checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
-          "Bootstrap data-file iterator and skeleton-file iterator have to be 
in-sync!")
-        dataFileIterator.hasNext && skeletonFileIterator.hasNext
+      //If we need to do position based merging with log files we will leave 
the row index column at the end
+      val dataProjection = if (getHasLogFiles && getUseRecordPosition) {
+        getIdentityProjection
+      } else {
+        projectRecord(dataRequiredSchema,
+          AvroSchemaUtils.removeFieldsFromSchema(dataRequiredSchema, 
rowIndexColumn))
       }
 
-      override def next(): Any = {
-        (skeletonFileIterator.next(), dataFileIterator.next()) match {
-          case (s: ColumnarBatch, d: ColumnarBatch) =>
-            val numCols = s.numCols() + d.numCols()
-            val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols)
-            for (i <- 0 until numCols) {
-              if (i < s.numCols()) {
-                vecs(i) = s.column(i)
+      //Always use internal row for positional merge because
+      //we need to iterate row by row when merging
+      new CachingIterator[InternalRow] {
+        val combinedRow = new JoinedRow()
+
+        //position column will always be at the end of the row
+        private def getPos(row: InternalRow): Long = {
+          row.getLong(row.numFields-1)
+        }
+
+        private def getNextSkeleton: (InternalRow, Long) = {
+          val nextSkeletonRow = 
skeletonFileIterator.next().asInstanceOf[InternalRow]
+          (nextSkeletonRow, getPos(nextSkeletonRow))
+        }
+
+        private def getNextData: (InternalRow, Long) = {
+          val nextDataRow = dataFileIterator.next().asInstanceOf[InternalRow]
+          (nextDataRow, getPos(nextDataRow))
+        }
+
+        override def close(): Unit = {
+          skeletonFileIterator.close()
+          dataFileIterator.close()
+        }
+
+        override protected def doHasNext(): Boolean = {
+          if (!dataFileIterator.hasNext || !skeletonFileIterator.hasNext) {
+            false
+          } else {
+            var nextSkeleton = getNextSkeleton
+            var nextData = getNextData
+            while (nextSkeleton._2 != nextData._2) {
+              if (nextSkeleton._2 > nextData._2) {
+                if (!dataFileIterator.hasNext) {
+                  return false
+                } else {
+                  nextData = getNextData
+                }
               } else {
-                vecs(i) = d.column(i - s.numCols())
+                if (!skeletonFileIterator.hasNext) {
+                  return false
+                } else {
+                  nextSkeleton = getNextSkeleton
+                }
               }
             }
-            assert(s.numRows() == d.numRows())
-            sparkAdapter.makeColumnarBatch(vecs, s.numRows())
-          case (_: ColumnarBatch, _: InternalRow) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
-          case (_: InternalRow, _: ColumnarBatch) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
-          case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
+            nextRecord = 
combinedRow(skeletonProjection.apply(nextSkeleton._1), 
dataProjection.apply(nextData._1))
+            true
+          }
         }
       }
+    } else {
+      new ClosableIterator[Any] {
+        val combinedRow = new JoinedRow()
 
-      override def close(): Unit = {
-        skeletonFileIterator.close()
-        dataFileIterator.close()
-      }
-    }.asInstanceOf[ClosableIterator[InternalRow]]
+        override def hasNext: Boolean = {
+          //If the iterators are out of sync it is probably due to filter 
pushdown
+          checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
+            "Bootstrap data-file iterator and skeleton-file iterator have to 
be in-sync!")
+          dataFileIterator.hasNext && skeletonFileIterator.hasNext
+        }
+
+        override def next(): Any = {
+          (skeletonFileIterator.next(), dataFileIterator.next()) match {
+            case (s: ColumnarBatch, d: ColumnarBatch) =>
+              //This will not be used until [HUDI-7693] is implemented
+              val numCols = s.numCols() + d.numCols()
+              val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols)
+              for (i <- 0 until numCols) {
+                if (i < s.numCols()) {
+                  vecs(i) = s.column(i)
+                } else {
+                  vecs(i) = d.column(i - s.numCols())
+                }
+              }
+              assert(s.numRows() == d.numRows())
+              sparkAdapter.makeColumnarBatch(vecs, s.numRows())
+            case (_: ColumnarBatch, _: InternalRow) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
+            case (_: InternalRow, _: ColumnarBatch) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
+            case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
+          }
+        }
+
+        override def close(): Unit = {
+          skeletonFileIterator.close()
+          dataFileIterator.close()
+        }
+      }.asInstanceOf[ClosableIterator[InternalRow]]
+    }
   }
 }
+
+object SparkFileFormatInternalRowReaderContext {
+  // From "namedExpressions.scala": Used to construct to record position field 
metadata.
+  private val FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY = 
"__file_source_generated_metadata_col"
+  private val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
+  private val METADATA_COL_ATTR_KEY = "__metadata_col"
+
+  def getAppliedRequiredSchema(requiredSchema: StructType, 
shouldAddRecordPosition: Boolean): StructType = {
+    if (shouldAddRecordPosition) {
+      val metadata = new MetadataBuilder()
+        .putString(METADATA_COL_ATTR_KEY, ROW_INDEX_TEMPORARY_COLUMN_NAME)
+        .putBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY, value = true)
+        .putString(FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY, 
ROW_INDEX_TEMPORARY_COLUMN_NAME)
+        .build()
+      val rowIndexField = StructField(ROW_INDEX_TEMPORARY_COLUMN_NAME, 
LongType, nullable = false, metadata)
+      StructType(requiredSchema.fields.filterNot(isIndexTempColumn) :+ 
rowIndexField)
+    } else {
+      requiredSchema
+    }
+  }
+
+  /**
+   * Only valid if there is support for record positions and no log files

Review Comment:
   `there is support for record positions` -> does this refer to reading row 
index in the Spark parquet reader?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -56,16 +60,27 @@ import scala.collection.mutable
  */
 class SparkFileFormatInternalRowReaderContext(parquetFileReader: 
SparkParquetReader,
                                               recordKeyColumn: String,
-                                              filters: Seq[Filter]) extends 
BaseSparkInternalRowReaderContext {
+                                              filters: Seq[Filter],
+                                              requiredFilters: Seq[Filter]) 
extends BaseSparkInternalRowReaderContext {

Review Comment:
   nit: add docs on the parameter `requiredFilters`



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -56,16 +60,27 @@ import scala.collection.mutable
  */
 class SparkFileFormatInternalRowReaderContext(parquetFileReader: 
SparkParquetReader,
                                               recordKeyColumn: String,
-                                              filters: Seq[Filter]) extends 
BaseSparkInternalRowReaderContext {
+                                              filters: Seq[Filter],
+                                              requiredFilters: Seq[Filter]) 
extends BaseSparkInternalRowReaderContext {
   lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
+  private lazy val bootstrapSafeFilters: Seq[Filter] = 
filters.filter(filterIsSafeForBootstrap) ++ requiredFilters
   private val deserializerMap: mutable.Map[Schema, HoodieAvroDeserializer] = 
mutable.Map()
+  private lazy val allFilters = filters ++ requiredFilters
+
+  override def supportsPositionField: Boolean = {

Review Comment:
   nit: `supportsPositionField` -> `supportsParquetRowIndex`



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -56,16 +60,27 @@ import scala.collection.mutable
  */
 class SparkFileFormatInternalRowReaderContext(parquetFileReader: 
SparkParquetReader,
                                               recordKeyColumn: String,
-                                              filters: Seq[Filter]) extends 
BaseSparkInternalRowReaderContext {
+                                              filters: Seq[Filter],
+                                              requiredFilters: Seq[Filter]) 
extends BaseSparkInternalRowReaderContext {
   lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
+  private lazy val bootstrapSafeFilters: Seq[Filter] = 
filters.filter(filterIsSafeForBootstrap) ++ requiredFilters
   private val deserializerMap: mutable.Map[Schema, HoodieAvroDeserializer] = 
mutable.Map()
+  private lazy val allFilters = filters ++ requiredFilters
+
+  override def supportsPositionField: Boolean = {
+    HoodieSparkUtils.gteqSpark3_5
+  }
 
   override def getFileRecordIterator(filePath: StoragePath,
                                      start: Long,
                                      length: Long,
                                      dataSchema: Schema,
                                      requiredSchema: Schema,
                                      storage: HoodieStorage): 
ClosableIterator[InternalRow] = {
+    val hasPositionField = 
AvroSchemaUtils.containsFieldInSchema(requiredSchema, 
ROW_INDEX_TEMPORARY_COLUMN_NAME)

Review Comment:
   ```suggestion
       val hasRowIndexField = 
AvroSchemaUtils.containsFieldInSchema(requiredSchema, 
ROW_INDEX_TEMPORARY_COLUMN_NAME)
   ```



##########
hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.execution.datasources.parquet
+
+import org.apache.hudi.SparkFileFormatInternalRowReaderContext
+import 
org.apache.hudi.SparkFileFormatInternalRowReaderContext.filterIsSafeForBootstrap
+import org.apache.hudi.common.model.HoodieRecord
+import 
org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.spark.sql.sources.{And, IsNotNull, Or}
+import org.apache.spark.sql.types.{LongType, StringType, StructField, 
StructType}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.Test
+
+class TestSparkFileFormatInternalRowReaderContext extends 
SparkClientFunctionalTestHarness {

Review Comment:
   Note: `TestHoodieFileGroupReaderBasedParquetFileFormat` is removed in #10957 
and renamed to this class in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to