This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 1a1e81c21 [FLINK-39342][Iceberg] Support hadoop.conf.* prefix to pass
Hadoop configuration properties (#4351)
1a1e81c21 is described below
commit 1a1e81c21c67dc4dd09a01888c70058eca9c6396
Author: EricZeng <[email protected]>
AuthorDate: Wed Apr 1 15:48:18 2026 +0800
[FLINK-39342][Iceberg] Support hadoop.conf.* prefix to pass Hadoop
configuration properties (#4351)
---
.../docs/connectors/pipeline-connectors/iceberg.md | 7 ++
.../docs/connectors/pipeline-connectors/iceberg.md | 7 ++
.../connectors/iceberg/sink/IcebergDataSink.java | 21 +++++-
.../iceberg/sink/IcebergDataSinkFactory.java | 19 +++++-
.../iceberg/sink/IcebergDataSinkOptions.java | 3 +
.../iceberg/sink/IcebergMetadataApplier.java | 17 ++++-
.../iceberg/sink/utils/HadoopConfUtils.java | 46 ++++++++++++++
.../iceberg/sink/v2/IcebergCommitter.java | 13 ++--
.../connectors/iceberg/sink/v2/IcebergSink.java | 21 ++++--
.../connectors/iceberg/sink/v2/IcebergWriter.java | 7 +-
.../sink/v2/compaction/CompactionOperator.java | 11 +++-
.../iceberg/sink/IcebergDataSinkFactoryTest.java | 36 +++++++++++
.../iceberg/sink/v2/CompactionOperatorTest.java | 52 ++++++++-------
.../iceberg/sink/v2/IcebergSinkITCase.java | 3 +-
.../iceberg/sink/v2/IcebergWriterTest.java | 74 +++++++++++++++++-----
15 files changed, 275 insertions(+), 62 deletions(-)
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md
b/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md
index 8cada084e..74b8ec1c7 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md
@@ -241,6 +241,13 @@ Pipeline 连接器选项
<td>String</td>
<td>透传 Iceberg 表选项到管道,详见 <a
href="https://iceberg.apache.org/docs/nightly/configuration/#write-properties">Iceberg
表配置</a>。</td>
</tr>
+ <tr>
+ <td>hadoop.conf.*</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>传递 Hadoop <code>Configuration</code> 参数(用于 Iceberg 的 catalog/table
相关操作)。前缀 <code>hadoop.conf.</code> 会被剥离。例如
<code>hadoop.conf.fs.s3a.endpoint</code>。</td>
+ </tr>
</tbody>
</table>
</div>
diff --git a/docs/content/docs/connectors/pipeline-connectors/iceberg.md
b/docs/content/docs/connectors/pipeline-connectors/iceberg.md
index 2063e5723..cc7dded49 100644
--- a/docs/content/docs/connectors/pipeline-connectors/iceberg.md
+++ b/docs/content/docs/connectors/pipeline-connectors/iceberg.md
@@ -241,6 +241,13 @@ Pipeline Connector Options
<td>String</td>
<td>Pass Iceberg table options to the pipeline,See <a
href="https://iceberg.apache.org/docs/nightly/configuration/#write-properties">Iceberg
table options</a>. </td>
</tr>
+ <tr>
+ <td>hadoop.conf.*</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Pass Hadoop <code>Configuration</code> options used by Iceberg
catalog/table operations. The prefix <code>hadoop.conf.</code> will be
stripped. For example, <code>hadoop.conf.fs.s3a.endpoint</code>.</td>
+ </tr>
</tbody>
</table>
</div>
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java
index 96c2b5f76..ee6c9bd05 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java
@@ -39,6 +39,9 @@ public class IcebergDataSink implements DataSink,
Serializable {
// options for creating Iceberg table.
private final Map<String, String> tableOptions;
+ // options for Hadoop configuration.
+ private final Map<String, String> hadoopConfOptions;
+
private final Map<TableId, List<String>> partitionMaps;
private final ZoneId zoneId;
@@ -56,7 +59,8 @@ public class IcebergDataSink implements DataSink,
Serializable {
ZoneId zoneId,
String schemaOperatorUid,
CompactionOptions compactionOptions,
- String jobIdPrefix) {
+ String jobIdPrefix,
+ Map<String, String> hadoopConfOptions) {
this.catalogOptions = catalogOptions;
this.tableOptions = tableOptions;
this.partitionMaps = partitionMaps;
@@ -64,18 +68,29 @@ public class IcebergDataSink implements DataSink,
Serializable {
this.schemaOperatorUid = schemaOperatorUid;
this.compactionOptions = compactionOptions;
this.jobIdPrefix = jobIdPrefix;
+ this.hadoopConfOptions = hadoopConfOptions;
}
@Override
public EventSinkProvider getEventSinkProvider() {
IcebergSink icebergEventSink =
new IcebergSink(
- catalogOptions, tableOptions, zoneId,
compactionOptions, jobIdPrefix);
+ catalogOptions,
+ tableOptions,
+ zoneId,
+ compactionOptions,
+ jobIdPrefix,
+ hadoopConfOptions);
return FlinkSinkProvider.of(icebergEventSink);
}
@Override
public MetadataApplier getMetadataApplier() {
- return new IcebergMetadataApplier(catalogOptions, tableOptions,
partitionMaps);
+ return new IcebergMetadataApplier(
+ catalogOptions, tableOptions, partitionMaps,
hadoopConfOptions);
+ }
+
+ public Map<String, String> getHadoopConfOptions() {
+ return hadoopConfOptions;
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
index 13336ff21..9c9025523 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
@@ -41,6 +41,7 @@ import java.util.Objects;
import java.util.Set;
import static
org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES;
+import static
org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_HADOOP_CONF;
import static
org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_TABLE_PROPERTIES;
/** A {@link DataSinkFactory} for Apache Iceberg. */
@@ -55,13 +56,15 @@ public class IcebergDataSinkFactory implements
DataSinkFactory {
@Override
public DataSink createDataSink(Context context) {
FactoryHelper.createFactoryHelper(this, context)
- .validateExcept(PREFIX_TABLE_PROPERTIES,
PREFIX_CATALOG_PROPERTIES);
+ .validateExcept(
+ PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES,
PREFIX_HADOOP_CONF);
Map<String, String> allOptions =
context.getFactoryConfiguration().toMap();
OptionUtils.printOptions(IDENTIFIER, allOptions);
Map<String, String> catalogOptions = new HashMap<>();
Map<String, String> tableOptions = new HashMap<>();
+ Map<String, String> hadoopConfOptions =
extractHadoopConfOptions(allOptions);
allOptions.forEach(
(key, value) -> {
if (key.startsWith(PREFIX_TABLE_PROPERTIES)) {
@@ -116,7 +119,19 @@ public class IcebergDataSinkFactory implements
DataSinkFactory {
zoneId,
schemaOperatorUid,
compactionOptions,
- jobIdPrefix);
+ jobIdPrefix,
+ hadoopConfOptions);
+ }
+
+ static Map<String, String> extractHadoopConfOptions(Map<String, String>
allOptions) {
+ Map<String, String> hadoopConfOptions = new HashMap<>();
+ allOptions.forEach(
+ (key, value) -> {
+ if (key.startsWith(PREFIX_HADOOP_CONF)) {
+
hadoopConfOptions.put(key.substring(PREFIX_HADOOP_CONF.length()), value);
+ }
+ });
+ return hadoopConfOptions;
}
private CompactionOptions getCompactionStrategy(Configuration
configuration) {
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
index 0aa59677c..5fcf3f23b 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
@@ -31,6 +31,9 @@ public class IcebergDataSinkOptions {
// prefix for passing properties for catalog creation.
public static final String PREFIX_CATALOG_PROPERTIES =
"catalog.properties.";
+ // prefix for passing properties for Hadoop configuration.
+ public static final String PREFIX_HADOOP_CONF = "hadoop.conf.";
+
public static final ConfigOption<String> TYPE =
key("catalog.properties.type")
.stringType()
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
index c71358104..01cb4b490 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
@@ -32,6 +32,7 @@ import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
+import org.apache.flink.cdc.connectors.iceberg.sink.utils.HadoopConfUtils;
import org.apache.flink.cdc.connectors.iceberg.sink.utils.IcebergTypeUtils;
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
@@ -92,19 +93,30 @@ public class IcebergMetadataApplier implements
MetadataApplier {
private final Map<TableId, List<String>> partitionMaps;
+ private final Map<String, String> hadoopConfOptions;
+
private Set<SchemaChangeEventType> enabledSchemaEvolutionTypes;
public IcebergMetadataApplier(Map<String, String> catalogOptions) {
- this(catalogOptions, new HashMap<>(), new HashMap<>());
+ this(catalogOptions, new HashMap<>(), new HashMap<>(), null);
}
public IcebergMetadataApplier(
Map<String, String> catalogOptions,
Map<String, String> tableOptions,
Map<TableId, List<String>> partitionMaps) {
+ this(catalogOptions, tableOptions, partitionMaps, null);
+ }
+
+ public IcebergMetadataApplier(
+ Map<String, String> catalogOptions,
+ Map<String, String> tableOptions,
+ Map<TableId, List<String>> partitionMaps,
+ Map<String, String> hadoopConfOptions) {
this.catalogOptions = catalogOptions;
this.tableOptions = tableOptions;
this.partitionMaps = partitionMaps;
+ this.hadoopConfOptions = hadoopConfOptions;
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
}
@@ -112,9 +124,10 @@ public class IcebergMetadataApplier implements
MetadataApplier {
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent)
throws SchemaEvolveException {
if (catalog == null) {
+ Configuration configuration =
HadoopConfUtils.createConfiguration(hadoopConfOptions);
catalog =
CatalogUtil.buildIcebergCatalog(
- this.getClass().getSimpleName(), catalogOptions,
new Configuration());
+ this.getClass().getSimpleName(), catalogOptions,
configuration);
}
SchemaChangeEventVisitor.visit(
schemaChangeEvent,
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/HadoopConfUtils.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/HadoopConfUtils.java
new file mode 100644
index 000000000..39eb50b83
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/HadoopConfUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.iceberg.sink.utils;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+/** Utilities for building Hadoop {@link Configuration} from connector
options. */
+public final class HadoopConfUtils {
+
+ private HadoopConfUtils() {}
+
+ public static Configuration createConfiguration(Map<String, String>
hadoopConfOptions) {
+ Configuration configuration = new Configuration();
+ applyTo(configuration, hadoopConfOptions);
+ return configuration;
+ }
+
+ public static void applyTo(Configuration configuration, Map<String,
String> hadoopConfOptions) {
+ if (configuration == null || hadoopConfOptions == null ||
hadoopConfOptions.isEmpty()) {
+ return;
+ }
+ hadoopConfOptions.forEach(
+ (k, v) -> {
+ if (k != null && v != null) {
+ configuration.set(k, v);
+ }
+ });
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
index 6cadc3d91..759777440 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.connectors.iceberg.sink.v2;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.connectors.iceberg.sink.utils.HadoopConfUtils;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
@@ -67,15 +68,19 @@ public class IcebergCommitter implements
Committer<WriteResultWrapper> {
private final Map<TableId, TableMetric> tableIdMetricMap;
- public IcebergCommitter(Map<String, String> catalogOptions) {
- this(catalogOptions, null);
+ public IcebergCommitter(
+ Map<String, String> catalogOptions, Map<String, String>
hadoopConfOptions) {
+ this(catalogOptions, null, hadoopConfOptions);
}
public IcebergCommitter(
- Map<String, String> catalogOptions, SinkCommitterMetricGroup
metricGroup) {
+ Map<String, String> catalogOptions,
+ SinkCommitterMetricGroup metricGroup,
+ Map<String, String> hadoopConfOptions) {
+ Configuration configuration =
HadoopConfUtils.createConfiguration(hadoopConfOptions);
this.catalog =
CatalogUtil.buildIcebergCatalog(
- this.getClass().getSimpleName(), catalogOptions, new
Configuration());
+ this.getClass().getSimpleName(), catalogOptions,
configuration);
this.metricGroup = metricGroup;
this.tableIdMetricMap = new HashMap<>();
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
index 23bb4bf87..b392bd9ad 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
@@ -58,6 +58,7 @@ public class IcebergSink
protected final Map<String, String> catalogOptions;
protected final Map<String, String> tableOptions;
+ protected final Map<String, String> hadoopConfOptions;
private final ZoneId zoneId;
@@ -72,13 +73,15 @@ public class IcebergSink
Map<String, String> tableOptions,
ZoneId zoneId,
CompactionOptions compactionOptions,
- String jobIdPrefix) {
+ String jobIdPrefix,
+ Map<String, String> hadoopConfOptions) {
this.catalogOptions = catalogOptions;
this.tableOptions = tableOptions;
this.zoneId = zoneId;
this.compactionOptions = compactionOptions;
this.jobId = jobIdPrefix + UUID.randomUUID();
this.operatorId = UUID.randomUUID().toString();
+ this.hadoopConfOptions = hadoopConfOptions;
}
@Override
@@ -87,14 +90,14 @@ public class IcebergSink
}
public Committer<WriteResultWrapper> createCommitter() {
- return new IcebergCommitter(catalogOptions);
+ return new IcebergCommitter(catalogOptions, hadoopConfOptions);
}
@Override
public Committer<WriteResultWrapper> createCommitter(
CommitterInitContext committerInitContext) {
SinkCommitterMetricGroup metricGroup =
committerInitContext.metricGroup();
- return new IcebergCommitter(catalogOptions, metricGroup);
+ return new IcebergCommitter(catalogOptions, metricGroup,
hadoopConfOptions);
}
@Override
@@ -115,7 +118,8 @@ public class IcebergSink
zoneId,
lastCheckpointId,
jobId,
- operatorId);
+ operatorId,
+ hadoopConfOptions);
}
@Override
@@ -130,7 +134,8 @@ public class IcebergSink
zoneId,
lastCheckpointId,
jobId,
- operatorId);
+ operatorId,
+ hadoopConfOptions);
}
@Override
@@ -153,7 +158,8 @@ public class IcebergSink
zoneId,
lastCheckpointId,
jobId,
- operatorId);
+ operatorId,
+ hadoopConfOptions);
}
@Override
@@ -208,7 +214,8 @@ public class IcebergSink
.transform(
"Compaction",
typeInformation,
- new CompactionOperator(catalogOptions,
compactionOptions))
+ new CompactionOperator(
+ catalogOptions, compactionOptions,
hadoopConfOptions))
.setParallelism(parallelism);
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
index 62e47d897..cbcd3b98e 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
@@ -27,6 +27,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.connectors.iceberg.sink.utils.HadoopConfUtils;
import org.apache.flink.cdc.connectors.iceberg.sink.utils.RowDataUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
@@ -92,10 +93,12 @@ public class IcebergWriter
ZoneId zoneId,
long lastCheckpointId,
String jobId,
- String operatorId) {
+ String operatorId,
+ Map<String, String> hadoopConfOptions) {
+ Configuration configuration =
HadoopConfUtils.createConfiguration(hadoopConfOptions);
catalog =
CatalogUtil.buildIcebergCatalog(
- this.getClass().getSimpleName(), catalogOptions, new
Configuration());
+ this.getClass().getSimpleName(), catalogOptions,
configuration);
writerFactoryMap = new HashMap<>();
writerMap = new HashMap<>();
schemaMap = new HashMap<>();
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java
index adfdba629..6ee9c0856 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java
@@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.iceberg.sink.v2.compaction;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.connectors.iceberg.sink.utils.HadoopConfUtils;
import org.apache.flink.cdc.connectors.iceberg.sink.v2.WriteResultWrapper;
import org.apache.flink.cdc.runtime.operators.AbstractStreamOperatorAdapter;
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -62,16 +63,21 @@ public class CompactionOperator
private final CompactionOptions compactionOptions;
+ private final Map<String, String> hadoopConfOptions;
+
private volatile Throwable throwable;
private ExecutorService compactExecutor;
public CompactionOperator(
- Map<String, String> catalogOptions, CompactionOptions
compactionOptions) {
+ Map<String, String> catalogOptions,
+ CompactionOptions compactionOptions,
+ Map<String, String> hadoopConfOptions) {
this.tableCommitTimes = new HashMap<>();
this.compactedTables = new HashSet<>();
this.catalogOptions = catalogOptions;
this.compactionOptions = compactionOptions;
+ this.hadoopConfOptions = hadoopConfOptions;
}
@Override
@@ -111,9 +117,10 @@ public class CompactionOperator
private void compact(TableId tableId) {
if (catalog == null) {
+ Configuration configuration =
HadoopConfUtils.createConfiguration(hadoopConfOptions);
catalog =
CatalogUtil.buildIcebergCatalog(
- this.getClass().getSimpleName(), catalogOptions,
new Configuration());
+ this.getClass().getSimpleName(), catalogOptions,
configuration);
}
try {
RewriteDataFilesActionResult rewriteResult =
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
index 0ada4ffa0..0e3fc00ed 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
@@ -93,6 +93,42 @@ public class IcebergDataSinkFactoryTest {
+ "unsupported_key");
}
+ @Test
+ void testHadoopConfOptionsAreAllowed() {
+ IcebergDataSinkFactory sinkFactory =
+ (IcebergDataSinkFactory)
+ FactoryDiscoveryUtils.getFactoryByIdentifier(
+ "iceberg", DataSinkFactory.class);
+
Assertions.assertThat(sinkFactory).isInstanceOf(IcebergDataSinkFactory.class);
+
+ Configuration conf =
+ Configuration.fromMap(
+ ImmutableMap.<String, String>builder()
+ .put(
+
"hadoop.conf.hive.metastore.kerberos.keytab.file",
+
"/etc/security/keytabs/hive.service.keytab")
+ .put(
+
"hadoop.conf.hive.metastore.kerberos.principal",
+ "hive/[email protected]")
+
.put("hadoop.conf.hive.metastore.sasl.enabled", "true")
+ .build());
+
+ DataSink dataSink =
+ sinkFactory.createDataSink(
+ new FactoryHelper.DefaultContext(
+ conf, conf,
Thread.currentThread().getContextClassLoader()));
+
+ Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
+ Map<String, String> hadoopConfOptions = ((IcebergDataSink)
dataSink).getHadoopConfOptions();
+ Assertions.assertThat(hadoopConfOptions)
+ .containsEntry(
+ "hive.metastore.kerberos.keytab.file",
+ "/etc/security/keytabs/hive.service.keytab")
+ .containsEntry("hive.metastore.kerberos.principal",
"hive/[email protected]")
+ .containsEntry("hive.metastore.sasl.enabled", "true")
+
.doesNotContainKey("hadoop.conf.hive.metastore.kerberos.keytab.file");
+ }
+
@Test
void testPrefixRequireOption() {
DataSinkFactory sinkFactory =
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
index 6d0fce3eb..2a2a84b3c 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
@@ -80,7 +80,8 @@ public class CompactionOperatorTest {
ZoneId.systemDefault(),
checkpointId,
jobId,
- operatorId);
+ operatorId,
+ new HashMap<>());
IcebergMetadataApplier icebergMetadataApplier = new
IcebergMetadataApplier(catalogOptions);
TableId tableId = TableId.parse("test.iceberg_table");
@@ -134,34 +135,37 @@ public class CompactionOperatorTest {
BinaryRecordDataGenerator binaryRecordDataGenerator =
new BinaryRecordDataGenerator(
createTableEvent.getSchema().getColumnDataTypes().toArray(new DataType[0]));
- IcebergCommitter icebergCommitter = new
IcebergCommitter(catalogOptions);
- // Commit many times.
int smallFileCount = 100;
- for (long i = 0; i < smallFileCount; i++) {
- RecordData recordData =
- binaryRecordDataGenerator.generate(
- new Object[] {
- i,
- BinaryStringData.fromString("Mark"),
- 10,
- BinaryStringData.fromString("test"),
- true,
- 1.0f,
- 1.0d,
- DecimalData.fromBigDecimal(new
BigDecimal(1.0), 10, 2),
- DateData.fromEpochDay(9)
- });
- icebergWriter.write(DataChangeEvent.insertEvent(tableId,
recordData), null);
- Collection<Committer.CommitRequest<WriteResultWrapper>> collection
=
- icebergWriter.prepareCommit().stream()
- .map(IcebergWriterTest.MockCommitRequestImpl::new)
- .collect(Collectors.toList());
- icebergCommitter.commit(collection);
+ try (IcebergCommitter icebergCommitter =
+ new IcebergCommitter(catalogOptions, new HashMap<>())) {
+ // Commit many times.
+ for (long i = 0; i < smallFileCount; i++) {
+ RecordData recordData =
+ binaryRecordDataGenerator.generate(
+ new Object[] {
+ i,
+ BinaryStringData.fromString("Mark"),
+ 10,
+ BinaryStringData.fromString("test"),
+ true,
+ 1.0f,
+ 1.0d,
+ DecimalData.fromBigDecimal(new
BigDecimal(1.0), 10, 2),
+ DateData.fromEpochDay(9)
+ });
+ icebergWriter.write(DataChangeEvent.insertEvent(tableId,
recordData), null);
+ Collection<Committer.CommitRequest<WriteResultWrapper>>
collection =
+ icebergWriter.prepareCommit().stream()
+
.map(IcebergWriterTest.MockCommitRequestImpl::new)
+ .collect(Collectors.toList());
+ icebergCommitter.commit(collection);
+ }
}
CompactionOperator compactionOperator =
new CompactionOperator(
catalogOptions,
-
CompactionOptions.builder().commitInterval(1).parallelism(4).build());
+
CompactionOptions.builder().commitInterval(1).parallelism(4).build(),
+ new HashMap<>());
compactionOperator.processElement(
new StreamRecord<>(
new CommittableWithLineage<>(
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java
index 2bd3b228a..c9bf112d2 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java
@@ -105,7 +105,8 @@ public class IcebergSinkITCase {
null,
null,
CompactionOptions.builder().build(),
- "FlinkCDC");
+ "FlinkCDC",
+ new HashMap<>());
String[] expected = new String[] {"21, 1.732, Disenchanted", "17,
6.28, Doris Day"};
stream.sinkTo(icebergSink);
env.execute("Values to Iceberg Sink");
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
index b16b88931..745aea81f 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
@@ -93,7 +93,14 @@ public class IcebergWriterTest {
String operatorId = UUID.randomUUID().toString();
IcebergWriter icebergWriter =
new IcebergWriter(
- catalogOptions, 1, 1, ZoneId.systemDefault(), 0,
jobId, operatorId);
+ catalogOptions,
+ 1,
+ 1,
+ ZoneId.systemDefault(),
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
IcebergMetadataApplier icebergMetadataApplier = new
IcebergMetadataApplier(catalogOptions);
TableId tableId = TableId.parse("test.iceberg_table");
@@ -179,10 +186,12 @@ public class IcebergWriterTest {
DataChangeEvent dataChangeEvent2 =
DataChangeEvent.insertEvent(tableId, recordData2);
icebergWriter.write(dataChangeEvent2, null);
Collection<WriteResultWrapper> writeResults =
icebergWriter.prepareCommit();
- IcebergCommitter icebergCommitter = new
IcebergCommitter(catalogOptions);
Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
- icebergCommitter.commit(collection);
+ try (IcebergCommitter icebergCommitter =
+ new IcebergCommitter(catalogOptions, new HashMap<>())) {
+ icebergCommitter.commit(collection);
+ }
List<String> result = fetchTableContent(catalog, tableId, null);
Assertions.assertThat(result)
.containsExactlyInAnyOrder(
@@ -258,7 +267,10 @@ public class IcebergWriterTest {
writeResults = icebergWriter.prepareCommit();
collection =
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
- icebergCommitter.commit(collection);
+ try (IcebergCommitter icebergCommitter =
+ new IcebergCommitter(catalogOptions, new HashMap<>())) {
+ icebergCommitter.commit(collection);
+ }
result = fetchTableContent(catalog, tableId, null);
Assertions.assertThat(result)
.containsExactlyInAnyOrder(
@@ -284,7 +296,15 @@ public class IcebergWriterTest {
String jobId = UUID.randomUUID().toString();
String operatorId = UUID.randomUUID().toString();
IcebergWriter icebergWriter =
- new IcebergWriter(catalogOptions, 1, 1, pipelineZoneId, 0,
jobId, operatorId);
+ new IcebergWriter(
+ catalogOptions,
+ 1,
+ 1,
+ pipelineZoneId,
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
IcebergMetadataApplier icebergMetadataApplier = new
IcebergMetadataApplier(catalogOptions);
TableId tableId = TableId.parse("test.iceberg_table");
@@ -349,10 +369,12 @@ public class IcebergWriterTest {
DataChangeEvent dataChangeEvent = DataChangeEvent.insertEvent(tableId,
record1);
icebergWriter.write(dataChangeEvent, null);
Collection<WriteResultWrapper> writeResults =
icebergWriter.prepareCommit();
- IcebergCommitter icebergCommitter = new
IcebergCommitter(catalogOptions);
Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
- icebergCommitter.commit(collection);
+ try (IcebergCommitter icebergCommitter =
+ new IcebergCommitter(catalogOptions, new HashMap<>())) {
+ icebergCommitter.commit(collection);
+ }
List<String> result = fetchTableContent(catalog, tableId, null);
Assertions.assertThat(result)
.containsExactlyInAnyOrder(
@@ -393,7 +415,14 @@ public class IcebergWriterTest {
String operatorId = UUID.randomUUID().toString();
IcebergWriter icebergWriter =
new IcebergWriter(
- catalogOptions, 1, 1, ZoneId.systemDefault(), 0,
jobId, operatorId);
+ catalogOptions,
+ 1,
+ 1,
+ ZoneId.systemDefault(),
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
TableId tableId = TableId.parse("test.iceberg_table");
Map<TableId, List<String>> partitionMaps = new HashMap<>();
@@ -440,10 +469,12 @@ public class IcebergWriterTest {
DataChangeEvent dataChangeEvent2 =
DataChangeEvent.insertEvent(tableId, recordData2);
icebergWriter.write(dataChangeEvent2, null);
Collection<WriteResultWrapper> writeResults =
icebergWriter.prepareCommit();
- IcebergCommitter icebergCommitter = new
IcebergCommitter(catalogOptions);
Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
- icebergCommitter.commit(collection);
+ try (IcebergCommitter icebergCommitter =
+ new IcebergCommitter(catalogOptions, new HashMap<>())) {
+ icebergCommitter.commit(collection);
+ }
Table table =
catalog.loadTable(
@@ -482,7 +513,15 @@ public class IcebergWriterTest {
String jobId = UUID.randomUUID().toString();
String operatorId = UUID.randomUUID().toString();
IcebergWriter icebergWriter =
- new IcebergWriter(catalogOptions, 1, 1, pipelineZoneId, 0,
jobId, operatorId);
+ new IcebergWriter(
+ catalogOptions,
+ 1,
+ 1,
+ pipelineZoneId,
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
IcebergMetadataApplier icebergMetadataApplier = new
IcebergMetadataApplier(catalogOptions);
TableId tableId = TableId.parse("test.iceberg_table");
TableIdentifier tableIdentifier =
@@ -514,10 +553,12 @@ public class IcebergWriterTest {
DataChangeEvent dataChangeEvent = DataChangeEvent.insertEvent(tableId,
record1);
icebergWriter.write(dataChangeEvent, null);
Collection<WriteResultWrapper> writeResults =
icebergWriter.prepareCommit();
- IcebergCommitter icebergCommitter = new
IcebergCommitter(catalogOptions);
Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
- icebergCommitter.commit(collection);
+ try (IcebergCommitter icebergCommitter =
+ new IcebergCommitter(catalogOptions, new HashMap<>())) {
+ icebergCommitter.commit(collection);
+ }
List<String> result = fetchTableContent(catalog, tableId, null);
Assertions.assertThat(result.size()).isEqualTo(1);
Assertions.assertThat(result).containsExactlyInAnyOrder("1, char1");
@@ -538,8 +579,11 @@ public class IcebergWriterTest {
writeResults = icebergWriter.prepareCommit();
collection =
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
- icebergCommitter.commit(collection);
- icebergCommitter.commit(collection);
+ try (IcebergCommitter icebergCommitter =
+ new IcebergCommitter(catalogOptions, new HashMap<>())) {
+ icebergCommitter.commit(collection);
+ icebergCommitter.commit(collection);
+ }
summary =
catalog.loadTable(tableIdentifier).currentSnapshot().summary();
Assertions.assertThat(summary.get("total-data-files")).isEqualTo("2");
Assertions.assertThat(summary.get("added-records")).isEqualTo("1");