This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 7bf1193380 ARTEMIS-4003 Fixing credit starve on Large Message over the 
bridge or clustering
7bf1193380 is described below

commit 7bf1193380c8b15d8f5d0f14bd028e2380b104d6
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Sep 19 09:50:16 2022 -0400

    ARTEMIS-4003 Fixing credit starve on Large Message over the bridge or 
clustering
---
 .../client/impl/AbstractProducerCreditsImpl.java   |  36 ++++++--
 .../impl/AsynchronousProducerCreditsImpl.java      |  22 +++++
 tests/smoke-tests/pom.xml                          |  57 ++++++++++++
 .../ClusteredLargeMessageTest.java                 | 100 +++++++++++++++++++++
 4 files changed, 209 insertions(+), 6 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
index 25b2d97c23..8b6b91f3dc 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
@@ -21,12 +21,15 @@ import 
org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
+import org.jboss.logging.Logger;
 
 public abstract class AbstractProducerCreditsImpl implements 
ClientProducerCredits {
 
+   private static final Logger logger = 
Logger.getLogger(AbstractProducerCreditsImpl.class);
+
    protected int pendingCredits;
 
-   private final int windowSize;
+   protected final int windowSize;
 
    protected volatile boolean closed;
 
@@ -81,7 +84,9 @@ public abstract class AbstractProducerCreditsImpl implements 
ClientProducerCredi
    }
 
    protected void afterAcquired(int credits) throws 
