This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 56013b72db Flink: Backport handle table comments in FlinkSQL (#16503)
56013b72db is described below
commit 56013b72db456f774efb696e237b58de88c2e94e
Author: Stepan Stepanishchev
<[email protected]>
AuthorDate: Thu May 21 15:15:08 2026 +0700
Flink: Backport handle table comments in FlinkSQL (#16503)
backports #16423
---
.../java/org/apache/iceberg/flink/FlinkCatalog.java | 6 ++++++
.../apache/iceberg/flink/TestFlinkCatalogTable.java | 19 +++++++++++++++++++
.../java/org/apache/iceberg/flink/FlinkCatalog.java | 6 ++++++
.../apache/iceberg/flink/TestFlinkCatalogTable.java | 19 +++++++++++++++++++
4 files changed, 50 insertions(+)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 4bb235b811..a56c4e0ca6 100644
--- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -62,6 +62,7 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -450,6 +451,11 @@ public class FlinkCatalog extends AbstractCatalog {
}
}
+ String comment = table.getComment();
+ if (comment != null && !comment.isEmpty()) {
+ properties.put(TableProperties.COMMENT, comment);
+ }
+
try {
icebergCatalog.createTable(
toIdentifier(tablePath), icebergSchema, spec, location,
properties.build());
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index 757f22857b..b8c357bbb6 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -47,6 +47,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -236,6 +237,24 @@ public class TestFlinkCatalogTable extends CatalogTestBase
{
.containsEntry(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY,
srcCatalogProps);
}
+ @TestTemplate
+ public void testCreateTableWithTableComment() {
+ // create table with comment
+ sql("CREATE TABLE tl(id BIGINT) COMMENT 'table comment'");
+
assertThat(table("tl").properties()).containsEntry(TableProperties.COMMENT,
"table comment");
+ }
+
+ @TestTemplate
+ public void testAlterTableModifyTableComment() {
+ // create table with comment
+ sql("CREATE TABLE tl(id BIGINT) COMMENT 'table comment'");
+
assertThat(table("tl").properties()).containsEntry(TableProperties.COMMENT,
"table comment");
+
+ // alter table comment
+ sql("ALTER TABLE tl SET('comment' = 'new comment')");
+
assertThat(table("tl").properties()).containsEntry(TableProperties.COMMENT,
"new comment");
+ }
+
@TestTemplate
public void testCreateTableLocation() {
assumeThat(isHadoopCatalog)
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 4bb235b811..a56c4e0ca6 100644
--- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -62,6 +62,7 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -450,6 +451,11 @@ public class FlinkCatalog extends AbstractCatalog {
}
}
+ String comment = table.getComment();
+ if (comment != null && !comment.isEmpty()) {
+ properties.put(TableProperties.COMMENT, comment);
+ }
+
try {
icebergCatalog.createTable(
toIdentifier(tablePath), icebergSchema, spec, location,
properties.build());
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index 091a7b67b4..020663a7de 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -48,6 +48,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -243,6 +244,24 @@ public class TestFlinkCatalogTable extends CatalogTestBase
{
.containsEntry(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY,
srcCatalogProps);
}
+ @TestTemplate
+ public void testCreateTableWithTableComment() {
+ // create table with comment
+ sql("CREATE TABLE tl(id BIGINT) COMMENT 'table comment'");
+
assertThat(table("tl").properties()).containsEntry(TableProperties.COMMENT,
"table comment");
+ }
+
+ @TestTemplate
+ public void testAlterTableModifyTableComment() {
+ // create table with comment
+ sql("CREATE TABLE tl(id BIGINT) COMMENT 'table comment'");
+
assertThat(table("tl").properties()).containsEntry(TableProperties.COMMENT,
"table comment");
+
+ // alter table comment
+ sql("ALTER TABLE tl SET('comment' = 'new comment')");
+
assertThat(table("tl").properties()).containsEntry(TableProperties.COMMENT,
"new comment");
+ }
+
@TestTemplate
public void testCreateTableLocation() {
assumeThat(isHadoopCatalog)