This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch kaf in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9c9dbb638760c0cb98748bbb8adfcf51b62f0e59 Author: Claus Ibsen <[email protected]> AuthorDate: Sat Feb 1 16:45:43 2025 +0100 camel-kafka: polished --- .../camel/component/kafka/KafkaProducer.java | 42 --------------- .../kafka/KafkaTransactionSynchronization.java | 63 ++++++++++++++++++++++ .../batching/KafkaRecordBatchingProcessor.java | 48 ++++++----------- .../KafkaRecordBatchingProcessorFacade.java | 10 ++-- 4 files changed, 82 insertions(+), 81 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 807e550e716..572c8d1b751 100755 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -41,18 +41,15 @@ import org.apache.camel.health.HealthCheckHelper; import org.apache.camel.health.WritableHealthCheckRepository; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.support.DefaultAsyncProducer; -import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.util.KeyValueHolder; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ReflectionHelper; import org.apache.camel.util.URISupport; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.NetworkClient; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.slf4j.Logger; @@ -529,42 +526,3 @@ public class KafkaProducer extends DefaultAsyncProducer { exchange.getUnitOfWork().addSynchronization(new KafkaTransactionSynchronization(transactionId, kafkaProducer)); } } - -class KafkaTransactionSynchronization extends SynchronizationAdapter { - private static final Logger LOG = LoggerFactory.getLogger(KafkaTransactionSynchronization.class); - private final String transactionId; - private final Producer kafkaProducer; - - public KafkaTransactionSynchronization(String transactionId, Producer kafkaProducer) { - this.transactionId = transactionId; - this.kafkaProducer = kafkaProducer; - } - - @Override - public void onDone(Exchange exchange) { - try { - if (exchange.getException() != null || exchange.isRollbackOnly()) { - if (exchange.getException() instanceof KafkaException) { - LOG.warn("Catch {} and will close kafka producer with transaction {} ", exchange.getException(), - transactionId); - kafkaProducer.close(); - } else { - LOG.warn("Abort kafka transaction {} with exchange {}", transactionId, exchange.getExchangeId()); - kafkaProducer.abortTransaction(); - } - } else { - LOG.debug("Commit kafka transaction {} with exchange {}", transactionId, exchange.getExchangeId()); - kafkaProducer.commitTransaction(); - } - } catch (KafkaException e) { - exchange.setException(e); - } catch (Exception e) { - exchange.setException(e); - LOG.warn("Abort kafka transaction {} with exchange {} due to {} ", transactionId, exchange.getExchangeId(), - e.getMessage(), e); - kafkaProducer.abortTransaction(); - } finally { - exchange.getUnitOfWork().endTransactedBy(transactionId); - } - } -} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaTransactionSynchronization.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaTransactionSynchronization.java new file mode 100644 index 00000000000..2525211f787 --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaTransactionSynchronization.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka; + +import org.apache.camel.Exchange; +import org.apache.camel.support.SynchronizationAdapter; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.KafkaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KafkaTransactionSynchronization extends SynchronizationAdapter { + private static final Logger LOG = LoggerFactory.getLogger(KafkaTransactionSynchronization.class); + private final String transactionId; + private final Producer kafkaProducer; + + public KafkaTransactionSynchronization(String transactionId, Producer kafkaProducer) { + this.transactionId = transactionId; + this.kafkaProducer = kafkaProducer; + } + + @Override + public void onDone(Exchange exchange) { + try { + if (exchange.getException() != null || exchange.isRollbackOnly()) { + if (exchange.getException() instanceof KafkaException) { + LOG.warn("Catch {} and will close kafka producer with transaction {} ", exchange.getException(), + transactionId); + kafkaProducer.close(); + } else { + LOG.warn("Abort kafka transaction {} with exchange {}", transactionId, exchange.getExchangeId()); + kafkaProducer.abortTransaction(); + } + } else { + LOG.debug("Commit kafka transaction {} with exchange {}", transactionId, exchange.getExchangeId()); + kafkaProducer.commitTransaction(); + } + } catch (KafkaException e) { + exchange.setException(e); + } catch (Exception e) { + exchange.setException(e); + LOG.warn("Abort kafka transaction {} with exchange {} due to {} ", transactionId, exchange.getExchangeId(), + e.getMessage(), e); + kafkaProducer.abortTransaction(); + } finally { + exchange.getUnitOfWork().endTransactedBy(transactionId); + } + } +} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java index 3e10a65e0ea..13f754f52ba 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java @@ -40,6 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { + private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordBatchingProcessor.class); private final KafkaConfiguration configuration; @@ -50,7 +51,6 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { private final class CommitSynchronization implements Synchronization { private final ExceptionHandler exceptionHandler; - private ProcessingResult result; public CommitSynchronization(ExceptionHandler exceptionHandler) { this.exceptionHandler = exceptionHandler; @@ -58,6 +58,7 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { @Override public void onComplete(Exchange exchange) { + // TODO: Make it possible to not keep List<Exchange> in body as result final List<?> exchanges = exchange.getMessage().getBody(List.class); // Ensure we are actually receiving what we are asked for @@ -68,7 +69,6 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { LOG.debug("Calling commit on {} exchanges using {}", exchanges.size(), commitManager.getClass().getSimpleName()); commitManager.commit(); - result = new ProcessingResult(false, false); } @Override @@ -81,8 +81,6 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { LOG.warn( "Skipping auto-commit on the batch because processing the exchanged has failed and the error was not correctly handled"); } - - result = new ProcessingResult(false, true); } } @@ -90,8 +88,7 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { this.configuration = configuration; this.processor = processor; this.commitManager = commitManager; - - this.exchangeList = new ArrayBlockingQueue<Exchange>(configuration.getMaxPollRecords()); + this.exchangeList = new ArrayBlockingQueue<>(configuration.getMaxPollRecords()); } public Exchange toExchange( @@ -105,7 +102,6 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { if (configuration.isAllowManualCommit()) { KafkaManualCommit manual = commitManager.getManualCommit(exchange, topicPartition, consumerRecord); - message.setHeader(KafkaConstants.MANUAL_COMMIT, manual); } @@ -115,6 +111,7 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { public ProcessingResult processExchange(KafkaConsumer camelKafkaConsumer, ConsumerRecords<Object, Object> consumerRecords) { LOG.debug("There's {} records to process ... max poll is set to {}", consumerRecords.count(), configuration.getMaxPollRecords()); + // Aggregate all consumer records in a single exchange if (exchangeList.isEmpty()) { watch.takenAndRestart(); @@ -153,17 +150,16 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { return !exchangeList.isEmpty() && consumerRecords.isEmpty() && watch.taken() >= configuration.getPollTimeoutMs(); } - private ProcessingResult processBatch(KafkaConsumer camelKafkaConsumer) { + private void processBatch(KafkaConsumer camelKafkaConsumer) { // Create the bundle exchange - final Exchange exchange = camelKafkaConsumer.createExchange(false); - final Message message = exchange.getMessage(); + Exchange exchange = camelKafkaConsumer.createExchange(false); + Message message = exchange.getMessage(); message.setBody(exchangeList.stream().toList()); - try { if (configuration.isAllowManualCommit()) { - return manualCommitResultProcessing(camelKafkaConsumer, exchange); + manualCommitResultProcessing(camelKafkaConsumer, exchange); } else { - return autoCommitResultProcessing(camelKafkaConsumer, exchange); + autoCommitResultProcessing(camelKafkaConsumer, exchange); } } finally { // Release the exchange @@ -174,46 +170,34 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { /* * The flow to execute when using auto-commit */ - private ProcessingResult autoCommitResultProcessing(KafkaConsumer camelKafkaConsumer, Exchange exchange) { - final ExceptionHandler exceptionHandler = camelKafkaConsumer.getExceptionHandler(); - final CommitSynchronization commitSynchronization = new CommitSynchronization(exceptionHandler); + private void autoCommitResultProcessing(KafkaConsumer camelKafkaConsumer, Exchange exchange) { + ExceptionHandler exceptionHandler = camelKafkaConsumer.getExceptionHandler(); + CommitSynchronization commitSynchronization = new CommitSynchronization(exceptionHandler); exchange.getExchangeExtension().addOnCompletion(commitSynchronization); - try { processor.process(exchange); } catch (Exception e) { exchange.setException(e); } - - return commitSynchronization.result; } /* * The flow to execute when the integrations perform manual commit on their own */ - private ProcessingResult manualCommitResultProcessing(KafkaConsumer camelKafkaConsumer, Exchange exchange) { + private void manualCommitResultProcessing(KafkaConsumer camelKafkaConsumer, Exchange exchange) { try { processor.process(exchange); } catch (Exception e) { exchange.setException(e); } - - ProcessingResult result; if (exchange.getException() != null) { - LOG.debug("An exception was thrown for batch records"); - final ExceptionHandler exceptionHandler = camelKafkaConsumer.getExceptionHandler(); - boolean handled = processException(exchange, exceptionHandler); - result = new ProcessingResult(false, handled); - } else { - result = new ProcessingResult(false, false); + ExceptionHandler exceptionHandler = camelKafkaConsumer.getExceptionHandler(); + processException(exchange, exceptionHandler); } - - return result; } - private boolean processException(Exchange exchange, ExceptionHandler exceptionHandler) { + private void processException(Exchange exchange, ExceptionHandler exceptionHandler) { // will handle/log the exception and then continue to next exceptionHandler.handleException("Error during processing", exchange, exchange.getException()); - return true; } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessorFacade.java index 0347249ebe0..06745b3506f 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessorFacade.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessorFacade.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.kafka.consumer.support.batching; import java.util.Set; @@ -30,16 +29,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaRecordBatchingProcessorFacade extends AbstractKafkaRecordProcessorFacade { + private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordBatchingProcessorFacade.class); + private final KafkaRecordBatchingProcessor kafkaRecordProcessor; - public KafkaRecordBatchingProcessorFacade( - KafkaConsumer camelKafkaConsumer, String threadId, + public KafkaRecordBatchingProcessorFacade(KafkaConsumer camelKafkaConsumer, String threadId, CommitManager commitManager, KafkaConsumerListener consumerListener) { super(camelKafkaConsumer, threadId, commitManager, consumerListener); - kafkaRecordProcessor = buildKafkaRecordProcessor(commitManager); - } private KafkaRecordBatchingProcessor buildKafkaRecordProcessor(CommitManager commitManager) { @@ -52,10 +50,8 @@ public class KafkaRecordBatchingProcessorFacade extends AbstractKafkaRecordProce @Override public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords) { logRecords(allRecords); - Set<TopicPartition> partitions = allRecords.partitions(); LOG.debug("Poll received records on {} partitions", partitions.size()); - return kafkaRecordProcessor.processExchange(camelKafkaConsumer, allRecords); }
