This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a1e81c21 [FLINK-39342][Iceberg] Support hadoop.conf.* prefix to pass 
Hadoop configuration properties (#4351)
1a1e81c21 is described below

commit 1a1e81c21c67dc4dd09a01888c70058eca9c6396
Author: EricZeng <[email protected]>
AuthorDate: Wed Apr 1 15:48:18 2026 +0800

    [FLINK-39342][Iceberg] Support hadoop.conf.* prefix to pass Hadoop 
configuration properties (#4351)
---
 .../docs/connectors/pipeline-connectors/iceberg.md |  7 ++
 .../docs/connectors/pipeline-connectors/iceberg.md |  7 ++
 .../connectors/iceberg/sink/IcebergDataSink.java   | 21 +++++-
 .../iceberg/sink/IcebergDataSinkFactory.java       | 19 +++++-
 .../iceberg/sink/IcebergDataSinkOptions.java       |  3 +
 .../iceberg/sink/IcebergMetadataApplier.java       | 17 ++++-
 .../iceberg/sink/utils/HadoopConfUtils.java        | 46 ++++++++++++++
 .../iceberg/sink/v2/IcebergCommitter.java          | 13 ++--
 .../connectors/iceberg/sink/v2/IcebergSink.java    | 21 ++++--
 .../connectors/iceberg/sink/v2/IcebergWriter.java  |  7 +-
 .../sink/v2/compaction/CompactionOperator.java     | 11 +++-
 .../iceberg/sink/IcebergDataSinkFactoryTest.java   | 36 +++++++++++
 .../iceberg/sink/v2/CompactionOperatorTest.java    | 52 ++++++++-------
 .../iceberg/sink/v2/IcebergSinkITCase.java         |  3 +-
 .../iceberg/sink/v2/IcebergWriterTest.java         | 74 +++++++++++++++++-----
 15 files changed, 275 insertions(+), 62 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md
index 8cada084e..74b8ec1c7 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md
@@ -241,6 +241,13 @@ Pipeline 连接器选项
       <td>String</td>
       <td>透传 Iceberg 表选项到管道,详见 <a 
href="https://iceberg.apache.org/docs/nightly/configuration/#write-properties";>Iceberg
 表配置</a>。</td>
     </tr>
+    <tr>
+      <td>hadoop.conf.*</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>传递 Hadoop <code>Configuration</code> 参数(用于 Iceberg 的 catalog/table 
相关操作)。前缀 <code>hadoop.conf.</code> 会被剥离。例如 
<code>hadoop.conf.fs.s3a.endpoint</code>。</td>
+    </tr>
     </tbody>
 </table>    
 </div>
diff --git a/docs/content/docs/connectors/pipeline-connectors/iceberg.md 
b/docs/content/docs/connectors/pipeline-connectors/iceberg.md
index 2063e5723..cc7dded49 100644
--- a/docs/content/docs/connectors/pipeline-connectors/iceberg.md
+++ b/docs/content/docs/connectors/pipeline-connectors/iceberg.md
@@ -241,6 +241,13 @@ Pipeline Connector Options
       <td>String</td>
       <td>Pass Iceberg table options to the pipeline,See <a 
href="https://iceberg.apache.org/docs/nightly/configuration/#write-properties";>Iceberg
 table options</a>. </td>
     </tr>
+    <tr>
+      <td>hadoop.conf.*</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Pass Hadoop <code>Configuration</code> options used by Iceberg 
catalog/table operations. The prefix <code>hadoop.conf.</code> will be 
stripped. For example, <code>hadoop.conf.fs.s3a.endpoint</code>.</td>
+    </tr>
     </tbody>
 </table>    
 </div>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java
index 96c2b5f76..ee6c9bd05 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java
@@ -39,6 +39,9 @@ public class IcebergDataSink implements DataSink, 
Serializable {
     // options for creating Iceberg table.
     private final Map<String, String> tableOptions;
 
+    // options for Hadoop configuration.
+    private final Map<String, String> hadoopConfOptions;
+
     private final Map<TableId, List<String>> partitionMaps;
 
     private final ZoneId zoneId;
@@ -56,7 +59,8 @@ public class IcebergDataSink implements DataSink, 
Serializable {
             ZoneId zoneId,
             String schemaOperatorUid,
             CompactionOptions compactionOptions,
-            String jobIdPrefix) {
+            String jobIdPrefix,
+            Map<String, String> hadoopConfOptions) {
         this.catalogOptions = catalogOptions;
         this.tableOptions = tableOptions;
         this.partitionMaps = partitionMaps;
@@ -64,18 +68,29 @@ public class IcebergDataSink implements DataSink, 
Serializable {
         this.schemaOperatorUid = schemaOperatorUid;
         this.compactionOptions = compactionOptions;
         this.jobIdPrefix = jobIdPrefix;
+        this.hadoopConfOptions = hadoopConfOptions;
     }
 
     @Override
     public EventSinkProvider getEventSinkProvider() {
         IcebergSink icebergEventSink =
                 new IcebergSink(
-                        catalogOptions, tableOptions, zoneId, 
compactionOptions, jobIdPrefix);
+                        catalogOptions,
+                        tableOptions,
+                        zoneId,
+                        compactionOptions,
+                        jobIdPrefix,
+                        hadoopConfOptions);
         return FlinkSinkProvider.of(icebergEventSink);
     }
 
     @Override
     public MetadataApplier getMetadataApplier() {
-        return new IcebergMetadataApplier(catalogOptions, tableOptions, 
partitionMaps);
+        return new IcebergMetadataApplier(
+                catalogOptions, tableOptions, partitionMaps, 
hadoopConfOptions);
+    }
+
+    public Map<String, String> getHadoopConfOptions() {
+        return hadoopConfOptions;
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
index 13336ff21..9c9025523 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
@@ -41,6 +41,7 @@ import java.util.Objects;
 import java.util.Set;
 
 import static 
org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES;
+import static 
org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_HADOOP_CONF;
 import static 
org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_TABLE_PROPERTIES;
 
 /** A {@link DataSinkFactory} for Apache Iceberg. */
@@ -55,13 +56,15 @@ public class IcebergDataSinkFactory implements 
DataSinkFactory {
     @Override
     public DataSink createDataSink(Context context) {
         FactoryHelper.createFactoryHelper(this, context)
-                .validateExcept(PREFIX_TABLE_PROPERTIES, 
PREFIX_CATALOG_PROPERTIES);
+                .validateExcept(
+                        PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES, 
PREFIX_HADOOP_CONF);
 
         Map<String, String> allOptions = 
context.getFactoryConfiguration().toMap();
         OptionUtils.printOptions(IDENTIFIER, allOptions);
 
         Map<String, String> catalogOptions = new HashMap<>();
         Map<String, String> tableOptions = new HashMap<>();
+        Map<String, String> hadoopConfOptions = 
extractHadoopConfOptions(allOptions);
         allOptions.forEach(
                 (key, value) -> {
                     if (key.startsWith(PREFIX_TABLE_PROPERTIES)) {
@@ -116,7 +119,19 @@ public class IcebergDataSinkFactory implements 
DataSinkFactory {
                 zoneId,
                 schemaOperatorUid,
                 compactionOptions,
-                jobIdPrefix);
+                jobIdPrefix,
+                hadoopConfOptions);
+    }
+
+    static Map<String, String> extractHadoopConfOptions(Map<String, String> 
allOptions) {
+        Map<String, String> hadoopConfOptions = new HashMap<>();
+        allOptions.forEach(
+                (key, value) -> {
+                    if (key.startsWith(PREFIX_HADOOP_CONF)) {
+                        
hadoopConfOptions.put(key.substring(PREFIX_HADOOP_CONF.length()), value);
+                    }
+                });
+        return hadoopConfOptions;
     }
 
     private CompactionOptions getCompactionStrategy(Configuration 
configuration) {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
index 0aa59677c..5fcf3f23b 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
@@ -31,6 +31,9 @@ public class IcebergDataSinkOptions {
     // prefix for passing properties for catalog creation.
     public static final String PREFIX_CATALOG_PROPERTIES = 
"catalog.properties.";
 
+    // prefix for passing properties for Hadoop configuration.
+    public static final String PREFIX_HADOOP_CONF = "hadoop.conf.";
+
     public static final ConfigOption<String> TYPE =
             key("catalog.properties.type")
                     .stringType()
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
index c71358104..01cb4b490 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
@@ -32,6 +32,7 @@ import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.PhysicalColumn;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
+import org.apache.flink.cdc.connectors.iceberg.sink.utils.HadoopConfUtils;
 import org.apache.flink.cdc.connectors.iceberg.sink.utils.IcebergTypeUtils;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
@@ -92,19 +93,30 @@ public class IcebergMetadataApplier implements 
MetadataApplier {
 
     private final Map<TableId, List<String>> partitionMaps;
 
+    private final Map<String, String> hadoopConfOptions;
+
     private Set<SchemaChangeEventType> enabledSchemaEvolutionTypes;
 
     public IcebergMetadataApplier(Map<String, String> catalogOptions) {
-        this(catalogOptions, new HashMap<>(), new HashMap<>());
+        this(catalogOptions, new HashMap<>(), new HashMap<>(), null);
     }
 
     public IcebergMetadataApplier(
             Map<String, String> catalogOptions,
             Map<String, String> tableOptions,
             Map<TableId, List<String>> partitionMaps) {
+        this(catalogOptions, tableOptions, partitionMaps, null);
+    }
+
+    public IcebergMetadataApplier(
+            Map<String, String> catalogOptions,
+            Map<String, String> tableOptions,
+            Map<TableId, List<String>> partitionMaps,
+            Map<String, String> hadoopConfOptions) {
         this.catalogOptions = catalogOptions;
         this.tableOptions = tableOptions;
         this.partitionMaps = partitionMaps;
+        this.hadoopConfOptions = hadoopConfOptions;
         this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
     }
 
@@ -112,9 +124,10 @@ public class IcebergMetadataApplier implements 
MetadataApplier {
     public void applySchemaChange(SchemaChangeEvent schemaChangeEvent)
             throws SchemaEvolveException {
         if (catalog == null) {
+            Configuration configuration = 
HadoopConfUtils.createConfiguration(hadoopConfOptions);
             catalog =
                     CatalogUtil.buildIcebergCatalog(
-                            this.getClass().getSimpleName(), catalogOptions, 
new Configuration());
+                            this.getClass().getSimpleName(), catalogOptions, 
configuration);
         }
         SchemaChangeEventVisitor.visit(
                 schemaChangeEvent,
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/HadoopConfUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/HadoopConfUtils.java
new file mode 100644
index 000000000..39eb50b83
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/HadoopConfUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.iceberg.sink.utils;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+/** Utilities for building Hadoop {@link Configuration} from connector 
options. */
+public final class HadoopConfUtils {
+
+    private HadoopConfUtils() {}
+
+    public static Configuration createConfiguration(Map<String, String> 
hadoopConfOptions) {
+        Configuration configuration = new Configuration();
+        applyTo(configuration, hadoopConfOptions);
+        return configuration;
+    }
+
+    public static void applyTo(Configuration configuration, Map<String, 
String> hadoopConfOptions) {
+        if (configuration == null || hadoopConfOptions == null || 
hadoopConfOptions.isEmpty()) {
+            return;
+        }
+        hadoopConfOptions.forEach(
+                (k, v) -> {
+                    if (k != null && v != null) {
+                        configuration.set(k, v);
+                    }
+                });
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
index 6cadc3d91..759777440 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.connectors.iceberg.sink.v2;
 
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.connectors.iceberg.sink.utils.HadoopConfUtils;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
 
@@ -67,15 +68,19 @@ public class IcebergCommitter implements 
Committer<WriteResultWrapper> {
 
     private final Map<TableId, TableMetric> tableIdMetricMap;
 
-    public IcebergCommitter(Map<String, String> catalogOptions) {
-        this(catalogOptions, null);
+    public IcebergCommitter(
+            Map<String, String> catalogOptions, Map<String, String> 
hadoopConfOptions) {
+        this(catalogOptions, null, hadoopConfOptions);
     }
 
     public IcebergCommitter(
-            Map<String, String> catalogOptions, SinkCommitterMetricGroup 
metricGroup) {
+            Map<String, String> catalogOptions,
+            SinkCommitterMetricGroup metricGroup,
+            Map<String, String> hadoopConfOptions) {
+        Configuration configuration = 
HadoopConfUtils.createConfiguration(hadoopConfOptions);
         this.catalog =
                 CatalogUtil.buildIcebergCatalog(
-                        this.getClass().getSimpleName(), catalogOptions, new 
Configuration());
+                        this.getClass().getSimpleName(), catalogOptions, 
configuration);
         this.metricGroup = metricGroup;
         this.tableIdMetricMap = new HashMap<>();
     }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
index 23bb4bf87..b392bd9ad 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
@@ -58,6 +58,7 @@ public class IcebergSink
 
     protected final Map<String, String> catalogOptions;
     protected final Map<String, String> tableOptions;
+    protected final Map<String, String> hadoopConfOptions;
 
     private final ZoneId zoneId;
 
@@ -72,13 +73,15 @@ public class IcebergSink
             Map<String, String> tableOptions,
             ZoneId zoneId,
             CompactionOptions compactionOptions,
-            String jobIdPrefix) {
+            String jobIdPrefix,
+            Map<String, String> hadoopConfOptions) {
         this.catalogOptions = catalogOptions;
         this.tableOptions = tableOptions;
         this.zoneId = zoneId;
         this.compactionOptions = compactionOptions;
         this.jobId = jobIdPrefix + UUID.randomUUID();
         this.operatorId = UUID.randomUUID().toString();
+        this.hadoopConfOptions = hadoopConfOptions;
     }
 
     @Override
@@ -87,14 +90,14 @@ public class IcebergSink
     }
 
     public Committer<WriteResultWrapper> createCommitter() {
-        return new IcebergCommitter(catalogOptions);
+        return new IcebergCommitter(catalogOptions, hadoopConfOptions);
     }
 
     @Override
     public Committer<WriteResultWrapper> createCommitter(
             CommitterInitContext committerInitContext) {
         SinkCommitterMetricGroup metricGroup = 
committerInitContext.metricGroup();
-        return new IcebergCommitter(catalogOptions, metricGroup);
+        return new IcebergCommitter(catalogOptions, metricGroup, 
hadoopConfOptions);
     }
 
     @Override
@@ -115,7 +118,8 @@ public class IcebergSink
                 zoneId,
                 lastCheckpointId,
                 jobId,
-                operatorId);
+                operatorId,
+                hadoopConfOptions);
     }
 
     @Override
@@ -130,7 +134,8 @@ public class IcebergSink
                 zoneId,
                 lastCheckpointId,
                 jobId,
-                operatorId);
+                operatorId,
+                hadoopConfOptions);
     }
 
     @Override
@@ -153,7 +158,8 @@ public class IcebergSink
                 zoneId,
                 lastCheckpointId,
                 jobId,
-                operatorId);
+                operatorId,
+                hadoopConfOptions);
     }
 
     @Override
@@ -208,7 +214,8 @@ public class IcebergSink
                     .transform(
                             "Compaction",
                             typeInformation,
-                            new CompactionOperator(catalogOptions, 
compactionOptions))
+                            new CompactionOperator(
+                                    catalogOptions, compactionOptions, 
hadoopConfOptions))
                     .setParallelism(parallelism);
         }
     }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
index 62e47d897..cbcd3b98e 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
@@ -27,6 +27,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.connectors.iceberg.sink.utils.HadoopConfUtils;
 import org.apache.flink.cdc.connectors.iceberg.sink.utils.RowDataUtils;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
@@ -92,10 +93,12 @@ public class IcebergWriter
             ZoneId zoneId,
             long lastCheckpointId,
             String jobId,
-            String operatorId) {
+            String operatorId,
+            Map<String, String> hadoopConfOptions) {
+        Configuration configuration = 
HadoopConfUtils.createConfiguration(hadoopConfOptions);
         catalog =
                 CatalogUtil.buildIcebergCatalog(
-                        this.getClass().getSimpleName(), catalogOptions, new 
Configuration());
+                        this.getClass().getSimpleName(), catalogOptions, 
configuration);
         writerFactoryMap = new HashMap<>();
         writerMap = new HashMap<>();
         schemaMap = new HashMap<>();
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java
index adfdba629..6ee9c0856 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java
@@ -18,6 +18,7 @@
 package org.apache.flink.cdc.connectors.iceberg.sink.v2.compaction;
 
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.connectors.iceberg.sink.utils.HadoopConfUtils;
 import org.apache.flink.cdc.connectors.iceberg.sink.v2.WriteResultWrapper;
 import org.apache.flink.cdc.runtime.operators.AbstractStreamOperatorAdapter;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -62,16 +63,21 @@ public class CompactionOperator
 
     private final CompactionOptions compactionOptions;
 
