This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 684a1870d3 Flink: Fix ALTER TABLE to add column to specific position
(#16419)
684a1870d3 is described below
commit 684a1870d3cc0738ab57269ab1dcb8a8a3b267a3
Author: Stepan Stepanishchev
<[email protected]>
AuthorDate: Wed May 20 18:20:20 2026 +0700
Flink: Fix ALTER TABLE to add column to specific position (#16419)
---
.../iceberg/flink/util/FlinkAlterTableUtil.java | 40 ++++++++++++++--------
.../iceberg/flink/TestFlinkCatalogTable.java | 25 ++++++++++++++
2 files changed, 51 insertions(+), 14 deletions(-)
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java
index 2bbc9cf208..35ada6106f 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java
@@ -124,20 +124,7 @@ public class FlinkAlterTableUtil {
UpdateSchema pendingUpdate, List<TableChange> schemaChanges) {
for (TableChange change : schemaChanges) {
if (change instanceof TableChange.AddColumn) {
- TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
- Column flinkColumn = addColumn.getColumn();
- Preconditions.checkArgument(
- FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn),
- "Unsupported table change: Adding computed column %s.",
- flinkColumn.getName());
- Type icebergType =
FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType());
- if (flinkColumn.getDataType().getLogicalType().isNullable()) {
- pendingUpdate.addColumn(
- flinkColumn.getName(), icebergType,
flinkColumn.getComment().orElse(null));
- } else {
- pendingUpdate.addRequiredColumn(
- flinkColumn.getName(), icebergType,
flinkColumn.getComment().orElse(null));
- }
+ applyAddColumn(pendingUpdate, (TableChange.AddColumn) change);
} else if (change instanceof TableChange.ModifyColumn) {
TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn)
change;
applyModifyColumn(pendingUpdate, modifyColumn);
@@ -164,6 +151,31 @@ public class FlinkAlterTableUtil {
}
}
+ private static void applyAddColumn(UpdateSchema pendingUpdate,
TableChange.AddColumn addColumn) {
+ Column flinkColumn = addColumn.getColumn();
+ Preconditions.checkArgument(
+ FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn),
+ "Unsupported table change: Adding computed column %s.",
+ flinkColumn.getName());
+
+ Type icebergType =
FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType());
+
+ if (flinkColumn.getDataType().getLogicalType().isNullable()) {
+ pendingUpdate.addColumn(
+ flinkColumn.getName(), icebergType,
flinkColumn.getComment().orElse(null));
+ } else {
+ pendingUpdate.addRequiredColumn(
+ flinkColumn.getName(), icebergType,
flinkColumn.getComment().orElse(null));
+ }
+
+ if (addColumn.getPosition() != null) {
+ TableChange.ColumnPosition position = addColumn.getPosition();
+ TableChange.ModifyColumnPosition modifyColumnPosition =
+ new TableChange.ModifyColumnPosition(addColumn.getColumn(),
position);
+ applyModifyColumnPosition(pendingUpdate, modifyColumnPosition);
+ }
+ }
+
/**
* Applies a list of Flink table property changes to an {@link
UpdateProperties} operation.
*
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index f7848a5d22..091a7b67b4 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -404,6 +404,31 @@ public class TestFlinkCatalogTable extends CatalogTestBase
{
.hasMessageContaining("Try to add a column `id` which already exists
in the table.");
}
+ @TestTemplate
+ public void testAlterTableAddColumnPosition() {
+ sql("CREATE TABLE tl(id BIGINT, name STRING)");
+ Schema schemaBefore = table("tl").schema();
+ assertThat(schemaBefore.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "name",
Types.StringType.get()))
+ .asStruct());
+
+ sql("ALTER TABLE tl ADD (col1 STRING FIRST)");
+ sql("ALTER TABLE tl ADD (col2 INT AFTER id)");
+
+ Schema schemaAfter = table("tl").schema();
+ assertThat(schemaAfter.asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.optional(3, "col1",
Types.StringType.get()),
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(4, "col2",
Types.IntegerType.get()),
+ Types.NestedField.optional(2, "name",
Types.StringType.get()))
+ .asStruct());
+ }
+
@TestTemplate
public void testAlterTableDropColumn() {
sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)");