This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new f70b7119468 CAMEL-22261 Reuse existing transaction if already
transacted
f70b7119468 is described below
commit f70b7119468b3530d0f6a8896475c04b596f5a8a
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Jul 23 21:30:02 2025 +0200
CAMEL-22261 Reuse existing transaction if already transacted
---
.../apache/camel/component/kafka/KafkaProducer.java | 21 ++++++++-------------
1 file changed, 8 insertions(+), 13 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 1ea7c459a21..c8af6d224a9 100755
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -530,20 +530,15 @@ public class KafkaProducer extends DefaultAsyncProducer
implements RouteIdAware
private void startKafkaTransaction(Exchange exchange) {
UnitOfWork uow = exchange.getUnitOfWork();
- if (uow.isTransactedBy(transactionId)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Not starting kafka transaction {} with exchange {}
(UOW hash code {}) since one is already started.",
- transactionId, exchange.getExchangeId(),
uow.hashCode());
- }
- return;
- } else if (LOG.isDebugEnabled()) {
- LOG.debug("Starting kafka transaction {} with exchange {} (UOW
hash code {})", transactionId,
- exchange.getExchangeId(), uow.hashCode());
+ if (!uow.isTransactedBy(transactionId)) {
+ LOG.debug("Starting kafka transaction {} with exchange {}",
transactionId, exchange.getExchangeId());
+ uow.beginTransactedBy(transactionId);
+ kafkaProducer.beginTransaction();
+ uow.addSynchronization(new
KafkaTransactionSynchronization(transactionId, kafkaProducer));
+ } else {
+ LOG.debug("Using existing kafka transaction {} with exchange {}.",
+ transactionId, exchange.getExchangeId());
}
-
- uow.beginTransactedBy(transactionId);
- kafkaProducer.beginTransaction();
- uow.addSynchronization(new
KafkaTransactionSynchronization(transactionId, kafkaProducer));
}
@Override