This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-4.8.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.8.x by this push:
     new f4e9f0b2993 CAMEL-21788: DelegatingCallback invokes completion in 
wrong order. (#17260)
f4e9f0b2993 is described below

commit f4e9f0b2993122d5cc5faf243e853340ddc9e7a2
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Feb 25 09:33:52 2025 +0000

    CAMEL-21788: DelegatingCallback invokes completion in wrong order. (#17260)
---
 .../apache/camel/component/kafka/KafkaProducer.java    |  3 ++-
 .../kafka/producer/support/DelegatingCallback.java     | 18 +++++++++++++++---
 2 files changed, 17 insertions(+), 4 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 807e550e716..8eda02c1fe0 100755
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -515,7 +515,8 @@ public class KafkaProducer extends DefaultAsyncProducer {
             KafkaProducerMetadataCallBack metadataCallBack = new 
KafkaProducerMetadataCallBack(
                     key, configuration.isRecordMetadata());
 
-            DelegatingCallback delegatingCallback = new DelegatingCallback(cb, 
metadataCallBack);
+            // make sure to cb is last in the order here
+            DelegatingCallback delegatingCallback = new 
DelegatingCallback(metadataCallBack, cb);
 
             kafkaProducer.send(record, delegatingCallback);
         } else {
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
index f1a830ef3e1..c316cf6fd67 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
@@ -18,9 +18,12 @@ package org.apache.camel.component.kafka.producer.support;
 
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class DelegatingCallback implements Callback {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(DelegatingCallback.class);
     private final Callback callback1;
     private final Callback callback2;
 
@@ -31,8 +34,17 @@ public final class DelegatingCallback implements Callback {
 
     @Override
     public void onCompletion(RecordMetadata metadata, Exception exception) {
-        callback1.onCompletion(metadata, exception);
-        callback2.onCompletion(metadata, exception);
-
+        try {
+            callback1.onCompletion(metadata, exception);
+        } catch (Exception e) {
+            // ensure every callback is invoked
+            LOG.warn("Error invoking 1st onCompletion. This exception is 
ignored.", e);
+        }
+        try {
+            callback2.onCompletion(metadata, exception);
+        } catch (Exception e) {
+            // ensure every callback is invoked
+            LOG.warn("Error invoking 2nd onCompletion. This exception is 
ignored.", e);
+        }
     }
 }

Reply via email to