This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch ci-add-column in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 78eedf86ca9d26a1fd3a9a04e9dedadb6de5c670 Author: Jark Wu <[email protected]> AuthorDate: Mon Dec 1 00:18:56 2025 +0800 WIP --- .../apache/fluss/record/LogRecordReadContext.java | 56 +++++++++------------- 1 file changed, 23 insertions(+), 33 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java index 50ec7ee66..571e75221 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java @@ -31,18 +31,20 @@ import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot; import org.apache.fluss.types.DataType; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.ArrowUtils; +import org.apache.fluss.utils.MapUtils; import org.apache.fluss.utils.Projection; import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.IntStream; import static org.apache.fluss.utils.Preconditions.checkNotNull; /** A simple implementation for {@link LogRecordBatch.ReadContext}. */ +@ThreadSafe public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoCloseable { // the log format of the table @@ -50,9 +52,7 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo // the schema of the date read form server or remote. (which is projected in the server side) private final RowType dataRowType; // the static schemaId of the table, should support dynamic schema evolution in the future - private final int expectedSchemaId; - // the Arrow vector schema root of the table, should be null if not ARROW log format - @Nullable private final VectorSchemaRoot vectorSchemaRoot; + private final int targetSchemaId; // the Arrow memory buffer allocator for the table, should be null if not ARROW log format @Nullable private final BufferAllocator bufferAllocator; // the final selected fields of the read data @@ -60,7 +60,8 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo // whether the projection is push downed to the server side and the returned data is pruned. private final boolean projectionPushDowned; private final SchemaGetter schemaGetter; - private final Map<Integer, VectorSchemaRoot> vectorSchemaRootMap; + private final ConcurrentHashMap<Integer, VectorSchemaRoot> vectorSchemaRootMap = + MapUtils.newConcurrentHashMap(); public static LogRecordReadContext createReadContext( TableInfo tableInfo, @@ -115,14 +116,11 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo SchemaGetter schemaGetter) { // TODO: use a more reasonable memory limit BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); - VectorSchemaRoot vectorRoot = - VectorSchemaRoot.create(ArrowUtils.toArrowSchema(dataRowType), allocator); FieldGetter[] fieldGetters = buildProjectedFieldGetters(dataRowType, selectedFields); return new LogRecordReadContext( LogFormat.ARROW, dataRowType, schemaId, - vectorRoot, allocator, fieldGetters, projectionPushDowned, @@ -130,10 +128,12 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo } /** - * Creates a LogRecordReadContext for INDEXED log format. + * Creates a LogRecordReadContext for ARROW log format, that underlying Arrow resources are not + * reused. * * @param rowType the schema of the table * @param schemaId the schemaId of the table + * @param schemaGetter the schema getter of to get schema by schemaId */ @VisibleForTesting public static LogRecordReadContext createArrowReadContext( @@ -142,6 +142,13 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo return createArrowReadContext(rowType, schemaId, selectedFields, false, schemaGetter); } + /** + * Creates a LogRecordReadContext for INDEXED log format. + * + * @param rowType the schema of the table + * @param schemaId the schemaId of the table + * @param schemaGetter the schema getter of to get schema by schemaId + */ @VisibleForTesting public static LogRecordReadContext createIndexedReadContext( RowType rowType, int schemaId, SchemaGetter schemaGetter) { @@ -155,40 +162,31 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo * @param rowType the schema of the read data * @param schemaId the schemaId of the table * @param selectedFields the final selected fields of the read data + * @param schemaGetter the schema getter of to get schema by schemaId */ public static LogRecordReadContext createIndexedReadContext( RowType rowType, int schemaId, int[] selectedFields, SchemaGetter schemaGetter) { FieldGetter[] fieldGetters = buildProjectedFieldGetters(rowType, selectedFields); // for INDEXED log format, the projection is NEVER push downed to the server side return new LogRecordReadContext( - LogFormat.INDEXED, - rowType, - schemaId, - null, - null, - fieldGetters, - false, - schemaGetter); + LogFormat.INDEXED, rowType, schemaId, null, fieldGetters, false, schemaGetter); } private LogRecordReadContext( LogFormat logFormat, RowType targetDataRowType, - int expectedSchemaId, - VectorSchemaRoot vectorSchemaRoot, + int targetSchemaId, BufferAllocator bufferAllocator, FieldGetter[] selectedFieldGetters, boolean projectionPushDowned, SchemaGetter schemaGetter) { this.logFormat = logFormat; this.dataRowType = targetDataRowType; - this.expectedSchemaId = expectedSchemaId; - this.vectorSchemaRoot = vectorSchemaRoot; + this.targetSchemaId = targetSchemaId; this.bufferAllocator = bufferAllocator; this.selectedFieldGetters = selectedFieldGetters; this.projectionPushDowned = projectionPushDowned; this.schemaGetter = schemaGetter; - this.vectorSchemaRootMap = new HashMap<>(); } @Override @@ -223,11 +221,6 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo "Only Arrow log format provides vector schema root."); } - if (schemaId == this.expectedSchemaId) { - checkNotNull(vectorSchemaRoot, "The vector schema root is not available."); - return vectorSchemaRoot; - } - RowType rowType = getRowType(schemaId); return vectorSchemaRootMap.computeIfAbsent( schemaId, @@ -252,14 +245,11 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo return null; } Schema originSchema = schemaGetter.getSchema(schemaId); - Schema expectedSchema = schemaGetter.getSchema(expectedSchemaId); + Schema expectedSchema = schemaGetter.getSchema(targetSchemaId); return ProjectedRow.from(originSchema, expectedSchema); } public void close() { - if (vectorSchemaRoot != null) { - vectorSchemaRoot.close(); - } vectorSchemaRootMap.values().forEach(VectorSchemaRoot::close); if (bufferAllocator != null) { bufferAllocator.close(); @@ -267,7 +257,7 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo } private boolean isSameRowType(int schemaId) { - return expectedSchemaId == schemaId || isProjectionPushDowned(); + return targetSchemaId == schemaId || isProjectionPushDowned(); } private static FieldGetter[] buildProjectedFieldGetters(RowType rowType, int[] selectedFields) {
