This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new d8bb561dad ARTEMIS-5895 OpenWire TX Commit shouldn't send Success when
in doubt
d8bb561dad is described below
commit d8bb561dad4f2197daf2e84216f158787dbdd6e9
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Feb 23 14:47:37 2026 -0500
ARTEMIS-5895 OpenWire TX Commit shouldn't send Success when in doubt
---
.../artemis/cli/commands/helper/HelperCreate.java | 15 +-
.../core/protocol/openwire/OpenWireConnection.java | 15 +-
.../OpenWireSharedStoreFailoverSmokeTest.java | 271 +++++++++++++++++++++
3 files changed, 289 insertions(+), 12 deletions(-)
diff --git
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/helper/HelperCreate.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/helper/HelperCreate.java
index 3bd744fb06..d0caf13bde 100644
---
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/helper/HelperCreate.java
+++
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/helper/HelperCreate.java
@@ -78,6 +78,8 @@ public class HelperCreate extends HelperBase {
String dataFolder = "./data";
+ private boolean useAIO = "Linux".equals(System.getProperty("os.name"));
+
private boolean failoverOnShutdown = false;
private boolean noAutoTune = true;
@@ -93,6 +95,15 @@ public class HelperCreate extends HelperBase {
return this;
}
+ public boolean isUseAIO() {
+ return useAIO;
+ }
+
+ public HelperCreate setUseAIO(boolean useAIO) {
+ this.useAIO = useAIO;
+ return this;
+ }
+
public boolean isNoWeb() {
return noWeb;
}
@@ -339,8 +350,10 @@ public class HelperCreate extends HelperBase {
add(listCommands, "--verbose");
- if ("Linux".equals(System.getProperty("os.name"))) {
+ if (useAIO) {
add(listCommands, "--aio");
+ } else {
+ add(listCommands, "--nio");
}
for (String str : args) {
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 3d4ab9757e..15415c68b3 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -736,17 +736,10 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
destroyed = true;
- //before closing transport, sendCommand the last response if any
- Command command = context.getLastCommand();
- if (command != null && command.isResponseRequired()) {
- Response lastResponse = new Response();
- lastResponse.setCorrelationId(command.getCommandId());
- try {
- dispatchSync(lastResponse);
- } catch (Throwable e) {
- logger.warn(e.getMessage(), e);
- }
- }
+ // In a previous version we used to send the last command's response
here,
+ // however if that command is a TX commit, we would confirm something we
can't be sure has completed,
+ // so it's better to leave it in doubt and let the client fail than to
send a false success response.
+
if (fail) {
shutdown(fail);
}
diff --git
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/failover/OpenWireSharedStoreFailoverSmokeTest.java
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/failover/OpenWireSharedStoreFailoverSmokeTest.java
new file mode 100644
index 0000000000..3a8d79960d
--- /dev/null
+++
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/failover/OpenWireSharedStoreFailoverSmokeTest.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.failover;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import org.apache.activemq.artemis.cli.commands.helper.HelperCreate;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class OpenWireSharedStoreFailoverSmokeTest extends SmokeTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final String SERVER_NAME_LIVE = "openwire-failover-live";
+ public static final String SERVER_NAME_BACKUP = "openwire-failover-backup";
+
+ private static final String QUEUE_NAME = "FailoverTestQueue";
+ private static final int NUMBER_OF_MESSAGES = 10000;
+ private static final int PRODUCERS = 50;
+ private static final int FAILOVER_AT_MESSAGE = 1000;
+
+ private static String sharedDataPath;
+ private Process liveServer;
+ private Process backupServer;
+
+ @BeforeAll
+ public static void createServers() throws Exception {
+ // Set up shared storage path
+ File sharedStorage = new File(getFileServerLocation(SERVER_NAME_LIVE),
"shared-storage");
+ sharedDataPath = sharedStorage.getAbsolutePath();
+
+ createLiveServer();
+ createBackupServer();
+ sharedStorage.mkdirs();
+ }
+
+ private static void createLiveServer() throws Exception {
+ File serverLocation = getFileServerLocation(SERVER_NAME_LIVE);
+ deleteDirectory(serverLocation);
+
+ HelperCreate cliCreateServer = helperCreate();
+
cliCreateServer.setUseAIO(false).setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation).setSharedStore(true).setClustered(true).setStaticCluster("tcp://localhost:61617").setDataFolder(sharedDataPath).setFailoverOnShutdown(true).setMessageLoadBalancing("OFF");
+
+ cliCreateServer.createServer();
+ }
+
+ private static void createBackupServer() throws Exception {
+ File serverLocation = getFileServerLocation(SERVER_NAME_BACKUP);
+ deleteDirectory(serverLocation);
+
+ HelperCreate cliCreateServer = helperCreate();
+
cliCreateServer.setUseAIO(false).setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation).setSharedStore(true).setBackup(true).setClustered(true).setStaticCluster("tcp://localhost:61616").setPortOffset(1).setDataFolder(sharedDataPath).setMessageLoadBalancing("OFF");
+ cliCreateServer.createServer();
+ }
+
+ @BeforeEach
+ public void before() throws Exception {
+ cleanupData(SERVER_NAME_LIVE);
+ cleanupData(SERVER_NAME_BACKUP);
+
+ // Clean shared storage
+ File sharedStorage = new File(sharedDataPath);
+ deleteDirectory(sharedStorage);
+ sharedStorage.mkdirs();
+
+ disableCheckThread();
+
+ liveServer = startServer(SERVER_NAME_LIVE, 0, 0);
+ assertTrue(ServerUtil.waitForServerToStartOnPort(61616, null, null,
30000));
+
+ backupServer = startServer(SERVER_NAME_BACKUP, 0, 0);
+ }
+
+ @AfterEach
+ @Override
+ public void after() throws Exception {
+ super.after();
+ }
+
+ @Test
+ public void testOpenWire() throws Exception {
+ String failoverURL =
"failover:(tcp://localhost:61616,tcp://localhost:61617)";
+ ConnectionFactory factory = CFUtil.createConnectionFactory("OPENWIRE",
failoverURL);
+
+ AtomicInteger errors = new AtomicInteger(0);
+ CountDownLatch producersLatch = new CountDownLatch(PRODUCERS);
+ CountDownLatch failoverLatch = new CountDownLatch(FAILOVER_AT_MESSAGE);
+
+ ExecutorService executor = Executors.newFixedThreadPool(PRODUCERS);
+ runAfter(executor::shutdownNow);
+
+ CyclicBarrier startFlag = new CyclicBarrier(PRODUCERS);
+
+ ConcurrentHashSet<String> duplicateIDs = new ConcurrentHashSet<>();
+
+ for (int producerID = 0; producerID < PRODUCERS; producerID++) {
+ final int theProducerID = producerID;
+ executor.execute(() -> {
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ javax.jms.Queue queue = session.createQueue(QUEUE_NAME);
+ javax.jms.MessageProducer producer =
session.createProducer(queue);
+
+ int messagesPerProducer = NUMBER_OF_MESSAGES / PRODUCERS;
+
+ startFlag.await(10, TimeUnit.SECONDS);
+ int totalSent = 0;
+
+ for (int i = 0; i < messagesPerProducer; i++) {
+ boolean messageCommitted = false;
+ int retryCount = 0;
+ final int maxRetries = 100;
+
+ String duplicateId = generateDuplicateID(theProducerID, i);
+
+ while (!messageCommitted && retryCount < maxRetries) {
+ try {
+ TextMessage message =
session.createTextMessage("Message from producer " + theProducerID + ",
sequence " + i);
+ message.setIntProperty("producerID", theProducerID);
+ message.setIntProperty("sequence", i);
+ // Set duplicate detection ID
+ message.setStringProperty("_AMQ_DUPL_ID", duplicateId);
+ producer.send(message);
+ session.commit();
+ duplicateIDs.add(duplicateId);
+ messageCommitted = true;
+
+ if (totalSent++ % 100 == 0) {
+ logger.info("Producer {} committed message {}.
Total confirmed: {}", theProducerID, i, totalSent);
+ }
+ failoverLatch.countDown();
+ } catch (Exception commitException) {
+
+ if (commitException.getMessage().contains("Duplicate
message detected")) {
+ logger.info("Duplicate, it's okay");
+ duplicateIDs.add(duplicateId);
+ session.rollback();
+ messageCommitted = true;
+ } else {
+ logger.warn("Producer {} commit failed (retry {}):
{}", theProducerID, retryCount, commitException.getMessage());
+ try {
+ session.rollback();
+ logger.info("Producer {} rolled back message
{}", theProducerID, i);
+ } catch (Exception rollbackException) {
+ logger.warn("Producer {} rollback failed: {}",
theProducerID, rollbackException.getMessage());
+ }
+ retryCount++;
+ Thread.sleep(100); // Brief pause before retry
+ }
+ }
+ }
+
+ if (!messageCommitted) {
+ logger.error("Producer {} failed to commit message {}
after {} retries", theProducerID, i, maxRetries);
+ errors.incrementAndGet();
+ break;
+ }
+ }
+
+ } catch (Exception e) {
+ logger.warn("Producer {} encountered fatal error: {}",
theProducerID, e.getMessage(), e);
+ errors.incrementAndGet();
+ } finally {
+ producersLatch.countDown();
+ }
+ });
+ }
+
+ assertTrue(failoverLatch.await(10, TimeUnit.SECONDS));
+ logger.info("Failover signal received, killing live server");
+ if (liveServer != null && liveServer.isAlive()) {
+ stopServerWithFile(getServerLocation(SERVER_NAME_LIVE));
+ liveServer.waitFor(1, TimeUnit.MINUTES);
+ }
+
+ // Wait for all producers to complete
+ assertTrue(producersLatch.await(10, TimeUnit.MINUTES), "Producers did
not complete in time");
+ assertEquals(0, errors.get(), "Errors occurred during sending");
+
+ int duplicateSize = duplicateIDs.size();
+ logger.info("Total confirmed messages sent: {}", duplicateSize);
+ assertEquals(NUMBER_OF_MESSAGES, duplicateSize, "Should have confirmed
all messages (with retries during failover)");
+
+ SimpleManagement management = new
SimpleManagement("tcp://localhost:61617", null, null);
+ Wait.waitFor(() -> management.getMessageCountOnQueue(QUEUE_NAME) >=
duplicateIDs.size(), 5000, 100);
+
+ long numberOfMessages = management.getMessageCountOnQueue(QUEUE_NAME);
+
+ // Consume and verify all messages
+ try (Connection consumerConnection = factory.createConnection()) {
+ consumerConnection.start();
+ Session consumerSession = consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
consumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME));
+
+ AtomicInteger receivedCount = new AtomicInteger(0);
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ TextMessage message = (TextMessage) consumer.receive(10000);
+ assertNotNull(message, "Should receive message " + i);
+
+ int producerID = message.getIntProperty("producerID");
+ int sequence = message.getIntProperty("sequence");
+ String duplicateID = generateDuplicateID(producerID, sequence);
+
+ duplicateIDs.remove(duplicateID);
+ receivedCount.incrementAndGet();
+
+ if ((i + 1) % 100 == 0) {
+ logger.info("Received {} messages so far", i + 1);
+ }
+ }
+ assertNull(consumer.receiveNoWait());
+
+ duplicateIDs.forEach(s -> logger.warn("DuplicateID not received {}",
duplicateIDs));
+
+ logger.info("Total messages received: {}", receivedCount);
+ assertTrue(receivedCount.get() >= duplicateSize, () -> "Should
receive exactly the confirmed count without duplicates " + receivedCount.get()
+ ", confirmed = " + duplicateSize);
+
+ assertTrue(duplicateIDs.isEmpty());
+ }
+
+ logger.info("Test completed successfully. Confirmed: {}, Verified in
queue", duplicateSize);
+ }
+
+ private static String generateDuplicateID(int producerID, int sequence) {
+ return "DUP:" + producerID + ":" + sequence;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]