This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new f83fbc67234 CAMEL-20705: fix non-atomic operations on volatile fields on the ScheduledPollConsumer f83fbc67234 is described below commit f83fbc67234976285b40e4fe01eda35c96113421 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Tue Apr 23 13:27:10 2024 +0200 CAMEL-20705: fix non-atomic operations on volatile fields on the ScheduledPollConsumer --- .../impl/ScheduledPollConsumerBackoffTest.java | 10 ++-- .../camel/support/ScheduledPollConsumer.java | 70 ++++++++++++---------- 2 files changed, 43 insertions(+), 37 deletions(-) diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerBackoffTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerBackoffTest.java index 90c455d655d..6e4eb2d4b49 100644 --- a/core/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerBackoffTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerBackoffTest.java @@ -45,7 +45,7 @@ public class ScheduledPollConsumerBackoffTest extends ContextTestSupport { consumer.run(); consumer.run(); consumer.run(); - assertEquals(2, commits); + assertEquals(3, commits); // and now we poll again consumer.run(); consumer.run(); @@ -55,9 +55,9 @@ public class ScheduledPollConsumerBackoffTest extends ContextTestSupport { consumer.run(); consumer.run(); consumer.run(); - assertEquals(4, commits); + assertEquals(6, commits); consumer.run(); - assertEquals(5, commits); + assertEquals(6, commits); consumer.stop(); } @@ -78,7 +78,7 @@ public class ScheduledPollConsumerBackoffTest extends ContextTestSupport { consumer.run(); consumer.run(); consumer.run(); - assertEquals(3, errors); + assertEquals(4, errors); // and now we poll again consumer.run(); consumer.run(); @@ -89,7 +89,7 @@ public class ScheduledPollConsumerBackoffTest extends ContextTestSupport { consumer.run(); consumer.run(); consumer.run(); - assertEquals(6, errors); + assertEquals(8, errors); consumer.stop(); } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java index 7081fa6c7f5..98e26fb6b26 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java @@ -21,6 +21,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.camel.Component; @@ -72,10 +73,10 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer // state during running private volatile boolean polling; - private volatile int backoffCounter; - private volatile long idleCounter; - private volatile long errorCounter; - private volatile long successCounter; + private final AtomicInteger backoffCounter = new AtomicInteger(); + private final AtomicLong idleCounter = new AtomicLong(); + private final AtomicLong errorCounter = new AtomicLong(); + private final AtomicLong successCounter = new AtomicLong(); private volatile Throwable lastError; private volatile Map<String, Object> lastErrorDetails; private final AtomicLong counter = new AtomicLong(); @@ -147,24 +148,25 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer // should we backoff if its enabled, and either the idle or error counter is > the threshold if (backoffMultiplier > 0 // either idle or error threshold could be not in use, so check for that and use MAX_VALUE if not in use - && idleCounter >= (backoffIdleThreshold > 0 ? backoffIdleThreshold : Integer.MAX_VALUE) - || errorCounter >= (backoffErrorThreshold > 0 ? backoffErrorThreshold : Integer.MAX_VALUE)) { - if (backoffCounter++ < backoffMultiplier) { + && idleCounter.longValue() >= (backoffIdleThreshold > 0 ? backoffIdleThreshold : Integer.MAX_VALUE) + || errorCounter.longValue() >= (backoffErrorThreshold > 0 ? backoffErrorThreshold : Integer.MAX_VALUE)) { + final int currentBackoffCounter = backoffCounter.incrementAndGet(); + if (currentBackoffCounter < backoffMultiplier) { // yes we should backoff - if (idleCounter > 0) { - LOG.debug("doRun() backoff due subsequent {} idles (backoff at {}/{})", idleCounter, backoffCounter, - backoffMultiplier); + if (idleCounter.intValue() > 0) { + LOG.debug("doRun() backoff due subsequent {} idles (backoff at {}/{})", idleCounter.longValue(), + backoffCounter.intValue(), backoffMultiplier); } else { - LOG.debug("doRun() backoff due subsequent {} errors (backoff at {}/{})", errorCounter, backoffCounter, - backoffMultiplier); + LOG.debug("doRun() backoff due subsequent {} errors (backoff at {}/{})", errorCounter.intValue(), + backoffCounter.intValue(), backoffMultiplier); } return; } else { // we are finished with backoff so reset counters - idleCounter = 0; - errorCounter = 0; - backoffCounter = 0; - successCounter = 0; + idleCounter.set(0); + errorCounter.set(0); + backoffCounter.set(0); + successCounter.set(0); LOG.trace("doRun() backoff finished, resetting counters."); } } @@ -218,7 +220,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer // clear any error that might be since we have successfully polled, otherwise readiness checks might believe the // consumer to be unhealthy - errorCounter = 0; + errorCounter.set(0); lastError = null; lastErrorDetails = null; @@ -264,9 +266,9 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer } if (cause != null) { - idleCounter = 0; - successCounter = 0; - errorCounter++; + idleCounter.set(0); + successCounter.set(0); + errorCounter.incrementAndGet(); lastError = cause; // enrich last error with http response code if possible if (cause instanceof HttpResponseAware) { @@ -276,9 +278,13 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer } } } else { - idleCounter = polledMessages == 0 ? ++idleCounter : 0; - successCounter++; - errorCounter = 0; + if (polledMessages == 0) { + idleCounter.incrementAndGet(); + } else { + idleCounter.set(0); + } + successCounter.incrementAndGet(); + errorCounter.set(0); lastError = null; lastErrorDetails = null; } @@ -286,8 +292,8 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer // now first pool is done after the poll is complete firstPollDone = true; - LOG.trace("doRun() done with idleCounter={}, successCounter={}, errorCounter={}", idleCounter, successCounter, - errorCounter); + LOG.trace("doRun() done with idleCounter={}, successCounter={}, errorCounter={}", idleCounter.longValue(), + successCounter.longValue(), errorCounter.longValue()); // avoid this thread to throw exceptions because the thread pool wont re-schedule a new thread } @@ -406,7 +412,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer } public int getBackoffCounter() { - return backoffCounter; + return backoffCounter.intValue(); } public int getBackoffMultiplier() { @@ -460,7 +466,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer * @see #getSuccessCounter() */ public long getErrorCounter() { - return errorCounter; + return errorCounter.longValue(); } /** @@ -470,7 +476,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer * @see #getErrorCounter() */ public long getSuccessCounter() { - return successCounter; + return successCounter.longValue(); } /** @@ -693,10 +699,10 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer } // clear counters - backoffCounter = 0; - idleCounter = 0; - errorCounter = 0; - successCounter = 0; + backoffCounter.set(0); + idleCounter.set(0); + errorCounter.set(0); + successCounter.set(0); counter.set(0); // clear ready state firstPollDone = false;