This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 7bf1193380 ARTEMIS-4003 Fixing credit starve on Large Message over the
bridge or clustering
7bf1193380 is described below
commit 7bf1193380c8b15d8f5d0f14bd028e2380b104d6
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Sep 19 09:50:16 2022 -0400
ARTEMIS-4003 Fixing credit starve on Large Message over the bridge or
clustering
---
.../client/impl/AbstractProducerCreditsImpl.java | 36 ++++++--
.../impl/AsynchronousProducerCreditsImpl.java | 22 +++++
tests/smoke-tests/pom.xml | 57 ++++++++++++
.../ClusteredLargeMessageTest.java | 100 +++++++++++++++++++++
4 files changed, 209 insertions(+), 6 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
index 25b2d97c23..8b6b91f3dc 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
@@ -21,12 +21,15 @@ import
org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
+import org.jboss.logging.Logger;
public abstract class AbstractProducerCreditsImpl implements
ClientProducerCredits {
+ private static final Logger logger =
Logger.getLogger(AbstractProducerCreditsImpl.class);
+
protected int pendingCredits;
- private final int windowSize;
+ protected final int windowSize;
protected volatile boolean closed;
@@ -81,7 +84,9 @@ public abstract class AbstractProducerCreditsImpl implements
ClientProducerCredi
}
protected void afterAcquired(int credits) throws
ActiveMQAddressFullException {
- // check to see if the blocking mode is FAIL on the server
+ if (logger.isDebugEnabled()) {
+ logger.debugf("AfterAcquire %s credits on address %s", credits,
address);
+ }
synchronized (this) {
pendingCredits -= credits;
}
@@ -112,6 +117,7 @@ public abstract class AbstractProducerCreditsImpl
implements ClientProducerCredi
@Override
public synchronized void reset() {
+ logger.debugf("reset credits on address %s", address);
// Any pendingCredits credits from before failover won't arrive, so we
re-initialise
int beforeFailure = pendingCredits;
@@ -144,6 +150,9 @@ public abstract class AbstractProducerCreditsImpl
implements ClientProducerCredi
protected void checkCredits(final int credits) {
int needed = Math.max(credits, windowSize);
+ if (logger.isTraceEnabled()) {
+ logger.tracef("CheckCredits %s on address %s, needed=%s, credits=%s,
window=%s", credits, address, needed, credits, windowSize);
+ }
int toRequest = -1;
@@ -151,17 +160,32 @@ public abstract class AbstractProducerCreditsImpl
implements ClientProducerCredi
if (getBalance() + arriving < needed) {
toRequest = needed - arriving;
- pendingCredits += toRequest;
- arriving += toRequest;
+ if (logger.isTraceEnabled()) {
+ logger.tracef("CheckCredits on Address %s, requesting=%s,
arriving=%s, balance=%s", address, toRequest, arriving, getBalance());
+ }
+ } else {
+ if (logger.isTraceEnabled()) {
+ logger.tracef("CheckCredits did not need it, balance=%s,
arriving=%s, needed=%s, getbalance + arriving < needed=%s", getBalance(),
arriving, needed, (boolean)(getBalance() + arriving < needed));
+ }
}
}
- if (toRequest != -1) {
+ if (toRequest > 0) {
+ if (logger.isDebugEnabled()) {
+ logger.debugf("Requesting %s credits on address %s, needed = %s,
arriving = %s", toRequest, address, needed, arriving);
+ }
requestCredits(toRequest);
+ } else {
+ logger.debugf("not asking for %s credits on %s", toRequest, address);
}
}
- private void requestCredits(final int credits) {
+ protected void requestCredits(final int credits) {
+ logger.debugf("Request %s credits on address %s", credits, address);
+ synchronized (this) {
+ pendingCredits += credits;
+ arriving += credits;
+ }
session.sendProducerCreditsMessage(credits, address);
}
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
index a49122f1e7..7cdeb41e0d 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
@@ -18,9 +18,12 @@
package org.apache.activemq.artemis.core.client.impl;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.jboss.logging.Logger;
public class AsynchronousProducerCreditsImpl extends
AbstractProducerCreditsImpl {
+ private static final Logger logger =
Logger.getLogger(AsynchronousProducerCreditsImpl.class);
+
int balance;
final ClientProducerFlowCallback callback;
@@ -36,6 +39,9 @@ public class AsynchronousProducerCreditsImpl extends
AbstractProducerCreditsImpl
protected synchronized void actualAcquire(int credits) {
synchronized (this) {
balance -= credits;
+ if (logger.isDebugEnabled()) {
+ logger.debugf("actualAcquire on address %s with credits=%s,
balance=%s, callbackType=%s", address, credits, balance, callback.getClass());
+ }
if (balance <= 0) {
callback.onCreditsFlow(true, this);
}
@@ -53,7 +59,17 @@ public class AsynchronousProducerCreditsImpl extends
AbstractProducerCreditsImpl
synchronized (this) {
super.receiveCredits(credits);
balance += credits;
+ if (logger.isDebugEnabled()) {
+ logger.debugf("receiveCredits with credits=%s, balance=%s,
arriving=%s, callbackType=%s", credits, balance, arriving, callback.getClass());
+ }
callback.onCreditsFlow(balance <= 0, this);
+
+ if (balance < 0 && arriving == 0) {
+ // there are no more credits arriving and we are still negative,
async large message send asked too much and we need to counter balance
+ logger.debugf("Starve credits counter balance");
+ int request = -balance + windowSize * 2;
+ requestCredits(request);
+ }
}
}
@@ -62,6 +78,9 @@ public class AsynchronousProducerCreditsImpl extends
AbstractProducerCreditsImpl
@Override
public void receiveFailCredits(final int credits) {
super.receiveFailCredits(credits);
+ if (logger.isDebugEnabled()) {
+ logger.debugf("creditsFail %s, callback=%s", credits,
callback.getClass());
+ }
callback.onCreditsFail(this);
}
@@ -70,6 +89,9 @@ public class AsynchronousProducerCreditsImpl extends
AbstractProducerCreditsImpl
synchronized (this) {
balance = 0;
callback.onCreditsFlow(true, this);
+ if (logger.isDebugEnabled()) {
+ logger.debugf("releaseOutstanding credits, balance=%s,
callback=%s", balance, callback.getClass());
+ }
}
}
diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml
index 408822df28..5b9cf150ae 100644
--- a/tests/smoke-tests/pom.xml
+++ b/tests/smoke-tests/pom.xml
@@ -1132,6 +1132,63 @@
</args>
</configuration>
</execution>
+ <execution>
+ <phase>test-compile</phase>
+ <id>create-clustered-large-message1</id>
+ <goals>
+ <goal>create</goal>
+ </goals>
+ <configuration>
+ <role>amq</role>
+ <user>artemis</user>
+ <password>artemis</password>
+ <allowAnonymous>true</allowAnonymous>
+ <noWeb>true</noWeb>
+
<instance>${basedir}/target/clusteredLargeMessage/cluster1</instance>
+
<configuration>${basedir}/target/classes/servers/clusteredLargeMessage/cluster1</configuration>
+ <args>
+ <arg>--name</arg>
+ <arg>cluster1</arg>
+ <arg>--clustered</arg>
+ <arg>--staticCluster</arg>
+ <arg>tcp://localhost:61716</arg>
+ <arg>--max-hops</arg>
+ <arg>1</arg>
+ <arg>--queues</arg>
+ <arg>testQueue</arg>
+ </args>
+ </configuration>
+ </execution>
+ <execution>
+ <phase>test-compile</phase>
+ <id>create-cluster-large-message2</id>
+ <goals>
+ <goal>create</goal>
+ </goals>
+ <configuration>
+ <role>amq</role>
+ <user>artemis</user>
+ <password>artemis</password>
+ <allowAnonymous>true</allowAnonymous>
+ <noWeb>true</noWeb>
+
<instance>${basedir}/target/clusteredLargeMessage/cluster2</instance>
+
<configuration>${basedir}/target/classes/servers/clusteredLargeMessage/cluster2</configuration>
+ <args>
+ <arg>--name</arg>
+ <arg>cluster2</arg>
+ <arg>--clustered</arg>
+ <arg>--staticCluster</arg>
+ <arg>tcp://localhost:61616</arg>
+ <arg>--max-hops</arg>
+ <arg>1</arg>
+ <arg>--port-offset</arg>
+ <arg>100</arg>
+ <arg>--queues</arg>
+ <arg>testQueue</arg>
+ </args>
+ </configuration>
+ </execution>
+
<execution>
<phase>test-compile</phase>
<id>create-create-nettynative</id>
diff --git
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java
new file mode 100644
index 0000000000..eb6bece11f
--- /dev/null
+++
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.clusteredLargeMessage;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ClusteredLargeMessageTest extends SmokeTestBase {
+
+ public static final String SERVER_NAME_0 = "clusteredLargeMessage/cluster1";
+ public static final String SERVER_NAME_1 = "clusteredLargeMessage/cluster2";
+
+ Process server1Process;
+
+ @Before
+ public void before() throws Exception {
+ cleanupData(SERVER_NAME_0);
+ cleanupData(SERVER_NAME_1);
+ server1Process = startServer(SERVER_NAME_0, 0, 30000);
+ startServer(SERVER_NAME_1, 100, 30000);
+ }
+
+ @Test
+ public void testLargeMessage() throws Exception {
+
+ // I'm calling all 3 here as I want to run all of these with a single
server start
+ // without having to deal with beforeClass and afterClass on this test
+ internalTestLargeMessge("CORE");
+ internalTestLargeMessge("AMQP");
+ internalTestLargeMessge("OPENWIRE");
+ }
+
+ private void internalTestLargeMessge(String protocol) throws Exception {
+
+ ConnectionFactory server2CF = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:61716");
+ Connection connection2 = server2CF.createConnection();
+ Session session2 = connection2.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue2 = session2.createQueue("testQueue");
+ MessageConsumer consumer2 = session2.createConsumer(queue2);
+ connection2.start();
+
+ ConnectionFactory server1CF = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:61616");
+ Connection connection1 = server1CF.createConnection();
+ Session session1 = connection1.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue1 = session1.createQueue("testQueue");
+ MessageProducer producer1 = session1.createProducer(queue1);
+
+ String largeBody;
+
+ {
+ StringBuffer largeBodyBuffer = new StringBuffer();
+ while (largeBodyBuffer.length() < 2_000_000) {
+ largeBodyBuffer.append("This is large ");
+ }
+ largeBody = largeBodyBuffer.toString();
+ }
+
+ for (int i = 0; i < 10; i++) {
+ TextMessage message = session1.createTextMessage(largeBody);
+ message.setStringProperty("i", Integer.toString(i));
+ producer1.send(message);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ TextMessage message = (TextMessage) consumer2.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(largeBody, message.getText());
+ }
+
+ connection1.close();
+ connection2.close();
+ }
+}
+