This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-add-column
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/ci-add-column by this push:
     new 28d4b24a5 fix lookup
28d4b24a5 is described below

commit 28d4b24a5144ad065cde22e07ec823d9fcd7a68d
Author: Jark Wu <[email protected]>
AuthorDate: Mon Dec 1 15:48:32 2025 +0800

    fix lookup
---
 .../fluss/client/lookup/AbstractLookuper.java      |  2 +-
 .../fluss/row/decode/FixedSchemaDecoder.java       |  2 +-
 .../serializer/RowDataSerializationSchema.java     |  3 +-
 .../source/lookup/FlinkAsyncLookupFunction.java    | 29 ++++++++++++-----
 .../flink/source/lookup/FlinkLookupFunction.java   | 27 +++++++++++----
 .../flink/utils/FlussRowToFlinkRowConverter.java   | 38 +++++-----------------
 .../source/lookup/FlinkLookupFunctionTest.java     | 12 ++++---
 7 files changed, 62 insertions(+), 51 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookuper.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookuper.java
index 4d2622f3c..6cd93adf7 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookuper.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookuper.java
@@ -121,7 +121,7 @@ abstract class AbstractLookuper implements Lookuper {
                     if (error != null) {
                         lookupFuture.completeExceptionally(
                                 new RuntimeException(
-                                        "Failed to get schema infos for prefix 
lookup", error));
+                                        "Failed to get schema infos for 
lookup", error));
                     } else {
                         LookupResult lookupResult = 
processSchemaRequestedRows(schemas, valueList);
                         lookupFuture.complete(lookupResult);
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/decode/FixedSchemaDecoder.java
 
b/fluss-common/src/main/java/org/apache/fluss/row/decode/FixedSchemaDecoder.java
index b5321f643..d809a095f 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/row/decode/FixedSchemaDecoder.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/decode/FixedSchemaDecoder.java
@@ -51,7 +51,7 @@ public class FixedSchemaDecoder {
     public FixedSchemaDecoder(KvFormat kvFormat, Schema sourceSchema, Schema 
targetSchema) {
         this.rowDecoder =
                 RowDecoder.create(
-                        kvFormat, 
targetSchema.getRowType().getChildren().toArray(new DataType[0]));
+                        kvFormat, 
sourceSchema.getRowType().getChildren().toArray(new DataType[0]));
         this.fieldIdMapping = SchemaUtil.getIndexMapping(sourceSchema, 
targetSchema);
         this.noProjection = false;
     }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
index 5271a50be..1a46ec0bc 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
@@ -56,7 +56,8 @@ import java.util.List;
  *   <li>The {@link #converter} is used to wrap Flink {@link RowData} into 
Fluss {@link InternalRow}
  *       without any schema transformation.
  *   <li>The {@link #outputPadding} is used to pad nulls for new columns when 
new columns are added.
- *       This may happen when table is added new columns before Flink job 
compilation, so the
+ *       This may happen when table is added new columns after Flink SQL job 
restored from
+ *       CompiledPlan that the Consumed Row Schema is old but the Plan Row 
Schema is new, so the
  *       Consumed Row Schema has less fields than Plan Row Schema.
  *   <li>The {@link #outputProjection} is used to re-arrange the fields 
according to latest schema
  *       if Plan Row Schema is not match Table Latest Row Schema. This may 
happen when table is
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
index 6e4647f80..82fd1bbc3 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
@@ -27,10 +27,12 @@ import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.flink.row.FlinkAsFlussRow;
 import org.apache.fluss.flink.source.lookup.LookupNormalizer.RemainingFilter;
+import org.apache.fluss.flink.utils.FlinkConversions;
 import org.apache.fluss.flink.utils.FlinkUtils;
 import org.apache.fluss.flink.utils.FlussRowToFlinkRowConverter;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.ProjectedRow;
 import org.apache.fluss.utils.ExceptionUtils;
 
 import org.apache.flink.table.data.RowData;
@@ -47,6 +49,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
 
 /** A flink async lookup function for fluss. */
 public class FlinkAsyncLookupFunction extends AsyncLookupFunction {
@@ -59,7 +62,7 @@ public class FlinkAsyncLookupFunction extends 
AsyncLookupFunction {
     private final TablePath tablePath;
     private final RowType flinkRowType;
     private final LookupNormalizer lookupNormalizer;
-    @Nullable private final int[] projection;
+    @Nullable private int[] projection;
 
     private transient FlussRowToFlinkRowConverter flussRowToFlinkRowConverter;
     private transient Connection connection;
@@ -85,20 +88,22 @@ public class FlinkAsyncLookupFunction extends 
AsyncLookupFunction {
         LOG.info("start open ...");
         connection = ConnectionFactory.createConnection(flussConfig);
         table = connection.getTable(tablePath);
-        org.apache.fluss.types.RowType flussRowType = 
table.getTableInfo().getRowType();
-        LOG.info("Current Fluss Schema is {}, Flink RowType is {}", 
flussRowType, flinkRowType);
-        // Currently, only primary key and prefix lookup are supported. These 
keys must be primary
-        // key which cannot modified.
         lookupRow = new FlinkAsFlussRow();
 
         final RowType outputRowType;
         if (projection == null) {
             outputRowType = flinkRowType;
+            // we force to do projection if no projection pushdown, in order 
to handle schema
+            // changes (ADD COLUMN LAST), this guarantees the input row of
+            // flussRowToFlinkRowConverter is in expected schema even new 
columns are added.
+            projection = IntStream.range(0, 
flinkRowType.getFieldCount()).toArray();
         } else {
             outputRowType = FlinkUtils.projectRowType(flinkRowType, 
projection);
         }
-
-        flussRowToFlinkRowConverter = new 
FlussRowToFlinkRowConverter(flussRowType, outputRowType);
+        // TODO: currently, we assume only ADD COLUMN LAST schema changes, so 
the projection
+        //  positions can still work even after such changes.
+        flussRowToFlinkRowConverter =
+                new 
FlussRowToFlinkRowConverter(FlinkConversions.toFlussRowType(outputRowType));
 
         Lookup lookup = table.newLookup();
         if (lookupNormalizer.getLookupType() == LookupType.PREFIX_LOOKUP) {
@@ -164,7 +169,7 @@ public class FlinkAsyncLookupFunction extends 
AsyncLookupFunction {
         List<RowData> projectedRow = new ArrayList<>();
         for (InternalRow row : lookupResult) {
             if (row != null) {
-                RowData flinkRow = 
flussRowToFlinkRowConverter.toFlinkRowData(row);
+                RowData flinkRow = 
flussRowToFlinkRowConverter.toFlinkRowData(maybeProject(row));
                 if (remainingFilter == null || 
remainingFilter.isMatch(flinkRow)) {
                     projectedRow.add(flinkRow);
                 }
@@ -173,6 +178,14 @@ public class FlinkAsyncLookupFunction extends 
AsyncLookupFunction {
         resultFuture.complete(projectedRow);
     }
 
+    private InternalRow maybeProject(InternalRow row) {
+        if (projection == null) {
+            return row;
+        }
+        // should not reuse objects for async operations
+        return ProjectedRow.from(projection).replaceRow(row);
+    }
+
     @Override
     public void close() throws Exception {
         LOG.info("start close ...");
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
index e9a5ad915..477666737 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
@@ -25,6 +25,7 @@ import org.apache.fluss.client.lookup.Lookuper;
 import org.apache.fluss.client.table.Table;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.row.FlinkAsFlussRow;
+import org.apache.fluss.flink.utils.FlinkConversions;
 import org.apache.fluss.flink.utils.FlinkUtils;
 import org.apache.fluss.flink.utils.FlussRowToFlinkRowConverter;
 import org.apache.fluss.metadata.TablePath;
@@ -44,6 +45,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.IntStream;
 
 /** A flink lookup function for fluss. */
 public class FlinkLookupFunction extends LookupFunction {
@@ -82,22 +84,25 @@ public class FlinkLookupFunction extends LookupFunction {
         LOG.info("start open ...");
         connection = ConnectionFactory.createConnection(flussConfig);
         table = connection.getTable(tablePath);
-        org.apache.fluss.types.RowType flussRowType = 
table.getTableInfo().getRowType();
-        LOG.info("Current Fluss Schema is {}, Flink RowType is {}", 
flussRowType, flinkRowType);
-        // Currently, only primary key and prefix lookup are supported. These 
keys must be primary
-        // key which cannot modified.
         lookupRow = new FlinkAsFlussRow();
 
         final RowType outputRowType;
         if (projection == null) {
             outputRowType = flinkRowType;
-            projectedRow = null;
+            // we force to do projection if no projection pushdown, in order 
to handle schema
+            // changes (ADD COLUMN LAST), this guarantees the input row of
+            // flussRowToFlinkRowConverter is in expected schema even new 
columns are added.
+            projectedRow =
+                    ProjectedRow.from(IntStream.range(0, 
flinkRowType.getFieldCount()).toArray());
         } else {
             outputRowType = FlinkUtils.projectRowType(flinkRowType, 
projection);
             // reuse the projected row
             projectedRow = ProjectedRow.from(projection);
         }
-        flussRowToFlinkRowConverter = new 
FlussRowToFlinkRowConverter(flussRowType, outputRowType);
+        // TODO: currently, we assume only ADD COLUMN LAST schema changes, so 
the projection
+        //  positions can still work even after such changes.
+        flussRowToFlinkRowConverter =
+                new 
FlussRowToFlinkRowConverter(FlinkConversions.toFlussRowType(outputRowType));
 
         Lookup lookup = table.newLookup();
         if (lookupNormalizer.getLookupType() == LookupType.PREFIX_LOOKUP) {
@@ -133,7 +138,8 @@ public class FlinkLookupFunction extends LookupFunction {
             List<RowData> projectedRows = new ArrayList<>();
             for (InternalRow row : lookupRows) {
                 if (row != null) {
-                    RowData flinkRow = 
flussRowToFlinkRowConverter.toFlinkRowData(row);
+                    RowData flinkRow =
+                            
flussRowToFlinkRowConverter.toFlinkRowData(maybeProject(row));
                     if (remainingFilter == null || 
remainingFilter.isMatch(flinkRow)) {
                         projectedRows.add(flinkRow);
                     }
@@ -146,6 +152,13 @@ public class FlinkLookupFunction extends LookupFunction {
         }
     }
 
+    private InternalRow maybeProject(InternalRow row) {
+        if (projectedRow == null) {
+            return row;
+        }
+        return projectedRow.replaceRow(row);
+    }
+
     @Override
     public void close() throws Exception {
         LOG.info("start close ...");
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java
index 0846b284b..e52a83a82 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java
@@ -39,31 +39,18 @@ import org.apache.flink.types.RowKind;
 import java.io.Serializable;
 
 import static org.apache.fluss.flink.utils.FlinkConversions.toFlinkRowKind;
-import static org.apache.fluss.flink.utils.FlinkConversions.toFlinkRowType;
 
 /** A converter to convert Fluss's {@link InternalRow} to Flink's {@link 
RowData}. */
 public class FlussRowToFlinkRowConverter {
     private final FlussDeserializationConverter[] toFlinkFieldConverters;
     private final InternalRow.FieldGetter[] flussFieldGetters;
-    private final int[] indexMapping;
 
     public FlussRowToFlinkRowConverter(RowType rowType) {
-        this(rowType, toFlinkRowType(rowType));
-    }
-
-    public FlussRowToFlinkRowConverter(
-            RowType sourceFlussRowType,
-            org.apache.flink.table.types.logical.RowType targetFlinkRowType) {
-        this.indexMapping =
-                
FlinkConversions.toFlinkRowTypeIndexMapping(sourceFlussRowType, 
targetFlinkRowType);
-        this.toFlinkFieldConverters =
-                new 
FlussDeserializationConverter[sourceFlussRowType.getFieldCount()];
-        this.flussFieldGetters = new 
InternalRow.FieldGetter[sourceFlussRowType.getFieldCount()];
-        for (int i = 0; i < sourceFlussRowType.getFieldCount(); i++) {
-            toFlinkFieldConverters[i] =
-                    
createNullableInternalConverter(sourceFlussRowType.getTypeAt(i));
-            flussFieldGetters[i] =
-                    
InternalRow.createFieldGetter(sourceFlussRowType.getTypeAt(i), i);
+        this.toFlinkFieldConverters = new 
FlussDeserializationConverter[rowType.getFieldCount()];
+        this.flussFieldGetters = new 
InternalRow.FieldGetter[rowType.getFieldCount()];
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            toFlinkFieldConverters[i] = 
createNullableInternalConverter(rowType.getTypeAt(i));
+            flussFieldGetters[i] = 
InternalRow.createFieldGetter(rowType.getTypeAt(i), i);
         }
     }
 
@@ -76,18 +63,11 @@ public class FlussRowToFlinkRowConverter {
     }
 
     private RowData toFlinkRowData(InternalRow flussRow, RowKind rowKind) {
-        int targetLength = indexMapping.length;
-        GenericRowData genericRowData = new GenericRowData(targetLength);
+        GenericRowData genericRowData = new 
GenericRowData(toFlinkFieldConverters.length);
         genericRowData.setRowKind(rowKind);
-        for (int i = 0; i < targetLength; i++) {
-            int flussIndex = indexMapping[i];
-            if (flussIndex == -1) {
-                genericRowData.setField(i, null);
-            } else {
-                Object flussField = 
flussFieldGetters[flussIndex].getFieldOrNull(flussRow);
-                genericRowData.setField(
-                        i, 
toFlinkFieldConverters[flussIndex].deserialize(flussField));
-            }
+        for (int i = 0; i < toFlinkFieldConverters.length; i++) {
+            Object flussField = flussFieldGetters[i].getFieldOrNull(flussRow);
+            genericRowData.setField(i, 
toFlinkFieldConverters[i].deserialize(flussField));
         }
         return genericRowData;
     }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
index 596780302..9903e7c45 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
@@ -64,6 +64,7 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
                         tablePath,
                         flinkRowType,
                         createPrimaryKeyLookupNormalizer(new int[] {0}, 
flinkRowType),
+                        // no projection when job compiling, new column added 
after that.
                         null);
 
         ListOutputCollector collector = new ListOutputCollector();
@@ -102,6 +103,7 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
                         tablePath,
                         flinkRowType,
                         createPrimaryKeyLookupNormalizer(new int[] {0}, 
flinkRowType),
+                        // no projection when job compiling, new column added 
after that.
                         null);
         asyncLookupFunction.open(null);
 
@@ -154,7 +156,7 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
                         tablePath,
                         flinkRowType,
                         createPrimaryKeyLookupNormalizer(new int[] {0}, 
flinkRowType),
-                        null);
+                        new int[] {1, 0});
 
         ListOutputCollector collector = new ListOutputCollector();
         lookupFunction.setCollector(collector);
@@ -191,7 +193,7 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
                         .sorted()
                         .collect(Collectors.toList());
         assertThat(result)
-                .containsExactly("+I(0,name0)", "+I(1,name1)", "+I(2,name2)", 
"+I(3,name3)");
+                .containsExactly("+I(name0,0)", "+I(name1,1)", "+I(name2,2)", 
"+I(name3,3)");
         lookupFunction.close();
 
         // start lookup job after schema change.
@@ -227,8 +229,9 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
                             .schema(
                                     Schema.newBuilder()
                                             .primaryKey("id")
-                                            .column("name", DataTypes.STRING())
                                             .column("id", DataTypes.INT())
+                                            .column("name", DataTypes.STRING())
+                                            // added an extra column 'age'
                                             .column("age", DataTypes.INT())
                                             .build())
                             .distributedBy(DEFAULT_BUCKET_NUM, "id")
@@ -241,7 +244,8 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
         try (Table table = conn.getTable(tablePath)) {
             UpsertWriter upsertWriter = table.newUpsert().createWriter();
             for (int i = 0; i < rows; i++) {
-                upsertWriter.upsert(schemaNotMatch ? row("name" + i, i, i) : 
row(i, "name" + i));
+                upsertWriter.upsert(
+                        schemaNotMatch ? row(i, "name" + i, i * 2) : row(i, 
"name" + i));
             }
             upsertWriter.flush();
         }

Reply via email to