This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-add-column
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/ci-add-column by this push:
     new 9be3ebcac revert source changes
9be3ebcac is described below

commit 9be3ebcace0498a026461529d18d92f878bdb5ce
Author: Jark Wu <[email protected]>
AuthorDate: Mon Dec 1 21:12:59 2025 +0800

    revert source changes
---
 .../org/apache/fluss/flink/source/FlinkSource.java |  7 ++-
 .../fluss/flink/source/FlinkTableSource.java       |  1 +
 .../org/apache/fluss/flink/source/FlussSource.java |  4 ++
 .../fluss/flink/source/FlussSourceBuilder.java     |  1 +
 .../flink/source/reader/FlinkSourceReader.java     |  4 ++
 .../source/reader/FlinkSourceSplitReader.java      | 68 ++++++++++++----------
 .../apache/fluss/flink/utils/FlinkConversions.java | 12 ----
 .../flink/source/reader/FlinkSourceReaderTest.java |  1 +
 .../source/reader/FlinkSourceSplitReaderTest.java  | 28 +++++++--
 9 files changed, 79 insertions(+), 47 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
index 0bf9f187c..bc15390fe 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
@@ -60,6 +60,7 @@ public class FlinkSource<OUT>
     private final boolean hasPrimaryKey;
     private final boolean isPartitioned;
     private final RowType sourceOutputType;
+    @Nullable private final int[] projectedFields;
     protected final OffsetsInitializer offsetsInitializer;
     protected final long scanPartitionDiscoveryIntervalMs;
     private final boolean streaming;
@@ -73,6 +74,7 @@ public class FlinkSource<OUT>
             boolean hasPrimaryKey,
             boolean isPartitioned,
             RowType sourceOutputType,
+            @Nullable int[] projectedFields,
             OffsetsInitializer offsetsInitializer,
             long scanPartitionDiscoveryIntervalMs,
             FlussDeserializationSchema<OUT> deserializationSchema,
@@ -84,6 +86,7 @@ public class FlinkSource<OUT>
                 hasPrimaryKey,
                 isPartitioned,
                 sourceOutputType,
+                projectedFields,
                 offsetsInitializer,
                 scanPartitionDiscoveryIntervalMs,
                 deserializationSchema,
@@ -98,6 +101,7 @@ public class FlinkSource<OUT>
             boolean hasPrimaryKey,
             boolean isPartitioned,
             RowType sourceOutputType,
+            @Nullable int[] projectedFields,
             OffsetsInitializer offsetsInitializer,
             long scanPartitionDiscoveryIntervalMs,
             FlussDeserializationSchema<OUT> deserializationSchema,
@@ -109,6 +113,7 @@ public class FlinkSource<OUT>
         this.hasPrimaryKey = hasPrimaryKey;
         this.isPartitioned = isPartitioned;
         this.sourceOutputType = sourceOutputType;
+        this.projectedFields = projectedFields;
         this.offsetsInitializer = offsetsInitializer;
         this.scanPartitionDiscoveryIntervalMs = 
scanPartitionDiscoveryIntervalMs;
         this.deserializationSchema = deserializationSchema;
@@ -182,7 +187,6 @@ public class FlinkSource<OUT>
                         context.getUserCodeClassLoader(),
                         sourceOutputType));
         FlinkRecordEmitter<OUT> recordEmitter = new 
FlinkRecordEmitter<>(deserializationSchema);
-        // recall to projectedFields
 
         return new FlinkSourceReader<>(
                 elementsQueue,
@@ -190,6 +194,7 @@ public class FlinkSource<OUT>
                 tablePath,
                 sourceOutputType,
                 context,
+                projectedFields,
                 flinkSourceReaderMetrics,
                 recordEmitter,
                 lakeSource);
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index b110e9f88..ef4b63812 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -325,6 +325,7 @@ public class FlinkTableSource
                         hasPrimaryKey(),
                         isPartitioned(),
                         flussRowType,
+                        projectedFields,
                         offsetsInitializer,
                         scanPartitionDiscoveryIntervalMs,
                         new RowDataDeserializationSchema(),
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
index 5f0b2ff8c..427741834 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
@@ -24,6 +24,8 @@ import 
org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.types.RowType;
 
