This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-add-column
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit aa31e1c4c8c7d9a6626b792c24ed6cfa32215f6f
Author: Jark Wu <[email protected]>
AuthorDate: Mon Dec 1 01:21:36 2025 +0800

    revert FlinkAsFlussRow changes
---
 .../apache/fluss/flink/row/FlinkAsFlussRow.java    | 100 ++++-----------------
 .../sink/serializer/FlussSerializationSchema.java  |   7 +-
 .../serializer/RowDataSerializationSchema.java     |  28 +++++-
 .../sink/serializer/SerializerInitContextImpl.java |   2 +-
 .../fluss/flink/sink/writer/FlinkSinkWriter.java   |  19 +---
 5 files changed, 50 insertions(+), 106 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java
index 314e4b40f..003e910ea 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java
@@ -23,34 +23,17 @@ import org.apache.fluss.row.InternalArray;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
-import org.apache.fluss.types.RowType;
 
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.TimestampData;
 
-import javax.annotation.Nullable;
-
-import java.util.List;
-
 /** Wraps a Flink {@link RowData} as a Fluss {@link InternalRow}. */
 public class FlinkAsFlussRow implements InternalRow {
 
     private RowData flinkRow;
 
-    /**
-     * Creates a FlinkAsFlussRow with a mapping from Fluss field index to 
Flink field index. If the
-     * indexMapping is null, flink and flink share same field name.
-     */
-    @Nullable private final int[] indexMapping;
-
-    public FlinkAsFlussRow() {
-        this(null);
-    }
-
-    public FlinkAsFlussRow(int[] indexMapping) {
-        this.indexMapping = indexMapping;
-    }
+    public FlinkAsFlussRow() {}
 
     public FlinkAsFlussRow replace(RowData flinkRow) {
         this.flinkRow = flinkRow;
@@ -59,94 +42,62 @@ public class FlinkAsFlussRow implements InternalRow {
 
     @Override
     public int getFieldCount() {
-        if (indexMapping == null) {
-            return flinkRow.getArity();
-        } else {
-            return indexMapping.length;
-        }
+        return flinkRow.getArity();
     }
 
     @Override
     public boolean isNullAt(int pos) {
-        // if no mapping, means that the field is not found in Flink, just 
ignore it.
-        if ((indexMapping != null && indexMapping[pos] == -1)) {
-            return true;
-        }
-
-        // If pos is larger than flinkRow.getArity(), it indicates that the 
Fluss table's data type
-        // is wider than the data provided by Flink.
-        // This often occurs when a schema change happens to the catalog table 
before job restart.
-        // Only appending columns at the end is compatible, just need to  
ignore the trailing
-        // columns.
-        // Other types of schema changes (e.g., dropping columns) would lead 
to incompatibility.
-        // Therefore, Fluss currently only supports appending columns, and 
here we simply ignore any
-        // extra columns.
-        int flussPos = indexMapping == null ? pos : indexMapping[pos];
-        if (flussPos >= flinkRow.getArity()) {
-            return true;
-        }
-
-        return flinkRow.isNullAt(flussPos);
+        return flinkRow.isNullAt(pos);
     }
 
     @Override
     public boolean getBoolean(int pos) {
-        int flussPos = indexMapping == null ? pos : indexMapping[pos];
-        return flinkRow.getBoolean(flussPos);
+        return flinkRow.getBoolean(pos);
     }
 
     @Override
     public byte getByte(int pos) {
-        int flussPos = indexMapping == null ? pos : indexMapping[pos];
-        return flinkRow.getByte(flussPos);
+        return flinkRow.getByte(pos);
     }
 
     @Override
     public short getShort(int pos) {
-        int flussPos = indexMapping == null ? pos : indexMapping[pos];
-        return flinkRow.getShort(flussPos);
+        return flinkRow.getShort(pos);
     }
 
     @Override
     public int getInt(int pos) {
-        int flussPos = indexMapping == null ? pos : indexMapping[pos];
-        return flinkRow.getInt(flussPos);
+        return flinkRow.getInt(pos);
     }
 
     @Override
     public long getLong(int pos) {
-        int flussPos = indexMapping == null ? pos : indexMapping[pos];
-        return flinkRow.getLong(flussPos);
+        return flinkRow.getLong(pos);
     }
 
     @Override
     public float getFloat(int pos) {
-        int flussPos = indexMapping == null ? pos : indexMapping[pos];
-        return flinkRow.getFloat(flussPos);
+        return flinkRow.getFloat(pos);
     }
 
     @Override
     public double getDouble(int pos) {
-        int flussPos = indexMapping == null ? pos : indexMapping[pos];
-        return flinkRow.getDouble(flussPos);
+        return flinkRow.getDouble(pos);
     }
 
     @Override
     public BinaryString getChar(int pos, int length) {
-        int flussPos = indexMapping == null ? pos : indexMapping[pos];
-        return BinaryString.fromBytes(flinkRow.getString(flussPos).toBytes());
+        return BinaryString.fromBytes(flinkRow.getString(pos).toBytes());
     }
 
     @Override
     public BinaryString getString(int pos) {
-        int flussPos = indexMapping == null ? pos : indexMapping[pos];
-        return BinaryString.fromBytes(flinkRow.getString(flussPos).toBytes());
+        return BinaryString.fromBytes(flinkRow.getString(pos).toBytes());
     }
 
     @Override
     public Decimal getDecimal(int pos, int precision, int scale) {
-        int flussPos = indexMapping == null ? pos : indexMapping[pos];
-        return fromFlinkDecimal(flinkRow.getDecimal(flussPos, precision, 
scale));
+        return fromFlinkDecimal(flinkRow.getDecimal(pos, precision, scale));
     }
 
     public static Decimal fromFlinkDecimal(DecimalData decimal) {
@@ -161,43 +112,26 @@ public class FlinkAsFlussRow implements InternalRow {
 
     @Override
     public TimestampNtz getTimestampNtz(int pos, int precision) {
-        int flussPos = indexMapping == null ? pos : indexMapping[pos];
-        TimestampData timestamp = flinkRow.getTimestamp(flussPos, precision);
+        TimestampData timestamp = flinkRow.getTimestamp(pos, precision);
         return TimestampNtz.fromMillis(
                 timestamp.getMillisecond(), timestamp.getNanoOfMillisecond());
     }
 
     @Override
     public TimestampLtz getTimestampLtz(int pos, int precision) {
-        int flussPos = indexMapping == null ? pos : indexMapping[pos];
-        TimestampData timestamp = flinkRow.getTimestamp(flussPos, precision);
+        TimestampData timestamp = flinkRow.getTimestamp(pos, precision);
         return TimestampLtz.fromEpochMillis(
                 timestamp.getMillisecond(), timestamp.getNanoOfMillisecond());
     }
 
     @Override
     public byte[] getBinary(int pos, int length) {
-        int flussPos = indexMapping == null ? pos : indexMapping[pos];
-        return flinkRow.getBinary(flussPos);
+        return flinkRow.getBinary(pos);
     }
 
     @Override
     public byte[] getBytes(int pos) {
-        int flussPos = indexMapping == null ? pos : indexMapping[pos];
-        return flinkRow.getBinary(flussPos);
-    }
-
-    /** Converts a Flink {@link RowType} to a Fluss {@link RowType}, based on 
the field name. */
-    public static FlinkAsFlussRow from(
-            org.apache.flink.table.types.logical.RowType flinkRowType, RowType 
flussRowType) {
-        int[] indexMapping = new int[flussRowType.getFieldCount()];
-        List<String> fieldNames = flussRowType.getFieldNames();
-        for (int i = 0; i < flussRowType.getFieldCount(); i++) {
-            String fieldName = fieldNames.get(i);
-            int fieldIndex = flinkRowType.getFieldIndex(fieldName);
-            indexMapping[i] = fieldIndex;
-        }
-        return new FlinkAsFlussRow(indexMapping);
+        return flinkRow.getBinary(pos);
     }
 
     @Override
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/FlussSerializationSchema.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/FlussSerializationSchema.java
index 0f3fff210..2096b1095 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/FlussSerializationSchema.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/FlussSerializationSchema.java
@@ -69,10 +69,11 @@ public interface FlussSerializationSchema<T> extends 
Serializable {
         RowType getRowSchema();
 
         /**
-         * Returns the Flink table row schema.
+         * Returns the input row schema which is the Flink {@link
+         * org.apache.flink.table.data.RowData}.
          *
-         * @return The schema of the Flink table row.
+         * @return The schema of the input Flink row.
          */
-        org.apache.flink.table.types.logical.RowType getTableRowType();
+        org.apache.flink.table.types.logical.RowType getInputRowSchema();
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
index 05e2921fc..440c70485 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
@@ -21,10 +21,15 @@ import org.apache.fluss.flink.row.FlinkAsFlussRow;
 import org.apache.fluss.flink.row.OperationType;
 import org.apache.fluss.flink.row.RowWithOp;
 import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.ProjectedRow;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.RowKind;
 
+import javax.annotation.Nullable;
+
+import java.util.List;
+
 /** Default implementation of RowDataConverter for RowData. */
 public class RowDataSerializationSchema implements 
FlussSerializationSchema<RowData> {
     private static final long serialVersionUID = 1L;
@@ -44,6 +49,12 @@ public class RowDataSerializationSchema implements 
FlussSerializationSchema<RowD
      */
     private transient FlinkAsFlussRow converter;
 
+    /**
+     * The projected row for output, used when schema evolution occurs, 
because we want to write
+     * rows using new schema (e.g., fill null for new columns).
+     */
+    @Nullable private transient ProjectedRow outputProjection;
+
     /**
      * Constructs a new {@code RowSerializationSchema}.
      *
@@ -63,7 +74,19 @@ public class RowDataSerializationSchema implements 
FlussSerializationSchema<RowD
      */
     @Override
     public void open(InitializationContext context) throws Exception {
-        this.converter = FlinkAsFlussRow.from(context.getTableRowType(), 
context.getRowSchema());
+        this.converter = new FlinkAsFlussRow();
+        List<String> targetFieldNames = context.getRowSchema().getFieldNames();
+        List<String> inputFieldNames = 
context.getInputRowSchema().getFieldNames();
+        if (targetFieldNames.size() != inputFieldNames.size()) {
+            // there is a schema evolution happens (e.g., ADD COLUMN), need to 
build index mapping
+            int[] indexMapping = new int[targetFieldNames.size()];
+            for (int i = 0; i < targetFieldNames.size(); i++) {
+                String fieldName = targetFieldNames.get(i);
+                int fieldIndex = inputFieldNames.indexOf(fieldName);
+                indexMapping[i] = fieldIndex;
+            }
+            outputProjection = ProjectedRow.from(indexMapping);
+        }
     }
 
     /**
@@ -82,6 +105,9 @@ public class RowDataSerializationSchema implements 
FlussSerializationSchema<RowD
                     "Converter not initialized. The open() method must be 
called before serializing records.");
         }
         InternalRow row = converter.replace(value);
+        if (outputProjection != null) {
+            row = outputProjection.replaceRow(row);
+        }
         OperationType opType = toOperationType(value.getRowKind());
 
         return new RowWithOp(row, opType);
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/SerializerInitContextImpl.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/SerializerInitContextImpl.java
index 93b849e58..cc1d168cd 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/SerializerInitContextImpl.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/SerializerInitContextImpl.java
@@ -47,7 +47,7 @@ public class SerializerInitContextImpl implements 
FlussSerializationSchema.Initi
     }
 
     @Override
-    public org.apache.flink.table.types.logical.RowType getTableRowType() {
+    public org.apache.flink.table.types.logical.RowType getInputRowSchema() {
         return flinkRowType;
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java
index a4469fd32..97a70ce25 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java
@@ -29,7 +29,6 @@ import org.apache.fluss.flink.row.OperationType;
 import org.apache.fluss.flink.row.RowWithOp;
 import org.apache.fluss.flink.sink.serializer.FlussSerializationSchema;
 import org.apache.fluss.flink.sink.serializer.SerializerInitContextImpl;
-import org.apache.fluss.flink.utils.FlinkConversions;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.metrics.Gauge;
@@ -114,7 +113,7 @@ public abstract class FlinkSinkWriter<InputT> implements 
SinkWriter<InputT> {
                 "Current Fluss Schema is {}, Table RowType is {}",
                 table.getTableInfo().getSchema(),
                 tableRowType);
-        // sanityCheck(table.getTableInfo());
+        sanityCheck(table.getTableInfo());
 
         try {
             this.serializationSchema.open(
@@ -209,22 +208,6 @@ public abstract class FlinkSinkWriter<InputT> implements 
SinkWriter<InputT> {
                             "Primary key constraint is not matched between 
metadata in Fluss (%s) and Flink (%s).",
                             flussTableInfo.hasPrimaryKey(), hasPrimaryKey));
         }
-        RowType currentTableRowType = 
FlinkConversions.toFlinkRowType(flussTableInfo.getRowType());
-        if 
(!this.tableRowType.copy(false).equals(currentTableRowType.copy(false))) {
-            // The default nullability of Flink row type and Fluss row type 
might be not the same,
-            // thus we need to compare the row type without nullability here.
-
-            // Throw exception if the schema is the not same, this should 
rarely happen because we
-            // only allow fluss tables derived from fluss catalog. But this 
can happen if an ALTER
-            // TABLE command executed on the fluss table, after the job is 
submitted but before the
-            // SinkFunction is opened.
-            throw new ValidationException(
-                    "The Flink query schema is not matched to current Fluss 
table schema. "
-                            + "\nFlink query schema: "
-                            + this.tableRowType
-                            + "\nFluss table schema: "
-                            + currentTableRowType);
-        }
     }
 
     private long computeSendTime() {

Reply via email to