+    private final Map<String, String> hadoopConfOptions;
+
     private volatile Throwable throwable;
 
     private ExecutorService compactExecutor;
 
     public CompactionOperator(
-            Map<String, String> catalogOptions, CompactionOptions 
compactionOptions) {
+            Map<String, String> catalogOptions,
+            CompactionOptions compactionOptions,
+            Map<String, String> hadoopConfOptions) {
         this.tableCommitTimes = new HashMap<>();
         this.compactedTables = new HashSet<>();
         this.catalogOptions = catalogOptions;
         this.compactionOptions = compactionOptions;
+        this.hadoopConfOptions = hadoopConfOptions;
     }
 
     @Override
@@ -111,9 +117,10 @@ public class CompactionOperator
 
     private void compact(TableId tableId) {
         if (catalog == null) {
+            Configuration configuration = 
HadoopConfUtils.createConfiguration(hadoopConfOptions);
             catalog =
                     CatalogUtil.buildIcebergCatalog(
-                            this.getClass().getSimpleName(), catalogOptions, 
new Configuration());
+                            this.getClass().getSimpleName(), catalogOptions, 
configuration);
         }
         try {
             RewriteDataFilesActionResult rewriteResult =
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
index 0ada4ffa0..0e3fc00ed 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
@@ -93,6 +93,42 @@ public class IcebergDataSinkFactoryTest {
                                 + "unsupported_key");
     }
 
