This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-add-column
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 78eedf86ca9d26a1fd3a9a04e9dedadb6de5c670
Author: Jark Wu <[email protected]>
AuthorDate: Mon Dec 1 00:18:56 2025 +0800

    WIP
---
 .../apache/fluss/record/LogRecordReadContext.java  | 56 +++++++++-------------
 1 file changed, 23 insertions(+), 33 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
index 50ec7ee66..571e75221 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
@@ -31,18 +31,20 @@ import 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.RowType;
 import org.apache.fluss.utils.ArrowUtils;
+import org.apache.fluss.utils.MapUtils;
 import org.apache.fluss.utils.Projection;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
 
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.IntStream;
 
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
 
 /** A simple implementation for {@link LogRecordBatch.ReadContext}. */
+@ThreadSafe
 public class LogRecordReadContext implements LogRecordBatch.ReadContext, 
AutoCloseable {
 
     // the log format of the table
@@ -50,9 +52,7 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
     // the schema of the date read form server or remote. (which is projected 
in the server side)
     private final RowType dataRowType;
     // the static schemaId of the table, should support dynamic schema 
evolution in the future
-    private final int expectedSchemaId;
-    // the Arrow vector schema root of the table, should be null if not ARROW 
log format
-    @Nullable private final VectorSchemaRoot vectorSchemaRoot;
+    private final int targetSchemaId;
     // the Arrow memory buffer allocator for the table, should be null if not 
ARROW log format
     @Nullable private final BufferAllocator bufferAllocator;
     // the final selected fields of the read data
@@ -60,7 +60,8 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
     // whether the projection is push downed to the server side and the 
returned data is pruned.
     private final boolean projectionPushDowned;
     private final SchemaGetter schemaGetter;
-    private final Map<Integer, VectorSchemaRoot> vectorSchemaRootMap;
+    private final ConcurrentHashMap<Integer, VectorSchemaRoot> 
vectorSchemaRootMap =
+            MapUtils.newConcurrentHashMap();
 
     public static LogRecordReadContext createReadContext(
             TableInfo tableInfo,
@@ -115,14 +116,11 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
             SchemaGetter schemaGetter) {
         // TODO: use a more reasonable memory limit
         BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
-        VectorSchemaRoot vectorRoot =
-                VectorSchemaRoot.create(ArrowUtils.toArrowSchema(dataRowType), 
allocator);
         FieldGetter[] fieldGetters = buildProjectedFieldGetters(dataRowType, 
selectedFields);
         return new LogRecordReadContext(
                 LogFormat.ARROW,
                 dataRowType,
                 schemaId,
-                vectorRoot,
                 allocator,
                 fieldGetters,
                 projectionPushDowned,
@@ -130,10 +128,12 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
     }
 
     /**
-     * Creates a LogRecordReadContext for INDEXED log format.
+     * Creates a LogRecordReadContext for ARROW log format, that underlying 
Arrow resources are not
+     * reused.
      *
      * @param rowType the schema of the table
      * @param schemaId the schemaId of the table
+     * @param schemaGetter the schema getter of to get schema by schemaId
      */
     @VisibleForTesting
     public static LogRecordReadContext createArrowReadContext(
@@ -142,6 +142,13 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
         return createArrowReadContext(rowType, schemaId, selectedFields, 
false, schemaGetter);
     }
 
+    /**
+     * Creates a LogRecordReadContext for INDEXED log format.
+     *
+     * @param rowType the schema of the table
+     * @param schemaId the schemaId of the table
+     * @param schemaGetter the schema getter of to get schema by schemaId
+     */
     @VisibleForTesting
     public static LogRecordReadContext createIndexedReadContext(
             RowType rowType, int schemaId, SchemaGetter schemaGetter) {
@@ -155,40 +162,31 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
      * @param rowType the schema of the read data
      * @param schemaId the schemaId of the table
      * @param selectedFields the final selected fields of the read data
+     * @param schemaGetter the schema getter of to get schema by schemaId
      */
     public static LogRecordReadContext createIndexedReadContext(
             RowType rowType, int schemaId, int[] selectedFields, SchemaGetter 
schemaGetter) {
         FieldGetter[] fieldGetters = buildProjectedFieldGetters(rowType, 
selectedFields);
         // for INDEXED log format, the projection is NEVER push downed to the 
server side
         return new LogRecordReadContext(
-                LogFormat.INDEXED,
-                rowType,
-                schemaId,
-                null,
-                null,
-                fieldGetters,
-                false,
-                schemaGetter);
+                LogFormat.INDEXED, rowType, schemaId, null, fieldGetters, 
false, schemaGetter);
     }
 
     private LogRecordReadContext(
             LogFormat logFormat,
             RowType targetDataRowType,
-            int expectedSchemaId,
-            VectorSchemaRoot vectorSchemaRoot,
+            int targetSchemaId,
             BufferAllocator bufferAllocator,
             FieldGetter[] selectedFieldGetters,
             boolean projectionPushDowned,
             SchemaGetter schemaGetter) {
         this.logFormat = logFormat;
         this.dataRowType = targetDataRowType;
-        this.expectedSchemaId = expectedSchemaId;
-        this.vectorSchemaRoot = vectorSchemaRoot;
+        this.targetSchemaId = targetSchemaId;
         this.bufferAllocator = bufferAllocator;
         this.selectedFieldGetters = selectedFieldGetters;
         this.projectionPushDowned = projectionPushDowned;
         this.schemaGetter = schemaGetter;
-        this.vectorSchemaRootMap = new HashMap<>();
     }
 
     @Override
@@ -223,11 +221,6 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
                     "Only Arrow log format provides vector schema root.");
         }
 
-        if (schemaId == this.expectedSchemaId) {
-            checkNotNull(vectorSchemaRoot, "The vector schema root is not 
available.");
-            return vectorSchemaRoot;
-        }
-
         RowType rowType = getRowType(schemaId);
         return vectorSchemaRootMap.computeIfAbsent(
                 schemaId,
@@ -252,14 +245,11 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
             return null;
         }
         Schema originSchema = schemaGetter.getSchema(schemaId);
-        Schema expectedSchema = schemaGetter.getSchema(expectedSchemaId);
+        Schema expectedSchema = schemaGetter.getSchema(targetSchemaId);
         return ProjectedRow.from(originSchema, expectedSchema);
     }
 
     public void close() {
-        if (vectorSchemaRoot != null) {
-            vectorSchemaRoot.close();
-        }
         vectorSchemaRootMap.values().forEach(VectorSchemaRoot::close);
         if (bufferAllocator != null) {
             bufferAllocator.close();
@@ -267,7 +257,7 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
     }
 
     private boolean isSameRowType(int schemaId) {
-        return expectedSchemaId == schemaId || isProjectionPushDowned();
+        return targetSchemaId == schemaId || isProjectionPushDowned();
     }
 
     private static FieldGetter[] buildProjectedFieldGetters(RowType rowType, 
int[] selectedFields) {

Reply via email to