This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-add-column
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 5d65a20b79fbefaae07955ffc296267dbde39b43
Author: Jark Wu <[email protected]>
AuthorDate: Mon Dec 1 01:01:42 2025 +0800

    WIP2
---
 .../src/main/java/org/apache/fluss/metadata/Schema.java    | 14 ++++++--------
 .../java/org/apache/fluss/record/KvRecordReadContext.java  |  1 +
 .../java/org/apache/fluss/record/LogRecordReadContext.java |  1 +
 .../src/main/java/org/apache/fluss/row/ProjectedRow.java   |  6 +++---
 .../java/org/apache/fluss/row/encode/ValueDecoder.java     |  1 +
 .../java/org/apache/fluss/row/encode/ValueEncoder.java     |  1 +
 .../src/test/java/org/apache/fluss/record/LogTestBase.java |  1 -
 .../test/java/org/apache/fluss/row/ProjectedRowTest.java   |  1 +
 .../org/apache/fluss/flink/row/FlinkAsFlussRowTest.java    |  3 +--
 9 files changed, 15 insertions(+), 14 deletions(-)

diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
index c065a3711..0ae0c8b97 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
@@ -155,6 +155,7 @@ public final class Schema implements Serializable {
         return keyIndexes;
     }
 
+    /** Returns the highest field ID in this schema. */
     public int getHighestFieldId() {
         return highestFieldId;
     }
@@ -250,24 +251,21 @@ public final class Schema implements Serializable {
 
         /** Adopts all columns from the given list. */
         public Builder fromColumns(List<Column> inputColumns) {
-            boolean nonMatchColumnId =
+            boolean nonSetColumnId =
                     inputColumns.stream()
                             .noneMatch(column -> column.columnId != 
Column.UNKNOWN_COLUMN_ID);
-            boolean allMatchColumnId =
+            boolean allSetColumnId =
                     inputColumns.stream()
                             .allMatch(column -> column.columnId != 
Column.UNKNOWN_COLUMN_ID);
             checkState(
-                    nonMatchColumnId || allMatchColumnId,
+                    nonSetColumnId || allSetColumnId,
                     "All columns must have columnId or none of them must have 
columnId.");
 
-            if (allMatchColumnId) {
+            if (allSetColumnId) {
                 columns.addAll(inputColumns);
                 highestFieldId =
                         new AtomicInteger(
-                                inputColumns.stream()
-                                        .mapToInt(Column::getColumnId)
-                                        .max()
-                                        .orElse(-1));
+                                
columns.stream().mapToInt(Column::getColumnId).max().orElse(-1));
             } else {
                 // if all columnId is not set, this maybe from old version 
schema. Just use its
                 // position as columnId.
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/KvRecordReadContext.java 
b/fluss-common/src/main/java/org/apache/fluss/record/KvRecordReadContext.java
index 5fffc3f4f..195312cef 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/KvRecordReadContext.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/KvRecordReadContext.java
@@ -31,6 +31,7 @@ public class KvRecordReadContext implements 
KvRecordBatch.ReadContext {
 
     private final KvFormat kvFormat;
     private final SchemaGetter schemaGetter;
+    // TODO reuse
     private final Map<Integer, RowDecoder> rowDecoderCache;
 
     private KvRecordReadContext(KvFormat kvFormat, SchemaGetter schemaGetter) {
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
index 571e75221..5ec5c7c77 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
@@ -244,6 +244,7 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
         if (isSameRowType(schemaId)) {
             return null;
         }
+        // TODO: should we cache the projection?
         Schema originSchema = schemaGetter.getSchema(schemaId);
         Schema expectedSchema = schemaGetter.getSchema(targetSchemaId);
         return ProjectedRow.from(originSchema, expectedSchema);
diff --git a/fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java 
b/fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java
index 6b4c516d1..d793f342a 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java
@@ -36,11 +36,11 @@ import static 
org.apache.fluss.utils.SchemaUtil.getIndexMapping;
 public class ProjectedRow implements InternalRow {
     public static final int UNEXIST_MAPPING = -1;
 
-    protected final int[] indexMapping;
+    private final int[] indexMapping;
 
-    protected InternalRow row;
+    private InternalRow row;
 
-    protected ProjectedRow(int[] indexMapping) {
+    private ProjectedRow(int[] indexMapping) {
         this.indexMapping = indexMapping;
     }
 
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java 
b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java
index e0252ae11..5f90ddaef 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java
@@ -36,6 +36,7 @@ import static 
org.apache.fluss.utils.MapUtils.newConcurrentHashMap;
  */
 public class ValueDecoder {
 
+    // TODO: reuse?
     private final Map<Short, RowDecoder> rowDecoders;
     private final SchemaGetter schemaGetter;
     private final KvFormat kvFormat;
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueEncoder.java 
b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueEncoder.java
index 415fac0c2..74eb10094 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueEncoder.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueEncoder.java
@@ -42,6 +42,7 @@ public class ValueEncoder {
         return values;
     }
 
+    // TODO: ???
     public static byte[] encodeRow(
             short schemaId, KvFormat kvFormat, RowType currentRowType, 
InternalRow row)
             throws Exception {
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/record/LogTestBase.java 
b/fluss-common/src/test/java/org/apache/fluss/record/LogTestBase.java
index 108ab2acf..36aef5b9d 100644
--- a/fluss-common/src/test/java/org/apache/fluss/record/LogTestBase.java
+++ b/fluss-common/src/test/java/org/apache/fluss/record/LogTestBase.java
@@ -72,7 +72,6 @@ public abstract class LogTestBase {
         assertLogRecordsListEquals(expected, actual, baseRowType);
     }
 
-    // todo: 在这个基础上可以加更多最简单的schema evolution测试,非常好
     protected void assertIndexedLogRecordBatchAndRowEquals(
             LogRecordBatch actual,
             LogRecordBatch expected,
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/ProjectedRowTest.java 
b/fluss-common/src/test/java/org/apache/fluss/row/ProjectedRowTest.java
index 09467d31a..80b21a719 100644
--- a/fluss-common/src/test/java/org/apache/fluss/row/ProjectedRowTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/row/ProjectedRowTest.java
@@ -183,6 +183,7 @@ class ProjectedRowTest {
                 .isExactlyInstanceOf(SchemaChangeException.class)
                 .hasMessage(
                         "Expected datatype of column(id=0,name=a) is [INT], 
while the actual datatype is [BIGINT]");
+
         assertThatThrownBy(
                         () ->
                                 ProjectedRow.from(
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/row/FlinkAsFlussRowTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/row/FlinkAsFlussRowTest.java
index 1f147751a..73ba044d8 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/row/FlinkAsFlussRowTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/row/FlinkAsFlussRowTest.java
@@ -31,7 +31,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.time.Instant;
-import java.util.stream.IntStream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.AssertionsForClassTypes.offset;
@@ -60,7 +59,7 @@ class FlinkAsFlussRowTest {
                         TimestampData.fromEpochMillis(1672531200000L, 3),
                         new byte[] {1, 2, 3},
                         null);
-        row = new FlinkAsFlussRow(IntStream.range(0, 
14).toArray()).replace(flinkRow);
+        row = new FlinkAsFlussRow().replace(flinkRow);
     }
 
     @Test

Reply via email to