This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch ci-add-column
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/ci-add-column by this push:
new 28d4b24a5 fix lookup
28d4b24a5 is described below
commit 28d4b24a5144ad065cde22e07ec823d9fcd7a68d
Author: Jark Wu <[email protected]>
AuthorDate: Mon Dec 1 15:48:32 2025 +0800
fix lookup
---
.../fluss/client/lookup/AbstractLookuper.java | 2 +-
.../fluss/row/decode/FixedSchemaDecoder.java | 2 +-
.../serializer/RowDataSerializationSchema.java | 3 +-
.../source/lookup/FlinkAsyncLookupFunction.java | 29 ++++++++++++-----
.../flink/source/lookup/FlinkLookupFunction.java | 27 +++++++++++----
.../flink/utils/FlussRowToFlinkRowConverter.java | 38 +++++-----------------
.../source/lookup/FlinkLookupFunctionTest.java | 12 ++++---
7 files changed, 62 insertions(+), 51 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookuper.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookuper.java
index 4d2622f3c..6cd93adf7 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookuper.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookuper.java
@@ -121,7 +121,7 @@ abstract class AbstractLookuper implements Lookuper {
if (error != null) {
lookupFuture.completeExceptionally(
new RuntimeException(
- "Failed to get schema infos for prefix
lookup", error));
+ "Failed to get schema infos for
lookup", error));
} else {
LookupResult lookupResult =
processSchemaRequestedRows(schemas, valueList);
lookupFuture.complete(lookupResult);
diff --git
a/fluss-common/src/main/java/org/apache/fluss/row/decode/FixedSchemaDecoder.java
b/fluss-common/src/main/java/org/apache/fluss/row/decode/FixedSchemaDecoder.java
index b5321f643..d809a095f 100644
---
a/fluss-common/src/main/java/org/apache/fluss/row/decode/FixedSchemaDecoder.java
+++
b/fluss-common/src/main/java/org/apache/fluss/row/decode/FixedSchemaDecoder.java
@@ -51,7 +51,7 @@ public class FixedSchemaDecoder {
public FixedSchemaDecoder(KvFormat kvFormat, Schema sourceSchema, Schema
targetSchema) {
this.rowDecoder =
RowDecoder.create(
- kvFormat,
targetSchema.getRowType().getChildren().toArray(new DataType[0]));
+ kvFormat,
sourceSchema.getRowType().getChildren().toArray(new DataType[0]));
this.fieldIdMapping = SchemaUtil.getIndexMapping(sourceSchema,
targetSchema);
this.noProjection = false;
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
index 5271a50be..1a46ec0bc 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
@@ -56,7 +56,8 @@ import java.util.List;
* <li>The {@link #converter} is used to wrap Flink {@link RowData} into
Fluss {@link InternalRow}
* without any schema transformation.
* <li>The {@link #outputPadding} is used to pad nulls for new columns when
new columns are added.
- * This may happen when table is added new columns before Flink job
compilation, so the
+ * This may happen when table is added new columns after Flink SQL job
restored from
+ * CompiledPlan that the Consumed Row Schema is old but the Plan Row
Schema is new, so the
* Consumed Row Schema has less fields than Plan Row Schema.
* <li>The {@link #outputProjection} is used to re-arrange the fields
according to latest schema
* if Plan Row Schema is not match Table Latest Row Schema. This may
happen when table is
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
index 6e4647f80..82fd1bbc3 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
@@ -27,10 +27,12 @@ import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.flink.row.FlinkAsFlussRow;
import org.apache.fluss.flink.source.lookup.LookupNormalizer.RemainingFilter;
+import org.apache.fluss.flink.utils.FlinkConversions;
import org.apache.fluss.flink.utils.FlinkUtils;
import org.apache.fluss.flink.utils.FlussRowToFlinkRowConverter;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.ProjectedRow;
import org.apache.fluss.utils.ExceptionUtils;
import org.apache.flink.table.data.RowData;
@@ -47,6 +49,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
/** A flink async lookup function for fluss. */
public class FlinkAsyncLookupFunction extends AsyncLookupFunction {
@@ -59,7 +62,7 @@ public class FlinkAsyncLookupFunction extends
AsyncLookupFunction {
private final TablePath tablePath;
private final RowType flinkRowType;
private final LookupNormalizer lookupNormalizer;
- @Nullable private final int[] projection;
+ @Nullable private int[] projection;
private transient FlussRowToFlinkRowConverter flussRowToFlinkRowConverter;
private transient Connection connection;
@@ -85,20 +88,22 @@ public class FlinkAsyncLookupFunction extends
AsyncLookupFunction {
LOG.info("start open ...");
connection = ConnectionFactory.createConnection(flussConfig);
table = connection.getTable(tablePath);
- org.apache.fluss.types.RowType flussRowType =
table.getTableInfo().getRowType();
- LOG.info("Current Fluss Schema is {}, Flink RowType is {}",
flussRowType, flinkRowType);
- // Currently, only primary key and prefix lookup are supported. These
keys must be primary
- // key which cannot modified.
lookupRow = new FlinkAsFlussRow();
final RowType outputRowType;
if (projection == null) {
outputRowType = flinkRowType;
+ // we force to do projection if no projection pushdown, in order
to handle schema
+ // changes (ADD COLUMN LAST), this guarantees the input row of
+ // flussRowToFlinkRowConverter is in expected schema even new
columns are added.
+ projection = IntStream.range(0,
flinkRowType.getFieldCount()).toArray();
} else {
outputRowType = FlinkUtils.projectRowType(flinkRowType,
projection);
}
-
- flussRowToFlinkRowConverter = new
FlussRowToFlinkRowConverter(flussRowType, outputRowType);
+ // TODO: currently, we assume only ADD COLUMN LAST schema changes, so
the projection
+ // positions can still work even after such changes.
+ flussRowToFlinkRowConverter =
+ new
FlussRowToFlinkRowConverter(FlinkConversions.toFlussRowType(outputRowType));
Lookup lookup = table.newLookup();
if (lookupNormalizer.getLookupType() == LookupType.PREFIX_LOOKUP) {
@@ -164,7 +169,7 @@ public class FlinkAsyncLookupFunction extends
AsyncLookupFunction {
List<RowData> projectedRow = new ArrayList<>();
for (InternalRow row : lookupResult) {
if (row != null) {
- RowData flinkRow =
flussRowToFlinkRowConverter.toFlinkRowData(row);
+ RowData flinkRow =
flussRowToFlinkRowConverter.toFlinkRowData(maybeProject(row));
if (remainingFilter == null ||
remainingFilter.isMatch(flinkRow)) {
projectedRow.add(flinkRow);
}
@@ -173,6 +178,14 @@ public class FlinkAsyncLookupFunction extends
AsyncLookupFunction {
resultFuture.complete(projectedRow);
}
+ private InternalRow maybeProject(InternalRow row) {
+ if (projection == null) {
+ return row;
+ }
+ // should not reuse objects for async operations
+ return ProjectedRow.from(projection).replaceRow(row);
+ }
+
@Override
public void close() throws Exception {
LOG.info("start close ...");
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
index e9a5ad915..477666737 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
@@ -25,6 +25,7 @@ import org.apache.fluss.client.lookup.Lookuper;
import org.apache.fluss.client.table.Table;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.row.FlinkAsFlussRow;
+import org.apache.fluss.flink.utils.FlinkConversions;
import org.apache.fluss.flink.utils.FlinkUtils;
import org.apache.fluss.flink.utils.FlussRowToFlinkRowConverter;
import org.apache.fluss.metadata.TablePath;
@@ -44,6 +45,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.stream.IntStream;
/** A flink lookup function for fluss. */
public class FlinkLookupFunction extends LookupFunction {
@@ -82,22 +84,25 @@ public class FlinkLookupFunction extends LookupFunction {
LOG.info("start open ...");
connection = ConnectionFactory.createConnection(flussConfig);
table = connection.getTable(tablePath);
- org.apache.fluss.types.RowType flussRowType =
table.getTableInfo().getRowType();
- LOG.info("Current Fluss Schema is {}, Flink RowType is {}",
flussRowType, flinkRowType);
- // Currently, only primary key and prefix lookup are supported. These
keys must be primary
- // key which cannot modified.
lookupRow = new FlinkAsFlussRow();
final RowType outputRowType;
if (projection == null) {
outputRowType = flinkRowType;
- projectedRow = null;
+ // we force to do projection if no projection pushdown, in order
to handle schema
+ // changes (ADD COLUMN LAST), this guarantees the input row of
+ // flussRowToFlinkRowConverter is in expected schema even new
columns are added.
+ projectedRow =
+ ProjectedRow.from(IntStream.range(0,
flinkRowType.getFieldCount()).toArray());
} else {
outputRowType = FlinkUtils.projectRowType(flinkRowType,
projection);
// reuse the projected row
projectedRow = ProjectedRow.from(projection);
}
- flussRowToFlinkRowConverter = new
FlussRowToFlinkRowConverter(flussRowType, outputRowType);
+ // TODO: currently, we assume only ADD COLUMN LAST schema changes, so
the projection
+ // positions can still work even after such changes.
+ flussRowToFlinkRowConverter =
+ new
FlussRowToFlinkRowConverter(FlinkConversions.toFlussRowType(outputRowType));
Lookup lookup = table.newLookup();
if (lookupNormalizer.getLookupType() == LookupType.PREFIX_LOOKUP) {
@@ -133,7 +138,8 @@ public class FlinkLookupFunction extends LookupFunction {
List<RowData> projectedRows = new ArrayList<>();
for (InternalRow row : lookupRows) {
if (row != null) {
- RowData flinkRow =
flussRowToFlinkRowConverter.toFlinkRowData(row);
+ RowData flinkRow =
+
flussRowToFlinkRowConverter.toFlinkRowData(maybeProject(row));
if (remainingFilter == null ||
remainingFilter.isMatch(flinkRow)) {
projectedRows.add(flinkRow);
}
@@ -146,6 +152,13 @@ public class FlinkLookupFunction extends LookupFunction {
}
}
+ private InternalRow maybeProject(InternalRow row) {
+ if (projectedRow == null) {
+ return row;
+ }
+ return projectedRow.replaceRow(row);
+ }
+
@Override
public void close() throws Exception {
LOG.info("start close ...");
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java
index 0846b284b..e52a83a82 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java
@@ -39,31 +39,18 @@ import org.apache.flink.types.RowKind;
import java.io.Serializable;
import static org.apache.fluss.flink.utils.FlinkConversions.toFlinkRowKind;
-import static org.apache.fluss.flink.utils.FlinkConversions.toFlinkRowType;
/** A converter to convert Fluss's {@link InternalRow} to Flink's {@link
RowData}. */
public class FlussRowToFlinkRowConverter {
private final FlussDeserializationConverter[] toFlinkFieldConverters;
private final InternalRow.FieldGetter[] flussFieldGetters;
- private final int[] indexMapping;
public FlussRowToFlinkRowConverter(RowType rowType) {
- this(rowType, toFlinkRowType(rowType));
- }
-
- public FlussRowToFlinkRowConverter(
- RowType sourceFlussRowType,
- org.apache.flink.table.types.logical.RowType targetFlinkRowType) {
- this.indexMapping =
-
FlinkConversions.toFlinkRowTypeIndexMapping(sourceFlussRowType,
targetFlinkRowType);
- this.toFlinkFieldConverters =
- new
FlussDeserializationConverter[sourceFlussRowType.getFieldCount()];
- this.flussFieldGetters = new
InternalRow.FieldGetter[sourceFlussRowType.getFieldCount()];
- for (int i = 0; i < sourceFlussRowType.getFieldCount(); i++) {
- toFlinkFieldConverters[i] =
-
createNullableInternalConverter(sourceFlussRowType.getTypeAt(i));
- flussFieldGetters[i] =
-
InternalRow.createFieldGetter(sourceFlussRowType.getTypeAt(i), i);
+ this.toFlinkFieldConverters = new
FlussDeserializationConverter[rowType.getFieldCount()];
+ this.flussFieldGetters = new
InternalRow.FieldGetter[rowType.getFieldCount()];
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ toFlinkFieldConverters[i] =
createNullableInternalConverter(rowType.getTypeAt(i));
+ flussFieldGetters[i] =
InternalRow.createFieldGetter(rowType.getTypeAt(i), i);
}
}
@@ -76,18 +63,11 @@ public class FlussRowToFlinkRowConverter {
}
private RowData toFlinkRowData(InternalRow flussRow, RowKind rowKind) {
- int targetLength = indexMapping.length;
- GenericRowData genericRowData = new GenericRowData(targetLength);
+ GenericRowData genericRowData = new
GenericRowData(toFlinkFieldConverters.length);
genericRowData.setRowKind(rowKind);
- for (int i = 0; i < targetLength; i++) {
- int flussIndex = indexMapping[i];
- if (flussIndex == -1) {
- genericRowData.setField(i, null);
- } else {
- Object flussField =
flussFieldGetters[flussIndex].getFieldOrNull(flussRow);
- genericRowData.setField(
- i,
toFlinkFieldConverters[flussIndex].deserialize(flussField));
- }
+ for (int i = 0; i < toFlinkFieldConverters.length; i++) {
+ Object flussField = flussFieldGetters[i].getFieldOrNull(flussRow);
+ genericRowData.setField(i,
toFlinkFieldConverters[i].deserialize(flussField));
}
return genericRowData;
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
index 596780302..9903e7c45 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
@@ -64,6 +64,7 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
tablePath,
flinkRowType,
createPrimaryKeyLookupNormalizer(new int[] {0},
flinkRowType),
+ // no projection when job compiling, new column added
after that.
null);
ListOutputCollector collector = new ListOutputCollector();
@@ -102,6 +103,7 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
tablePath,
flinkRowType,
createPrimaryKeyLookupNormalizer(new int[] {0},
flinkRowType),
+ // no projection when job compiling, new column added
after that.
null);
asyncLookupFunction.open(null);
@@ -154,7 +156,7 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
tablePath,
flinkRowType,
createPrimaryKeyLookupNormalizer(new int[] {0},
flinkRowType),
- null);
+ new int[] {1, 0});
ListOutputCollector collector = new ListOutputCollector();
lookupFunction.setCollector(collector);
@@ -191,7 +193,7 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
.sorted()
.collect(Collectors.toList());
assertThat(result)
- .containsExactly("+I(0,name0)", "+I(1,name1)", "+I(2,name2)",
"+I(3,name3)");
+ .containsExactly("+I(name0,0)", "+I(name1,1)", "+I(name2,2)",
"+I(name3,3)");
lookupFunction.close();
// start lookup job after schema change.
@@ -227,8 +229,9 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
.schema(
Schema.newBuilder()
.primaryKey("id")
- .column("name", DataTypes.STRING())
.column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ // added an extra column 'age'
.column("age", DataTypes.INT())
.build())
.distributedBy(DEFAULT_BUCKET_NUM, "id")
@@ -241,7 +244,8 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
try (Table table = conn.getTable(tablePath)) {
UpsertWriter upsertWriter = table.newUpsert().createWriter();
for (int i = 0; i < rows; i++) {
- upsertWriter.upsert(schemaNotMatch ? row("name" + i, i, i) :
row(i, "name" + i));
+ upsertWriter.upsert(
+ schemaNotMatch ? row(i, "name" + i, i * 2) : row(i,
"name" + i));
}
upsertWriter.flush();
}