[ 
https://issues.apache.org/jira/browse/ARTEMIS-4314?focusedWorklogId=865879&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-865879
 ]

ASF GitHub Bot logged work on ARTEMIS-4314:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Jun/23 20:51
            Start Date: 15/Jun/23 20:51
    Worklog Time Spent: 10m 
      Work Description: tabish121 commented on code in PR #4509:
URL: https://github.com/apache/activemq-artemis/pull/4509#discussion_r1231516909


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -155,6 +167,67 @@ private synchronized void connect() throws Exception {
       }
    }
 
+   interface QueueHandle {
+      long getMessageCount();
+      int getCreditWindow();
+   }
+
+   private QueueHandle createQueueHandle(ActiveMQServer server, 
ClientSession.QueueQuery queryResult) {
+      final Queue queue = server.locateQueue(queryResult.getName());
+      int creditWindow = DEFAULT_CONSUMER_WINDOW_SIZE;
+
+      final Integer defaultConsumerWindowSize = 
queryResult.getDefaultConsumerWindowSize();
+      if (defaultConsumerWindowSize != null) {
+         creditWindow = defaultConsumerWindowSize.intValue();
+         if (creditWindow <= 0) {
+            creditWindow = DEFAULT_CONSUMER_WINDOW_SIZE;
+            logger.trace("{} override non positive queue consumerWindowSize 
with {}.", this, creditWindow);
+         }
+      }
+
+      final int finalCreditWindow = creditWindow;
+      return new QueueHandle() {
+         @Override
+         public long getMessageCount() {
+            return queue.getMessageCountForRing();
+         }
+
+         @Override
+         public int getCreditWindow() {
+            return finalCreditWindow;
+         }
+      };
+   }
+
+   private void scheduleCreditOnEmpty(final int delay, final QueueHandle 
handle) {
+      if (handle != null) {

Review Comment:
   Personally I'd treat a null handle being passed here as a terminal event and 
throw since it's probably gonna break something if you try and schedule for 
credit and don't actually schedule anything to replenish credit.  





Issue Time Tracking
-------------------

    Worklog Id:     (was: 865879)
    Time Spent: 40m  (was: 0.5h)

> Federation, support consumerWindowSize zero and federate in batches only when 
> the local queue is has excess capacity
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: ARTEMIS-4314
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4314
>             Project: ActiveMQ Artemis
>          Issue Type: Improvement
>          Components: Federation
>    Affects Versions: 2.28.0
>            Reporter: Gary Tully
>            Assignee: Gary Tully
>            Priority: Major
>             Fix For: 2.29.0
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Dual queue federation, where clusters federate in both direction can suffer 
> from message flip flopping once the priority adjustment kicks in.
> If there is a large backlog, the lower priority federation consumer is in 
> play once all of the local consumer credit is exhausted and the backlog can 
> drain to the other cluster.
> If demand is low there, the process can repeat. limiting the rate of the 
> federation consumer can help but it is not ideal b/c when there is no local 
> demand, we want to have a high rate of migration.
>  
> A possible solution is to have the federation consumer manage its own credit 
> and only flow messages when the local queue has capacity. Then flow a batch 
> of messages, and await again that the local queue has capacity. In this way, 
> there is no thundering herd effect, but there is also fast migration of 
> messages once there is demand.
> the consumerWindowSize=0 is already in play for consumer.receive calls and 
> there is already a defaultConsumerWindowSize for an address. These can be 
> combined to realise batchFederationOnCapacity semantics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to