This is an automated email from the ASF dual-hosted git repository.

bryanck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 03fc1ae330 Kafka Connect: Add config for transactional ID prefix 
(#11780)
03fc1ae330 is described below

commit 03fc1ae33096716e9afdc7c4f1dce91de48f32bc
Author: Thomas Jaeckle <[email protected]>
AuthorDate: Fri Mar 14 16:21:41 2025 +0100

    Kafka Connect: Add config for transactional ID prefix (#11780)
---
 docs/docs/kafka-connect.md                              |  1 +
 .../org/apache/iceberg/connect/IcebergSinkConfig.java   | 17 +++++++++++++++++
 .../org/apache/iceberg/connect/channel/Channel.java     |  2 +-
 3 files changed, 19 insertions(+), 1 deletion(-)

diff --git a/docs/docs/kafka-connect.md b/docs/docs/kafka-connect.md
index 929c43220c..92bcb8c708 100644
--- a/docs/docs/kafka-connect.md
+++ b/docs/docs/kafka-connect.md
@@ -81,6 +81,7 @@ for exactly-once semantics. This requires Kafka 2.5 or later.
 | iceberg.control.commit.interval-ms         | Commit interval in msec, 
default is 300,000 (5 min)                                                      
        |
 | iceberg.control.commit.timeout-ms          | Commit timeout interval in 
msec, default is 30,000 (30 sec)                                                
      |
 | iceberg.control.commit.threads             | Number of threads to use for 
commits, default is (cores * 2)                                                 
    |
+| iceberg.coordinator.transactional.prefix   | Prefix for the transactional id 
to use for the coordinator producer, default is to use no/empty prefix          
 |
 | iceberg.catalog                            | Name of the catalog, default is 
`iceberg`                                                                       
 |
 | iceberg.catalog.*                          | Properties passed through to 
Iceberg catalog initialization                                                  
    |
 | iceberg.hadoop-conf-dir                    | If specified, Hadoop config 
files in this directory will be loaded                                          
     |
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
index 8e59d73923..a4e15932f1 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java
@@ -87,6 +87,8 @@ public class IcebergSinkConfig extends AbstractConfig {
   private static final int COMMIT_TIMEOUT_MS_DEFAULT = 30_000;
   private static final String COMMIT_THREADS_PROP = 
"iceberg.control.commit.threads";
   private static final String CONNECT_GROUP_ID_PROP = 
"iceberg.connect.group-id";
+  private static final String TRANSACTIONAL_PREFIX_PROP =
+      "iceberg.coordinator.transactional.prefix";
   private static final String HADOOP_CONF_DIR_PROP = "iceberg.hadoop-conf-dir";
 
   private static final String NAME_PROP = "name";
@@ -211,6 +213,12 @@ public class IcebergSinkConfig extends AbstractConfig {
         Runtime.getRuntime().availableProcessors() * 2,
         Importance.MEDIUM,
         "Coordinator threads to use for table commits, default is (cores * 
2)");
+    configDef.define(
+        TRANSACTIONAL_PREFIX_PROP,
+        ConfigDef.Type.STRING,
+        null,
+        Importance.LOW,
+        "Optional prefix of the transactional id for the coordinator");
     configDef.define(
         HADOOP_CONF_DIR_PROP,
         ConfigDef.Type.STRING,
@@ -393,6 +401,15 @@ public class IcebergSinkConfig extends AbstractConfig {
     return getInt(COMMIT_THREADS_PROP);
   }
 
+  public String transactionalPrefix() {
+    String result = getString(TRANSACTIONAL_PREFIX_PROP);
+    if (result != null) {
+      return result;
+    }
+
+    return "";
+  }
+
   public String hadoopConfDir() {
     return getString(HADOOP_CONF_DIR_PROP);
   }
diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java
index 993fcf67c9..01cf165de6 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Channel.java
@@ -64,7 +64,7 @@ abstract class Channel {
     this.connectGroupId = config.connectGroupId();
     this.context = context;
 
-    String transactionalId = name + config.transactionalSuffix();
+    String transactionalId = config.transactionalPrefix() + name + 
config.transactionalSuffix();
     this.producer = clientFactory.createProducer(transactionalId);
     this.consumer = clientFactory.createConsumer(consumerGroupId);
     this.admin = clientFactory.createAdmin();

Reply via email to