This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new a76e9f81d2 Revert "ARTEMIS-4314 Small Tweak: using executor directly 
if no delay"
a76e9f81d2 is described below

commit a76e9f81d2b6f01ec946d9b81aef0a57306451cb
Author: Gary Tully <gary.tu...@gmail.com>
AuthorDate: Thu Jun 22 15:40:45 2023 +0100

    Revert "ARTEMIS-4314 Small Tweak: using executor directly if no delay"
    
    This reverts commit c6a82ff95ef94cd192fa17c515d535d42dbafb8e.
---
 .../federation/FederatedQueueConsumerImpl.java     | 36 +++++++++-------------
 1 file changed, 15 insertions(+), 21 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
index ca304719cf..d2470e5a6b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
@@ -208,29 +208,23 @@ public class FederatedQueueConsumerImpl implements 
FederatedQueueConsumer, Sessi
    }
 
    private void scheduleCreditOnEmpty(final int delay, final QueueHandle 
handle) {
-
-      Runnable runnable = () -> {
-         if (clientConsumer != null) {
-            if (0L == handle.getMessageCount()) {
-               flow(handle.getCreditWindow());
-               pendingPullCredit.set(handle.getCreditWindow());
-            } else {
-               if (0 == delay) {
-                  clientConsumer.resetIfSlowConsumer();
-                  pendingPullCredit.set(0);
+      scheduledExecutorService.schedule(() -> {
+         // use queue executor to sync on message count metric
+         handle.getExecutor().execute(() -> {
+            if (clientConsumer != null) {
+               if (0L == handle.getMessageCount()) {
+                  flow(handle.getCreditWindow());
+                  pendingPullCredit.set(handle.getCreditWindow());
+               } else {
+                  if (0 == delay) {
+                     clientConsumer.resetIfSlowConsumer();
+                     pendingPullCredit.set(0);
+                  }
+                  
scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay, 
intialConnectDelayMultiplier, intialConnectDelayMax), handle);
                }
-               
scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay, 
intialConnectDelayMultiplier, intialConnectDelayMax), handle);
             }
-         }
-      };
-
-      if (delay == 0) { // if delay==0 just use the executor directly
-         handle.getExecutor().execute(runnable);
-      } else {
-         scheduledExecutorService.schedule(() -> {
-            handle.getExecutor().execute(runnable);
-         }, delay, TimeUnit.SECONDS);
-      }
+         });
+      }, delay, TimeUnit.SECONDS);
    }
 
    private void flow(int creditWindow) {

Reply via email to