ActiveMQAddressFullException {
-      // check to see if the blocking mode is FAIL on the server
+      if (logger.isDebugEnabled()) {
+         logger.debugf("AfterAcquire %s credits on address %s", credits, 
address);
+      }
       synchronized (this) {
          pendingCredits -= credits;
       }
@@ -112,6 +117,7 @@ public abstract class AbstractProducerCreditsImpl 
implements ClientProducerCredi
 
    @Override
    public synchronized void reset() {
+      logger.debugf("reset credits on address %s", address);
       // Any pendingCredits credits from before failover won't arrive, so we 
re-initialise
 
       int beforeFailure = pendingCredits;
@@ -144,6 +150,9 @@ public abstract class AbstractProducerCreditsImpl 
implements ClientProducerCredi
 
    protected void checkCredits(final int credits) {
       int needed = Math.max(credits, windowSize);
+      if (logger.isTraceEnabled()) {
+         logger.tracef("CheckCredits %s on address %s, needed=%s, credits=%s, 
window=%s", credits, address, needed, credits, windowSize);
+      }
 
       int toRequest = -1;
 
@@ -151,17 +160,32 @@ public abstract class AbstractProducerCreditsImpl 
implements ClientProducerCredi
          if (getBalance() + arriving < needed) {
             toRequest = needed - arriving;
 
-            pendingCredits += toRequest;
-            arriving += toRequest;
+            if (logger.isTraceEnabled()) {
+               logger.tracef("CheckCredits on Address %s, requesting=%s, 
arriving=%s, balance=%s", address, toRequest, arriving, getBalance());
+            }
+         } else {
+            if (logger.isTraceEnabled()) {
+               logger.tracef("CheckCredits did not need it, balance=%s, 
arriving=%s,  needed=%s, getbalance + arriving < needed=%s", getBalance(), 
arriving, needed, (boolean)(getBalance() + arriving < needed));
+            }
          }
       }
 
-      if (toRequest != -1) {
+      if (toRequest > 0) {
+         if (logger.isDebugEnabled()) {
+            logger.debugf("Requesting %s credits on address %s, needed = %s, 
arriving = %s", toRequest, address, needed, arriving);
+         }
          requestCredits(toRequest);
+      } else {
+         logger.debugf("not asking for %s credits on %s", toRequest, address);
       }
    }
 
-   private void requestCredits(final int credits) {
+   protected void requestCredits(final int credits) {
+      logger.debugf("Request %s credits on address %s", credits, address);
+      synchronized (this) {
+         pendingCredits += credits;
+         arriving += credits;
+      }
       session.sendProducerCreditsMessage(credits, address);
    }
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
index a49122f1e7..7cdeb41e0d 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
@@ -18,9 +18,12 @@
 package org.apache.activemq.artemis.core.client.impl;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.jboss.logging.Logger;
 
 public class AsynchronousProducerCreditsImpl extends 
AbstractProducerCreditsImpl {
 
+   private static final Logger logger = 
Logger.getLogger(AsynchronousProducerCreditsImpl.class);
+
    int balance;
 
    final ClientProducerFlowCallback callback;
@@ -36,6 +39,9 @@ public class AsynchronousProducerCreditsImpl extends 
AbstractProducerCreditsImpl
    protected synchronized void actualAcquire(int credits) {
       synchronized (this) {
          balance -= credits;
+         if (logger.isDebugEnabled()) {
+            logger.debugf("actualAcquire on address %s with credits=%s, 
balance=%s, callbackType=%s", address, credits, balance, callback.getClass());
+         }
          if (balance <= 0) {
             callback.onCreditsFlow(true, this);
          }
@@ -53,7 +59,17 @@ public class AsynchronousProducerCreditsImpl extends 
AbstractProducerCreditsImpl
       synchronized (this) {
          super.receiveCredits(credits);
          balance += credits;
+         if (logger.isDebugEnabled()) {
+            logger.debugf("receiveCredits with credits=%s, balance=%s, 
arriving=%s, callbackType=%s", credits, balance, arriving, callback.getClass());
+         }
          callback.onCreditsFlow(balance <= 0, this);
+
+         if (balance < 0 && arriving == 0) {
+            // there are no more credits arriving and we are still negative, 
async large message send asked too much and we need to counter balance
+            logger.debugf("Starve credits counter balance");
+            int request = -balance + windowSize * 2;
+            requestCredits(request);
+         }
       }
 
    }
@@ -62,6 +78,9 @@ public class AsynchronousProducerCreditsImpl extends 
AbstractProducerCreditsImpl
    @Override
    public void receiveFailCredits(final int credits) {
       super.receiveFailCredits(credits);
+      if (logger.isDebugEnabled()) {
+         logger.debugf("creditsFail %s, callback=%s", credits, 
callback.getClass());
+      }
       callback.onCreditsFail(this);
    }
 
@@ -70,6 +89,9 @@ public class AsynchronousProducerCreditsImpl extends 
AbstractProducerCreditsImpl
       synchronized (this) {
          balance = 0;
          callback.onCreditsFlow(true, this);
+         if (logger.isDebugEnabled()) {
+            logger.debugf("releaseOutstanding credits, balance=%s, 
callback=%s", balance, callback.getClass());
+         }
       }
 
    }
diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml
index 408822df28..5b9cf150ae 100644
--- a/tests/smoke-tests/pom.xml
+++ b/tests/smoke-tests/pom.xml
@@ -1132,6 +1132,63 @@
                      </args>
                   </configuration>
                </execution>
+               <execution>
+                  <phase>test-compile</phase>
+                  <id>create-clustered-large-message1</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <role>amq</role>
+                     <user>artemis</user>
+                     <password>artemis</password>
+                     <allowAnonymous>true</allowAnonymous>
+                     <noWeb>true</noWeb>
+                     
<instance>${basedir}/target/clusteredLargeMessage/cluster1</instance>
+                     
<configuration>${basedir}/target/classes/servers/clusteredLargeMessage/cluster1</configuration>
+                     <args>
+                        <arg>--name</arg>
+                        <arg>cluster1</arg>
+                        <arg>--clustered</arg>
+                        <arg>--staticCluster</arg>
+                        <arg>tcp://localhost:61716</arg>
+                        <arg>--max-hops</arg>
+                        <arg>1</arg>
+                        <arg>--queues</arg>
+                        <arg>testQueue</arg>
+                     </args>
+                  </configuration>
+               </execution>
+               <execution>
+                  <phase>test-compile</phase>
+                  <id>create-cluster-large-message2</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <role>amq</role>
+                     <user>artemis</user>
+                     <password>artemis</password>
+                     <allowAnonymous>true</allowAnonymous>
+                     <noWeb>true</noWeb>
+                     
<instance>${basedir}/target/clusteredLargeMessage/cluster2</instance>
+                     
<configuration>${basedir}/target/classes/servers/clusteredLargeMessage/cluster2</configuration>
+                     <args>
+                        <arg>--name</arg>
+                        <arg>cluster2</arg>
+                        <arg>--clustered</arg>
+                        <arg>--staticCluster</arg>
+                        <arg>tcp://localhost:61616</arg>
+                        <arg>--max-hops</arg>
+                        <arg>1</arg>
+                        <arg>--port-offset</arg>
+                        <arg>100</arg>
+                        <arg>--queues</arg>
+                        <arg>testQueue</arg>
+                     </args>
+                  </configuration>
+               </execution>
+
                <execution>
                   <phase>test-compile</phase>
                   <id>create-create-nettynative</id>
diff --git 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java
 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java
new file mode 100644
index 0000000000..eb6bece11f
--- /dev/null
+++ 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.clusteredLargeMessage;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ClusteredLargeMessageTest extends SmokeTestBase {
+
+   public static final String SERVER_NAME_0 = "clusteredLargeMessage/cluster1";
+   public static final String SERVER_NAME_1 = "clusteredLargeMessage/cluster2";
+
+   Process server1Process;
+
+   @Before
+   public void before() throws Exception {
+      cleanupData(SERVER_NAME_0);
+      cleanupData(SERVER_NAME_1);
+      server1Process = startServer(SERVER_NAME_0, 0, 30000);
+      startServer(SERVER_NAME_1, 100, 30000);
+   }
+
+   @Test
+   public void testLargeMessage() throws Exception {
+
+      // I'm calling all 3 here as I want to run all of these with a single 
server start
+      // without having to deal with beforeClass and afterClass on this test
+      internalTestLargeMessge("CORE");
+      internalTestLargeMessge("AMQP");
+      internalTestLargeMessge("OPENWIRE");
+   }
+
+   private void internalTestLargeMessge(String protocol) throws Exception {
+
+      ConnectionFactory server2CF = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61716");
+      Connection connection2 = server2CF.createConnection();
+      Session session2 = connection2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Queue queue2 = session2.createQueue("testQueue");
+      MessageConsumer consumer2 = session2.createConsumer(queue2);
+      connection2.start();
+
+      ConnectionFactory server1CF = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+      Connection connection1 = server1CF.createConnection();
+      Session session1 = connection1.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Queue queue1 = session1.createQueue("testQueue");
+      MessageProducer producer1 = session1.createProducer(queue1);
+
+      String largeBody;
+
+      {
+         StringBuffer largeBodyBuffer = new StringBuffer();
+         while (largeBodyBuffer.length() < 2_000_000) {
+            largeBodyBuffer.append("This is large ");
+         }
+         largeBody = largeBodyBuffer.toString();
+      }
+
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = session1.createTextMessage(largeBody);
+         message.setStringProperty("i", Integer.toString(i));
+         producer1.send(message);
+      }
+
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = (TextMessage) consumer2.receive(5000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals(largeBody, message.getText());
+      }
+
+      connection1.close();
+      connection2.close();
+   }
+}
+

Reply via email to