This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-add-column
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/ci-add-column by this push:
     new fd9e18730 fix sink
fd9e18730 is described below

commit fd9e187308d465471ed6eef38e550d3f2afbe93b
Author: Jark Wu <[email protected]>
AuthorDate: Mon Dec 1 12:20:28 2025 +0800

    fix sink
---
 .../main/java/org/apache/fluss/row/PaddingRow.java | 133 +++++++++++++++++++++
 .../serializer/RowDataSerializationSchema.java     |  49 +++++++-
 2 files changed, 180 insertions(+), 2 deletions(-)

diff --git a/fluss-common/src/main/java/org/apache/fluss/row/PaddingRow.java 
b/fluss-common/src/main/java/org/apache/fluss/row/PaddingRow.java
new file mode 100644
index 000000000..db431f2ac
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/row/PaddingRow.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row;
+
+/**
+ * An {@link InternalRow} that pads another {@link InternalRow} with nulls up 
to a target field
+ * count.
+ */
+public class PaddingRow implements InternalRow {
+
+    private final int targetFieldCount;
+    private InternalRow row;
+
+    public PaddingRow(int targetFieldCount) {
+        this.targetFieldCount = targetFieldCount;
+    }
+
+    /**
+     * Replaces the underlying {@link InternalRow} backing this {@link 
PaddingRow}.
+     *
+     * <p>This method replaces the row data in place and does not return a new 
object. This is done
+     * for performance reasons.
+     */
+    public PaddingRow replaceRow(InternalRow row) {
+        this.row = row;
+        return this;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return targetFieldCount;
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        if (row.getFieldCount() > pos) {
+            return row.isNullAt(pos);
+        } else {
+            // padding null if pos exceeds the underlying row's field count
+            return true;
+        }
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        return row.getBoolean(pos);
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        return row.getByte(pos);
+    }
+
+    @Override
+    public short getShort(int pos) {
+        return row.getShort(pos);
+    }
+
+    @Override
+    public int getInt(int pos) {
+        return row.getInt(pos);
+    }
+
+    @Override
+    public long getLong(int pos) {
+        return row.getLong(pos);
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        return row.getFloat(pos);
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        return row.getDouble(pos);
+    }
+
+    @Override
+    public BinaryString getChar(int pos, int length) {
+        return row.getChar(pos, length);
+    }
+
+    @Override
+    public BinaryString getString(int pos) {
+        return row.getString(pos);
+    }
+
+    @Override
+    public Decimal getDecimal(int pos, int precision, int scale) {
+        return row.getDecimal(pos, precision, scale);
+    }
+
+    @Override
+    public TimestampNtz getTimestampNtz(int pos, int precision) {
+        return row.getTimestampNtz(pos, precision);
+    }
+
+    @Override
+    public TimestampLtz getTimestampLtz(int pos, int precision) {
+        return row.getTimestampLtz(pos, precision);
+    }
+
+    @Override
+    public byte[] getBinary(int pos, int length) {
+        return row.getBinary(pos, length);
+    }
+
+    @Override
+    public byte[] getBytes(int pos) {
+        return row.getBytes(pos);
+    }
+
+    @Override
+    public InternalArray getArray(int pos) {
+        return row.getArray(pos);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
index 440c70485..5271a50be 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
@@ -21,6 +21,7 @@ import org.apache.fluss.flink.row.FlinkAsFlussRow;
 import org.apache.fluss.flink.row.OperationType;
 import org.apache.fluss.flink.row.RowWithOp;
 import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.PaddingRow;
 import org.apache.fluss.row.ProjectedRow;
 
 import org.apache.flink.table.data.RowData;
@@ -30,7 +31,38 @@ import javax.annotation.Nullable;
 
 import java.util.List;
 
-/** Default implementation of RowDataConverter for RowData. */
+/**
+ * Default implementation of RowDataConverter for RowData.
+ *
+ * <p>Schema Evolution Handling:
+ *
+ * <p>There are three schema in sink case.
+ *
+ * <ul>
+ *   <li>(1) Consumed Row Schema (RowData): the schema of RowData consumed, 
currently, Flink doesn't
+ *       provide API to fetch it, but we can get the row arity from 
RowData.getArity().
+ *   <li>(2) Plan Row Schema ({@link 
InitializationContext#getInputRowSchema()}): the compiled input
+ *       row schema of the sink, but the schema is fetched from catalog, so it 
is not the consumed
+ *       row schema. This is updated when job is re-compiled.
+ *   <li>(3) Table Latest Row Schema ({@link 
InitializationContext#getRowSchema()}): the latest
+ *       schema of the sink table, which is fetched at open() during runtime, 
so it will be updated
+ *       when Flink job restarts.
+ * </ul>
+ *
+ * <p>We always want to use latest schema to write data, so we need to add 
conversions from consumed
+ * Flink RowData.
+ *
+ * <ul>
+ *   <li>The {@link #converter} is used to wrap Flink {@link RowData} into 
Fluss {@link InternalRow}
+ *       without any schema transformation.
+ *   <li>The {@link #outputPadding} is used to pad nulls for new columns when 
new columns are added.
+ *       This may happen when table is added new columns before Flink job 
compilation, so the
+ *       Consumed Row Schema has less fields than Plan Row Schema.
+ *   <li>The {@link #outputProjection} is used to re-arrange the fields 
according to latest schema
+ *       if Plan Row Schema is not match Table Latest Row Schema. This may 
happen when table is
+ *       added new columns after the Flink job compiled and before job 
restarts.
+ * </ul>
+ */
 public class RowDataSerializationSchema implements 
FlussSerializationSchema<RowData> {
     private static final long serialVersionUID = 1L;
 
@@ -49,9 +81,16 @@ public class RowDataSerializationSchema implements 
FlussSerializationSchema<RowD
      */
     private transient FlinkAsFlussRow converter;
 
+    /**
+     * The padding row for output, used when the input row has fewer fields 
than the target schema.
+     * This may happen when table is added new columns between Flink job 
restarts.
+     */
+    private transient PaddingRow outputPadding;
+
     /**
      * The projected row for output, used when schema evolution occurs, 
because we want to write
-     * rows using new schema (e.g., fill null for new columns).
+     * rows using new schema (e.g., fill null for new columns). This may 
happen when table is *
+     * added new columns after the Flink job compiled and before job restarts.
      */
     @Nullable private transient ProjectedRow outputProjection;
 
@@ -77,6 +116,7 @@ public class RowDataSerializationSchema implements 
FlussSerializationSchema<RowD
         this.converter = new FlinkAsFlussRow();
         List<String> targetFieldNames = context.getRowSchema().getFieldNames();
         List<String> inputFieldNames = 
context.getInputRowSchema().getFieldNames();
+        this.outputPadding = new PaddingRow(inputFieldNames.size());
         if (targetFieldNames.size() != inputFieldNames.size()) {
             // there is a schema evolution happens (e.g., ADD COLUMN), need to 
build index mapping
             int[] indexMapping = new int[targetFieldNames.size()];
@@ -105,6 +145,11 @@ public class RowDataSerializationSchema implements 
FlussSerializationSchema<RowD
                     "Converter not initialized. The open() method must be 
called before serializing records.");
         }
         InternalRow row = converter.replace(value);
+        // handling schema evolution for changes before job compilation
+        if (row.getFieldCount() < outputPadding.getFieldCount()) {
+            row = outputPadding.replaceRow(row);
+        }
+        // handling schema evolution for changes after job compilation
         if (outputProjection != null) {
             row = outputProjection.replaceRow(row);
         }

Reply via email to