This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new f83fbc67234 CAMEL-20705: fix non-atomic operations on volatile fields 
on the ScheduledPollConsumer
f83fbc67234 is described below

commit f83fbc67234976285b40e4fe01eda35c96113421
Author: Otavio Rodolfo Piske <angusyo...@gmail.com>
AuthorDate: Tue Apr 23 13:27:10 2024 +0200

    CAMEL-20705: fix non-atomic operations on volatile fields on the 
ScheduledPollConsumer
---
 .../impl/ScheduledPollConsumerBackoffTest.java     | 10 ++--
 .../camel/support/ScheduledPollConsumer.java       | 70 ++++++++++++----------
 2 files changed, 43 insertions(+), 37 deletions(-)

diff --git 
a/core/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerBackoffTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerBackoffTest.java
index 90c455d655d..6e4eb2d4b49 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerBackoffTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerBackoffTest.java
@@ -45,7 +45,7 @@ public class ScheduledPollConsumerBackoffTest extends 
ContextTestSupport {
         consumer.run();
         consumer.run();
         consumer.run();
-        assertEquals(2, commits);
+        assertEquals(3, commits);
         // and now we poll again
         consumer.run();
         consumer.run();
@@ -55,9 +55,9 @@ public class ScheduledPollConsumerBackoffTest extends 
ContextTestSupport {
         consumer.run();
         consumer.run();
         consumer.run();
-        assertEquals(4, commits);
+        assertEquals(6, commits);
         consumer.run();
-        assertEquals(5, commits);
+        assertEquals(6, commits);
 
         consumer.stop();
     }
@@ -78,7 +78,7 @@ public class ScheduledPollConsumerBackoffTest extends 
ContextTestSupport {
         consumer.run();
         consumer.run();
         consumer.run();
-        assertEquals(3, errors);
+        assertEquals(4, errors);
         // and now we poll again
         consumer.run();
         consumer.run();
@@ -89,7 +89,7 @@ public class ScheduledPollConsumerBackoffTest extends 
ContextTestSupport {
         consumer.run();
         consumer.run();
         consumer.run();
-        assertEquals(6, errors);
+        assertEquals(8, errors);
 
         consumer.stop();
     }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
index 7081fa6c7f5..98e26fb6b26 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
@@ -21,6 +21,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.Component;
@@ -72,10 +73,10 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer
 
     // state during running
     private volatile boolean polling;
-    private volatile int backoffCounter;
-    private volatile long idleCounter;
-    private volatile long errorCounter;
-    private volatile long successCounter;
+    private final AtomicInteger backoffCounter = new AtomicInteger();
+    private final AtomicLong idleCounter = new AtomicLong();
+    private final AtomicLong errorCounter = new AtomicLong();
+    private final AtomicLong successCounter = new AtomicLong();
     private volatile Throwable lastError;
     private volatile Map<String, Object> lastErrorDetails;
     private final AtomicLong counter = new AtomicLong();
@@ -147,24 +148,25 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer
         // should we backoff if its enabled, and either the idle or error 
counter is > the threshold
         if (backoffMultiplier > 0
                 // either idle or error threshold could be not in use, so 
check for that and use MAX_VALUE if not in use
-                && idleCounter >= (backoffIdleThreshold > 0 ? 
backoffIdleThreshold : Integer.MAX_VALUE)
-                || errorCounter >= (backoffErrorThreshold > 0 ? 
backoffErrorThreshold : Integer.MAX_VALUE)) {
-            if (backoffCounter++ < backoffMultiplier) {
+                && idleCounter.longValue() >= (backoffIdleThreshold > 0 ? 
backoffIdleThreshold : Integer.MAX_VALUE)
+                || errorCounter.longValue() >= (backoffErrorThreshold > 0 ? 
backoffErrorThreshold : Integer.MAX_VALUE)) {
+            final int currentBackoffCounter = backoffCounter.incrementAndGet();
+            if (currentBackoffCounter < backoffMultiplier) {
                 // yes we should backoff
-                if (idleCounter > 0) {
-                    LOG.debug("doRun() backoff due subsequent {} idles 
(backoff at {}/{})", idleCounter, backoffCounter,
-                            backoffMultiplier);
+                if (idleCounter.intValue() > 0) {
+                    LOG.debug("doRun() backoff due subsequent {} idles 
(backoff at {}/{})", idleCounter.longValue(),
+                            backoffCounter.intValue(), backoffMultiplier);
                 } else {
-                    LOG.debug("doRun() backoff due subsequent {} errors 
(backoff at {}/{})", errorCounter, backoffCounter,
-                            backoffMultiplier);
+                    LOG.debug("doRun() backoff due subsequent {} errors 
(backoff at {}/{})", errorCounter.intValue(),
+                            backoffCounter.intValue(), backoffMultiplier);
                 }
                 return;
             } else {
                 // we are finished with backoff so reset counters
-                idleCounter = 0;
-                errorCounter = 0;
-                backoffCounter = 0;
-                successCounter = 0;
+                idleCounter.set(0);
+                errorCounter.set(0);
+                backoffCounter.set(0);
+                successCounter.set(0);
                 LOG.trace("doRun() backoff finished, resetting counters.");
             }
         }
@@ -218,7 +220,7 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer
 
                                 // clear any error that might be since we have 
successfully polled, otherwise readiness checks might believe the
                                 // consumer to be unhealthy
-                                errorCounter = 0;
+                                errorCounter.set(0);
                                 lastError = null;
                                 lastErrorDetails = null;
 
@@ -264,9 +266,9 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer
         }
 
         if (cause != null) {
-            idleCounter = 0;
-            successCounter = 0;
-            errorCounter++;
+            idleCounter.set(0);
+            successCounter.set(0);
+            errorCounter.incrementAndGet();
             lastError = cause;
             // enrich last error with http response code if possible
             if (cause instanceof HttpResponseAware) {
@@ -276,9 +278,13 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer
                 }
             }
         } else {
-            idleCounter = polledMessages == 0 ? ++idleCounter : 0;
-            successCounter++;
-            errorCounter = 0;
+            if (polledMessages == 0) {
+                idleCounter.incrementAndGet();
+            } else {
+                idleCounter.set(0);
+            }
+            successCounter.incrementAndGet();
+            errorCounter.set(0);
             lastError = null;
             lastErrorDetails = null;
         }
@@ -286,8 +292,8 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer
         // now first pool is done after the poll is complete
         firstPollDone = true;
 
-        LOG.trace("doRun() done with idleCounter={}, successCounter={}, 
errorCounter={}", idleCounter, successCounter,
-                errorCounter);
+        LOG.trace("doRun() done with idleCounter={}, successCounter={}, 
errorCounter={}", idleCounter.longValue(),
+                successCounter.longValue(), errorCounter.longValue());
 
         // avoid this thread to throw exceptions because the thread pool wont 
re-schedule a new thread
     }
@@ -406,7 +412,7 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer
     }
 
     public int getBackoffCounter() {
-        return backoffCounter;
+        return backoffCounter.intValue();
     }
 
     public int getBackoffMultiplier() {
@@ -460,7 +466,7 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer
      * @see #getSuccessCounter()
      */
     public long getErrorCounter() {
-        return errorCounter;
+        return errorCounter.longValue();
     }
 
     /**
@@ -470,7 +476,7 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer
      * @see #getErrorCounter()
      */
     public long getSuccessCounter() {
-        return successCounter;
+        return successCounter.longValue();
     }
 
     /**
@@ -693,10 +699,10 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer
         }
 
         // clear counters
-        backoffCounter = 0;
-        idleCounter = 0;
-        errorCounter = 0;
-        successCounter = 0;
+        backoffCounter.set(0);
+        idleCounter.set(0);
+        errorCounter.set(0);
+        successCounter.set(0);
         counter.set(0);
         // clear ready state
         firstPollDone = false;

Reply via email to