This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 56013b72db Flink: Backport handle table comments in FlinkSQL (#16503)
56013b72db is described below

commit 56013b72db456f774efb696e237b58de88c2e94e
Author: Stepan Stepanishchev 
<[email protected]>
AuthorDate: Thu May 21 15:15:08 2026 +0700

    Flink: Backport handle table comments in FlinkSQL (#16503)
    
    backports #16423
---
 .../java/org/apache/iceberg/flink/FlinkCatalog.java   |  6 ++++++
 .../apache/iceberg/flink/TestFlinkCatalogTable.java   | 19 +++++++++++++++++++
 .../java/org/apache/iceberg/flink/FlinkCatalog.java   |  6 ++++++
 .../apache/iceberg/flink/TestFlinkCatalogTable.java   | 19 +++++++++++++++++++
 4 files changed, 50 insertions(+)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 4bb235b811..a56c4e0ca6 100644
--- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -62,6 +62,7 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -450,6 +451,11 @@ public class FlinkCatalog extends AbstractCatalog {
       }
     }
 
+    String comment = table.getComment();
+    if (comment != null && !comment.isEmpty()) {
+      properties.put(TableProperties.COMMENT, comment);
+    }
+
     try {
       icebergCatalog.createTable(
           toIdentifier(tablePath), icebergSchema, spec, location, 
properties.build());
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index 757f22857b..b8c357bbb6 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -47,6 +47,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -236,6 +237,24 @@ public class TestFlinkCatalogTable extends CatalogTestBase 
{
         .containsEntry(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY, 
srcCatalogProps);
   }
 
+  @TestTemplate
+  public void testCreateTableWithTableComment() {
+    // create table with comment
+    sql("CREATE TABLE tl(id BIGINT) COMMENT 'table comment'");
+    
assertThat(table("tl").properties()).containsEntry(TableProperties.COMMENT, 
"table comment");
+  }
+
+  @TestTemplate
+  public void testAlterTableModifyTableComment() {
+    // create table with comment
+    sql("CREATE TABLE tl(id BIGINT) COMMENT 'table comment'");
+    
assertThat(table("tl").properties()).containsEntry(TableProperties.COMMENT, 
"table comment");
+
+    // alter table comment
+    sql("ALTER TABLE tl SET('comment' = 'new comment')");
+    
assertThat(table("tl").properties()).containsEntry(TableProperties.COMMENT, 
"new comment");
+  }
+
   @TestTemplate
   public void testCreateTableLocation() {
     assumeThat(isHadoopCatalog)
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 4bb235b811..a56c4e0ca6 100644
--- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -62,6 +62,7 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -450,6 +451,11 @@ public class FlinkCatalog extends AbstractCatalog {
       }
     }
 
+    String comment = table.getComment();
+    if (comment != null && !comment.isEmpty()) {
+      properties.put(TableProperties.COMMENT, comment);
+    }
+
     try {
       icebergCatalog.createTable(
           toIdentifier(tablePath), icebergSchema, spec, location, 
properties.build());
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index 091a7b67b4..020663a7de 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -48,6 +48,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -243,6 +244,24 @@ public class TestFlinkCatalogTable extends CatalogTestBase 
{
         .containsEntry(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY, 
srcCatalogProps);
   }
 
+  @TestTemplate
+  public void testCreateTableWithTableComment() {
+    // create table with comment
+    sql("CREATE TABLE tl(id BIGINT) COMMENT 'table comment'");
+    
assertThat(table("tl").properties()).containsEntry(TableProperties.COMMENT, 
"table comment");
+  }
+
+  @TestTemplate
+  public void testAlterTableModifyTableComment() {
+    // create table with comment
+    sql("CREATE TABLE tl(id BIGINT) COMMENT 'table comment'");
+    
assertThat(table("tl").properties()).containsEntry(TableProperties.COMMENT, 
"table comment");
+
+    // alter table comment
+    sql("ALTER TABLE tl SET('comment' = 'new comment')");
+    
assertThat(table("tl").properties()).containsEntry(TableProperties.COMMENT, 
"new comment");
+  }
+
   @TestTemplate
   public void testCreateTableLocation() {
     assumeThat(isHadoopCatalog)

Reply via email to