Copilot commented on code in PR #4351:
URL: https://github.com/apache/flink-cdc/pull/4351#discussion_r3007673317


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java:
##########
@@ -72,13 +73,15 @@ public IcebergSink(
             Map<String, String> tableOptions,
             ZoneId zoneId,
             CompactionOptions compactionOptions,
-            String jobIdPrefix) {
+            String jobIdPrefix,
+            Map<String, String> hadoopConfOptions) {

Review Comment:
   Changing this public constructor signature forces all external callers to 
pass an extra `hadoopConfOptions` argument. To avoid a source/binary 
compatibility break for users instantiating `IcebergSink` directly, consider 
keeping the previous constructor (delegating to this one with an empty map) and 
deprecating it.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java:
##########
@@ -92,29 +93,41 @@ public class IcebergMetadataApplier implements 
MetadataApplier {
 
     private final Map<TableId, List<String>> partitionMaps;
 
+    private final Map<String, String> hadoopConfOptions;
+
     private Set<SchemaChangeEventType> enabledSchemaEvolutionTypes;
 
     public IcebergMetadataApplier(Map<String, String> catalogOptions) {
-        this(catalogOptions, new HashMap<>(), new HashMap<>());
+        this(catalogOptions, new HashMap<>(), new HashMap<>(), null);
     }
 
     public IcebergMetadataApplier(
             Map<String, String> catalogOptions,
             Map<String, String> tableOptions,
             Map<TableId, List<String>> partitionMaps) {
+        this(catalogOptions, tableOptions, partitionMaps, null);
+    }
+
+    public IcebergMetadataApplier(
+            Map<String, String> catalogOptions,
+            Map<String, String> tableOptions,
+            Map<TableId, List<String>> partitionMaps,
+            Map<String, String> hadoopConfOptions) {
         this.catalogOptions = catalogOptions;
         this.tableOptions = tableOptions;
         this.partitionMaps = partitionMaps;
+        this.hadoopConfOptions = hadoopConfOptions;
         this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
     }
 
     @Override
     public void applySchemaChange(SchemaChangeEvent schemaChangeEvent)
             throws SchemaEvolveException {
         if (catalog == null) {
+            Configuration configuration = 
HadoopConfUtils.createConfiguration(hadoopConfOptions);
             catalog =
                     CatalogUtil.buildIcebergCatalog(
-                            this.getClass().getSimpleName(), catalogOptions, 
new Configuration());
+                            this.getClass().getSimpleName(), catalogOptions, 
configuration);

Review Comment:
   `IcebergMetadataApplier` creates an Iceberg `Catalog` here, but its 
`close()` method currently only nulls the reference. For catalogs that manage 
external resources (e.g., Hive metastore connections), consider closing the 
catalog instance in `close()` to prevent resource leaks in long-running jobs or 
during redeployments.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java:
##########
@@ -67,15 +68,19 @@ public class IcebergCommitter implements 
Committer<WriteResultWrapper> {
 
     private final Map<TableId, TableMetric> tableIdMetricMap;
 
-    public IcebergCommitter(Map<String, String> catalogOptions) {
-        this(catalogOptions, null);
+    public IcebergCommitter(
+            Map<String, String> catalogOptions, Map<String, String> 
hadoopConfOptions) {
+        this(catalogOptions, null, hadoopConfOptions);
     }
 
     public IcebergCommitter(
-            Map<String, String> catalogOptions, SinkCommitterMetricGroup 
metricGroup) {
+            Map<String, String> catalogOptions,
+            SinkCommitterMetricGroup metricGroup,
+            Map<String, String> hadoopConfOptions) {
+        Configuration configuration = 
HadoopConfUtils.createConfiguration(hadoopConfOptions);
         this.catalog =
                 CatalogUtil.buildIcebergCatalog(
-                        this.getClass().getSimpleName(), catalogOptions, new 
Configuration());
+                        this.getClass().getSimpleName(), catalogOptions, 
configuration);

Review Comment:
   `IcebergCommitter` builds an Iceberg `Catalog` here, but its `close()` 
implementation is currently a no-op. If the chosen catalog implementation holds 
resources (e.g., Hive metastore client threads / pooled connections), not 
closing it can leak resources across job lifecycles; consider closing the 
catalog instance in `close()` (or otherwise documenting why it's safe not to).



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java:
##########
@@ -111,9 +117,10 @@ public void 
processElement(StreamRecord<CommittableMessage<WriteResultWrapper>>
 
     private void compact(TableId tableId) {
         if (catalog == null) {
+            Configuration configuration = 
HadoopConfUtils.createConfiguration(hadoopConfOptions);
             catalog =
                     CatalogUtil.buildIcebergCatalog(
-                            this.getClass().getSimpleName(), catalogOptions, 
new Configuration());
+                            this.getClass().getSimpleName(), catalogOptions, 
configuration);

Review Comment:
   `compact()` lazily creates an Iceberg `Catalog` but `close()` currently only 
sets `catalog = null`. If the catalog implementation holds resources (notably 
Hive metastore clients), consider closing it during operator `close()` to avoid 
resource leaks on job cancellation/restart.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java:
##########
@@ -92,10 +93,12 @@ public IcebergWriter(
             ZoneId zoneId,
             long lastCheckpointId,
             String jobId,
-            String operatorId) {
+            String operatorId,
+            Map<String, String> hadoopConfOptions) {
+        Configuration configuration = 
HadoopConfUtils.createConfiguration(hadoopConfOptions);
         catalog =
                 CatalogUtil.buildIcebergCatalog(
-                        this.getClass().getSimpleName(), catalogOptions, new 
Configuration());
+                        this.getClass().getSimpleName(), catalogOptions, 
configuration);

Review Comment:
   `IcebergWriter.close()` currently just nulls out `catalog` without closing 
it. Since the catalog is created here (potentially with Hive/Glue 
implementations that hold resources), consider closing the catalog instance in 
`close()` to avoid leaking threads/connections between restarts or task 
failures.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java:
##########
@@ -93,6 +93,31 @@ void testUnsupportedOption() {
                                 + "unsupported_key");
     }
 
+    @Test
+    void testHadoopConfOptionsAreAllowed() {
+        DataSinkFactory sinkFactory =
+                FactoryDiscoveryUtils.getFactoryByIdentifier("iceberg", 
DataSinkFactory.class);
+        
Assertions.assertThat(sinkFactory).isInstanceOf(IcebergDataSinkFactory.class);
+
+        Configuration conf =
+                Configuration.fromMap(
+                        ImmutableMap.<String, String>builder()
+                                .put(
+                                        
"hadoop.conf.hive.metastore.kerberos.keytab.file",
+                                        
"/etc/security/keytabs/hive.service.keytab")
+                                .put(
+                                        
"hadoop.conf.hive.metastore.kerberos.principal",
+                                        "hive/[email protected]")
+                                
.put("hadoop.conf.hive.metastore.sasl.enabled", "true")
+                                .build());
+
+        DataSink dataSink =
+                sinkFactory.createDataSink(
+                        new FactoryHelper.DefaultContext(
+                                conf, conf, 
Thread.currentThread().getContextClassLoader()));
+        Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
+    }

Review Comment:
   This test only asserts that the factory accepts `hadoop.conf.*` options, but 
it doesn't verify that the prefix is stripped and the resulting Hadoop 
configuration options are actually carried into the created sink (and later 
used when building the Iceberg catalog). Consider asserting the produced 
`hadoopConfOptions` contents (e.g., via reflection or a package-private 
accessor) to ensure the new feature works end-to-end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to