This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit cf84a59a0ead8a58a9ab4670ff37dfb942c963c1 Author: Mike Barlotta <codesm...@users.noreply.github.com> AuthorDate: Tue Jan 9 09:11:48 2024 -0500 remove lastResult from Kafka Camel component (#12002) --- .../camel/component/kafka/KafkaFetchRecords.java | 46 ++++----------------- .../consumer/support/KafkaRecordProcessor.java | 48 +++++++++------------- .../support/KafkaRecordProcessorFacade.java | 37 ++++++----------- .../kafka/consumer/support/ProcessingResult.java | 20 +-------- 4 files changed, 43 insertions(+), 108 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index 2cdd516d0c4..9a5004cc2de 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -305,8 +305,7 @@ public class KafkaFetchRecords implements Runnable { } protected void startPolling() { - long partitionLastOffset = -1; - + try { /* * We lock the processing of the record to avoid raising a WakeUpException as a result to a call @@ -315,7 +314,8 @@ public class KafkaFetchRecords implements Runnable { lock.lock(); long pollTimeoutMs = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs(); - + Duration pollDuration = Duration.ofMillis(pollTimeoutMs); + if (LOG.isTraceEnabled()) { LOG.trace("Polling {} from {} with timeout: {}", threadId, getPrintableTopic(), pollTimeoutMs); } @@ -323,10 +323,6 @@ public class KafkaFetchRecords implements Runnable { KafkaRecordProcessorFacade recordProcessorFacade = new KafkaRecordProcessorFacade( kafkaConsumer, threadId, commitManager, consumerListener); - Duration pollDuration = Duration.ofMillis(pollTimeoutMs); - - ProcessingResult lastResult = null; - while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && pollExceptionStrategy.canContinue()) { ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration); if (consumerListener != null) { @@ -335,43 +331,15 @@ public class KafkaFetchRecords implements Runnable { } } - if (lastResult != null) { - if (LOG.isTraceEnabled()) { - LOG.trace("This polling iteration is using lastresult on partition {} and offset {}", - lastResult.getPartition(), lastResult.getPartitionLastOffset()); - } - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("This polling iteration is using lastresult of null"); - } - } - - ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords, lastResult); - - if (result != null) { - if (LOG.isTraceEnabled()) { - LOG.trace("This polling iteration had a result returned for partition {} and offset {}", - result.getPartition(), result.getPartitionLastOffset()); - } - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("This polling iteration had a result returned as null"); - } - } - + ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords); updateTaskState(); + + // when breakOnFirstError we want to unsubscribe from Kafka if (result != null && result.isBreakOnErrorHit() && !this.state.equals(State.PAUSED)) { LOG.debug("We hit an error ... setting flags to force reconnect"); // force re-connect setReconnect(true); setConnected(false); - } else { - lastResult = result; - - if (LOG.isTraceEnabled()) { - LOG.trace("Setting lastresult to partition {} and offset {}", - lastResult.getPartition(), lastResult.getPartitionLastOffset()); - } } } @@ -405,6 +373,8 @@ public class KafkaFetchRecords implements Runnable { e.getClass().getName(), threadId, getPrintableTopic(), e.getMessage()); } + // why do we set this to -1 + long partitionLastOffset = -1; pollExceptionStrategy.handle(partitionLastOffset, e); } finally { // only close if not retry diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java index 2954836f5f6..b06e53493e6 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java @@ -23,7 +23,6 @@ import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.kafka.KafkaConfiguration; import org.apache.camel.component.kafka.KafkaConstants; -import org.apache.camel.component.kafka.consumer.AbstractCommitManager; import org.apache.camel.component.kafka.consumer.CommitManager; import org.apache.camel.component.kafka.consumer.KafkaManualCommit; import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer; @@ -86,8 +85,7 @@ public class KafkaRecordProcessor { public ProcessingResult processExchange( Exchange exchange, TopicPartition topicPartition, boolean partitionHasNext, - boolean recordHasNext, ConsumerRecord<Object, Object> record, ProcessingResult lastResult, - ExceptionHandler exceptionHandler) { + boolean recordHasNext, ConsumerRecord<Object, Object> record, ExceptionHandler exceptionHandler) { Message message = exchange.getMessage(); @@ -114,33 +112,31 @@ public class KafkaRecordProcessor { } catch (Exception e) { exchange.setException(e); } + + ProcessingResult result = ProcessingResult.newUnprocessed(); if (exchange.getException() != null) { LOG.debug("An exception was thrown for record at partition {} and offset {}", record.partition(), record.offset()); - boolean breakOnErrorExit = processException(exchange, topicPartition, record, lastResult, - exceptionHandler); - return new ProcessingResult(breakOnErrorExit, lastResult.getPartition(), lastResult.getPartitionLastOffset(), true); + boolean breakOnErrorExit = processException(exchange, topicPartition, record, exceptionHandler); + result = new ProcessingResult(breakOnErrorExit, true); } else { - return new ProcessingResult(false, record.partition(), record.offset(), exchange.getException() != null); + result = new ProcessingResult(false, exchange.getException() != null); + } + + if (!result.isBreakOnErrorHit()) { + commitManager.recordOffset(topicPartition, record.offset()); } + + return result; } private boolean processException( Exchange exchange, TopicPartition topicPartition, - ConsumerRecord<Object, Object> consumerRecord, ProcessingResult lastResult, - ExceptionHandler exceptionHandler) { + ConsumerRecord<Object, Object> consumerRecord, ExceptionHandler exceptionHandler) { // processing failed due to an unhandled exception, what should we do if (configuration.isBreakOnFirstError()) { - if (lastResult.getPartition() != -1 && - lastResult.getPartition() != consumerRecord.partition()) { - LOG.error("About to process an exception with UNEXPECTED partition & offset. Got topic partition {}. " + - " The last result was on partition {} with offset {} but was expecting partition {} with offset {}", - topicPartition.partition(), lastResult.getPartition(), lastResult.getPartitionLastOffset(), - consumerRecord.partition(), consumerRecord.offset()); - } - // we are failing and we should break out if (LOG.isWarnEnabled()) { Exception exc = exchange.getException(); @@ -150,17 +146,13 @@ public class KafkaRecordProcessor { consumerRecord.offset(), consumerRecord.partition()); } - // force commit, so we resume on next poll where we failed - // except when the failure happened at the first message in a poll - if (lastResult.getPartitionLastOffset() != AbstractCommitManager.START_OFFSET) { - // we should just do a commit (vs the original forceCommit) - // when route uses NOOP Commit Manager it will rely - // on the route implementation to explicitly commit offset - // when route uses Synch/Asynch Commit Manager it will - // ALWAYS commit the offset for the failing record - // and will ALWAYS retry it - commitManager.commit(topicPartition); - } + // we should just do a commit (vs the original forceCommit) + // when route uses NOOP Commit Manager it will rely + // on the route implementation to explicitly commit offset + // when route uses Synch/Asynch Commit Manager it will + // ALWAYS commit the offset for the failing record + // and will ALWAYS retry it + commitManager.commit(topicPartition); // continue to next partition return true; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java index f6317510fd9..470e2ea01e6 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java @@ -54,18 +54,16 @@ public class KafkaRecordProcessorFacade { return camelKafkaConsumer.isStopping(); } - public ProcessingResult processPolledRecords( - ConsumerRecords<Object, Object> allRecords, ProcessingResult resultFromPreviousPoll) { + public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords) { logRecords(allRecords); + ProcessingResult result = ProcessingResult.newUnprocessed(); + Set<TopicPartition> partitions = allRecords.partitions(); Iterator<TopicPartition> partitionIterator = partitions.iterator(); LOG.debug("Poll received records on {} partitions", partitions.size()); - ProcessingResult lastResult - = resultFromPreviousPoll == null ? ProcessingResult.newUnprocessed() : resultFromPreviousPoll; - while (partitionIterator.hasNext() && !isStopping()) { TopicPartition partition = partitionIterator.next(); @@ -76,36 +74,33 @@ public class KafkaRecordProcessorFacade { logRecordsInPartition(partitionRecords, partition); - while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) { + while (!result.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) { ConsumerRecord<Object, Object> consumerRecord = recordIterator.next(); LOG.debug("Processing record on partition {} with offset {}", consumerRecord.partition(), consumerRecord.offset()); - lastResult = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(), lastResult, + result = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(), kafkaRecordProcessor, consumerRecord); - LOG.debug( - "Processed record on partition {} with offset {} and got ProcessingResult for partition {} and offset {}", - consumerRecord.partition(), consumerRecord.offset(), lastResult.getPartition(), - lastResult.getPartitionLastOffset()); + LOG.debug("Processed record on partition {} with offset {}", consumerRecord.partition(), consumerRecord.offset()); if (consumerListener != null) { - if (!consumerListener.afterProcess(lastResult)) { + if (!consumerListener.afterProcess(result)) { commitManager.commit(partition); - return lastResult; + return result; } } } - if (!lastResult.isBreakOnErrorHit()) { + if (!result.isBreakOnErrorHit()) { LOG.debug("Committing offset on successful execution"); // all records processed from partition so commit them commitManager.commit(partition); } } - return lastResult; + return result; } private void logRecordsInPartition(List<ConsumerRecord<Object, Object>> partitionRecords, TopicPartition partition) { @@ -125,7 +120,6 @@ public class KafkaRecordProcessorFacade { TopicPartition partition, boolean partitionHasNext, boolean recordHasNext, - final ProcessingResult lastResult, KafkaRecordProcessor kafkaRecordProcessor, ConsumerRecord<Object, Object> consumerRecord) { @@ -133,18 +127,13 @@ public class KafkaRecordProcessorFacade { Exchange exchange = camelKafkaConsumer.createExchange(false); - ProcessingResult currentResult - = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext, - recordHasNext, consumerRecord, lastResult, camelKafkaConsumer.getExceptionHandler()); - - if (!currentResult.isBreakOnErrorHit()) { - commitManager.recordOffset(partition, currentResult.getPartitionLastOffset()); - } + ProcessingResult result = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext, + recordHasNext, consumerRecord, camelKafkaConsumer.getExceptionHandler()); // success so release the exchange camelKafkaConsumer.releaseExchange(exchange, false); - return currentResult; + return result; } private void logRecord(ConsumerRecord<Object, Object> consumerRecord) { diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java index fe3afd6ee8d..87f88c6e23d 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java @@ -17,24 +17,16 @@ package org.apache.camel.component.kafka.consumer.support; -import org.apache.camel.component.kafka.consumer.AbstractCommitManager; public final class ProcessingResult { private static final ProcessingResult UNPROCESSED_RESULT - = new ProcessingResult( - false, - AbstractCommitManager.NON_PARTITION, - AbstractCommitManager.START_OFFSET, false); + = new ProcessingResult(false, false); private final boolean breakOnErrorHit; - private final long lastPartition; - private final long partitionLastOffset; private final boolean failed; - ProcessingResult(boolean breakOnErrorHit, long lastPartition, long partitionLastOffset, boolean failed) { + ProcessingResult(boolean breakOnErrorHit, boolean failed) { this.breakOnErrorHit = breakOnErrorHit; - this.lastPartition = lastPartition; - this.partitionLastOffset = partitionLastOffset; this.failed = failed; } @@ -42,14 +34,6 @@ public final class ProcessingResult { return breakOnErrorHit; } - public long getPartitionLastOffset() { - return partitionLastOffset; - } - - public long getPartition() { - return lastPartition; - } - public boolean isFailed() { return failed; }