This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new fe206c4c52 ARTEMIS-5741 ensure scaledown exits and stops external
components
fe206c4c52 is described below
commit fe206c4c5229cdbfab6242bbaf5bf6e1c4a7adf6
Author: Gary Tully <[email protected]>
AuthorDate: Tue Nov 4 17:44:05 2025 +0000
ARTEMIS-5741 ensure scaledown exits and stops external components
---
.../management/impl/ActiveMQServerControlImpl.java | 2 +-
.../tests/integration/server/ScaleDownTest.java | 47 +++++++++++++++++++---
2 files changed, 42 insertions(+), 7 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 22a877be5c..c5cde06c83 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -4206,7 +4206,7 @@ public class ActiveMQServerControlImpl extends
AbstractControl implements Active
primaryOnlyPolicy.getScaleDownPolicy().getConnectors().add(0,
connector);
}
- server.fail(true);
+ server.stop(true, true);
}
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
index 5b1df39646..bdfb88546a 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
@@ -27,6 +27,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
@@ -39,13 +40,16 @@ import
org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
import
org.apache.activemq.artemis.core.config.ha.PrimaryOnlyPolicyConfiguration;
+import
org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ServiceComponent;
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
@@ -61,6 +65,7 @@ import
org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -77,6 +82,7 @@ public class ScaleDownTest extends ClusterTestBase {
private static final String AMQP_ACCEPTOR_URI = "tcp://127.0.0.1:5672";
private boolean useScaleDownGroupName;
+ private final AtomicBoolean shutdownTrueCalledOnExternalComponent = new
AtomicBoolean();
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@@ -113,6 +119,34 @@ public class ScaleDownTest extends ClusterTestBase {
startServers(0, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
+
+ // verify external service component like web gets shutdown on stop
+ shutdownTrueCalledOnExternalComponent.set(false);
+ servers[0].addExternalComponent(new ServiceComponent() {
+ @Override
+ public void start() throws Exception {
+ }
+
+ @Override
+ public void stop() throws Exception {
+ }
+
+ @Override
+ public boolean isStarted() {
+ return false;
+ }
+
+ @Override
+ public void stop(boolean shutdown) throws Exception {
+ shutdownTrueCalledOnExternalComponent.set(shutdown);
+ }
+
+ }, false);
+ }
+
+ @AfterEach
+ public void checkExternalComponentStoppedOnZero() {
+ assertEquals(true, shutdownTrueCalledOnExternalComponent.get());
}
protected boolean isNetty() {
@@ -153,7 +187,8 @@ public class ScaleDownTest extends ClusterTestBase {
assertEquals(TEST_SIZE - 1, getMessageCount(((LocalQueueBinding)
servers[0].getPostOffice().getBinding(SimpleString.of(queueName2))).getQueue()));
// trigger scaleDown from node 0 to node 1
- servers[0].stop();
+ ActiveMQServerControlImpl serverControl = (ActiveMQServerControlImpl)
servers[0].getManagementService().getResource(ResourceNames.BROKER);
+ serverControl.scaleDown(null);
// get the 2 messages from queue 1
addConsumer(0, 1, queueName1, null);
@@ -204,8 +239,8 @@ public class ScaleDownTest extends ClusterTestBase {
//In normal case server1 will check if the reconnection's scaleDown
//server has been scaled down before granting the connection.
//but if the scaleDown is server1 itself, it should grant
- //the connection without checking scaledown state against it.
- //Otherwise the connection will never be estabilished, and more,
+ //the connection without checking scaledown state against it,
+ //Otherwise the connection will never be established, and more,
//the repetitive reconnect attempts will cause
//ClientSessionFactory's closeExecutor to be filled with
//tasks that keep growing until OOM.
@@ -279,7 +314,7 @@ public class ScaleDownTest extends ClusterTestBase {
send(0, addressName1, TEST_SIZE, false, null);
send(0, addressName2, TEST_SIZE, false, null);
- // add consumers to node 1 to force messages messages to redistribute to
node 2 through the paused sf queue
+ // add consumers to node 1 to force messages to redistribute to node 2
through the paused sf queue
addConsumer(0, 1, queueName1, null);
addConsumer(1, 1, queueName2, null);
@@ -582,7 +617,7 @@ public class ScaleDownTest extends ClusterTestBase {
buffer[i] = getSamplebyte(i);
}
- for (int nmsg = 0; nmsg < 10; nmsg++) {
+ for (int numMessages = 0; numMessages < 10; numMessages++) {
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeBytes(buffer);
producer.send(message);
@@ -596,7 +631,7 @@ public class ScaleDownTest extends ClusterTestBase {
ClientConsumer consumer =
addClientConsumer(session.createConsumer(queueName));
session.start();
- for (int nmsg = 0; nmsg < 10; nmsg++) {
+ for (int numMessages = 0; numMessages < 10; numMessages++) {
ClientMessage msg = consumer.receive(250);
assertNotNull(msg);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact