jonvex commented on code in PR #11413:
URL: https://github.com/apache/hudi/pull/11413#discussion_r1631715307


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -116,45 +143,154 @@ class 
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
                                      skeletonRequiredSchema: Schema,
                                      dataFileIterator: 
ClosableIterator[InternalRow],
                                      dataRequiredSchema: Schema): 
ClosableIterator[InternalRow] = {
-    doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]],
-      dataFileIterator.asInstanceOf[ClosableIterator[Any]])
+    doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]], 
skeletonRequiredSchema,
+      dataFileIterator.asInstanceOf[ClosableIterator[Any]], dataRequiredSchema)
   }
 
-  protected def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any], 
dataFileIterator: ClosableIterator[Any]): ClosableIterator[InternalRow] = {
-    new ClosableIterator[Any] {
-      val combinedRow = new JoinedRow()
+  private def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any],
+                               skeletonRequiredSchema: Schema,
+                               dataFileIterator: ClosableIterator[Any],
+                               dataRequiredSchema: Schema): 
ClosableIterator[InternalRow] = {
+    if (supportsPositionField()) {
+      assert(AvroSchemaUtils.containsFieldInSchema(skeletonRequiredSchema, 
ROW_INDEX_TEMPORARY_COLUMN_NAME))
+      assert(AvroSchemaUtils.containsFieldInSchema(dataRequiredSchema, 
ROW_INDEX_TEMPORARY_COLUMN_NAME))
+      val rowIndexColumn = new java.util.HashSet[String]()
+      rowIndexColumn.add(ROW_INDEX_TEMPORARY_COLUMN_NAME)
+      //always remove the row index column from the skeleton because the data 
file will also have the same column
+      val skeletonProjection = projectRecord(skeletonRequiredSchema,
+        AvroSchemaUtils.removeFieldsFromSchema(skeletonRequiredSchema, 
rowIndexColumn))
 
-      override def hasNext: Boolean = {
-        //If the iterators are out of sync it is probably due to filter 
pushdown
-        checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
-          "Bootstrap data-file iterator and skeleton-file iterator have to be 
in-sync!")
-        dataFileIterator.hasNext && skeletonFileIterator.hasNext
+      //If we need to do position based merging with log files we will leave 
the row index column at the end
+      val dataProjection = if (getHasLogFiles && getUseRecordPosition) {
+        getIdentityProjection
+      } else {
+        projectRecord(dataRequiredSchema,
+          AvroSchemaUtils.removeFieldsFromSchema(dataRequiredSchema, 
rowIndexColumn))
       }
 
-      override def next(): Any = {
-        (skeletonFileIterator.next(), dataFileIterator.next()) match {
-          case (s: ColumnarBatch, d: ColumnarBatch) =>
-            val numCols = s.numCols() + d.numCols()
-            val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols)
-            for (i <- 0 until numCols) {
-              if (i < s.numCols()) {
-                vecs(i) = s.column(i)
+      //Always use internal row for positional merge because

Review Comment:
   I think the filtering is actually done by batch as well, so I think we 
wouldn't need to iterate through the rows themselves



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to