This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 684a1870d3 Flink: Fix ALTER TABLE to add column to specific position 
(#16419)
684a1870d3 is described below

commit 684a1870d3cc0738ab57269ab1dcb8a8a3b267a3
Author: Stepan Stepanishchev 
<[email protected]>
AuthorDate: Wed May 20 18:20:20 2026 +0700

    Flink: Fix ALTER TABLE to add column to specific position (#16419)
---
 .../iceberg/flink/util/FlinkAlterTableUtil.java    | 40 ++++++++++++++--------
 .../iceberg/flink/TestFlinkCatalogTable.java       | 25 ++++++++++++++
 2 files changed, 51 insertions(+), 14 deletions(-)

diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java
index 2bbc9cf208..35ada6106f 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java
@@ -124,20 +124,7 @@ public class FlinkAlterTableUtil {
       UpdateSchema pendingUpdate, List<TableChange> schemaChanges) {
     for (TableChange change : schemaChanges) {
       if (change instanceof TableChange.AddColumn) {
-        TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
-        Column flinkColumn = addColumn.getColumn();
-        Preconditions.checkArgument(
-            FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn),
-            "Unsupported table change: Adding computed column %s.",
-            flinkColumn.getName());
-        Type icebergType = 
FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType());
-        if (flinkColumn.getDataType().getLogicalType().isNullable()) {
-          pendingUpdate.addColumn(
-              flinkColumn.getName(), icebergType, 
flinkColumn.getComment().orElse(null));
-        } else {
-          pendingUpdate.addRequiredColumn(
-              flinkColumn.getName(), icebergType, 
flinkColumn.getComment().orElse(null));
-        }
+        applyAddColumn(pendingUpdate, (TableChange.AddColumn) change);
       } else if (change instanceof TableChange.ModifyColumn) {
         TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) 
change;
         applyModifyColumn(pendingUpdate, modifyColumn);
@@ -164,6 +151,31 @@ public class FlinkAlterTableUtil {
     }
   }
 
+  private static void applyAddColumn(UpdateSchema pendingUpdate, 
TableChange.AddColumn addColumn) {
+    Column flinkColumn = addColumn.getColumn();
+    Preconditions.checkArgument(
+        FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn),
+        "Unsupported table change: Adding computed column %s.",
+        flinkColumn.getName());
+
+    Type icebergType = 
FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType());
+
+    if (flinkColumn.getDataType().getLogicalType().isNullable()) {
+      pendingUpdate.addColumn(
+          flinkColumn.getName(), icebergType, 
flinkColumn.getComment().orElse(null));
+    } else {
+      pendingUpdate.addRequiredColumn(
+          flinkColumn.getName(), icebergType, 
flinkColumn.getComment().orElse(null));
+    }
+
+    if (addColumn.getPosition() != null) {
+      TableChange.ColumnPosition position = addColumn.getPosition();
+      TableChange.ModifyColumnPosition modifyColumnPosition =
+          new TableChange.ModifyColumnPosition(addColumn.getColumn(), 
position);
+      applyModifyColumnPosition(pendingUpdate, modifyColumnPosition);
+    }
+  }
+
   /**
    * Applies a list of Flink table property changes to an {@link 
UpdateProperties} operation.
    *
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index f7848a5d22..091a7b67b4 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -404,6 +404,31 @@ public class TestFlinkCatalogTable extends CatalogTestBase 
{
         .hasMessageContaining("Try to add a column `id` which already exists 
in the table.");
   }
 
+  @TestTemplate
+  public void testAlterTableAddColumnPosition() {
+    sql("CREATE TABLE tl(id BIGINT, name STRING)");
+    Schema schemaBefore = table("tl").schema();
+    assertThat(schemaBefore.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(1, "id", Types.LongType.get()),
+                    Types.NestedField.optional(2, "name", 
Types.StringType.get()))
+                .asStruct());
+
+    sql("ALTER TABLE tl ADD (col1 STRING FIRST)");
+    sql("ALTER TABLE tl ADD (col2 INT AFTER id)");
+
+    Schema schemaAfter = table("tl").schema();
+    assertThat(schemaAfter.asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(3, "col1", 
Types.StringType.get()),
+                    Types.NestedField.optional(1, "id", Types.LongType.get()),
+                    Types.NestedField.optional(4, "col2", 
Types.IntegerType.get()),
+                    Types.NestedField.optional(2, "name", 
Types.StringType.get()))
+                .asStruct());
+  }
+
   @TestTemplate
   public void testAlterTableDropColumn() {
     sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)");

Reply via email to