This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-add-column
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 6295cf7d94fae0864c1b9cc1100dda4401fc6997
Author: Jark Wu <[email protected]>
AuthorDate: Sun Nov 30 21:20:15 2025 +0800

    WIP1
---
 .../apache/fluss/record/DefaultLogRecordBatch.java | 96 ++++++----------------
 1 file changed, 27 insertions(+), 69 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java 
b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
index b6ed40eeb..8bf27e10e 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
@@ -21,7 +21,6 @@ import org.apache.fluss.annotation.PublicEvolving;
 import org.apache.fluss.exception.CorruptMessageException;
 import org.apache.fluss.memory.MemorySegment;
 import org.apache.fluss.metadata.LogFormat;
-import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.ProjectedRow;
 import org.apache.fluss.row.arrow.ArrowReader;
 import org.apache.fluss.row.columnar.ColumnarRow;
@@ -34,13 +33,13 @@ import org.apache.fluss.utils.CloseableIterator;
 import org.apache.fluss.utils.MurmurHashUtils;
 import org.apache.fluss.utils.crc.Crc32C;
 
+import javax.annotation.Nullable;
+
 import java.nio.ByteBuffer;
 import java.util.NoSuchElementException;
-import java.util.function.Function;
 
 import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_OFFSET;
 import static 
org.apache.fluss.record.LogRecordBatchFormat.COMMIT_TIMESTAMP_OFFSET;
-import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH;
 import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET;
 import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
 import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD;
