This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.10.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.10.x by this push:
new ce40ac07744 CAMEL-21788: DelegatingCallback invokes completion in
wrong order. (#17260)
ce40ac07744 is described below
commit ce40ac07744217f58acf0ca513810eb2706c976b
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Feb 25 09:33:52 2025 +0000
CAMEL-21788: DelegatingCallback invokes completion in wrong order. (#17260)
---
.../apache/camel/component/kafka/KafkaProducer.java | 3 ++-
.../kafka/producer/support/DelegatingCallback.java | 18 +++++++++++++++---
2 files changed, 17 insertions(+), 4 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index d3deb114633..06ecf4510c5 100755
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -510,7 +510,8 @@ public class KafkaProducer extends DefaultAsyncProducer {
KafkaProducerMetadataCallBack metadataCallBack = new
KafkaProducerMetadataCallBack(
key, configuration.isRecordMetadata());
- DelegatingCallback delegatingCallback = new DelegatingCallback(cb,
metadataCallBack);
+ // make sure to cb is last in the order here
+ DelegatingCallback delegatingCallback = new
DelegatingCallback(metadataCallBack, cb);
kafkaProducer.send(record, delegatingCallback);
} else {
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
index f1a830ef3e1..c316cf6fd67 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
@@ -18,9 +18,12 @@ package org.apache.camel.component.kafka.producer.support;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public final class DelegatingCallback implements Callback {
+ private static final Logger LOG =
LoggerFactory.getLogger(DelegatingCallback.class);
private final Callback callback1;
private final Callback callback2;
@@ -31,8 +34,17 @@ public final class DelegatingCallback implements Callback {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
- callback1.onCompletion(metadata, exception);
- callback2.onCompletion(metadata, exception);
-
+ try {
+ callback1.onCompletion(metadata, exception);
+ } catch (Exception e) {
+ // ensure every callback is invoked
+ LOG.warn("Error invoking 1st onCompletion. This exception is
ignored.", e);
+ }
+ try {
+ callback2.onCompletion(metadata, exception);
+ } catch (Exception e) {
+ // ensure every callback is invoked
+ LOG.warn("Error invoking 2nd onCompletion. This exception is
ignored.", e);
+ }
}
}