yihua commented on code in PR #10957:
URL: https://github.com/apache/hudi/pull/10957#discussion_r1630825026


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -101,46 +121,150 @@ class 
SparkFileFormatInternalRowReaderContext(readerMaps: mutable.Map[Long, Part
   }
 
   override def mergeBootstrapReaders(skeletonFileIterator: 
ClosableIterator[InternalRow],
-                                     dataFileIterator: 
ClosableIterator[InternalRow]): ClosableIterator[InternalRow] = {
-    doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]],
-      dataFileIterator.asInstanceOf[ClosableIterator[Any]])
+                                     skeletonRequiredSchema: Schema,
+                                     dataFileIterator: 
ClosableIterator[InternalRow],
+                                     dataRequiredSchema: Schema): 
ClosableIterator[InternalRow] = {
+    doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]], 
skeletonRequiredSchema,
+      dataFileIterator.asInstanceOf[ClosableIterator[Any]], dataRequiredSchema)
   }
 
-  protected def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any], 
dataFileIterator: ClosableIterator[Any]): ClosableIterator[InternalRow] = {
-    new ClosableIterator[Any] {
-      val combinedRow = new JoinedRow()
-
-      override def hasNext: Boolean = {
-        //If the iterators are out of sync it is probably due to filter 
pushdown
-        checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
-          "Bootstrap data-file iterator and skeleton-file iterator have to be 
in-sync!")
-        dataFileIterator.hasNext && skeletonFileIterator.hasNext
+  protected def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any],
+                                 skeletonRequiredSchema: Schema,
+                                 dataFileIterator: ClosableIterator[Any],
+                                 dataRequiredSchema: Schema): 
ClosableIterator[InternalRow] = {
+    if (getUseRecordPosition) {
+      assert(AvroSchemaUtils.containsFieldInSchema(skeletonRequiredSchema, 
ROW_INDEX_TEMPORARY_COLUMN_NAME))
+      assert(AvroSchemaUtils.containsFieldInSchema(dataRequiredSchema, 
ROW_INDEX_TEMPORARY_COLUMN_NAME))
+      val javaSet = new java.util.HashSet[String]()
+      javaSet.add(ROW_INDEX_TEMPORARY_COLUMN_NAME)
+      val skeletonProjection = projectRecord(skeletonRequiredSchema,
+        AvroSchemaUtils.removeFieldsFromSchema(skeletonRequiredSchema, 
javaSet))
+      //If we have log files, we will want to do position based merging with 
those as well,
+      //so leave the row index column at the end
+      val dataProjection = if (getHasLogFiles) {
+        getIdentityProjection
+      } else {
+        projectRecord(dataRequiredSchema,
+          AvroSchemaUtils.removeFieldsFromSchema(dataRequiredSchema, javaSet))
       }
 
-      override def next(): Any = {
-        (skeletonFileIterator.next(), dataFileIterator.next()) match {
-          case (s: ColumnarBatch, d: ColumnarBatch) =>
-            val numCols = s.numCols() + d.numCols()
-            val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols)
-            for (i <- 0 until numCols) {
-              if (i < s.numCols()) {
-                vecs(i) = s.column(i)
+      //Always use internal row for positional merge because
+      //we need to iterate row by row when merging
+      new CachingIterator[InternalRow] {
+        val combinedRow = new JoinedRow()
+
+        //position column will always be at the end of the row
+        private def getPos(row: InternalRow): Long = {
+          row.getLong(row.numFields-1)
+        }
+
+        private def getNextSkeleton: (InternalRow, Long) = {
+          val nextSkeletonRow = 
skeletonFileIterator.next().asInstanceOf[InternalRow]
+          (nextSkeletonRow, getPos(nextSkeletonRow))
+        }
+
+        private def getNextData: (InternalRow, Long) = {
+          val nextSkeletonRow = 
skeletonFileIterator.next().asInstanceOf[InternalRow]
+          (nextSkeletonRow, getPos(nextSkeletonRow))
+        }
+
+        override def close(): Unit = {
+          skeletonFileIterator.close()
+          dataFileIterator.close()
+        }
+
+        override protected def doHasNext(): Boolean = {
+          if (!dataFileIterator.hasNext || !skeletonFileIterator.hasNext) {
+            false
+          } else {
+            var nextSkeleton = getNextSkeleton
+            var nextData = getNextData
+            while (nextSkeleton._2 != nextData._2) {
+              if (nextSkeleton._2 > nextData._2) {
+                if (!dataFileIterator.hasNext) {
+                  return false
+                } else {
+                  nextData = getNextData
+                }
               } else {
-                vecs(i) = d.column(i - s.numCols())
+                if (!skeletonFileIterator.hasNext) {
+                  return false
+                } else {
+                  nextSkeleton = getNextSkeleton
+                }
               }
             }
-            assert(s.numRows() == d.numRows())
-            sparkAdapter.makeColumnarBatch(vecs, s.numRows())
-          case (_: ColumnarBatch, _: InternalRow) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
-          case (_: InternalRow, _: ColumnarBatch) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
-          case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
+            nextRecord = 
combinedRow(skeletonProjection.apply(nextSkeleton._1), 
dataProjection.apply(nextData._1))
+            true
+          }
         }
       }
+    } else {
+      new ClosableIterator[Any] {
+        val combinedRow = new JoinedRow()
 
-      override def close(): Unit = {
-        skeletonFileIterator.close()
-        dataFileIterator.close()
-      }
-    }.asInstanceOf[ClosableIterator[InternalRow]]
+        override def hasNext: Boolean = {
+          //If the iterators are out of sync it is probably due to filter 
pushdown
+          checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
+            "Bootstrap data-file iterator and skeleton-file iterator have to 
be in-sync!")
+          dataFileIterator.hasNext && skeletonFileIterator.hasNext
+        }
+
+        override def next(): Any = {
+          (skeletonFileIterator.next(), dataFileIterator.next()) match {
+            case (s: ColumnarBatch, d: ColumnarBatch) =>
+              //This will not be used until [HUDI-7693] is implemented
+              val numCols = s.numCols() + d.numCols()
+              val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols)
+              for (i <- 0 until numCols) {
+                if (i < s.numCols()) {
+                  vecs(i) = s.column(i)
+                } else {
+                  vecs(i) = d.column(i - s.numCols())
+                }
+              }
+              assert(s.numRows() == d.numRows())
+              sparkAdapter.makeColumnarBatch(vecs, s.numRows())
+            case (_: ColumnarBatch, _: InternalRow) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
+            case (_: InternalRow, _: ColumnarBatch) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
+            case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
+          }
+        }
+
+        override def close(): Unit = {
+          skeletonFileIterator.close()
+          dataFileIterator.close()
+        }
+      }.asInstanceOf[ClosableIterator[InternalRow]]
+    }
   }
 }
+
+object SparkFileFormatInternalRowReaderContext {
+  // From "ParquetFileFormat.scala": The names of the field for record 
position.
+  private val ROW_INDEX = "row_index"
+  private val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"
+
+  // From "namedExpressions.scala": Used to construct to record position field 
metadata.
+  private val FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY = 
"__file_source_generated_metadata_col"
+  private val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
+  private val METADATA_COL_ATTR_KEY = "__metadata_col"
+
+  def getRecordKeyRelatedFilters(filters: Seq[Filter], recordKeyColumn: 
String): Seq[Filter] = {
+    filters.filter(f => f.references.exists(c => 
c.equalsIgnoreCase(recordKeyColumn)))
+  }
+
+  def isIndexTempColumn(field: StructField): Boolean = {
+    field.name.equals(ROW_INDEX_TEMPORARY_COLUMN_NAME)
+  }
+
+  def getAppliedRequiredSchema(requiredSchema: StructType): StructType = {
+      val metadata = new MetadataBuilder()
+        .putString(METADATA_COL_ATTR_KEY, ROW_INDEX_TEMPORARY_COLUMN_NAME)
+        .putBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY, value = true)
+        .putString(FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY, 
ROW_INDEX_TEMPORARY_COLUMN_NAME)
+        .build()
+      val rowIndexField = StructField(ROW_INDEX_TEMPORARY_COLUMN_NAME, 
LongType, nullable = false, metadata)

Review Comment:
   Where is the `TestFiltersInFileGroupReader` class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to