This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 987c466ba [FLINK-39245][Iceberg] Add AWS Glue Catalog support for 
Iceberg pipeline connector (#4314)
987c466ba is described below

commit 987c466ba0e2fb7ec8bdc6cefa1d72a03468f0ee
Author: HUANG XIAO <[email protected]>
AuthorDate: Fri Mar 20 22:40:29 2026 +0800

    [FLINK-39245][Iceberg] Add AWS Glue Catalog support for Iceberg pipeline 
connector (#4314)
    
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
---
 .../docs/connectors/pipeline-connectors/iceberg.md | 200 +++++++++++++++------
 .../docs/connectors/pipeline-connectors/iceberg.md | 103 ++++++++++-
 .../flink-cdc-pipeline-connector-iceberg/pom.xml   |   6 +
 .../iceberg/sink/IcebergDataSinkFactory.java       |   6 +
 .../iceberg/sink/IcebergDataSinkOptions.java       |  50 +++++-
 .../iceberg/sink/IcebergDataSinkFactoryTest.java   |  48 +++++
 6 files changed, 349 insertions(+), 64 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md
index 9d97cd831..8cada084e 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md
@@ -24,22 +24,24 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Iceberg Pipeline Connector
+# Iceberg Pipeline 连接器
 
-The Iceberg Pipeline Connector functions as a *Data Sink* for data pipelines, 
enabling data writes to Apache Iceberg 
tables[Iceberg](https://iceberg.apache.org). This document explains how to 
configure the connector.
+Iceberg Pipeline 连接器作为数据管道的 *Data Sink*,支持将数据写入 [Apache 
Iceberg](https://iceberg.apache.org) 表。本文档介绍如何配置该连接器。
 
-## Key Capabilities
-* Automatic Table Creation
-  Creates Iceberg tables dynamically when they do not exist
-* Schema Synchronization
-  Propagates schema changes (e.g., column additions) from source systems to 
Iceberg
-* Data Replication
-  Supports both batch and streaming data synchronization
+## 核心能力
+* **自动建表:**
+当 Iceberg 表不存在时,自动动态创建
+* **Schema 同步:**
+将上游数据源的 Schema 变更(例如新增列)自动同步到 Iceberg 表
+* **数据同步:**
+支持批处理和流式数据同步
 
-How to create Pipeline
+Pipeline 创建方式
 ----------------
 
-The pipeline for reading data from MySQL and sink to Iceberg can be defined as 
follows:
+以下示例展示了如何定义一个从 MySQL 读取数据并写入 Iceberg 的数据管道:
+
+### Hadoop Catalog 示例
 
 ```yaml
 source:
@@ -62,115 +64,203 @@ pipeline:
   name: MySQL to Iceberg Pipeline
   parallelism: 2
 ```
-***Note:***
-If `catalog.properties.type` is hadoop, you need to configure the following 
dependencies manually, and pass it with `--jar` argument of Flink CDC CLI when 
submitting YAML pipeline jobs.
+
+### AWS Glue Catalog 示例
+
+```yaml
+source:
+  type: mysql
+  name: MySQL Source
+  hostname: 127.0.0.1
+  port: 3306
+  username: admin
+  password: pass
+  tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+  server-id: 5401-5404
+
+sink:
+  type: iceberg
+  name: Iceberg Sink
+  catalog.properties.type: glue
+  catalog.properties.warehouse: s3://my-bucket/warehouse
+  catalog.properties.io-impl: org.apache.iceberg.aws.s3.S3FileIO
+  catalog.properties.client.region: us-east-1
+  catalog.properties.glue.skip-archive: true
+
+pipeline:
+  name: MySQL to Iceberg via Glue Pipeline
+  parallelism: 2
+```
+
+***注意:***
+根据所使用的 Catalog 类型,可能需要手动添加额外的 JAR 依赖,并在使用 Flink CDC CLI 提交 YAML 管道作业时通过 
`--jar` 参数传入。
+
 <div class="wy-table-responsive">
 <table class="colwidths-auto docutils">
     <thead>
       <tr>
-        <th class="text-left">Dependency Item</th>
-        <th class="text-left">Description</th>
+        <th class="text-left">Catalog 类型</th>
+        <th class="text-left">依赖项</th>
+        <th class="text-left">说明</th>
       </tr>
     </thead>
     <tbody>
       <tr>
-        <td><a 
href="https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2-uber/2.8.3-10.0
 ">  org.apache.flink:flink-shaded-hadoop-2-uber:2.8.3-10.0</a></td>
-        <td>Used for Hadoop dependencies.</td>
+        <td>所有类型</td>
+        <td><a 
href="https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-runtime-1.20";>org.apache.iceberg:iceberg-flink-runtime-1.20</a></td>
+        <td>Iceberg Flink 运行时依赖。当运行环境中未预装 Iceberg 时(例如独立部署的 Flink 集群),所有 
Catalog 类型均需要此依赖。</td>
+      </tr>
+      <tr>
+        <td>hadoop</td>
+        <td><a 
href="https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2-uber/2.8.3-10.0";>org.apache.flink:flink-shaded-hadoop-2-uber:2.8.3-10.0</a></td>
+        <td>提供 Hadoop 文件系统相关依赖。</td>
+      </tr>
+      <tr>
+        <td>glue</td>
+        <td><a 
href="https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-aws";>org.apache.iceberg:iceberg-aws</a></td>
+        <td>提供 AWS Glue Catalog 和 S3 FileIO 实现。</td>
+      </tr>
+      <tr>
+        <td>glue</td>
+        <td><a 
href="https://mvnrepository.com/artifact/software.amazon.awssdk/bundle";>software.amazon.awssdk:bundle</a></td>
+        <td>iceberg-aws 所依赖的 AWS SDK。</td>
       </tr>
     </tbody>
 </table>
 </div>
 
-Pipeline Connector Options
+Pipeline 连接器选项
 ----------------
 <div class="highlight">
 <table class="colwidths-auto docutils">
    <thead>
       <tr>
-        <th class="text-left" style="width: 25%">Option</th>
-        <th class="text-left" style="width: 8%">Required</th>
-        <th class="text-left" style="width: 7%">Default</th>
-        <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 50%">Description</th>
+        <th class="text-left" style="width: 25%">选项</th>
+        <th class="text-left" style="width: 8%">是否必填</th>
+        <th class="text-left" style="width: 7%">默认值</th>
+        <th class="text-left" style="width: 10%">类型</th>
+        <th class="text-left" style="width: 50%">描述</th>
       </tr>
     </thead>
     <tbody>
     <tr>
       <td>type</td>
-      <td>required</td>
-      <td style="word-wrap: break-word;">(none)</td>
+      <td>必填</td>
+      <td style="word-wrap: break-word;">(无)</td>
       <td>String</td>
-      <td>Specify what connector to use, here should be 
<code>'iceberg'</code>.</td>
+      <td>指定使用的连接器类型,此处应为 <code>iceberg</code>。</td>
     </tr>
     <tr>
       <td>name</td>
-      <td>optional</td>
-      <td style="word-wrap: break-word;">(none)</td>
+      <td>可选</td>
+      <td style="word-wrap: break-word;">(无)</td>
       <td>String</td>
-      <td>The name of the sink.</td>
+      <td>Sink 的名称。</td>
     </tr>
     <tr>
       <td>catalog.properties.type</td>
-      <td>required</td>
-      <td style="word-wrap: break-word;">(none)</td>
+      <td>条件必填</td>
+      <td style="word-wrap: break-word;">(无)</td>
       <td>String</td>
-      <td>Metastore of iceberg catalog, supports hadoop and hive.</td>
+      <td>Iceberg Catalog 的元数据存储类型,支持 <code>hadoop</code>、<code>hive</code> 和 
<code>glue</code>。此选项与 <code>catalog.properties.catalog-impl</code> 必须设置其一。</td>
+    </tr>
+    <tr>
+      <td>catalog.properties.catalog-impl</td>
+      <td>条件必填</td>
+      <td style="word-wrap: break-word;">(无)</td>
+      <td>String</td>
+      <td>自定义 Catalog 实现类,例如 
<code>org.apache.iceberg.aws.glue.GlueCatalog</code>。此选项与 
<code>catalog.properties.type</code> 必须设置其一。</td>
     </tr>
     <tr>
       <td>catalog.properties.warehouse</td>
-      <td>optional</td>
-      <td style="word-wrap: break-word;">(none)</td>
+      <td>可选</td>
+      <td style="word-wrap: break-word;">(无)</td>
       <td>String</td>
-      <td>The warehouse root path of catalog.</td>
+      <td>Iceberg Catalog 的仓库根路径,适用于所有 Catalog 类型。对于 <code>hadoop</code> 和 
<code>hive</code> Catalog,通常为本地或分布式文件系统路径;对于 <code>glue</code> 
Catalog,通常为对象存储路径,例如 <code>s3://my-bucket/warehouse</code>。</td>
     </tr>
     <tr>
       <td>catalog.properties.uri</td>
-      <td>optional</td>
-      <td style="word-wrap: break-word;">(none)</td>
+      <td>可选</td>
+      <td style="word-wrap: break-word;">(无)</td>
+      <td>String</td>
+      <td>元数据服务 URI(例如 Hive Metastore 的 thrift URI)。</td>
+    </tr>
+    <tr>
+      <td>catalog.properties.io-impl</td>
+      <td>可选</td>
+      <td style="word-wrap: break-word;">(无)</td>
+      <td>String</td>
+      <td>自定义 FileIO 实现类。使用 AWS S3 时,请设置为 
<code>org.apache.iceberg.aws.s3.S3FileIO</code>。</td>
+    </tr>
+    <tr>
+      <td>catalog.properties.client.region</td>
+      <td>可选</td>
+      <td style="word-wrap: break-word;">(无)</td>
       <td>String</td>
-      <td>Uri of metastore server.</td>
+      <td>Glue Catalog 客户端的 AWS 区域(例如 <code>us-east-1</code>)。</td>
+    </tr>
+    <tr>
+      <td>catalog.properties.glue.id</td>
+      <td>可选</td>
+      <td style="word-wrap: break-word;">(无)</td>
+      <td>String</td>
+      <td>Glue Catalog ID(即 AWS 账户 ID)。默认使用调用者的 AWS 账户 ID。</td>
+    </tr>
+    <tr>
+      <td>catalog.properties.glue.skip-archive</td>
+      <td>可选</td>
+      <td style="word-wrap: break-word;">true</td>
+      <td>Boolean</td>
+      <td>是否跳过在 Glue 中归档旧版本的表元数据。</td>
+    </tr>
+    <tr>
+      <td>catalog.properties.glue.skip-name-validation</td>
+      <td>可选</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>是否跳过 Glue Catalog 的名称校验。</td>
     </tr>
     <tr>
       <td>partition.key</td>
-      <td>optional</td>
-      <td style="word-wrap: break-word;">(none)</td>
+      <td>可选</td>
+      <td style="word-wrap: break-word;">(无)</td>
       <td>String</td>
-      <td>Partition keys for each partitioned table. Allow setting multiple 
primary keys for multiTables. Tables are separated by ';', and partition keys 
are separated by ','. For example, we can set <code>partition.key</code> of two 
tables using 'testdb.table1:id1,id2;testdb.table2:name'. For partition 
transforms, we can set <code>partition.key</code> using 
'testdb.table1:truncate[10](id);testdb.table2:hour(create_time);testdb.table3:day(create_time);testdb.table4:month(create_time);tes
 [...]
+      <td>各分区表的分区键。支持为多张表设置不同的分区键,表之间以 <code>;</code> 分隔,分区键之间以 <code>,</code> 
分隔。例如,可以通过 <code>testdb.table1:id1,id2;testdb.table2:name</code> 
为两张表设置分区键。对于分区转换,可以使用以下语法:<code>testdb.table1:truncate[10](id);testdb.table2:hour(create_time);testdb.table3:day(create_time);testdb.table4:month(create_time);testdb.table5:year(create_time);testdb.table6:bucket[10](create_time)</code>。</td>
     </tr>
     <tr>
       <td>catalog.properties.*</td>
-      <td>optional</td>
-      <td style="word-wrap: break-word;">(none)</td>
+      <td>可选</td>
+      <td style="word-wrap: break-word;">(无)</td>
       <td>String</td>
-      <td>Pass options of Iceberg catalog to pipeline,See <a 
href="https://iceberg.apache.org/docs/nightly/flink-configuration/#catalog-configuration";>Iceberg
 catalog options</a>. </td>
+      <td>透传 Iceberg Catalog 选项到管道,详见 <a 
href="https://iceberg.apache.org/docs/nightly/flink-configuration/#catalog-configuration";>Iceberg
 Catalog 配置</a>。</td>
     </tr>
     <tr>
       <td>table.properties.*</td>
-      <td>optional</td>
-      <td style="word-wrap: break-word;">(none)</td>
+      <td>可选</td>
+      <td style="word-wrap: break-word;">(无)</td>
       <td>String</td>
-      <td>Pass options of Iceberg table to pipeline,See <a 
href="https://iceberg.apache.org/docs/nightly/configuration/#write-properties";>Iceberg
 table options</a>. </td>
+      <td>透传 Iceberg 表选项到管道,详见 <a 
href="https://iceberg.apache.org/docs/nightly/configuration/#write-properties";>Iceberg
 表配置</a>。</td>
     </tr>
     </tbody>
 </table>    
 </div>
 
-Usage Notes
+使用须知
 --------
 
-* Only support Iceberg primary key table, so the source table must have 
primary keys.
+* 源表必须包含主键,不支持无主键的表。
 
-* Not support exactly-once. The connector uses at-least-once + primary key 
table for idempotent writing.
+* 不支持精确一次(Exactly-Once)语义。连接器采用至少一次(At-Least-Once)+ 主键幂等写入的方式保证数据一致性。
 
-Data Type Mapping
+数据类型映射
 ----------------
 <div class="wy-table-responsive">
 <table class="colwidths-auto docutils">
     <thead>
       <tr>
-        <th class="text-left">CDC type</th>
-        <th class="text-left">Iceberg type</th>
-        <th class="text-left" style="width:60%;">NOTE</th>
+        <th class="text-left">CDC 类型</th>
+        <th class="text-left">Iceberg 类型</th>
+        <th class="text-left" style="width:60%;">说明</th>
       </tr>
     </thead>
     <tbody>
@@ -243,4 +333,4 @@ Data Type Mapping
 </table>
 </div>
 
-{{< top >}}
\ No newline at end of file
+{{< top >}}
diff --git a/docs/content/docs/connectors/pipeline-connectors/iceberg.md 
b/docs/content/docs/connectors/pipeline-connectors/iceberg.md
index 129d154b0..2063e5723 100644
--- a/docs/content/docs/connectors/pipeline-connectors/iceberg.md
+++ b/docs/content/docs/connectors/pipeline-connectors/iceberg.md
@@ -41,6 +41,8 @@ How to create Pipeline
 
 The pipeline for reading data from MySQL and sink to Iceberg can be defined as 
follows:
 
+### Hadoop Catalog Example
+
 ```yaml
 source:
   type: mysql
@@ -63,20 +65,65 @@ pipeline:
   parallelism: 2
 ```
 
+### AWS Glue Catalog Example
+
+```yaml
+source:
+  type: mysql
+  name: MySQL Source
+  hostname: 127.0.0.1
+  port: 3306
+  username: admin
+  password: pass
+  tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+  server-id: 5401-5404
+
+sink:
+  type: iceberg
+  name: Iceberg Sink
+  catalog.properties.type: glue
+  catalog.properties.warehouse: s3://my-bucket/warehouse
+  catalog.properties.io-impl: org.apache.iceberg.aws.s3.S3FileIO
+  catalog.properties.client.region: us-east-1
+  catalog.properties.glue.skip-archive: true
+
+pipeline:
+  name: MySQL to Iceberg via Glue Pipeline
+  parallelism: 2
+```
+
 ***Note:***
-If `catalog.properties.type` is hadoop, you need to configure the following 
dependencies manually, and pass it with `--jar` argument of Flink CDC CLI when 
submitting YAML pipeline jobs.
+Depending on the catalog type, you may need to add extra JARs manually and 
pass them with the `--jar` argument of Flink CDC CLI when submitting YAML 
pipeline jobs.
+
 <div class="wy-table-responsive">
 <table class="colwidths-auto docutils">
     <thead>
       <tr>
+        <th class="text-left">Catalog Type</th>
         <th class="text-left">Dependency Item</th>
         <th class="text-left">Description</th>
       </tr>
     </thead>
     <tbody>
       <tr>
-        <td><a 
href="https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2-uber/2.8.3-10.0
 ">  org.apache.flink:flink-shaded-hadoop-2-uber:2.8.3-10.0</a></td>
-        <td>Used for Hadoop dependencies.</td>
+        <td>all</td>
+        <td><a 
href="https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-runtime-1.20";>org.apache.iceberg:iceberg-flink-runtime-1.20</a></td>
+        <td>Iceberg Flink runtime. Required for all catalog types when not 
pre-installed in the runtime environment (e.g., standalone Flink clusters).</td>
+      </tr>
+      <tr>
+        <td>hadoop</td>
+        <td><a 
href="https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2-uber/2.8.3-10.0";>org.apache.flink:flink-shaded-hadoop-2-uber:2.8.3-10.0</a></td>
+        <td>Provides Hadoop filesystem dependencies.</td>
+      </tr>
+      <tr>
+        <td>glue</td>
+        <td><a 
href="https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-aws";>org.apache.iceberg:iceberg-aws</a></td>
+        <td>Provides AWS Glue Catalog and S3 FileIO implementation.</td>
+      </tr>
+      <tr>
+        <td>glue</td>
+        <td><a 
href="https://mvnrepository.com/artifact/software.amazon.awssdk/bundle";>software.amazon.awssdk:bundle</a></td>
+        <td>AWS SDK bundle required by iceberg-aws.</td>
       </tr>
     </tbody>
 </table>
@@ -112,24 +159,66 @@ Pipeline Connector Options
     </tr>
     <tr>
       <td>catalog.properties.type</td>
-      <td>required</td>
+      <td>conditionally required</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>Metastore of Iceberg catalog, supports <code>hadoop</code> and 
<code>hive</code>.</td>
+      <td>Metastore type of Iceberg catalog, supports <code>hadoop</code>, 
<code>hive</code>, and <code>glue</code>. Either this option or 
<code>catalog.properties.catalog-impl</code> must be set.</td>
+    </tr>
+    <tr>
+      <td>catalog.properties.catalog-impl</td>
+      <td>conditionally required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Custom catalog implementation class, e.g. 
<code>org.apache.iceberg.aws.glue.GlueCatalog</code>. Either this option or 
<code>catalog.properties.type</code> must be set.</td>
     </tr>
     <tr>
       <td>catalog.properties.warehouse</td>
       <td>optional</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>The warehouse root path of catalog.</td>
+      <td>The warehouse root path of the Iceberg catalog, used by all catalog 
types. For <code>hadoop</code> and <code>hive</code> catalogs, this is 
typically a local or distributed filesystem path. For <code>glue</code> 
catalog, this is typically an object storage path like 
<code>s3://my-bucket/warehouse</code>.</td>
     </tr>
     <tr>
       <td>catalog.properties.uri</td>
       <td>optional</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>Uri of metastore server.</td>
+      <td>URI of the metastore server (e.g. Hive Metastore thrift URI).</td>
+    </tr>
+    <tr>
+      <td>catalog.properties.io-impl</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Custom FileIO implementation class. For AWS S3, use 
<code>org.apache.iceberg.aws.s3.S3FileIO</code>.</td>
+    </tr>
+    <tr>
+      <td>catalog.properties.client.region</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS region for the Glue catalog client (e.g. 
<code>us-east-1</code>).</td>
+    </tr>
+    <tr>
+      <td>catalog.properties.glue.id</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The Glue catalog ID (AWS account ID). By default, the caller's AWS 
account ID is used.</td>
+    </tr>
+    <tr>
+      <td>catalog.properties.glue.skip-archive</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">true</td>
+      <td>Boolean</td>
+      <td>Whether to skip archiving older table versions in Glue.</td>
+    </tr>
+    <tr>
+      <td>catalog.properties.glue.skip-name-validation</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Whether to skip name validation for Glue catalog.</td>
     </tr>
     <tr>
       <td>partition.key</td>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/pom.xml
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/pom.xml
index bed8a5cff..4723dc18c 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/pom.xml
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/pom.xml
@@ -36,6 +36,12 @@ limitations under the License.
             
<artifactId>iceberg-flink-runtime-${iceberg.flink.major.version}</artifactId>
             <version>${iceberg.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-aws</artifactId>
+            <version>${iceberg.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-cdc-composer</artifactId>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
index 3bdbc924d..13336ff21 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
@@ -142,6 +142,12 @@ public class IcebergDataSinkFactory implements 
DataSinkFactory {
     public Set<ConfigOption<?>> optionalOptions() {
         Set<ConfigOption<?>> options = new HashSet<>();
         options.add(IcebergDataSinkOptions.TYPE);
+        options.add(IcebergDataSinkOptions.CATALOG_IMPL);
+        options.add(IcebergDataSinkOptions.IO_IMPL);
+        options.add(IcebergDataSinkOptions.GLUE_ID);
+        options.add(IcebergDataSinkOptions.GLUE_SKIP_ARCHIVE);
+        options.add(IcebergDataSinkOptions.GLUE_SKIP_NAME_VALIDATION);
+        options.add(IcebergDataSinkOptions.CLIENT_REGION);
         options.add(IcebergDataSinkOptions.WAREHOUSE);
         options.add(IcebergDataSinkOptions.PARTITION_KEY);
         options.add(IcebergDataSinkOptions.SINK_COMPACTION_ENABLED);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
index 57bdd887b..0aa59677c 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
@@ -35,14 +35,60 @@ public class IcebergDataSinkOptions {
             key("catalog.properties.type")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription("Type of iceberg catalog, supports 
`hadoop` and `hive`.");
+                    .withDescription(
+                            "Type of iceberg catalog, supports `hadoop`, 
`hive` and `glue`.");
+
+    public static final ConfigOption<String> CATALOG_IMPL =
+            key("catalog.properties.catalog-impl")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Custom catalog implementation class. "
+                                    + "For AWS Glue catalog, use 
`org.apache.iceberg.aws.glue.GlueCatalog`.");
+
+    public static final ConfigOption<String> IO_IMPL =
+            key("catalog.properties.io-impl")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Custom FileIO implementation class. "
+                                    + "For AWS S3, use 
`org.apache.iceberg.aws.s3.S3FileIO`.");
+
+    public static final ConfigOption<String> GLUE_ID =
+            key("catalog.properties.glue.id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The Glue catalog ID (AWS account ID). By default, 
the caller's AWS account ID is used.");
+
+    public static final ConfigOption<Boolean> GLUE_SKIP_ARCHIVE =
+            key("catalog.properties.glue.skip-archive")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to skip archiving older table versions in 
Glue. Default is true.");
+
+    public static final ConfigOption<Boolean> GLUE_SKIP_NAME_VALIDATION =
+            key("catalog.properties.glue.skip-name-validation")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to skip name validation for Glue catalog. 
Default is false.");
+
+    public static final ConfigOption<String> CLIENT_REGION =
+            key("catalog.properties.client.region")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The AWS region for the Glue catalog 
client.");
 
     public static final ConfigOption<String> WAREHOUSE =
             key("catalog.properties.warehouse")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "The warehouse root path of catalog, only usable 
when catalog.properties.type is `hadoop`.");
+                            "The warehouse root path of the Iceberg catalog, 
used by all catalog types. "
+                                    + "For `hadoop` and `hive` catalogs, this 
is typically a local or distributed filesystem path (for example, 
`hdfs://namenode:8020/warehouse`). "
+                                    + "For `glue` catalog, this is typically 
an object storage path like `s3://my-bucket/warehouse`.");
 
     public static final ConfigOption<String> PARTITION_KEY =
             key("partition.key")
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
index 10f89e16d..0ada4ffa0 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
@@ -111,6 +111,54 @@ public class IcebergDataSinkFactoryTest {
         Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
     }
 
+    @Test
+    void testCreateGlueCatalogDataSink() {
+        DataSinkFactory sinkFactory =
+                FactoryDiscoveryUtils.getFactoryByIdentifier("iceberg", 
DataSinkFactory.class);
+        
Assertions.assertThat(sinkFactory).isInstanceOf(IcebergDataSinkFactory.class);
+
+        Configuration conf =
+                Configuration.fromMap(
+                        ImmutableMap.<String, String>builder()
+                                .put("catalog.properties.type", "glue")
+                                .put("catalog.properties.warehouse", 
"s3://my-bucket/warehouse")
+                                .put(
+                                        "catalog.properties.io-impl",
+                                        "org.apache.iceberg.aws.s3.S3FileIO")
+                                .put("catalog.properties.client.region", 
"us-east-1")
+                                .put("catalog.properties.glue.skip-archive", 
"true")
+                                .build());
+        DataSink dataSink =
+                sinkFactory.createDataSink(
+                        new FactoryHelper.DefaultContext(
+                                conf, conf, 
Thread.currentThread().getContextClassLoader()));
+        Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
+    }
+
+    @Test
+    void testCreateGlueCatalogWithCatalogImpl() {
+        DataSinkFactory sinkFactory =
+                FactoryDiscoveryUtils.getFactoryByIdentifier("iceberg", 
DataSinkFactory.class);
+        
Assertions.assertThat(sinkFactory).isInstanceOf(IcebergDataSinkFactory.class);
+
+        Configuration conf =
+                Configuration.fromMap(
+                        ImmutableMap.<String, String>builder()
+                                .put(
+                                        "catalog.properties.catalog-impl",
+                                        
"org.apache.iceberg.aws.glue.GlueCatalog")
+                                .put("catalog.properties.warehouse", 
"s3://my-bucket/warehouse")
+                                .put(
+                                        "catalog.properties.io-impl",
+                                        "org.apache.iceberg.aws.s3.S3FileIO")
+                                .build());
+        DataSink dataSink =
+                sinkFactory.createDataSink(
+                        new FactoryHelper.DefaultContext(
+                                conf, conf, 
Thread.currentThread().getContextClassLoader()));
+        Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
+    }
+
     @Test
     public void testPartitionOption() {
         Map<String, String> testcases = new HashMap<>();

Reply via email to