+    @Test
+    void testHadoopConfOptionsAreAllowed() {
+        IcebergDataSinkFactory sinkFactory =
+                (IcebergDataSinkFactory)
+                        FactoryDiscoveryUtils.getFactoryByIdentifier(
+                                "iceberg", DataSinkFactory.class);
+        
Assertions.assertThat(sinkFactory).isInstanceOf(IcebergDataSinkFactory.class);
+
+        Configuration conf =
+                Configuration.fromMap(
+                        ImmutableMap.<String, String>builder()
+                                .put(
+                                        
"hadoop.conf.hive.metastore.kerberos.keytab.file",
+                                        
"/etc/security/keytabs/hive.service.keytab")
+                                .put(
+                                        
"hadoop.conf.hive.metastore.kerberos.principal",
+                                        "hive/[email protected]")
+                                
.put("hadoop.conf.hive.metastore.sasl.enabled", "true")
+                                .build());
+
+        DataSink dataSink =
+                sinkFactory.createDataSink(
+                        new FactoryHelper.DefaultContext(
+                                conf, conf, 
Thread.currentThread().getContextClassLoader()));
+
+        Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
+        Map<String, String> hadoopConfOptions = ((IcebergDataSink) 
dataSink).getHadoopConfOptions();
+        Assertions.assertThat(hadoopConfOptions)
+                .containsEntry(
+                        "hive.metastore.kerberos.keytab.file",
+                        "/etc/security/keytabs/hive.service.keytab")
+                .containsEntry("hive.metastore.kerberos.principal", 
"hive/[email protected]")
+                .containsEntry("hive.metastore.sasl.enabled", "true")
+                
.doesNotContainKey("hadoop.conf.hive.metastore.kerberos.keytab.file");
+    }
+
     @Test
     void testPrefixRequireOption() {
         DataSinkFactory sinkFactory =
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
index 6d0fce3eb..2a2a84b3c 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
@@ -80,7 +80,8 @@ public class CompactionOperatorTest {
                         ZoneId.systemDefault(),
                         checkpointId,
                         jobId,
-                        operatorId);
+                        operatorId,
+                        new HashMap<>());
         IcebergMetadataApplier icebergMetadataApplier = new 
