This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch ci-add-column
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/ci-add-column by this push:
new fd9e18730 fix sink
fd9e18730 is described below
commit fd9e187308d465471ed6eef38e550d3f2afbe93b
Author: Jark Wu <[email protected]>
AuthorDate: Mon Dec 1 12:20:28 2025 +0800
fix sink
---
.../main/java/org/apache/fluss/row/PaddingRow.java | 133 +++++++++++++++++++++
.../serializer/RowDataSerializationSchema.java | 49 +++++++-
2 files changed, 180 insertions(+), 2 deletions(-)
diff --git a/fluss-common/src/main/java/org/apache/fluss/row/PaddingRow.java
b/fluss-common/src/main/java/org/apache/fluss/row/PaddingRow.java
new file mode 100644
index 000000000..db431f2ac
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/row/PaddingRow.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row;
+
+/**
+ * An {@link InternalRow} that pads another {@link InternalRow} with nulls up
to a target field
+ * count.
+ */
+public class PaddingRow implements InternalRow {
+
+ private final int targetFieldCount;
+ private InternalRow row;
+
+ public PaddingRow(int targetFieldCount) {
+ this.targetFieldCount = targetFieldCount;
+ }
+
+ /**
+ * Replaces the underlying {@link InternalRow} backing this {@link
PaddingRow}.
+ *
+ * <p>This method replaces the row data in place and does not return a new
object. This is done
+ * for performance reasons.
+ */
+ public PaddingRow replaceRow(InternalRow row) {
+ this.row = row;
+ return this;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return targetFieldCount;
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ if (row.getFieldCount() > pos) {
+ return row.isNullAt(pos);
+ } else {
+ // padding null if pos exceeds the underlying row's field count
+ return true;
+ }
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return row.getBoolean(pos);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return row.getByte(pos);
+ }
+
+ @Override
+ public short getShort(int pos) {
+ return row.getShort(pos);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return row.getInt(pos);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return row.getLong(pos);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return row.getFloat(pos);
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return row.getDouble(pos);
+ }
+
+ @Override
+ public BinaryString getChar(int pos, int length) {
+ return row.getChar(pos, length);
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ return row.getString(pos);
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ return row.getDecimal(pos, precision, scale);
+ }
+
+ @Override
+ public TimestampNtz getTimestampNtz(int pos, int precision) {
+ return row.getTimestampNtz(pos, precision);
+ }
+
+ @Override
+ public TimestampLtz getTimestampLtz(int pos, int precision) {
+ return row.getTimestampLtz(pos, precision);
+ }
+
+ @Override
+ public byte[] getBinary(int pos, int length) {
+ return row.getBinary(pos, length);
+ }
+
+ @Override
+ public byte[] getBytes(int pos) {
+ return row.getBytes(pos);
+ }
+
+ @Override
+ public InternalArray getArray(int pos) {
+ return row.getArray(pos);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
index 440c70485..5271a50be 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
@@ -21,6 +21,7 @@ import org.apache.fluss.flink.row.FlinkAsFlussRow;
import org.apache.fluss.flink.row.OperationType;
import org.apache.fluss.flink.row.RowWithOp;
import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.PaddingRow;
import org.apache.fluss.row.ProjectedRow;
import org.apache.flink.table.data.RowData;
@@ -30,7 +31,38 @@ import javax.annotation.Nullable;
import java.util.List;
-/** Default implementation of RowDataConverter for RowData. */
+/**
+ * Default implementation of RowDataConverter for RowData.
+ *
+ * <p>Schema Evolution Handling:
+ *
+ * <p>There are three schema in sink case.
+ *
+ * <ul>
+ * <li>(1) Consumed Row Schema (RowData): the schema of RowData consumed,
currently, Flink doesn't
+ * provide API to fetch it, but we can get the row arity from
RowData.getArity().
+ * <li>(2) Plan Row Schema ({@link
InitializationContext#getInputRowSchema()}): the compiled input
+ * row schema of the sink, but the schema is fetched from catalog, so it
is not the consumed
+ * row schema. This is updated when job is re-compiled.
+ * <li>(3) Table Latest Row Schema ({@link
InitializationContext#getRowSchema()}): the latest
+ * schema of the sink table, which is fetched at open() during runtime,
so it will be updated
+ * when Flink job restarts.
+ * </ul>
+ *
+ * <p>We always want to use latest schema to write data, so we need to add
conversions from consumed
+ * Flink RowData.
+ *
+ * <ul>
+ * <li>The {@link #converter} is used to wrap Flink {@link RowData} into
Fluss {@link InternalRow}
+ * without any schema transformation.
+ * <li>The {@link #outputPadding} is used to pad nulls for new columns when
new columns are added.
+ * This may happen when table is added new columns before Flink job
compilation, so the
+ * Consumed Row Schema has less fields than Plan Row Schema.
+ * <li>The {@link #outputProjection} is used to re-arrange the fields
according to latest schema
+ * if Plan Row Schema is not match Table Latest Row Schema. This may
happen when table is
+ * added new columns after the Flink job compiled and before job
restarts.
+ * </ul>
+ */
public class RowDataSerializationSchema implements
FlussSerializationSchema<RowData> {
private static final long serialVersionUID = 1L;
@@ -49,9 +81,16 @@ public class RowDataSerializationSchema implements
FlussSerializationSchema<RowD
*/
private transient FlinkAsFlussRow converter;
+ /**
+ * The padding row for output, used when the input row has fewer fields
than the target schema.
+ * This may happen when table is added new columns between Flink job
restarts.
+ */
+ private transient PaddingRow outputPadding;
+
/**
* The projected row for output, used when schema evolution occurs,
because we want to write
- * rows using new schema (e.g., fill null for new columns).
+ * rows using new schema (e.g., fill null for new columns). This may
happen when table is *
+ * added new columns after the Flink job compiled and before job restarts.
*/
@Nullable private transient ProjectedRow outputProjection;
@@ -77,6 +116,7 @@ public class RowDataSerializationSchema implements
FlussSerializationSchema<RowD
this.converter = new FlinkAsFlussRow();
List<String> targetFieldNames = context.getRowSchema().getFieldNames();
List<String> inputFieldNames =
context.getInputRowSchema().getFieldNames();
+ this.outputPadding = new PaddingRow(inputFieldNames.size());
if (targetFieldNames.size() != inputFieldNames.size()) {
// there is a schema evolution happens (e.g., ADD COLUMN), need to
build index mapping
int[] indexMapping = new int[targetFieldNames.size()];
@@ -105,6 +145,11 @@ public class RowDataSerializationSchema implements
FlussSerializationSchema<RowD
"Converter not initialized. The open() method must be
called before serializing records.");
}
InternalRow row = converter.replace(value);
+ // handling schema evolution for changes before job compilation
+ if (row.getFieldCount() < outputPadding.getFieldCount()) {
+ row = outputPadding.replaceRow(row);
+ }
+ // handling schema evolution for changes after job compilation
if (outputProjection != null) {
row = outputProjection.replaceRow(row);
}