This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 944e65b0ae245e157aa5df2405edaf57c918d63a
Author: Otavio Rodolfo Piske <angusyo...@gmail.com>
AuthorDate: Thu Apr 28 16:23:20 2022 +0200

    (chores) camel-kafka: ensures all writes to the last error happen through 
the setter
---
 .../apache/camel/component/kafka/KafkaFetchRecords.java    | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 0f090a34d41..b1befdfa5c3 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -154,7 +154,7 @@ public class KafkaFetchRecords implements Runnable {
                 setConnected(true);
             }
 
-            lastError = null;
+            setLastError(null);
             startPolling();
         } while ((pollExceptionStrategy.canContinue() || isReconnect()) && 
isKafkaConsumerRunnable());
 
@@ -172,7 +172,7 @@ public class KafkaFetchRecords implements Runnable {
         String msg = "Gave up subscribing 
org.apache.kafka.clients.consumer.KafkaConsumer " +
                      threadId + " to " + topic + " after " + max + " attempts 
(elapsed: " + time + ").";
         LOG.warn(msg);
-        lastError = new KafkaConsumerFatalException(msg, lastError);
+        setLastError(new KafkaConsumerFatalException(msg, lastError));
     }
 
     private void setupCreateConsumerException(ForegroundTask task, int max) {
@@ -180,7 +180,8 @@ public class KafkaFetchRecords implements Runnable {
         String topic = getPrintableTopic();
         String msg = "Gave up creating 
org.apache.kafka.clients.consumer.KafkaConsumer "
                      + threadId + " to " + topic + " after " + max + " 
attempts (elapsed: " + time + ").";
-        lastError = new KafkaConsumerFatalException(msg, lastError);
+
+        setLastError(new KafkaConsumerFatalException(msg, lastError));
     }
 
     private boolean initializeConsumerTask() {
@@ -191,7 +192,7 @@ public class KafkaFetchRecords implements Runnable {
             // ensure this is logged so users can see the problem
             LOG.warn("Error subscribing 
org.apache.kafka.clients.consumer.KafkaConsumer due to: {}", e.getMessage(),
                     e);
-            lastError = e;
+            setLastError(e);
             return false;
         }
 
@@ -219,7 +220,7 @@ public class KafkaFetchRecords implements Runnable {
             // ensure this is logged so users can see the problem
             LOG.warn("Error creating 
org.apache.kafka.clients.consumer.KafkaConsumer due to: {}", e.getMessage(),
                     e);
-            lastError = e;
+            setLastError(e);
             return false;
         }
 
@@ -551,4 +552,7 @@ public class KafkaFetchRecords implements Runnable {
         state = State.RESUME_REQUESTED;
     }
 
+    private synchronized void setLastError(Exception lastError) {
+        this.lastError = lastError;
+    }
 }

Reply via email to