This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch ci-add-column in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 6295cf7d94fae0864c1b9cc1100dda4401fc6997 Author: Jark Wu <[email protected]> AuthorDate: Sun Nov 30 21:20:15 2025 +0800 WIP1 --- .../apache/fluss/record/DefaultLogRecordBatch.java | 96 ++++++---------------- 1 file changed, 27 insertions(+), 69 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java index b6ed40eeb..8bf27e10e 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java @@ -21,7 +21,6 @@ import org.apache.fluss.annotation.PublicEvolving; import org.apache.fluss.exception.CorruptMessageException; import org.apache.fluss.memory.MemorySegment; import org.apache.fluss.metadata.LogFormat; -import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.ProjectedRow; import org.apache.fluss.row.arrow.ArrowReader; import org.apache.fluss.row.columnar.ColumnarRow; @@ -34,13 +33,13 @@ import org.apache.fluss.utils.CloseableIterator; import org.apache.fluss.utils.MurmurHashUtils; import org.apache.fluss.utils.crc.Crc32C; +import javax.annotation.Nullable; + import java.nio.ByteBuffer; import java.util.NoSuchElementException; -import java.util.function.Function; import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_OFFSET; import static org.apache.fluss.record.LogRecordBatchFormat.COMMIT_TIMESTAMP_OFFSET; -import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH; import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET; import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1; import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD; @@ -213,7 +212,6 @@ public class DefaultLogRecordBatch implements LogRecordBatch { } @Override - @SuppressWarnings("unchecked") public CloseableIterator<LogRecord> records(ReadContext context) { if (getRecordCount() == 0) { return CloseableIterator.emptyIterator(); @@ -223,26 +221,18 @@ public class DefaultLogRecordBatch implements LogRecordBatch { long timestamp = commitTimestamp(); LogFormat logFormat = context.getLogFormat(); RowType rowType = context.getRowType(schemaId); - Function<InternalRow, InternalRow> outputRowProcessor = - (row) -> { - ProjectedRow projectedRow = context.getOutputProjectedRow(schemaId); - if (projectedRow != null) { - return projectedRow.replaceRow(row); - } else { - return row; - } - }; switch (logFormat) { case ARROW: return columnRecordIterator( rowType, - outputRowProcessor, + context.getOutputProjectedRow(schemaId), context.getVectorSchemaRoot(schemaId), context.getBufferAllocator(), timestamp); case INDEXED: - return rowRecordIterator(rowType, outputRowProcessor, timestamp); + return rowRecordIterator( + rowType, context.getOutputProjectedRow(schemaId), timestamp); default: throw new IllegalArgumentException("Unsupported log format: " + logFormat); } @@ -270,9 +260,7 @@ public class DefaultLogRecordBatch implements LogRecordBatch { } private CloseableIterator<LogRecord> rowRecordIterator( - RowType rowType, - Function<InternalRow, InternalRow> outputRowProcessor, - long timestamp) { + RowType rowType, @Nullable ProjectedRow outputProjection, long timestamp) { DataType[] fieldTypes = rowType.getChildren().toArray(new DataType[0]); return new LogRecordIterator() { int position = DefaultLogRecordBatch.this.position + recordBatchHeaderSize(magic); @@ -281,16 +269,20 @@ public class DefaultLogRecordBatch implements LogRecordBatch { @Override protected LogRecord readNext(long baseOffset) { IndexedLogRecord logRecord = - ProjectedIndexedLogRecord.readFrom( - segment, - position, - baseOffset + rowId, - timestamp, - fieldTypes, - outputRowProcessor); + IndexedLogRecord.readFrom( + segment, position, baseOffset + rowId, timestamp, fieldTypes); rowId++; position += logRecord.getSizeInBytes(); - return logRecord; + if (outputProjection == null) { + return logRecord; + } else { + // apply projection + return new GenericRecord( + logRecord.logOffset(), + logRecord.timestamp(), + logRecord.getChangeType(), + outputProjection.replaceRow(logRecord.getRow())); + } } @Override @@ -305,7 +297,7 @@ public class DefaultLogRecordBatch implements LogRecordBatch { private CloseableIterator<LogRecord> columnRecordIterator( RowType rowType, - Function<InternalRow, InternalRow> outputRowProcessor, + @Nullable ProjectedRow outputProjection, VectorSchemaRoot root, BufferAllocator allocator, long timestamp) { @@ -319,7 +311,7 @@ public class DefaultLogRecordBatch implements LogRecordBatch { ArrowReader reader = ArrowUtils.createArrowReader( segment, arrowOffset, arrowLength, root, allocator, rowType); - return new ArrowLogRecordIterator(reader, timestamp, outputRowProcessor) { + return new ArrowLogRecordIterator(reader, timestamp, outputProjection) { @Override protected ChangeType getChangeType(int rowId) { return ChangeType.APPEND_ONLY; @@ -337,7 +329,7 @@ public class DefaultLogRecordBatch implements LogRecordBatch { ArrowReader reader = ArrowUtils.createArrowReader( segment, arrowOffset, arrowLength, root, allocator, rowType); - return new ArrowLogRecordIterator(reader, timestamp, outputRowProcessor) { + return new ArrowLogRecordIterator(reader, timestamp, outputProjection) { @Override protected ChangeType getChangeType(int rowId) { return changeTypeVector.getChangeType(rowId); @@ -351,15 +343,13 @@ public class DefaultLogRecordBatch implements LogRecordBatch { private final ArrowReader reader; private final long timestamp; private int rowId = 0; - private final Function<InternalRow, InternalRow> outputRowProcessor; + @Nullable private final ProjectedRow outputProjection; private ArrowLogRecordIterator( - ArrowReader reader, - long timestamp, - Function<InternalRow, InternalRow> outputRowProcessor) { + ArrowReader reader, long timestamp, @Nullable ProjectedRow outputProjection) { this.reader = reader; this.timestamp = timestamp; - this.outputRowProcessor = outputRowProcessor; + this.outputProjection = outputProjection; } protected abstract ChangeType getChangeType(int rowId); @@ -377,7 +367,9 @@ public class DefaultLogRecordBatch implements LogRecordBatch { baseOffset + rowId, timestamp, getChangeType(rowId), - outputRowProcessor.apply(originalRow)); + outputProjection == null + ? originalRow + : outputProjection.replaceRow(originalRow)); rowId++; return record; } @@ -447,38 +439,4 @@ public class DefaultLogRecordBatch implements LogRecordBatch { throw new UnsupportedOperationException(); } } - - private static class ProjectedIndexedLogRecord extends IndexedLogRecord { - private final Function<InternalRow, InternalRow> outputRowProcessor; - - ProjectedIndexedLogRecord( - long logOffset, - long timestamp, - DataType[] fieldTypes, - Function<InternalRow, InternalRow> outputRowProcessor) { - super(logOffset, timestamp, fieldTypes); - this.outputRowProcessor = outputRowProcessor; - } - - public static IndexedLogRecord readFrom( - MemorySegment segment, - int position, - long logOffset, - long logTimestamp, - DataType[] colTypes, - Function<InternalRow, InternalRow> outputRowProcessor) { - int sizeInBytes = segment.getInt(position); - ProjectedIndexedLogRecord logRecord = - new ProjectedIndexedLogRecord( - logOffset, logTimestamp, colTypes, outputRowProcessor); - logRecord.pointTo(segment, position, sizeInBytes + LENGTH_LENGTH); - return logRecord; - } - - @Override - public InternalRow getRow() { - InternalRow row = super.getRow(); - return outputRowProcessor.apply(row); - } - } }