@@ -213,7 +212,6 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public CloseableIterator<LogRecord> records(ReadContext context) {
         if (getRecordCount() == 0) {
             return CloseableIterator.emptyIterator();
@@ -223,26 +221,18 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
         long timestamp = commitTimestamp();
         LogFormat logFormat = context.getLogFormat();
         RowType rowType = context.getRowType(schemaId);
-        Function<InternalRow, InternalRow> outputRowProcessor =
-                (row) -> {
-                    ProjectedRow projectedRow = 
context.getOutputProjectedRow(schemaId);
-                    if (projectedRow != null) {
-                        return projectedRow.replaceRow(row);
-                    } else {
-                        return row;
-                    }
-                };
 
         switch (logFormat) {
             case ARROW:
                 return columnRecordIterator(
                         rowType,
-                        outputRowProcessor,
+                        context.getOutputProjectedRow(schemaId),
                         context.getVectorSchemaRoot(schemaId),
                         context.getBufferAllocator(),
                         timestamp);
             case INDEXED:
-                return rowRecordIterator(rowType, outputRowProcessor, 
timestamp);
+                return rowRecordIterator(
+                        rowType, context.getOutputProjectedRow(schemaId), 
timestamp);
             default:
                 throw new IllegalArgumentException("Unsupported log format: " 
+ logFormat);
         }
@@ -270,9 +260,7 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
     }
 
     private CloseableIterator<LogRecord> rowRecordIterator(
-            RowType rowType,
-            Function<InternalRow, InternalRow> outputRowProcessor,
-            long timestamp) {
+            RowType rowType, @Nullable ProjectedRow outputProjection, long 
timestamp) {
         DataType[] fieldTypes = rowType.getChildren().toArray(new DataType[0]);
         return new LogRecordIterator() {
             int position = DefaultLogRecordBatch.this.position + 
recordBatchHeaderSize(magic);
@@ -281,16 +269,20 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
             @Override
             protected LogRecord readNext(long baseOffset) {
                 IndexedLogRecord logRecord =
-                        ProjectedIndexedLogRecord.readFrom(
-                                segment,
-                                position,
-                                baseOffset + rowId,
-                                timestamp,
-                                fieldTypes,
-                                outputRowProcessor);
+                        IndexedLogRecord.readFrom(
+                                segment, position, baseOffset + rowId, 
timestamp, fieldTypes);
                 rowId++;
                 position += logRecord.getSizeInBytes();
-                return logRecord;
+                if (outputProjection == null) {
+                    return logRecord;
+                } else {
+                    // apply projection
+                    return new GenericRecord(
+                            logRecord.logOffset(),
+                            logRecord.timestamp(),
+                            logRecord.getChangeType(),
+                            outputProjection.replaceRow(logRecord.getRow()));
+                }
             }
 
             @Override
@@ -305,7 +297,7 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
 
     private CloseableIterator<LogRecord> columnRecordIterator(
             RowType rowType,
-            Function<InternalRow, InternalRow> outputRowProcessor,
+            @Nullable ProjectedRow outputProjection,
             VectorSchemaRoot root,
             BufferAllocator allocator,
             long timestamp) {
@@ -319,7 +311,7 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
             ArrowReader reader =
                     ArrowUtils.createArrowReader(
                             segment, arrowOffset, arrowLength, root, 
allocator, rowType);
-            return new ArrowLogRecordIterator(reader, timestamp, 
outputRowProcessor) {
+            return new ArrowLogRecordIterator(reader, timestamp, 
outputProjection) {
                 @Override
                 protected ChangeType getChangeType(int rowId) {
                     return ChangeType.APPEND_ONLY;
@@ -337,7 +329,7 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
             ArrowReader reader =
                     ArrowUtils.createArrowReader(
                             segment, arrowOffset, arrowLength, root, 
allocator, rowType);
-            return new ArrowLogRecordIterator(reader, timestamp, 
outputRowProcessor) {
+            return new ArrowLogRecordIterator(reader, timestamp, 
outputProjection) {
                 @Override
                 protected ChangeType getChangeType(int rowId) {
                     return changeTypeVector.getChangeType(rowId);
@@ -351,15 +343,13 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
         private final ArrowReader reader;
         private final long timestamp;
         private int rowId = 0;
-        private final Function<InternalRow, InternalRow> outputRowProcessor;
+        @Nullable private final ProjectedRow outputProjection;
 
         private ArrowLogRecordIterator(
-                ArrowReader reader,
-                long timestamp,
-                Function<InternalRow, InternalRow> outputRowProcessor) {
+                ArrowReader reader, long timestamp, @Nullable ProjectedRow 
outputProjection) {
             this.reader = reader;
             this.timestamp = timestamp;
-            this.outputRowProcessor = outputRowProcessor;
+            this.outputProjection = outputProjection;
         }
 
         protected abstract ChangeType getChangeType(int rowId);
@@ -377,7 +367,9 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
                             baseOffset + rowId,
                             timestamp,
                             getChangeType(rowId),
-                            outputRowProcessor.apply(originalRow));
+                            outputProjection == null
+                                    ? originalRow
+                                    : 
outputProjection.replaceRow(originalRow));
             rowId++;
             return record;
         }
@@ -447,38 +439,4 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
             throw new UnsupportedOperationException();
         }
     }
-
-    private static class ProjectedIndexedLogRecord extends IndexedLogRecord {
-        private final Function<InternalRow, InternalRow> outputRowProcessor;
-
-        ProjectedIndexedLogRecord(
-                long logOffset,
-                long timestamp,
-                DataType[] fieldTypes,
-                Function<InternalRow, InternalRow> outputRowProcessor) {
-            super(logOffset, timestamp, fieldTypes);
-            this.outputRowProcessor = outputRowProcessor;
-        }
-
-        public static IndexedLogRecord readFrom(
-                MemorySegment segment,
-                int position,
-                long logOffset,
-                long logTimestamp,
-                DataType[] colTypes,
-                Function<InternalRow, InternalRow> outputRowProcessor) {
-            int sizeInBytes = segment.getInt(position);
-            ProjectedIndexedLogRecord logRecord =
-                    new ProjectedIndexedLogRecord(
-                            logOffset, logTimestamp, colTypes, 
outputRowProcessor);
-            logRecord.pointTo(segment, position, sizeInBytes + LENGTH_LENGTH);
-            return logRecord;
-        }
-
-        @Override
-        public InternalRow getRow() {
-            InternalRow row = super.getRow();
-            return outputRowProcessor.apply(row);
-        }
-    }
 }

Reply via email to