+import javax.annotation.Nullable;
+
 /**
  * A Flink DataStream source implementation for reading data from Fluss tables.
  *
@@ -61,6 +63,7 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
             boolean hasPrimaryKey,
             boolean isPartitioned,
             RowType sourceOutputType,
+            @Nullable int[] projectedFields,
             OffsetsInitializer offsetsInitializer,
             long scanPartitionDiscoveryIntervalMs,
             FlussDeserializationSchema<OUT> deserializationSchema,
@@ -72,6 +75,7 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
                 hasPrimaryKey,
                 isPartitioned,
                 sourceOutputType,
+                projectedFields,
                 offsetsInitializer,
                 scanPartitionDiscoveryIntervalMs,
                 deserializationSchema,
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java
index 4891ab256..f16639753 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java
@@ -296,6 +296,7 @@ public class FlussSourceBuilder<OUT> {
                 hasPrimaryKey,
                 isPartitioned,
                 sourceOutputType,
+                projectedFields,
                 offsetsInitializer,
                 scanPartitionDiscoveryIntervalMs,
                 deserializationSchema,
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java
index 639d52513..6364532bb 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java
@@ -40,6 +40,8 @@ import 
org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 
+import javax.annotation.Nullable;
+
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Consumer;
@@ -55,6 +57,7 @@ public class FlinkSourceReader<OUT>
             TablePath tablePath,
             RowType sourceOutputType,
             SourceReaderContext context,
+            @Nullable int[] projectedFields,
             FlinkSourceReaderMetrics flinkSourceReaderMetrics,
             FlinkRecordEmitter<OUT> recordEmitter,
             LakeSource<LakeSplit> lakeSource) {
@@ -67,6 +70,7 @@ public class FlinkSourceReader<OUT>
                                         flussConfig,
                                         tablePath,
                                         sourceOutputType,
+                                        projectedFields,
                                         flinkSourceReaderMetrics,
                                         lakeSource),
                         (ignore) -> {}),
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
index a19572dc9..ce50fe406 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
@@ -58,6 +58,7 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -119,6 +120,7 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
             Configuration flussConf,
             TablePath tablePath,
             RowType sourceOutputType,
+            @Nullable int[] projectedFields,
             FlinkSourceReaderMetrics flinkSourceReaderMetrics,
             @Nullable LakeSource<LakeSplit> lakeSource) {
         this.flinkMetricRegistry =
@@ -128,17 +130,15 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
         this.sourceOutputType = sourceOutputType;
         this.boundedSplits = new ArrayDeque<>();
         this.subscribedBuckets = new HashMap<>();
+        this.projectedFields = projectedFields;
+        if (projectedFields == null) {}
+
         this.flinkSourceReaderMetrics = flinkSourceReaderMetrics;
-        this.projectedFields =
-                reCalculateProjectedFields(sourceOutputType, 
table.getTableInfo().getRowType());
+        sanityCheck(table.getTableInfo().getRowType(), projectedFields);
         this.logScanner = 
table.newScan().project(projectedFields).createLogScanner();
         this.stoppingOffsets = new HashMap<>();
         this.emptyLogSplits = new HashSet<>();
         this.lakeSource = lakeSource;
-        LOG.info(
-                "fluss table schema: {}, flink table output type:{}",
-                table.getTableInfo().getSchema(),
-                sourceOutputType);
     }
 
     @Override
@@ -216,8 +216,8 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
                     LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit =
                             (LakeSnapshotAndFlussLogSplit) sourceSplitBase;
                     if (lakeSnapshotAndFlussLogSplit.isStreaming()) {
-                        // is streaming split which has no stopping offset, we 
need also
-                        // subscribe change log
+                        // is streaming split which has no stopping offset, we 
need also subscribe
+                        // change log
                         subscribeLog(
                                 lakeSnapshotAndFlussLogSplit,
                                 
lakeSnapshotAndFlussLogSplit.getStartingOffset());
@@ -563,33 +563,41 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
         flinkMetricRegistry.close();
     }
 
-    /**
-     * The projected fields for the fluss table from the source output types. 
Mapping based on
-     * column name rather thn column id.
-     */
-    private static int[] reCalculateProjectedFields(
-            RowType sourceOutputType, RowType flussRowType) {
-        if (sourceOutputType.copy(false).equals(flussRowType.copy(false))) {
-            return null;
-        }
-
-        List<String> fieldNames = sourceOutputType.getFieldNames();
-        int[] projectedFlussFields = new int[fieldNames.size()];
-        for (int i = 0; i < fieldNames.size(); i++) {
-            int fieldIndex = flussRowType.getFieldIndex(fieldNames.get(i));
-            if (fieldIndex == -1) {
-                throw new ValidationException(
-                        String.format(
-                                "The field %s is not found in the fluss 
table.",
-                                fieldNames.get(i)));
+    private void sanityCheck(RowType flussTableRowType, @Nullable int[] 
projectedFields) {
+        RowType tableRowType =
+                projectedFields != null
+                        ? flussTableRowType.project(projectedFields)
+                        // only read the output fields from source
+                        : 
flussTableRowType.project(sourceOutputType.getFieldNames());
+        if (!sourceOutputType.copy(false).equals(tableRowType.copy(false))) {
+            // The default nullability of Flink row type and Fluss row type 
might be not the same,
+            // thus we need to compare the row type without nullability here.
+
+            final String flussSchemaMsg;
+            if (projectedFields == null) {
+                flussSchemaMsg = "\nFluss table schema: " + tableRowType;
+            } else {
+                flussSchemaMsg =
+                        "\nFluss table schema: "
+                                + tableRowType
+                                + " (projection "
+                                + Arrays.toString(projectedFields)
+                                + ")";
             }
-            projectedFlussFields[i] = fieldIndex;
+            // Throw exception if the schema is the not same, this should 
rarely happen because we
+            // only allow fluss tables derived from fluss catalog. But this 
can happen if an ALTER
+            // TABLE command executed on the fluss table, after the job is 
submitted but before the
+            // SinkFunction is opened.
+            throw new ValidationException(
+                    "The Flink query schema is not matched to Fluss table 
schema. "
+                            + "\nFlink query schema: "
+                            + sourceOutputType
+                            + flussSchemaMsg);
         }
-        return projectedFlussFields;
     }
 
     @VisibleForTesting
-    public int[] getProjectedFields() {
+    int[] getProjectedFields() {
         return projectedFields;
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
index 74375e24c..2553b0c18 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
@@ -86,18 +86,6 @@ public class FlinkConversions {
 
     private FlinkConversions() {}
 
-    public static int[] toFlinkRowTypeIndexMapping(
-            RowType sourceFlussRowType,
-            org.apache.flink.table.types.logical.RowType targetFlinkRowType) {
-        int[] indexMapping = new int[targetFlinkRowType.getFieldCount()];
-        for (int i = 0; i < targetFlinkRowType.getFieldCount(); i++) {
-            String fieldName = targetFlinkRowType.getFieldNames().get(i);
-            int index = sourceFlussRowType.getFieldIndex(fieldName);
-            indexMapping[i] = index;
-        }
-        return indexMapping;
-    }
-
     /** Convert Fluss's type to Flink's type. */
     @VisibleForTesting
     public static org.apache.flink.table.types.DataType toFlinkType(DataType 
flussDataType) {
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java
index 88e2cb539..6634236aa 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java
@@ -178,6 +178,7 @@ class FlinkSourceReaderTest extends FlinkTestBase {
                 tablePath,
                 sourceOutputType,
                 context,
+                null,
                 new FlinkSourceReaderMetrics(context.metricGroup()),
                 recordEmitter,
                 null);
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
index c30c9cf72..44bd1e98d 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
@@ -79,6 +79,24 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
         TableDescriptor descriptor1 = 
TableDescriptor.builder().schema(schema1).build();
         createTable(tablePath1, descriptor1);
 
+        assertThatThrownBy(
+                        () ->
+                                new FlinkSourceSplitReader(
+                                        clientConf,
+                                        tablePath1,
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD(
+                                                        "id", 
DataTypes.BIGINT().copy(false)),
+                                                DataTypes.FIELD("name", 
DataTypes.STRING())),
+                                        new int[] {1, 0},
+                                        createMockSourceReaderMetrics(),
+                                        null))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage(
+                        "The Flink query schema is not matched to Fluss table 
schema. \n"
+                                + "Flink query schema: ROW<`id` BIGINT NOT 
NULL, `name` STRING>\n"
+                                + "Fluss table schema: ROW<`name` STRING, `id` 
BIGINT NOT NULL> (projection [1, 0])");
+
         assertThatThrownBy(
                         () ->
                                 new FlinkSourceSplitReader(
@@ -87,10 +105,11 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
                                         DataTypes.ROW(
                                                 DataTypes.FIELD("name2", 
DataTypes.STRING()),
                                                 DataTypes.FIELD("id", 
DataTypes.BIGINT())),
+                                        null,
                                         createMockSourceReaderMetrics(),
                                         null))
-                .isInstanceOf(ValidationException.class)
-                .hasMessage("The field name2 is not found in the fluss 
table.");
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("Field name2 does not exist in the row type.");
 
         FlinkSourceSplitReader flinkSourceSplitReader =
                 new FlinkSourceSplitReader(
@@ -99,9 +118,10 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
                         DataTypes.ROW(
                                 DataTypes.FIELD("name", DataTypes.STRING()),
                                 DataTypes.FIELD("id", 
DataTypes.BIGINT().copy(false))),
+                        null,
                         createMockSourceReaderMetrics(),
                         null);
-        assertThat(flinkSourceSplitReader.getProjectedFields()).isEqualTo(new 
int[] {1, 0});
+        assertThat(flinkSourceSplitReader.getProjectedFields()).isNull();
     }
 
     @Test
@@ -384,7 +404,7 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
 
     private FlinkSourceSplitReader createSplitReader(TablePath tablePath, 
RowType rowType) {
         return new FlinkSourceSplitReader(
-                clientConf, tablePath, rowType, 
createMockSourceReaderMetrics(), null);
+                clientConf, tablePath, rowType, null, 
createMockSourceReaderMetrics(), null);
     }
 
     private FlinkSourceReaderMetrics createMockSourceReaderMetrics() {

Reply via email to