This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch ci-add-column in repository https://gitbox.apache.org/repos/asf/fluss.git
commit aa31e1c4c8c7d9a6626b792c24ed6cfa32215f6f Author: Jark Wu <[email protected]> AuthorDate: Mon Dec 1 01:21:36 2025 +0800 revert FlinkAsFlussRow changes --- .../apache/fluss/flink/row/FlinkAsFlussRow.java | 100 ++++----------------- .../sink/serializer/FlussSerializationSchema.java | 7 +- .../serializer/RowDataSerializationSchema.java | 28 +++++- .../sink/serializer/SerializerInitContextImpl.java | 2 +- .../fluss/flink/sink/writer/FlinkSinkWriter.java | 19 +--- 5 files changed, 50 insertions(+), 106 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java index 314e4b40f..003e910ea 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java @@ -23,34 +23,17 @@ import org.apache.fluss.row.InternalArray; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; -import org.apache.fluss.types.RowType; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.TimestampData; -import javax.annotation.Nullable; - -import java.util.List; - /** Wraps a Flink {@link RowData} as a Fluss {@link InternalRow}. */ public class FlinkAsFlussRow implements InternalRow { private RowData flinkRow; - /** - * Creates a FlinkAsFlussRow with a mapping from Fluss field index to Flink field index. If the - * indexMapping is null, flink and flink share same field name. - */ - @Nullable private final int[] indexMapping; - - public FlinkAsFlussRow() { - this(null); - } - - public FlinkAsFlussRow(int[] indexMapping) { - this.indexMapping = indexMapping; - } + public FlinkAsFlussRow() {} public FlinkAsFlussRow replace(RowData flinkRow) { this.flinkRow = flinkRow; @@ -59,94 +42,62 @@ public class FlinkAsFlussRow implements InternalRow { @Override public int getFieldCount() { - if (indexMapping == null) { - return flinkRow.getArity(); - } else { - return indexMapping.length; - } + return flinkRow.getArity(); } @Override public boolean isNullAt(int pos) { - // if no mapping, means that the field is not found in Flink, just ignore it. - if ((indexMapping != null && indexMapping[pos] == -1)) { - return true; - } - - // If pos is larger than flinkRow.getArity(), it indicates that the Fluss table's data type - // is wider than the data provided by Flink. - // This often occurs when a schema change happens to the catalog table before job restart. - // Only appending columns at the end is compatible, just need to ignore the trailing - // columns. - // Other types of schema changes (e.g., dropping columns) would lead to incompatibility. - // Therefore, Fluss currently only supports appending columns, and here we simply ignore any - // extra columns. - int flussPos = indexMapping == null ? pos : indexMapping[pos]; - if (flussPos >= flinkRow.getArity()) { - return true; - } - - return flinkRow.isNullAt(flussPos); + return flinkRow.isNullAt(pos); } @Override public boolean getBoolean(int pos) { - int flussPos = indexMapping == null ? pos : indexMapping[pos]; - return flinkRow.getBoolean(flussPos); + return flinkRow.getBoolean(pos); } @Override public byte getByte(int pos) { - int flussPos = indexMapping == null ? pos : indexMapping[pos]; - return flinkRow.getByte(flussPos); + return flinkRow.getByte(pos); } @Override public short getShort(int pos) { - int flussPos = indexMapping == null ? pos : indexMapping[pos]; - return flinkRow.getShort(flussPos); + return flinkRow.getShort(pos); } @Override public int getInt(int pos) { - int flussPos = indexMapping == null ? pos : indexMapping[pos]; - return flinkRow.getInt(flussPos); + return flinkRow.getInt(pos); } @Override public long getLong(int pos) { - int flussPos = indexMapping == null ? pos : indexMapping[pos]; - return flinkRow.getLong(flussPos); + return flinkRow.getLong(pos); } @Override public float getFloat(int pos) { - int flussPos = indexMapping == null ? pos : indexMapping[pos]; - return flinkRow.getFloat(flussPos); + return flinkRow.getFloat(pos); } @Override public double getDouble(int pos) { - int flussPos = indexMapping == null ? pos : indexMapping[pos]; - return flinkRow.getDouble(flussPos); + return flinkRow.getDouble(pos); } @Override public BinaryString getChar(int pos, int length) { - int flussPos = indexMapping == null ? pos : indexMapping[pos]; - return BinaryString.fromBytes(flinkRow.getString(flussPos).toBytes()); + return BinaryString.fromBytes(flinkRow.getString(pos).toBytes()); } @Override public BinaryString getString(int pos) { - int flussPos = indexMapping == null ? pos : indexMapping[pos]; - return BinaryString.fromBytes(flinkRow.getString(flussPos).toBytes()); + return BinaryString.fromBytes(flinkRow.getString(pos).toBytes()); } @Override public Decimal getDecimal(int pos, int precision, int scale) { - int flussPos = indexMapping == null ? pos : indexMapping[pos]; - return fromFlinkDecimal(flinkRow.getDecimal(flussPos, precision, scale)); + return fromFlinkDecimal(flinkRow.getDecimal(pos, precision, scale)); } public static Decimal fromFlinkDecimal(DecimalData decimal) { @@ -161,43 +112,26 @@ public class FlinkAsFlussRow implements InternalRow { @Override public TimestampNtz getTimestampNtz(int pos, int precision) { - int flussPos = indexMapping == null ? pos : indexMapping[pos]; - TimestampData timestamp = flinkRow.getTimestamp(flussPos, precision); + TimestampData timestamp = flinkRow.getTimestamp(pos, precision); return TimestampNtz.fromMillis( timestamp.getMillisecond(), timestamp.getNanoOfMillisecond()); } @Override public TimestampLtz getTimestampLtz(int pos, int precision) { - int flussPos = indexMapping == null ? pos : indexMapping[pos]; - TimestampData timestamp = flinkRow.getTimestamp(flussPos, precision); + TimestampData timestamp = flinkRow.getTimestamp(pos, precision); return TimestampLtz.fromEpochMillis( timestamp.getMillisecond(), timestamp.getNanoOfMillisecond()); } @Override public byte[] getBinary(int pos, int length) { - int flussPos = indexMapping == null ? pos : indexMapping[pos]; - return flinkRow.getBinary(flussPos); + return flinkRow.getBinary(pos); } @Override public byte[] getBytes(int pos) { - int flussPos = indexMapping == null ? pos : indexMapping[pos]; - return flinkRow.getBinary(flussPos); - } - - /** Converts a Flink {@link RowType} to a Fluss {@link RowType}, based on the field name. */ - public static FlinkAsFlussRow from( - org.apache.flink.table.types.logical.RowType flinkRowType, RowType flussRowType) { - int[] indexMapping = new int[flussRowType.getFieldCount()]; - List<String> fieldNames = flussRowType.getFieldNames(); - for (int i = 0; i < flussRowType.getFieldCount(); i++) { - String fieldName = fieldNames.get(i); - int fieldIndex = flinkRowType.getFieldIndex(fieldName); - indexMapping[i] = fieldIndex; - } - return new FlinkAsFlussRow(indexMapping); + return flinkRow.getBinary(pos); } @Override diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/FlussSerializationSchema.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/FlussSerializationSchema.java index 0f3fff210..2096b1095 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/FlussSerializationSchema.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/FlussSerializationSchema.java @@ -69,10 +69,11 @@ public interface FlussSerializationSchema<T> extends Serializable { RowType getRowSchema(); /** - * Returns the Flink table row schema. + * Returns the input row schema which is the Flink {@link + * org.apache.flink.table.data.RowData}. * - * @return The schema of the Flink table row. + * @return The schema of the input Flink row. */ - org.apache.flink.table.types.logical.RowType getTableRowType(); + org.apache.flink.table.types.logical.RowType getInputRowSchema(); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java index 05e2921fc..440c70485 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java @@ -21,10 +21,15 @@ import org.apache.fluss.flink.row.FlinkAsFlussRow; import org.apache.fluss.flink.row.OperationType; import org.apache.fluss.flink.row.RowWithOp; import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.ProjectedRow; import org.apache.flink.table.data.RowData; import org.apache.flink.types.RowKind; +import javax.annotation.Nullable; + +import java.util.List; + /** Default implementation of RowDataConverter for RowData. */ public class RowDataSerializationSchema implements FlussSerializationSchema<RowData> { private static final long serialVersionUID = 1L; @@ -44,6 +49,12 @@ public class RowDataSerializationSchema implements FlussSerializationSchema<RowD */ private transient FlinkAsFlussRow converter; + /** + * The projected row for output, used when schema evolution occurs, because we want to write + * rows using new schema (e.g., fill null for new columns). + */ + @Nullable private transient ProjectedRow outputProjection; + /** * Constructs a new {@code RowSerializationSchema}. * @@ -63,7 +74,19 @@ public class RowDataSerializationSchema implements FlussSerializationSchema<RowD */ @Override public void open(InitializationContext context) throws Exception { - this.converter = FlinkAsFlussRow.from(context.getTableRowType(), context.getRowSchema()); + this.converter = new FlinkAsFlussRow(); + List<String> targetFieldNames = context.getRowSchema().getFieldNames(); + List<String> inputFieldNames = context.getInputRowSchema().getFieldNames(); + if (targetFieldNames.size() != inputFieldNames.size()) { + // there is a schema evolution happens (e.g., ADD COLUMN), need to build index mapping + int[] indexMapping = new int[targetFieldNames.size()]; + for (int i = 0; i < targetFieldNames.size(); i++) { + String fieldName = targetFieldNames.get(i); + int fieldIndex = inputFieldNames.indexOf(fieldName); + indexMapping[i] = fieldIndex; + } + outputProjection = ProjectedRow.from(indexMapping); + } } /** @@ -82,6 +105,9 @@ public class RowDataSerializationSchema implements FlussSerializationSchema<RowD "Converter not initialized. The open() method must be called before serializing records."); } InternalRow row = converter.replace(value); + if (outputProjection != null) { + row = outputProjection.replaceRow(row); + } OperationType opType = toOperationType(value.getRowKind()); return new RowWithOp(row, opType); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/SerializerInitContextImpl.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/SerializerInitContextImpl.java index 93b849e58..cc1d168cd 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/SerializerInitContextImpl.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/SerializerInitContextImpl.java @@ -47,7 +47,7 @@ public class SerializerInitContextImpl implements FlussSerializationSchema.Initi } @Override - public org.apache.flink.table.types.logical.RowType getTableRowType() { + public org.apache.flink.table.types.logical.RowType getInputRowSchema() { return flinkRowType; } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java index a4469fd32..97a70ce25 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java @@ -29,7 +29,6 @@ import org.apache.fluss.flink.row.OperationType; import org.apache.fluss.flink.row.RowWithOp; import org.apache.fluss.flink.sink.serializer.FlussSerializationSchema; import org.apache.fluss.flink.sink.serializer.SerializerInitContextImpl; -import org.apache.fluss.flink.utils.FlinkConversions; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.metrics.Gauge; @@ -114,7 +113,7 @@ public abstract class FlinkSinkWriter<InputT> implements SinkWriter<InputT> { "Current Fluss Schema is {}, Table RowType is {}", table.getTableInfo().getSchema(), tableRowType); - // sanityCheck(table.getTableInfo()); + sanityCheck(table.getTableInfo()); try { this.serializationSchema.open( @@ -209,22 +208,6 @@ public abstract class FlinkSinkWriter<InputT> implements SinkWriter<InputT> { "Primary key constraint is not matched between metadata in Fluss (%s) and Flink (%s).", flussTableInfo.hasPrimaryKey(), hasPrimaryKey)); } - RowType currentTableRowType = FlinkConversions.toFlinkRowType(flussTableInfo.getRowType()); - if (!this.tableRowType.copy(false).equals(currentTableRowType.copy(false))) { - // The default nullability of Flink row type and Fluss row type might be not the same, - // thus we need to compare the row type without nullability here. - - // Throw exception if the schema is the not same, this should rarely happen because we - // only allow fluss tables derived from fluss catalog. But this can happen if an ALTER - // TABLE command executed on the fluss table, after the job is submitted but before the - // SinkFunction is opened. - throw new ValidationException( - "The Flink query schema is not matched to current Fluss table schema. " - + "\nFlink query schema: " - + this.tableRowType - + "\nFluss table schema: " - + currentTableRowType); - } } private long computeSendTime() {