IcebergMetadataApplier(catalogOptions);
         TableId tableId = TableId.parse("test.iceberg_table");
 
@@ -134,34 +135,37 @@ public class CompactionOperatorTest {
         BinaryRecordDataGenerator binaryRecordDataGenerator =
                 new BinaryRecordDataGenerator(
                         
createTableEvent.getSchema().getColumnDataTypes().toArray(new DataType[0]));
-        IcebergCommitter icebergCommitter = new 
IcebergCommitter(catalogOptions);
-        // Commit many times.
         int smallFileCount = 100;
-        for (long i = 0; i < smallFileCount; i++) {
-            RecordData recordData =
-                    binaryRecordDataGenerator.generate(
-                            new Object[] {
-                                i,
-                                BinaryStringData.fromString("Mark"),
-                                10,
-                                BinaryStringData.fromString("test"),
-                                true,
-                                1.0f,
-                                1.0d,
-                                DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 10, 2),
-                                DateData.fromEpochDay(9)
-                            });
-            icebergWriter.write(DataChangeEvent.insertEvent(tableId, 
recordData), null);
-            Collection<Committer.CommitRequest<WriteResultWrapper>> collection 
=
-                    icebergWriter.prepareCommit().stream()
-                            .map(IcebergWriterTest.MockCommitRequestImpl::new)
-                            .collect(Collectors.toList());
-            icebergCommitter.commit(collection);
+        try (IcebergCommitter icebergCommitter =
+                new IcebergCommitter(catalogOptions, new HashMap<>())) {
+            // Commit many times.
+            for (long i = 0; i < smallFileCount; i++) {
+                RecordData recordData =
+                        binaryRecordDataGenerator.generate(
+                                new Object[] {
+                                    i,
+                                    BinaryStringData.fromString("Mark"),
+                                    10,
+                                    BinaryStringData.fromString("test"),
+                                    true,
+                                    1.0f,
+                                    1.0d,
+                                    DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 10, 2),
+                                    DateData.fromEpochDay(9)
+                                });
+                icebergWriter.write(DataChangeEvent.insertEvent(tableId, 
recordData), null);
+                Collection<Committer.CommitRequest<WriteResultWrapper>> 
collection =
+                        icebergWriter.prepareCommit().stream()
+                                
.map(IcebergWriterTest.MockCommitRequestImpl::new)
+                                .collect(Collectors.toList());
+                icebergCommitter.commit(collection);
+            }
         }
         CompactionOperator compactionOperator =
                 new CompactionOperator(
                         catalogOptions,
-                        
CompactionOptions.builder().commitInterval(1).parallelism(4).build());
+                        
CompactionOptions.builder().commitInterval(1).parallelism(4).build(),
+                        new HashMap<>());
         compactionOperator.processElement(
                 new StreamRecord<>(
                         new CommittableWithLineage<>(
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java
index 2bd3b228a..c9bf112d2 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java
@@ -105,7 +105,8 @@ public class IcebergSinkITCase {
                         null,
                         null,
                         CompactionOptions.builder().build(),
-                        "FlinkCDC");
+                        "FlinkCDC",
+                        new HashMap<>());
         String[] expected = new String[] {"21, 1.732, Disenchanted", "17, 
6.28, Doris Day"};
         stream.sinkTo(icebergSink);
         env.execute("Values to Iceberg Sink");
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
index b16b88931..745aea81f 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
@@ -93,7 +93,14 @@ public class IcebergWriterTest {
         String operatorId = UUID.randomUUID().toString();
         IcebergWriter icebergWriter =
                 new IcebergWriter(
-                        catalogOptions, 1, 1, ZoneId.systemDefault(), 0, 
jobId, operatorId);
+                        catalogOptions,
+                        1,
+                        1,
+                        ZoneId.systemDefault(),
+                        0,
+                        jobId,
+                        operatorId,
+                        new HashMap<>());
         IcebergMetadataApplier icebergMetadataApplier = new 
IcebergMetadataApplier(catalogOptions);
         TableId tableId = TableId.parse("test.iceberg_table");
 
@@ -179,10 +186,12 @@ public class IcebergWriterTest {
         DataChangeEvent dataChangeEvent2 = 
DataChangeEvent.insertEvent(tableId, recordData2);
         icebergWriter.write(dataChangeEvent2, null);
         Collection<WriteResultWrapper> writeResults = 
icebergWriter.prepareCommit();
-        IcebergCommitter icebergCommitter = new 
IcebergCommitter(catalogOptions);
         Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
                 
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
-        icebergCommitter.commit(collection);
+        try (IcebergCommitter icebergCommitter =
+                new IcebergCommitter(catalogOptions, new HashMap<>())) {
+            icebergCommitter.commit(collection);
+        }
         List<String> result = fetchTableContent(catalog, tableId, null);
         Assertions.assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -258,7 +267,10 @@ public class IcebergWriterTest {
         writeResults = icebergWriter.prepareCommit();
         collection =
                 
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
-        icebergCommitter.commit(collection);
+        try (IcebergCommitter icebergCommitter =
+                new IcebergCommitter(catalogOptions, new HashMap<>())) {
+            icebergCommitter.commit(collection);
+        }
         result = fetchTableContent(catalog, tableId, null);
         Assertions.assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -284,7 +296,15 @@ public class IcebergWriterTest {
         String jobId = UUID.randomUUID().toString();
         String operatorId = UUID.randomUUID().toString();
         IcebergWriter icebergWriter =
-                new IcebergWriter(catalogOptions, 1, 1, pipelineZoneId, 0, 
jobId, operatorId);
+                new IcebergWriter(
+                        catalogOptions,
+                        1,
+                        1,
+                        pipelineZoneId,
+                        0,
+                        jobId,
+                        operatorId,
+                        new HashMap<>());
         IcebergMetadataApplier icebergMetadataApplier = new 
IcebergMetadataApplier(catalogOptions);
         TableId tableId = TableId.parse("test.iceberg_table");
 
@@ -349,10 +369,12 @@ public class IcebergWriterTest {
         DataChangeEvent dataChangeEvent = DataChangeEvent.insertEvent(tableId, 
record1);
         icebergWriter.write(dataChangeEvent, null);
         Collection<WriteResultWrapper> writeResults = 
icebergWriter.prepareCommit();
-        IcebergCommitter icebergCommitter = new 
IcebergCommitter(catalogOptions);
         Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
                 
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
-        icebergCommitter.commit(collection);
+        try (IcebergCommitter icebergCommitter =
+                new IcebergCommitter(catalogOptions, new HashMap<>())) {
+            icebergCommitter.commit(collection);
+        }
         List<String> result = fetchTableContent(catalog, tableId, null);
         Assertions.assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -393,7 +415,14 @@ public class IcebergWriterTest {
         String operatorId = UUID.randomUUID().toString();
         IcebergWriter icebergWriter =
                 new IcebergWriter(
-                        catalogOptions, 1, 1, ZoneId.systemDefault(), 0, 
jobId, operatorId);
+                        catalogOptions,
+                        1,
+                        1,
+                        ZoneId.systemDefault(),
+                        0,
+                        jobId,
+                        operatorId,
+                        new HashMap<>());
 
         TableId tableId = TableId.parse("test.iceberg_table");
         Map<TableId, List<String>> partitionMaps = new HashMap<>();
@@ -440,10 +469,12 @@ public class IcebergWriterTest {
         DataChangeEvent dataChangeEvent2 = 
DataChangeEvent.insertEvent(tableId, recordData2);
         icebergWriter.write(dataChangeEvent2, null);
         Collection<WriteResultWrapper> writeResults = 
icebergWriter.prepareCommit();
-        IcebergCommitter icebergCommitter = new 
IcebergCommitter(catalogOptions);
         Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
                 
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
-        icebergCommitter.commit(collection);
+        try (IcebergCommitter icebergCommitter =
+                new IcebergCommitter(catalogOptions, new HashMap<>())) {
+            icebergCommitter.commit(collection);
+        }
 
         Table table =
                 catalog.loadTable(
@@ -482,7 +513,15 @@ public class IcebergWriterTest {
         String jobId = UUID.randomUUID().toString();
         String operatorId = UUID.randomUUID().toString();
         IcebergWriter icebergWriter =
-                new IcebergWriter(catalogOptions, 1, 1, pipelineZoneId, 0, 
jobId, operatorId);
+                new IcebergWriter(
+                        catalogOptions,
+                        1,
+                        1,
+                        pipelineZoneId,
+                        0,
+                        jobId,
+                        operatorId,
+                        new HashMap<>());
         IcebergMetadataApplier icebergMetadataApplier = new 
IcebergMetadataApplier(catalogOptions);
         TableId tableId = TableId.parse("test.iceberg_table");
         TableIdentifier tableIdentifier =
@@ -514,10 +553,12 @@ public class IcebergWriterTest {
         DataChangeEvent dataChangeEvent = DataChangeEvent.insertEvent(tableId, 
record1);
         icebergWriter.write(dataChangeEvent, null);
         Collection<WriteResultWrapper> writeResults = 
icebergWriter.prepareCommit();
-        IcebergCommitter icebergCommitter = new 
IcebergCommitter(catalogOptions);
         Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
                 
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
-        icebergCommitter.commit(collection);
+        try (IcebergCommitter icebergCommitter =
+                new IcebergCommitter(catalogOptions, new HashMap<>())) {
+            icebergCommitter.commit(collection);
+        }
         List<String> result = fetchTableContent(catalog, tableId, null);
         Assertions.assertThat(result.size()).isEqualTo(1);
         Assertions.assertThat(result).containsExactlyInAnyOrder("1, char1");
@@ -538,8 +579,11 @@ public class IcebergWriterTest {
         writeResults = icebergWriter.prepareCommit();
         collection =
                 
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
-        icebergCommitter.commit(collection);
-        icebergCommitter.commit(collection);
+        try (IcebergCommitter icebergCommitter =
+                new IcebergCommitter(catalogOptions, new HashMap<>())) {
+            icebergCommitter.commit(collection);
+            icebergCommitter.commit(collection);
+        }
         summary = 
catalog.loadTable(tableIdentifier).currentSnapshot().summary();
         Assertions.assertThat(summary.get("total-data-files")).isEqualTo("2");
         Assertions.assertThat(summary.get("added-records")).isEqualTo("1");


Reply via email to