yihua commented on code in PR #11413: URL: https://github.com/apache/hudi/pull/11413#discussion_r1631711150
########## hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala: ########## @@ -116,45 +143,154 @@ class SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea skeletonRequiredSchema: Schema, dataFileIterator: ClosableIterator[InternalRow], dataRequiredSchema: Schema): ClosableIterator[InternalRow] = { - doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]], - dataFileIterator.asInstanceOf[ClosableIterator[Any]]) + doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]], skeletonRequiredSchema, + dataFileIterator.asInstanceOf[ClosableIterator[Any]], dataRequiredSchema) } - protected def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any], dataFileIterator: ClosableIterator[Any]): ClosableIterator[InternalRow] = { - new ClosableIterator[Any] { - val combinedRow = new JoinedRow() + private def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any], + skeletonRequiredSchema: Schema, + dataFileIterator: ClosableIterator[Any], + dataRequiredSchema: Schema): ClosableIterator[InternalRow] = { + if (supportsPositionField()) { + assert(AvroSchemaUtils.containsFieldInSchema(skeletonRequiredSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME)) + assert(AvroSchemaUtils.containsFieldInSchema(dataRequiredSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME)) + val rowIndexColumn = new java.util.HashSet[String]() + rowIndexColumn.add(ROW_INDEX_TEMPORARY_COLUMN_NAME) + //always remove the row index column from the skeleton because the data file will also have the same column + val skeletonProjection = projectRecord(skeletonRequiredSchema, + AvroSchemaUtils.removeFieldsFromSchema(skeletonRequiredSchema, rowIndexColumn)) - override def hasNext: Boolean = { - //If the iterators are out of sync it is probably due to filter pushdown - checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext, - "Bootstrap data-file iterator and skeleton-file iterator have to be in-sync!") - dataFileIterator.hasNext && skeletonFileIterator.hasNext + //If we need to do position based merging with log files we will leave the row index column at the end + val dataProjection = if (getHasLogFiles && getUseRecordPosition) { + getIdentityProjection + } else { + projectRecord(dataRequiredSchema, + AvroSchemaUtils.removeFieldsFromSchema(dataRequiredSchema, rowIndexColumn)) } - override def next(): Any = { - (skeletonFileIterator.next(), dataFileIterator.next()) match { - case (s: ColumnarBatch, d: ColumnarBatch) => - val numCols = s.numCols() + d.numCols() - val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols) - for (i <- 0 until numCols) { - if (i < s.numCols()) { - vecs(i) = s.column(i) + //Always use internal row for positional merge because Review Comment: We can still iterate through rows within the `ColumnarBatch` in the vectorized processing. We can leave that as a follow-up. ########## hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java: ########## @@ -122,6 +123,15 @@ public void setNeedsBootstrapMerge(boolean needsBootstrapMerge) { this.needsBootstrapMerge = needsBootstrapMerge; } + // Getter and Setter for useRecordPosition + public boolean getUseRecordPosition() { + return useRecordPosition; + } Review Comment: Rename the getter and setter to sth like `shouldMergeUseRecordPosition` and `setMergeUseRecordPosition` so it indicates this is used for controlling the merging behavior. ########## hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala: ########## @@ -116,45 +143,154 @@ class SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea skeletonRequiredSchema: Schema, dataFileIterator: ClosableIterator[InternalRow], dataRequiredSchema: Schema): ClosableIterator[InternalRow] = { - doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]], - dataFileIterator.asInstanceOf[ClosableIterator[Any]]) + doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]], skeletonRequiredSchema, + dataFileIterator.asInstanceOf[ClosableIterator[Any]], dataRequiredSchema) } - protected def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any], dataFileIterator: ClosableIterator[Any]): ClosableIterator[InternalRow] = { - new ClosableIterator[Any] { - val combinedRow = new JoinedRow() + private def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any], + skeletonRequiredSchema: Schema, + dataFileIterator: ClosableIterator[Any], + dataRequiredSchema: Schema): ClosableIterator[InternalRow] = { + if (supportsPositionField()) { + assert(AvroSchemaUtils.containsFieldInSchema(skeletonRequiredSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME)) + assert(AvroSchemaUtils.containsFieldInSchema(dataRequiredSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME)) + val rowIndexColumn = new java.util.HashSet[String]() + rowIndexColumn.add(ROW_INDEX_TEMPORARY_COLUMN_NAME) + //always remove the row index column from the skeleton because the data file will also have the same column + val skeletonProjection = projectRecord(skeletonRequiredSchema, + AvroSchemaUtils.removeFieldsFromSchema(skeletonRequiredSchema, rowIndexColumn)) - override def hasNext: Boolean = { - //If the iterators are out of sync it is probably due to filter pushdown - checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext, - "Bootstrap data-file iterator and skeleton-file iterator have to be in-sync!") - dataFileIterator.hasNext && skeletonFileIterator.hasNext + //If we need to do position based merging with log files we will leave the row index column at the end + val dataProjection = if (getHasLogFiles && getUseRecordPosition) { + getIdentityProjection + } else { + projectRecord(dataRequiredSchema, + AvroSchemaUtils.removeFieldsFromSchema(dataRequiredSchema, rowIndexColumn)) } - override def next(): Any = { - (skeletonFileIterator.next(), dataFileIterator.next()) match { - case (s: ColumnarBatch, d: ColumnarBatch) => - val numCols = s.numCols() + d.numCols() - val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols) - for (i <- 0 until numCols) { - if (i < s.numCols()) { - vecs(i) = s.column(i) + //Always use internal row for positional merge because + //we need to iterate row by row when merging + new CachingIterator[InternalRow] { + val combinedRow = new JoinedRow() + + //position column will always be at the end of the row + private def getPos(row: InternalRow): Long = { + row.getLong(row.numFields-1) + } + + private def getNextSkeleton: (InternalRow, Long) = { + val nextSkeletonRow = skeletonFileIterator.next().asInstanceOf[InternalRow] + (nextSkeletonRow, getPos(nextSkeletonRow)) + } + + private def getNextData: (InternalRow, Long) = { + val nextDataRow = dataFileIterator.next().asInstanceOf[InternalRow] + (nextDataRow, getPos(nextDataRow)) + } + + override def close(): Unit = { + skeletonFileIterator.close() + dataFileIterator.close() + } + + override protected def doHasNext(): Boolean = { + if (!dataFileIterator.hasNext || !skeletonFileIterator.hasNext) { + false + } else { + var nextSkeleton = getNextSkeleton + var nextData = getNextData + while (nextSkeleton._2 != nextData._2) { + if (nextSkeleton._2 > nextData._2) { + if (!dataFileIterator.hasNext) { + return false + } else { + nextData = getNextData + } } else { - vecs(i) = d.column(i - s.numCols()) + if (!skeletonFileIterator.hasNext) { + return false + } else { + nextSkeleton = getNextSkeleton + } } } - assert(s.numRows() == d.numRows()) - sparkAdapter.makeColumnarBatch(vecs, s.numRows()) - case (_: ColumnarBatch, _: InternalRow) => throw new IllegalStateException("InternalRow ColumnVector mismatch") - case (_: InternalRow, _: ColumnarBatch) => throw new IllegalStateException("InternalRow ColumnVector mismatch") - case (s: InternalRow, d: InternalRow) => combinedRow(s, d) + nextRecord = combinedRow(skeletonProjection.apply(nextSkeleton._1), dataProjection.apply(nextData._1)) + true + } } } + } else { + new ClosableIterator[Any] { + val combinedRow = new JoinedRow() - override def close(): Unit = { - skeletonFileIterator.close() - dataFileIterator.close() - } - }.asInstanceOf[ClosableIterator[InternalRow]] + override def hasNext: Boolean = { + //If the iterators are out of sync it is probably due to filter pushdown + checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext, + "Bootstrap data-file iterator and skeleton-file iterator have to be in-sync!") + dataFileIterator.hasNext && skeletonFileIterator.hasNext + } + + override def next(): Any = { + (skeletonFileIterator.next(), dataFileIterator.next()) match { Review Comment: I assume this is the original logic of key-based merging which is unchanged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org