Copilot commented on code in PR #4351:
URL: https://github.com/apache/flink-cdc/pull/4351#discussion_r3007673317
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java:
##########
@@ -72,13 +73,15 @@ public IcebergSink(
Map<String, String> tableOptions,
ZoneId zoneId,
CompactionOptions compactionOptions,
- String jobIdPrefix) {
+ String jobIdPrefix,
+ Map<String, String> hadoopConfOptions) {
Review Comment:
Changing this public constructor signature forces all external callers to
pass an extra `hadoopConfOptions` argument. To avoid a source/binary
compatibility break for users instantiating `IcebergSink` directly, consider
keeping the previous constructor (delegating to this one with an empty map) and
deprecating it.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java:
##########
@@ -92,29 +93,41 @@ public class IcebergMetadataApplier implements
MetadataApplier {
private final Map<TableId, List<String>> partitionMaps;
+ private final Map<String, String> hadoopConfOptions;
+
private Set<SchemaChangeEventType> enabledSchemaEvolutionTypes;
public IcebergMetadataApplier(Map<String, String> catalogOptions) {
- this(catalogOptions, new HashMap<>(), new HashMap<>());
+ this(catalogOptions, new HashMap<>(), new HashMap<>(), null);
}
public IcebergMetadataApplier(
Map<String, String> catalogOptions,
Map<String, String> tableOptions,
Map<TableId, List<String>> partitionMaps) {
+ this(catalogOptions, tableOptions, partitionMaps, null);
+ }
+
+ public IcebergMetadataApplier(
+ Map<String, String> catalogOptions,
+ Map<String, String> tableOptions,
+ Map<TableId, List<String>> partitionMaps,
+ Map<String, String> hadoopConfOptions) {
this.catalogOptions = catalogOptions;
this.tableOptions = tableOptions;
this.partitionMaps = partitionMaps;
+ this.hadoopConfOptions = hadoopConfOptions;
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
}
@Override
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent)
throws SchemaEvolveException {
if (catalog == null) {
+ Configuration configuration =
HadoopConfUtils.createConfiguration(hadoopConfOptions);
catalog =
CatalogUtil.buildIcebergCatalog(
- this.getClass().getSimpleName(), catalogOptions,
new Configuration());
+ this.getClass().getSimpleName(), catalogOptions,
configuration);
Review Comment:
`IcebergMetadataApplier` creates an Iceberg `Catalog` here, but its
`close()` method currently only nulls the reference. For catalogs that manage
external resources (e.g., Hive metastore connections), consider closing the
catalog instance in `close()` to prevent resource leaks in long-running jobs or
during redeployments.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java:
##########
@@ -67,15 +68,19 @@ public class IcebergCommitter implements
Committer<WriteResultWrapper> {
private final Map<TableId, TableMetric> tableIdMetricMap;
- public IcebergCommitter(Map<String, String> catalogOptions) {
- this(catalogOptions, null);
+ public IcebergCommitter(
+ Map<String, String> catalogOptions, Map<String, String>
hadoopConfOptions) {
+ this(catalogOptions, null, hadoopConfOptions);
}
public IcebergCommitter(
- Map<String, String> catalogOptions, SinkCommitterMetricGroup
metricGroup) {
+ Map<String, String> catalogOptions,
+ SinkCommitterMetricGroup metricGroup,
+ Map<String, String> hadoopConfOptions) {
+ Configuration configuration =
HadoopConfUtils.createConfiguration(hadoopConfOptions);
this.catalog =
CatalogUtil.buildIcebergCatalog(
- this.getClass().getSimpleName(), catalogOptions, new
Configuration());
+ this.getClass().getSimpleName(), catalogOptions,
configuration);
Review Comment:
`IcebergCommitter` builds an Iceberg `Catalog` here, but its `close()`
implementation is currently a no-op. If the chosen catalog implementation holds
resources (e.g., Hive metastore client threads / pooled connections), not
closing it can leak resources across job lifecycles; consider closing the
catalog instance in `close()` (or otherwise documenting why it's safe not to).
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java:
##########
@@ -111,9 +117,10 @@ public void
processElement(StreamRecord<CommittableMessage<WriteResultWrapper>>
private void compact(TableId tableId) {
if (catalog == null) {
+ Configuration configuration =
HadoopConfUtils.createConfiguration(hadoopConfOptions);
catalog =
CatalogUtil.buildIcebergCatalog(
- this.getClass().getSimpleName(), catalogOptions,
new Configuration());
+ this.getClass().getSimpleName(), catalogOptions,
configuration);
Review Comment:
`compact()` lazily creates an Iceberg `Catalog` but `close()` currently only
sets `catalog = null`. If the catalog implementation holds resources (notably
Hive metastore clients), consider closing it during operator `close()` to avoid
resource leaks on job cancellation/restart.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java:
##########
@@ -92,10 +93,12 @@ public IcebergWriter(
ZoneId zoneId,
long lastCheckpointId,
String jobId,
- String operatorId) {
+ String operatorId,
+ Map<String, String> hadoopConfOptions) {
+ Configuration configuration =
HadoopConfUtils.createConfiguration(hadoopConfOptions);
catalog =
CatalogUtil.buildIcebergCatalog(
- this.getClass().getSimpleName(), catalogOptions, new
Configuration());
+ this.getClass().getSimpleName(), catalogOptions,
configuration);
Review Comment:
`IcebergWriter.close()` currently just nulls out `catalog` without closing
it. Since the catalog is created here (potentially with Hive/Glue
implementations that hold resources), consider closing the catalog instance in
`close()` to avoid leaking threads/connections between restarts or task
failures.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java:
##########
@@ -93,6 +93,31 @@ void testUnsupportedOption() {
+ "unsupported_key");
}
+ @Test
+ void testHadoopConfOptionsAreAllowed() {
+ DataSinkFactory sinkFactory =
+ FactoryDiscoveryUtils.getFactoryByIdentifier("iceberg",
DataSinkFactory.class);
+
Assertions.assertThat(sinkFactory).isInstanceOf(IcebergDataSinkFactory.class);
+
+ Configuration conf =
+ Configuration.fromMap(
+ ImmutableMap.<String, String>builder()
+ .put(
+
"hadoop.conf.hive.metastore.kerberos.keytab.file",
+
"/etc/security/keytabs/hive.service.keytab")
+ .put(
+
"hadoop.conf.hive.metastore.kerberos.principal",
+ "hive/[email protected]")
+
.put("hadoop.conf.hive.metastore.sasl.enabled", "true")
+ .build());
+
+ DataSink dataSink =
+ sinkFactory.createDataSink(
+ new FactoryHelper.DefaultContext(
+ conf, conf,
Thread.currentThread().getContextClassLoader()));
+ Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
+ }
Review Comment:
This test only asserts that the factory accepts `hadoop.conf.*` options, but
it doesn't verify that the prefix is stripped and the resulting Hadoop
configuration options are actually carried into the created sink (and later
used when building the Iceberg catalog). Consider asserting the produced
`hadoopConfOptions` contents (e.g., via reflection or a package-private
accessor) to ensure the new feature works end-to-end.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]