This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 987c466ba [FLINK-39245][Iceberg] Add AWS Glue Catalog support for
Iceberg pipeline connector (#4314)
987c466ba is described below
commit 987c466ba0e2fb7ec8bdc6cefa1d72a03468f0ee
Author: HUANG XIAO <[email protected]>
AuthorDate: Fri Mar 20 22:40:29 2026 +0800
[FLINK-39245][Iceberg] Add AWS Glue Catalog support for Iceberg pipeline
connector (#4314)
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
---
.../docs/connectors/pipeline-connectors/iceberg.md | 200 +++++++++++++++------
.../docs/connectors/pipeline-connectors/iceberg.md | 103 ++++++++++-
.../flink-cdc-pipeline-connector-iceberg/pom.xml | 6 +
.../iceberg/sink/IcebergDataSinkFactory.java | 6 +
.../iceberg/sink/IcebergDataSinkOptions.java | 50 +++++-
.../iceberg/sink/IcebergDataSinkFactoryTest.java | 48 +++++
6 files changed, 349 insertions(+), 64 deletions(-)
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md
b/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md
index 9d97cd831..8cada084e 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md
@@ -24,22 +24,24 @@ specific language governing permissions and limitations
under the License.
-->
-# Iceberg Pipeline Connector
+# Iceberg Pipeline 连接器
-The Iceberg Pipeline Connector functions as a *Data Sink* for data pipelines,
enabling data writes to Apache Iceberg
tables[Iceberg](https://iceberg.apache.org). This document explains how to
configure the connector.
+Iceberg Pipeline 连接器作为数据管道的 *Data Sink*,支持将数据写入 [Apache
Iceberg](https://iceberg.apache.org) 表。本文档介绍如何配置该连接器。
-## Key Capabilities
-* Automatic Table Creation
- Creates Iceberg tables dynamically when they do not exist
-* Schema Synchronization
- Propagates schema changes (e.g., column additions) from source systems to
Iceberg
-* Data Replication
- Supports both batch and streaming data synchronization
+## 核心能力
+* **自动建表:**
+当 Iceberg 表不存在时,自动动态创建
+* **Schema 同步:**
+将上游数据源的 Schema 变更(例如新增列)自动同步到 Iceberg 表
+* **数据同步:**
+支持批处理和流式数据同步
-How to create Pipeline
+Pipeline 创建方式
----------------
-The pipeline for reading data from MySQL and sink to Iceberg can be defined as
follows:
+以下示例展示了如何定义一个从 MySQL 读取数据并写入 Iceberg 的数据管道:
+
+### Hadoop Catalog 示例
```yaml
source:
@@ -62,115 +64,203 @@ pipeline:
name: MySQL to Iceberg Pipeline
parallelism: 2
```
-***Note:***
-If `catalog.properties.type` is hadoop, you need to configure the following
dependencies manually, and pass it with `--jar` argument of Flink CDC CLI when
submitting YAML pipeline jobs.
+
+### AWS Glue Catalog 示例
+
+```yaml
+source:
+ type: mysql
+ name: MySQL Source
+ hostname: 127.0.0.1
+ port: 3306
+ username: admin
+ password: pass
+ tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+ server-id: 5401-5404
+
+sink:
+ type: iceberg
+ name: Iceberg Sink
+ catalog.properties.type: glue
+ catalog.properties.warehouse: s3://my-bucket/warehouse
+ catalog.properties.io-impl: org.apache.iceberg.aws.s3.S3FileIO
+ catalog.properties.client.region: us-east-1
+ catalog.properties.glue.skip-archive: true
+
+pipeline:
+ name: MySQL to Iceberg via Glue Pipeline
+ parallelism: 2
+```
+
+***注意:***
+根据所使用的 Catalog 类型,可能需要手动添加额外的 JAR 依赖,并在使用 Flink CDC CLI 提交 YAML 管道作业时通过
`--jar` 参数传入。
+
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
- <th class="text-left">Dependency Item</th>
- <th class="text-left">Description</th>
+ <th class="text-left">Catalog 类型</th>
+ <th class="text-left">依赖项</th>
+ <th class="text-left">说明</th>
</tr>
</thead>
<tbody>
<tr>
- <td><a
href="https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2-uber/2.8.3-10.0
"> org.apache.flink:flink-shaded-hadoop-2-uber:2.8.3-10.0</a></td>
- <td>Used for Hadoop dependencies.</td>
+ <td>所有类型</td>
+ <td><a
href="https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-runtime-1.20">org.apache.iceberg:iceberg-flink-runtime-1.20</a></td>
+ <td>Iceberg Flink 运行时依赖。当运行环境中未预装 Iceberg 时(例如独立部署的 Flink 集群),所有
Catalog 类型均需要此依赖。</td>
+ </tr>
+ <tr>
+ <td>hadoop</td>
+ <td><a
href="https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2-uber/2.8.3-10.0">org.apache.flink:flink-shaded-hadoop-2-uber:2.8.3-10.0</a></td>
+ <td>提供 Hadoop 文件系统相关依赖。</td>
+ </tr>
+ <tr>
+ <td>glue</td>
+ <td><a
href="https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-aws">org.apache.iceberg:iceberg-aws</a></td>
+ <td>提供 AWS Glue Catalog 和 S3 FileIO 实现。</td>
+ </tr>
+ <tr>
+ <td>glue</td>
+ <td><a
href="https://mvnrepository.com/artifact/software.amazon.awssdk/bundle">software.amazon.awssdk:bundle</a></td>
+ <td>iceberg-aws 所依赖的 AWS SDK。</td>
</tr>
</tbody>
</table>
</div>
-Pipeline Connector Options
+Pipeline 连接器选项
----------------
<div class="highlight">
<table class="colwidths-auto docutils">
<thead>
<tr>
- <th class="text-left" style="width: 25%">Option</th>
- <th class="text-left" style="width: 8%">Required</th>
- <th class="text-left" style="width: 7%">Default</th>
- <th class="text-left" style="width: 10%">Type</th>
- <th class="text-left" style="width: 50%">Description</th>
+ <th class="text-left" style="width: 25%">选项</th>
+ <th class="text-left" style="width: 8%">是否必填</th>
+ <th class="text-left" style="width: 7%">默认值</th>
+ <th class="text-left" style="width: 10%">类型</th>
+ <th class="text-left" style="width: 50%">描述</th>
</tr>
</thead>
<tbody>
<tr>
<td>type</td>
- <td>required</td>
- <td style="word-wrap: break-word;">(none)</td>
+ <td>必填</td>
+ <td style="word-wrap: break-word;">(无)</td>
<td>String</td>
- <td>Specify what connector to use, here should be
<code>'iceberg'</code>.</td>
+ <td>指定使用的连接器类型,此处应为 <code>iceberg</code>。</td>
</tr>
<tr>
<td>name</td>
- <td>optional</td>
- <td style="word-wrap: break-word;">(none)</td>
+ <td>可选</td>
+ <td style="word-wrap: break-word;">(无)</td>
<td>String</td>
- <td>The name of the sink.</td>
+ <td>Sink 的名称。</td>
</tr>
<tr>
<td>catalog.properties.type</td>
- <td>required</td>
- <td style="word-wrap: break-word;">(none)</td>
+ <td>条件必填</td>
+ <td style="word-wrap: break-word;">(无)</td>
<td>String</td>
- <td>Metastore of iceberg catalog, supports hadoop and hive.</td>
+ <td>Iceberg Catalog 的元数据存储类型,支持 <code>hadoop</code>、<code>hive</code> 和
<code>glue</code>。此选项与 <code>catalog.properties.catalog-impl</code> 必须设置其一。</td>
+ </tr>
+ <tr>
+ <td>catalog.properties.catalog-impl</td>
+ <td>条件必填</td>
+ <td style="word-wrap: break-word;">(无)</td>
+ <td>String</td>
+ <td>自定义 Catalog 实现类,例如
<code>org.apache.iceberg.aws.glue.GlueCatalog</code>。此选项与
<code>catalog.properties.type</code> 必须设置其一。</td>
</tr>
<tr>
<td>catalog.properties.warehouse</td>
- <td>optional</td>
- <td style="word-wrap: break-word;">(none)</td>
+ <td>可选</td>
+ <td style="word-wrap: break-word;">(无)</td>
<td>String</td>
- <td>The warehouse root path of catalog.</td>
+ <td>Iceberg Catalog 的仓库根路径,适用于所有 Catalog 类型。对于 <code>hadoop</code> 和
<code>hive</code> Catalog,通常为本地或分布式文件系统路径;对于 <code>glue</code>
Catalog,通常为对象存储路径,例如 <code>s3://my-bucket/warehouse</code>。</td>
</tr>
<tr>
<td>catalog.properties.uri</td>
- <td>optional</td>
- <td style="word-wrap: break-word;">(none)</td>
+ <td>可选</td>
+ <td style="word-wrap: break-word;">(无)</td>
+ <td>String</td>
+ <td>元数据服务 URI(例如 Hive Metastore 的 thrift URI)。</td>
+ </tr>
+ <tr>
+ <td>catalog.properties.io-impl</td>
+ <td>可选</td>
+ <td style="word-wrap: break-word;">(无)</td>
+ <td>String</td>
+ <td>自定义 FileIO 实现类。使用 AWS S3 时,请设置为
<code>org.apache.iceberg.aws.s3.S3FileIO</code>。</td>
+ </tr>
+ <tr>
+ <td>catalog.properties.client.region</td>
+ <td>可选</td>
+ <td style="word-wrap: break-word;">(无)</td>
<td>String</td>
- <td>Uri of metastore server.</td>
+ <td>Glue Catalog 客户端的 AWS 区域(例如 <code>us-east-1</code>)。</td>
+ </tr>
+ <tr>
+ <td>catalog.properties.glue.id</td>
+ <td>可选</td>
+ <td style="word-wrap: break-word;">(无)</td>
+ <td>String</td>
+ <td>Glue Catalog ID(即 AWS 账户 ID)。默认使用调用者的 AWS 账户 ID。</td>
+ </tr>
+ <tr>
+ <td>catalog.properties.glue.skip-archive</td>
+ <td>可选</td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>是否跳过在 Glue 中归档旧版本的表元数据。</td>
+ </tr>
+ <tr>
+ <td>catalog.properties.glue.skip-name-validation</td>
+ <td>可选</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>是否跳过 Glue Catalog 的名称校验。</td>
</tr>
<tr>
<td>partition.key</td>
- <td>optional</td>
- <td style="word-wrap: break-word;">(none)</td>
+ <td>可选</td>
+ <td style="word-wrap: break-word;">(无)</td>
<td>String</td>
- <td>Partition keys for each partitioned table. Allow setting multiple
primary keys for multiTables. Tables are separated by ';', and partition keys
are separated by ','. For example, we can set <code>partition.key</code> of two
tables using 'testdb.table1:id1,id2;testdb.table2:name'. For partition
transforms, we can set <code>partition.key</code> using
'testdb.table1:truncate[10](id);testdb.table2:hour(create_time);testdb.table3:day(create_time);testdb.table4:month(create_time);tes
[...]
+ <td>各分区表的分区键。支持为多张表设置不同的分区键,表之间以 <code>;</code> 分隔,分区键之间以 <code>,</code>
分隔。例如,可以通过 <code>testdb.table1:id1,id2;testdb.table2:name</code>
为两张表设置分区键。对于分区转换,可以使用以下语法:<code>testdb.table1:truncate[10](id);testdb.table2:hour(create_time);testdb.table3:day(create_time);testdb.table4:month(create_time);testdb.table5:year(create_time);testdb.table6:bucket[10](create_time)</code>。</td>
</tr>
<tr>
<td>catalog.properties.*</td>
- <td>optional</td>
- <td style="word-wrap: break-word;">(none)</td>
+ <td>可选</td>
+ <td style="word-wrap: break-word;">(无)</td>
<td>String</td>
- <td>Pass options of Iceberg catalog to pipeline,See <a
href="https://iceberg.apache.org/docs/nightly/flink-configuration/#catalog-configuration">Iceberg
catalog options</a>. </td>
+ <td>透传 Iceberg Catalog 选项到管道,详见 <a
href="https://iceberg.apache.org/docs/nightly/flink-configuration/#catalog-configuration">Iceberg
Catalog 配置</a>。</td>
</tr>
<tr>
<td>table.properties.*</td>
- <td>optional</td>
- <td style="word-wrap: break-word;">(none)</td>
+ <td>可选</td>
+ <td style="word-wrap: break-word;">(无)</td>
<td>String</td>
- <td>Pass options of Iceberg table to pipeline,See <a
href="https://iceberg.apache.org/docs/nightly/configuration/#write-properties">Iceberg
table options</a>. </td>
+ <td>透传 Iceberg 表选项到管道,详见 <a
href="https://iceberg.apache.org/docs/nightly/configuration/#write-properties">Iceberg
表配置</a>。</td>
</tr>
</tbody>
</table>
</div>
-Usage Notes
+使用须知
--------
-* Only support Iceberg primary key table, so the source table must have
primary keys.
+* 源表必须包含主键,不支持无主键的表。
-* Not support exactly-once. The connector uses at-least-once + primary key
table for idempotent writing.
+* 不支持精确一次(Exactly-Once)语义。连接器采用至少一次(At-Least-Once)+ 主键幂等写入的方式保证数据一致性。
-Data Type Mapping
+数据类型映射
----------------
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
- <th class="text-left">CDC type</th>
- <th class="text-left">Iceberg type</th>
- <th class="text-left" style="width:60%;">NOTE</th>
+ <th class="text-left">CDC 类型</th>
+ <th class="text-left">Iceberg 类型</th>
+ <th class="text-left" style="width:60%;">说明</th>
</tr>
</thead>
<tbody>
@@ -243,4 +333,4 @@ Data Type Mapping
</table>
</div>
-{{< top >}}
\ No newline at end of file
+{{< top >}}
diff --git a/docs/content/docs/connectors/pipeline-connectors/iceberg.md
b/docs/content/docs/connectors/pipeline-connectors/iceberg.md
index 129d154b0..2063e5723 100644
--- a/docs/content/docs/connectors/pipeline-connectors/iceberg.md
+++ b/docs/content/docs/connectors/pipeline-connectors/iceberg.md
@@ -41,6 +41,8 @@ How to create Pipeline
The pipeline for reading data from MySQL and sink to Iceberg can be defined as
follows:
+### Hadoop Catalog Example
+
```yaml
source:
type: mysql
@@ -63,20 +65,65 @@ pipeline:
parallelism: 2
```
+### AWS Glue Catalog Example
+
+```yaml
+source:
+ type: mysql
+ name: MySQL Source
+ hostname: 127.0.0.1
+ port: 3306
+ username: admin
+ password: pass
+ tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+ server-id: 5401-5404
+
+sink:
+ type: iceberg
+ name: Iceberg Sink
+ catalog.properties.type: glue
+ catalog.properties.warehouse: s3://my-bucket/warehouse
+ catalog.properties.io-impl: org.apache.iceberg.aws.s3.S3FileIO
+ catalog.properties.client.region: us-east-1
+ catalog.properties.glue.skip-archive: true
+
+pipeline:
+ name: MySQL to Iceberg via Glue Pipeline
+ parallelism: 2
+```
+
***Note:***
-If `catalog.properties.type` is hadoop, you need to configure the following
dependencies manually, and pass it with `--jar` argument of Flink CDC CLI when
submitting YAML pipeline jobs.
+Depending on the catalog type, you may need to add extra JARs manually and
pass them with the `--jar` argument of Flink CDC CLI when submitting YAML
pipeline jobs.
+
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
+ <th class="text-left">Catalog Type</th>
<th class="text-left">Dependency Item</th>
<th class="text-left">Description</th>
</tr>
</thead>
<tbody>
<tr>
- <td><a
href="https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2-uber/2.8.3-10.0
"> org.apache.flink:flink-shaded-hadoop-2-uber:2.8.3-10.0</a></td>
- <td>Used for Hadoop dependencies.</td>
+ <td>all</td>
+ <td><a
href="https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-runtime-1.20">org.apache.iceberg:iceberg-flink-runtime-1.20</a></td>
+ <td>Iceberg Flink runtime. Required for all catalog types when not
pre-installed in the runtime environment (e.g., standalone Flink clusters).</td>
+ </tr>
+ <tr>
+ <td>hadoop</td>
+ <td><a
href="https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2-uber/2.8.3-10.0">org.apache.flink:flink-shaded-hadoop-2-uber:2.8.3-10.0</a></td>
+ <td>Provides Hadoop filesystem dependencies.</td>
+ </tr>
+ <tr>
+ <td>glue</td>
+ <td><a
href="https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-aws">org.apache.iceberg:iceberg-aws</a></td>
+ <td>Provides AWS Glue Catalog and S3 FileIO implementation.</td>
+ </tr>
+ <tr>
+ <td>glue</td>
+ <td><a
href="https://mvnrepository.com/artifact/software.amazon.awssdk/bundle">software.amazon.awssdk:bundle</a></td>
+ <td>AWS SDK bundle required by iceberg-aws.</td>
</tr>
</tbody>
</table>
@@ -112,24 +159,66 @@ Pipeline Connector Options
</tr>
<tr>
<td>catalog.properties.type</td>
- <td>required</td>
+ <td>conditionally required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
- <td>Metastore of Iceberg catalog, supports <code>hadoop</code> and
<code>hive</code>.</td>
+ <td>Metastore type of Iceberg catalog, supports <code>hadoop</code>,
<code>hive</code>, and <code>glue</code>. Either this option or
<code>catalog.properties.catalog-impl</code> must be set.</td>
+ </tr>
+ <tr>
+ <td>catalog.properties.catalog-impl</td>
+ <td>conditionally required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Custom catalog implementation class, e.g.
<code>org.apache.iceberg.aws.glue.GlueCatalog</code>. Either this option or
<code>catalog.properties.type</code> must be set.</td>
</tr>
<tr>
<td>catalog.properties.warehouse</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
- <td>The warehouse root path of catalog.</td>
+ <td>The warehouse root path of the Iceberg catalog, used by all catalog
types. For <code>hadoop</code> and <code>hive</code> catalogs, this is
typically a local or distributed filesystem path. For <code>glue</code>
catalog, this is typically an object storage path like
<code>s3://my-bucket/warehouse</code>.</td>
</tr>
<tr>
<td>catalog.properties.uri</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
- <td>Uri of metastore server.</td>
+ <td>URI of the metastore server (e.g. Hive Metastore thrift URI).</td>
+ </tr>
+ <tr>
+ <td>catalog.properties.io-impl</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Custom FileIO implementation class. For AWS S3, use
<code>org.apache.iceberg.aws.s3.S3FileIO</code>.</td>
+ </tr>
+ <tr>
+ <td>catalog.properties.client.region</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS region for the Glue catalog client (e.g.
<code>us-east-1</code>).</td>
+ </tr>
+ <tr>
+ <td>catalog.properties.glue.id</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The Glue catalog ID (AWS account ID). By default, the caller's AWS
account ID is used.</td>
+ </tr>
+ <tr>
+ <td>catalog.properties.glue.skip-archive</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Whether to skip archiving older table versions in Glue.</td>
+ </tr>
+ <tr>
+ <td>catalog.properties.glue.skip-name-validation</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to skip name validation for Glue catalog.</td>
</tr>
<tr>
<td>partition.key</td>
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/pom.xml
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/pom.xml
index bed8a5cff..4723dc18c 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/pom.xml
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/pom.xml
@@ -36,6 +36,12 @@ limitations under the License.
<artifactId>iceberg-flink-runtime-${iceberg.flink.major.version}</artifactId>
<version>${iceberg.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-aws</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-composer</artifactId>
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
index 3bdbc924d..13336ff21 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
@@ -142,6 +142,12 @@ public class IcebergDataSinkFactory implements
DataSinkFactory {
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(IcebergDataSinkOptions.TYPE);
+ options.add(IcebergDataSinkOptions.CATALOG_IMPL);
+ options.add(IcebergDataSinkOptions.IO_IMPL);
+ options.add(IcebergDataSinkOptions.GLUE_ID);
+ options.add(IcebergDataSinkOptions.GLUE_SKIP_ARCHIVE);
+ options.add(IcebergDataSinkOptions.GLUE_SKIP_NAME_VALIDATION);
+ options.add(IcebergDataSinkOptions.CLIENT_REGION);
options.add(IcebergDataSinkOptions.WAREHOUSE);
options.add(IcebergDataSinkOptions.PARTITION_KEY);
options.add(IcebergDataSinkOptions.SINK_COMPACTION_ENABLED);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
index 57bdd887b..0aa59677c 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
@@ -35,14 +35,60 @@ public class IcebergDataSinkOptions {
key("catalog.properties.type")
.stringType()
.noDefaultValue()
- .withDescription("Type of iceberg catalog, supports
`hadoop` and `hive`.");
+ .withDescription(
+ "Type of iceberg catalog, supports `hadoop`,
`hive` and `glue`.");
+
+ public static final ConfigOption<String> CATALOG_IMPL =
+ key("catalog.properties.catalog-impl")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Custom catalog implementation class. "
+ + "For AWS Glue catalog, use
`org.apache.iceberg.aws.glue.GlueCatalog`.");
+
+ public static final ConfigOption<String> IO_IMPL =
+ key("catalog.properties.io-impl")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Custom FileIO implementation class. "
+ + "For AWS S3, use
`org.apache.iceberg.aws.s3.S3FileIO`.");
+
+ public static final ConfigOption<String> GLUE_ID =
+ key("catalog.properties.glue.id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The Glue catalog ID (AWS account ID). By default,
the caller's AWS account ID is used.");
+
+ public static final ConfigOption<Boolean> GLUE_SKIP_ARCHIVE =
+ key("catalog.properties.glue.skip-archive")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to skip archiving older table versions in
Glue. Default is true.");
+
+ public static final ConfigOption<Boolean> GLUE_SKIP_NAME_VALIDATION =
+ key("catalog.properties.glue.skip-name-validation")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to skip name validation for Glue catalog.
Default is false.");
+
+ public static final ConfigOption<String> CLIENT_REGION =
+ key("catalog.properties.client.region")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The AWS region for the Glue catalog
client.");
public static final ConfigOption<String> WAREHOUSE =
key("catalog.properties.warehouse")
.stringType()
.noDefaultValue()
.withDescription(
- "The warehouse root path of catalog, only usable
when catalog.properties.type is `hadoop`.");
+ "The warehouse root path of the Iceberg catalog,
used by all catalog types. "
+ + "For `hadoop` and `hive` catalogs, this
is typically a local or distributed filesystem path (for example,
`hdfs://namenode:8020/warehouse`). "
+ + "For `glue` catalog, this is typically
an object storage path like `s3://my-bucket/warehouse`.");
public static final ConfigOption<String> PARTITION_KEY =
key("partition.key")
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
index 10f89e16d..0ada4ffa0 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
@@ -111,6 +111,54 @@ public class IcebergDataSinkFactoryTest {
Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
}
+ @Test
+ void testCreateGlueCatalogDataSink() {
+ DataSinkFactory sinkFactory =
+ FactoryDiscoveryUtils.getFactoryByIdentifier("iceberg",
DataSinkFactory.class);
+
Assertions.assertThat(sinkFactory).isInstanceOf(IcebergDataSinkFactory.class);
+
+ Configuration conf =
+ Configuration.fromMap(
+ ImmutableMap.<String, String>builder()
+ .put("catalog.properties.type", "glue")
+ .put("catalog.properties.warehouse",
"s3://my-bucket/warehouse")
+ .put(
+ "catalog.properties.io-impl",
+ "org.apache.iceberg.aws.s3.S3FileIO")
+ .put("catalog.properties.client.region",
"us-east-1")
+ .put("catalog.properties.glue.skip-archive",
"true")
+ .build());
+ DataSink dataSink =
+ sinkFactory.createDataSink(
+ new FactoryHelper.DefaultContext(
+ conf, conf,
Thread.currentThread().getContextClassLoader()));
+ Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
+ }
+
+ @Test
+ void testCreateGlueCatalogWithCatalogImpl() {
+ DataSinkFactory sinkFactory =
+ FactoryDiscoveryUtils.getFactoryByIdentifier("iceberg",
DataSinkFactory.class);
+
Assertions.assertThat(sinkFactory).isInstanceOf(IcebergDataSinkFactory.class);
+
+ Configuration conf =
+ Configuration.fromMap(
+ ImmutableMap.<String, String>builder()
+ .put(
+ "catalog.properties.catalog-impl",
+
"org.apache.iceberg.aws.glue.GlueCatalog")
+ .put("catalog.properties.warehouse",
"s3://my-bucket/warehouse")
+ .put(
+ "catalog.properties.io-impl",
+ "org.apache.iceberg.aws.s3.S3FileIO")
+ .build());
+ DataSink dataSink =
+ sinkFactory.createDataSink(
+ new FactoryHelper.DefaultContext(
+ conf, conf,
Thread.currentThread().getContextClassLoader()));
+ Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
+ }
+
@Test
public void testPartitionOption() {
Map<String, String> testcases = new HashMap<>();