This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch ci-add-column
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/ci-add-column by this push:
new 9be3ebcac revert source changes
9be3ebcac is described below
commit 9be3ebcace0498a026461529d18d92f878bdb5ce
Author: Jark Wu <[email protected]>
AuthorDate: Mon Dec 1 21:12:59 2025 +0800
revert source changes
---
.../org/apache/fluss/flink/source/FlinkSource.java | 7 ++-
.../fluss/flink/source/FlinkTableSource.java | 1 +
.../org/apache/fluss/flink/source/FlussSource.java | 4 ++
.../fluss/flink/source/FlussSourceBuilder.java | 1 +
.../flink/source/reader/FlinkSourceReader.java | 4 ++
.../source/reader/FlinkSourceSplitReader.java | 68 ++++++++++++----------
.../apache/fluss/flink/utils/FlinkConversions.java | 12 ----
.../flink/source/reader/FlinkSourceReaderTest.java | 1 +
.../source/reader/FlinkSourceSplitReaderTest.java | 28 +++++++--
9 files changed, 79 insertions(+), 47 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
index 0bf9f187c..bc15390fe 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
@@ -60,6 +60,7 @@ public class FlinkSource<OUT>
private final boolean hasPrimaryKey;
private final boolean isPartitioned;
private final RowType sourceOutputType;
+ @Nullable private final int[] projectedFields;
protected final OffsetsInitializer offsetsInitializer;
protected final long scanPartitionDiscoveryIntervalMs;
private final boolean streaming;
@@ -73,6 +74,7 @@ public class FlinkSource<OUT>
boolean hasPrimaryKey,
boolean isPartitioned,
RowType sourceOutputType,
+ @Nullable int[] projectedFields,
OffsetsInitializer offsetsInitializer,
long scanPartitionDiscoveryIntervalMs,
FlussDeserializationSchema<OUT> deserializationSchema,
@@ -84,6 +86,7 @@ public class FlinkSource<OUT>
hasPrimaryKey,
isPartitioned,
sourceOutputType,
+ projectedFields,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
deserializationSchema,
@@ -98,6 +101,7 @@ public class FlinkSource<OUT>
boolean hasPrimaryKey,
boolean isPartitioned,
RowType sourceOutputType,
+ @Nullable int[] projectedFields,
OffsetsInitializer offsetsInitializer,
long scanPartitionDiscoveryIntervalMs,
FlussDeserializationSchema<OUT> deserializationSchema,
@@ -109,6 +113,7 @@ public class FlinkSource<OUT>
this.hasPrimaryKey = hasPrimaryKey;
this.isPartitioned = isPartitioned;
this.sourceOutputType = sourceOutputType;
+ this.projectedFields = projectedFields;
this.offsetsInitializer = offsetsInitializer;
this.scanPartitionDiscoveryIntervalMs =
scanPartitionDiscoveryIntervalMs;
this.deserializationSchema = deserializationSchema;
@@ -182,7 +187,6 @@ public class FlinkSource<OUT>
context.getUserCodeClassLoader(),
sourceOutputType));
FlinkRecordEmitter<OUT> recordEmitter = new
FlinkRecordEmitter<>(deserializationSchema);
- // recall to projectedFields
return new FlinkSourceReader<>(
elementsQueue,
@@ -190,6 +194,7 @@ public class FlinkSource<OUT>
tablePath,
sourceOutputType,
context,
+ projectedFields,
flinkSourceReaderMetrics,
recordEmitter,
lakeSource);
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index b110e9f88..ef4b63812 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -325,6 +325,7 @@ public class FlinkTableSource
hasPrimaryKey(),
isPartitioned(),
flussRowType,
+ projectedFields,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
new RowDataDeserializationSchema(),
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
index 5f0b2ff8c..427741834 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
@@ -24,6 +24,8 @@ import
org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.types.RowType;
+import javax.annotation.Nullable;
+
/**
* A Flink DataStream source implementation for reading data from Fluss tables.
*
@@ -61,6 +63,7 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
boolean hasPrimaryKey,
boolean isPartitioned,
RowType sourceOutputType,
+ @Nullable int[] projectedFields,
OffsetsInitializer offsetsInitializer,
long scanPartitionDiscoveryIntervalMs,
FlussDeserializationSchema<OUT> deserializationSchema,
@@ -72,6 +75,7 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
hasPrimaryKey,
isPartitioned,
sourceOutputType,
+ projectedFields,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
deserializationSchema,
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java
index 4891ab256..f16639753 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java
@@ -296,6 +296,7 @@ public class FlussSourceBuilder<OUT> {
hasPrimaryKey,
isPartitioned,
sourceOutputType,
+ projectedFields,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
deserializationSchema,
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java
index 639d52513..6364532bb 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java
@@ -40,6 +40,8 @@ import
org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import javax.annotation.Nullable;
+
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
@@ -55,6 +57,7 @@ public class FlinkSourceReader<OUT>
TablePath tablePath,
RowType sourceOutputType,
SourceReaderContext context,
+ @Nullable int[] projectedFields,
FlinkSourceReaderMetrics flinkSourceReaderMetrics,
FlinkRecordEmitter<OUT> recordEmitter,
LakeSource<LakeSplit> lakeSource) {
@@ -67,6 +70,7 @@ public class FlinkSourceReader<OUT>
flussConfig,
tablePath,
sourceOutputType,
+ projectedFields,
flinkSourceReaderMetrics,
lakeSource),
(ignore) -> {}),
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
index a19572dc9..ce50fe406 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
@@ -58,6 +58,7 @@ import java.io.IOException;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -119,6 +120,7 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
Configuration flussConf,
TablePath tablePath,
RowType sourceOutputType,
+ @Nullable int[] projectedFields,
FlinkSourceReaderMetrics flinkSourceReaderMetrics,
@Nullable LakeSource<LakeSplit> lakeSource) {
this.flinkMetricRegistry =
@@ -128,17 +130,15 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
this.sourceOutputType = sourceOutputType;
this.boundedSplits = new ArrayDeque<>();
this.subscribedBuckets = new HashMap<>();
+ this.projectedFields = projectedFields;
+ if (projectedFields == null) {}
+
this.flinkSourceReaderMetrics = flinkSourceReaderMetrics;
- this.projectedFields =
- reCalculateProjectedFields(sourceOutputType,
table.getTableInfo().getRowType());
+ sanityCheck(table.getTableInfo().getRowType(), projectedFields);
this.logScanner =
table.newScan().project(projectedFields).createLogScanner();
this.stoppingOffsets = new HashMap<>();
this.emptyLogSplits = new HashSet<>();
this.lakeSource = lakeSource;
- LOG.info(
- "fluss table schema: {}, flink table output type:{}",
- table.getTableInfo().getSchema(),
- sourceOutputType);
}
@Override
@@ -216,8 +216,8 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit =
(LakeSnapshotAndFlussLogSplit) sourceSplitBase;
if (lakeSnapshotAndFlussLogSplit.isStreaming()) {
- // is streaming split which has no stopping offset, we
need also
- // subscribe change log
+ // is streaming split which has no stopping offset, we
need also subscribe
+ // change log
subscribeLog(
lakeSnapshotAndFlussLogSplit,
lakeSnapshotAndFlussLogSplit.getStartingOffset());
@@ -563,33 +563,41 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
flinkMetricRegistry.close();
}
- /**
- * The projected fields for the fluss table from the source output types.
Mapping based on
- * column name rather thn column id.
- */
- private static int[] reCalculateProjectedFields(
- RowType sourceOutputType, RowType flussRowType) {
- if (sourceOutputType.copy(false).equals(flussRowType.copy(false))) {
- return null;
- }
-
- List<String> fieldNames = sourceOutputType.getFieldNames();
- int[] projectedFlussFields = new int[fieldNames.size()];
- for (int i = 0; i < fieldNames.size(); i++) {
- int fieldIndex = flussRowType.getFieldIndex(fieldNames.get(i));
- if (fieldIndex == -1) {
- throw new ValidationException(
- String.format(
- "The field %s is not found in the fluss
table.",
- fieldNames.get(i)));
+ private void sanityCheck(RowType flussTableRowType, @Nullable int[]
projectedFields) {
+ RowType tableRowType =
+ projectedFields != null
+ ? flussTableRowType.project(projectedFields)
+ // only read the output fields from source
+ :
flussTableRowType.project(sourceOutputType.getFieldNames());
+ if (!sourceOutputType.copy(false).equals(tableRowType.copy(false))) {
+ // The default nullability of Flink row type and Fluss row type
might be not the same,
+ // thus we need to compare the row type without nullability here.
+
+ final String flussSchemaMsg;
+ if (projectedFields == null) {
+ flussSchemaMsg = "\nFluss table schema: " + tableRowType;
+ } else {
+ flussSchemaMsg =
+ "\nFluss table schema: "
+ + tableRowType
+ + " (projection "
+ + Arrays.toString(projectedFields)
+ + ")";
}
- projectedFlussFields[i] = fieldIndex;
+ // Throw exception if the schema is the not same, this should
rarely happen because we
+ // only allow fluss tables derived from fluss catalog. But this
can happen if an ALTER
+ // TABLE command executed on the fluss table, after the job is
submitted but before the
+ // SinkFunction is opened.
+ throw new ValidationException(
+ "The Flink query schema is not matched to Fluss table
schema. "
+ + "\nFlink query schema: "
+ + sourceOutputType
+ + flussSchemaMsg);
}
- return projectedFlussFields;
}
@VisibleForTesting
- public int[] getProjectedFields() {
+ int[] getProjectedFields() {
return projectedFields;
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
index 74375e24c..2553b0c18 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
@@ -86,18 +86,6 @@ public class FlinkConversions {
private FlinkConversions() {}
- public static int[] toFlinkRowTypeIndexMapping(
- RowType sourceFlussRowType,
- org.apache.flink.table.types.logical.RowType targetFlinkRowType) {
- int[] indexMapping = new int[targetFlinkRowType.getFieldCount()];
- for (int i = 0; i < targetFlinkRowType.getFieldCount(); i++) {
- String fieldName = targetFlinkRowType.getFieldNames().get(i);
- int index = sourceFlussRowType.getFieldIndex(fieldName);
- indexMapping[i] = index;
- }
- return indexMapping;
- }
-
/** Convert Fluss's type to Flink's type. */
@VisibleForTesting
public static org.apache.flink.table.types.DataType toFlinkType(DataType
flussDataType) {
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java
index 88e2cb539..6634236aa 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java
@@ -178,6 +178,7 @@ class FlinkSourceReaderTest extends FlinkTestBase {
tablePath,
sourceOutputType,
context,
+ null,
new FlinkSourceReaderMetrics(context.metricGroup()),
recordEmitter,
null);
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
index c30c9cf72..44bd1e98d 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
@@ -79,6 +79,24 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
TableDescriptor descriptor1 =
TableDescriptor.builder().schema(schema1).build();
createTable(tablePath1, descriptor1);
+ assertThatThrownBy(
+ () ->
+ new FlinkSourceSplitReader(
+ clientConf,
+ tablePath1,
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ "id",
DataTypes.BIGINT().copy(false)),
+ DataTypes.FIELD("name",
DataTypes.STRING())),
+ new int[] {1, 0},
+ createMockSourceReaderMetrics(),
+ null))
+ .isInstanceOf(ValidationException.class)
+ .hasMessage(
+ "The Flink query schema is not matched to Fluss table
schema. \n"
+ + "Flink query schema: ROW<`id` BIGINT NOT
NULL, `name` STRING>\n"
+ + "Fluss table schema: ROW<`name` STRING, `id`
BIGINT NOT NULL> (projection [1, 0])");
+
assertThatThrownBy(
() ->
new FlinkSourceSplitReader(
@@ -87,10 +105,11 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
DataTypes.ROW(
DataTypes.FIELD("name2",
DataTypes.STRING()),
DataTypes.FIELD("id",
DataTypes.BIGINT())),
+ null,
createMockSourceReaderMetrics(),
null))
- .isInstanceOf(ValidationException.class)
- .hasMessage("The field name2 is not found in the fluss
table.");
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Field name2 does not exist in the row type.");
FlinkSourceSplitReader flinkSourceSplitReader =
new FlinkSourceSplitReader(
@@ -99,9 +118,10 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
DataTypes.ROW(
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("id",
DataTypes.BIGINT().copy(false))),
+ null,
createMockSourceReaderMetrics(),
null);
- assertThat(flinkSourceSplitReader.getProjectedFields()).isEqualTo(new
int[] {1, 0});
+ assertThat(flinkSourceSplitReader.getProjectedFields()).isNull();
}
@Test
@@ -384,7 +404,7 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
private FlinkSourceSplitReader createSplitReader(TablePath tablePath,
RowType rowType) {
return new FlinkSourceSplitReader(
- clientConf, tablePath, rowType,
createMockSourceReaderMetrics(), null);
+ clientConf, tablePath, rowType, null,
createMockSourceReaderMetrics(), null);
}
private FlinkSourceReaderMetrics createMockSourceReaderMetrics() {