This is an automated email from the ASF dual-hosted git repository. gtully pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new a76e9f81d2 Revert "ARTEMIS-4314 Small Tweak: using executor directly if no delay" a76e9f81d2 is described below commit a76e9f81d2b6f01ec946d9b81aef0a57306451cb Author: Gary Tully <gary.tu...@gmail.com> AuthorDate: Thu Jun 22 15:40:45 2023 +0100 Revert "ARTEMIS-4314 Small Tweak: using executor directly if no delay" This reverts commit c6a82ff95ef94cd192fa17c515d535d42dbafb8e. --- .../federation/FederatedQueueConsumerImpl.java | 36 +++++++++------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java index ca304719cf..d2470e5a6b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java @@ -208,29 +208,23 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi } private void scheduleCreditOnEmpty(final int delay, final QueueHandle handle) { - - Runnable runnable = () -> { - if (clientConsumer != null) { - if (0L == handle.getMessageCount()) { - flow(handle.getCreditWindow()); - pendingPullCredit.set(handle.getCreditWindow()); - } else { - if (0 == delay) { - clientConsumer.resetIfSlowConsumer(); - pendingPullCredit.set(0); + scheduledExecutorService.schedule(() -> { + // use queue executor to sync on message count metric + handle.getExecutor().execute(() -> { + if (clientConsumer != null) { + if (0L == handle.getMessageCount()) { + flow(handle.getCreditWindow()); + pendingPullCredit.set(handle.getCreditWindow()); + } else { + if (0 == delay) { + clientConsumer.resetIfSlowConsumer(); + pendingPullCredit.set(0); + } + scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax), handle); } - scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax), handle); } - } - }; - - if (delay == 0) { // if delay==0 just use the executor directly - handle.getExecutor().execute(runnable); - } else { - scheduledExecutorService.schedule(() -> { - handle.getExecutor().execute(runnable); - }, delay, TimeUnit.SECONDS); - } + }); + }, delay, TimeUnit.SECONDS); } private void flow(int creditWindow) {