This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 944e65b0ae245e157aa5df2405edaf57c918d63a Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Thu Apr 28 16:23:20 2022 +0200 (chores) camel-kafka: ensures all writes to the last error happen through the setter --- .../apache/camel/component/kafka/KafkaFetchRecords.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index 0f090a34d41..b1befdfa5c3 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -154,7 +154,7 @@ public class KafkaFetchRecords implements Runnable { setConnected(true); } - lastError = null; + setLastError(null); startPolling(); } while ((pollExceptionStrategy.canContinue() || isReconnect()) && isKafkaConsumerRunnable()); @@ -172,7 +172,7 @@ public class KafkaFetchRecords implements Runnable { String msg = "Gave up subscribing org.apache.kafka.clients.consumer.KafkaConsumer " + threadId + " to " + topic + " after " + max + " attempts (elapsed: " + time + ")."; LOG.warn(msg); - lastError = new KafkaConsumerFatalException(msg, lastError); + setLastError(new KafkaConsumerFatalException(msg, lastError)); } private void setupCreateConsumerException(ForegroundTask task, int max) { @@ -180,7 +180,8 @@ public class KafkaFetchRecords implements Runnable { String topic = getPrintableTopic(); String msg = "Gave up creating org.apache.kafka.clients.consumer.KafkaConsumer " + threadId + " to " + topic + " after " + max + " attempts (elapsed: " + time + ")."; - lastError = new KafkaConsumerFatalException(msg, lastError); + + setLastError(new KafkaConsumerFatalException(msg, lastError)); } private boolean initializeConsumerTask() { @@ -191,7 +192,7 @@ public class KafkaFetchRecords implements Runnable { // ensure this is logged so users can see the problem LOG.warn("Error subscribing org.apache.kafka.clients.consumer.KafkaConsumer due to: {}", e.getMessage(), e); - lastError = e; + setLastError(e); return false; } @@ -219,7 +220,7 @@ public class KafkaFetchRecords implements Runnable { // ensure this is logged so users can see the problem LOG.warn("Error creating org.apache.kafka.clients.consumer.KafkaConsumer due to: {}", e.getMessage(), e); - lastError = e; + setLastError(e); return false; } @@ -551,4 +552,7 @@ public class KafkaFetchRecords implements Runnable { state = State.RESUME_REQUESTED; } + private synchronized void setLastError(Exception lastError) { + this.lastError = lastError; + } }