This is an automated email from the ASF dual-hosted git repository.
jlmonteiro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new d58ca06151 fix(#1731): adapter deadlock (#1735)
d58ca06151 is described below
commit d58ca06151b72c809bd9d013f05e8f5565279b5a
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Mon Mar 9 09:49:22 2026 +0100
fix(#1731): adapter deadlock (#1735)
* fix(#1731): resolve deadlock in rollbackPendingCursorAdditions by
invoking onCompletion outside synchronized block
* test(#1731): attempt to simplify the test code by using only one inner
class to implement both.
---
.../org/apache/activemq/broker/region/Queue.java | 15 +-
.../jdbc/h2/H2JDBCDeadlockOnSendExceptionTest.java | 264 +++++++++++++++++++++
2 files changed, 275 insertions(+), 4 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 2e537db360..048512cafb 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -790,18 +790,25 @@ public class Queue extends BaseDestination implements
Task, UsageListener, Index
}
public void rollbackPendingCursorAdditions(MessageId messageId) {
+ MessageContext toRollback = null;
synchronized (indexOrderedCursorUpdates) {
for (int i = indexOrderedCursorUpdates.size() - 1; i >= 0; i--) {
- MessageContext mc = indexOrderedCursorUpdates.get(i);
+ final MessageContext mc = indexOrderedCursorUpdates.get(i);
if (mc.message.getMessageId().equals(messageId)) {
indexOrderedCursorUpdates.remove(mc);
- if (mc.onCompletion != null) {
- mc.onCompletion.run();
- }
+ toRollback = mc;
break;
}
}
}
+ // Invoke onCompletion outside synchronized(indexOrderedCursorUpdates)
to avoid a
+ // lock-ordering deadlock with JDBCMessageStore.addMessage, which holds
+ // pendingAdditions while calling indexListener.onAdd() (which acquires
+ // indexOrderedCursorUpdates). The onCompletion callback acquires
pendingAdditions,
+ // so calling it inside the lock produces a deadlock cycle.
+ if (toRollback != null && toRollback.onCompletion != null) {
+ toRollback.onCompletion.run();
+ }
}
private void doPendingCursorAdditions() throws Exception {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/h2/H2JDBCDeadlockOnSendExceptionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/h2/H2JDBCDeadlockOnSendExceptionTest.java
new file mode 100644
index 0000000000..75e6fdd615
--- /dev/null
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/h2/H2JDBCDeadlockOnSendExceptionTest.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc.h2;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import jakarta.jms.Connection;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.store.IndexListener;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.ProxyMessageStore;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.jdbc.TransactionContext;
+import org.apache.activemq.store.jdbc.adapter.H2JDBCAdapter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Reproduces the JDBC store deadlock described in GitHub issue #1731.
+ *
+ * <p>Lock-ordering inversion between:
+ * <ul>
+ * <li>{@code pendingAdditions} (JDBCMessageStore) — held by {@code
addMessage} while calling
+ * {@code indexListener.onAdd()}, and needed later by the
cursor-completion callback</li>
+ * <li>{@code indexOrderedCursorUpdates} (Queue) — held by {@code
rollbackPendingCursorAdditions}
+ * while calling that same cursor-completion callback</li>
+ * </ul>
+ *
+ * <p>Fix: move {@code mc.onCompletion.run()} outside {@code
synchronized(indexOrderedCursorUpdates)}
+ * in {@code Queue.rollbackPendingCursorAdditions}.
+ *
+ * @see <a href="https://github.com/apache/activemq/issues/1731">GitHub
#1731</a>
+ */
+public class H2JDBCDeadlockOnSendExceptionTest {
+
+ private static final String QUEUE_NAME = "test.deadlock.queue";
+
+ private BrokerService broker;
+ private ActiveMQConnectionFactory connectionFactory;
+ private final DeadlockCoordinator coordinator = new DeadlockCoordinator();
+
+ /** Fail the test if it hangs beyond 30 seconds — that indicates a
deadlock. */
+ @Rule
+ public final Timeout testTimeout = Timeout.seconds(30);
+
+ @Before
+ public void setUp() throws Exception {
+ broker = new BrokerService();
+ broker.setUseJmx(false);
+ broker.setPersistenceAdapter(createJDBCAdapter());
+ broker.addConnector("tcp://0.0.0.0:0");
+ broker.start();
+ broker.waitUntilStarted();
+ connectionFactory = new ActiveMQConnectionFactory(
+
broker.getTransportConnectors().get(0).getPublishableConnectString());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (broker != null && broker.isStarted()) {
+ broker.stop();
+ }
+ }
+
+ private JDBCPersistenceAdapter createJDBCAdapter() throws IOException {
+ final JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter() {
+ @Override
+ public MessageStore createQueueMessageStore(final ActiveMQQueue
destination) throws IOException {
+ final MessageStore base =
super.createQueueMessageStore(destination);
+ // Intercept registerIndexListener to wrap Queue with the
coordinator.
+ return new ProxyMessageStore(base) {
+ @Override
+ public void registerIndexListener(final IndexListener
indexListener) {
+ coordinator.setQueueListener(indexListener);
+ super.registerIndexListener(coordinator);
+ }
+ };
+ }
+ };
+ jdbc.setDataSource(H2DB.createDataSource("H2JDBCDeadlockTest"));
+ jdbc.setAdapter(coordinator.newAdapter());
+ jdbc.deleteAllMessages();
+ jdbc.setUseLock(false);
+ return jdbc;
+ }
+
+ /**
+ * Verifies that no deadlock occurs when a JDBC exception fires during
{@code addMessage}
+ * while another {@code addMessage} executes concurrently.
+ *
+ * <p>Before the fix: test hangs → {@code @Rule Timeout} fails it.
+ * After the fix: both threads complete normally.
+ */
+ @Test
+ public void testNoDeadlockOnJDBCException() throws Exception {
+ final CountDownLatch rollbackHoldsIndexLock = new CountDownLatch(1);
+ final CountDownLatch threadBHoldsPendingLock = new CountDownLatch(1);
+ coordinator.arm(rollbackHoldsIndexLock, threadBHoldsPendingLock);
+
+ final ExecutorService executor = Executors.newFixedThreadPool(2);
+ final CountDownLatch allDone = new CountDownLatch(2);
+
+ // Thread A: sends the message whose DB write will be injected to fail.
+ // The failure triggers Queue.rollbackPendingCursorAdditions, which
acquires
+ // indexOrderedCursorUpdates and calls mc.onCompletion (inside that
lock before fix).
+ executor.execute(() -> {
+ try (final Connection conn = connectionFactory.createConnection();
+ final Session session = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE)) {
+ conn.start();
+ session.createProducer(session.createQueue(QUEUE_NAME))
+ .send(session.createTextMessage("will-fail"));
+ } catch (final Exception ignored) {
+ // Expected: the broker propagates the injected IOException as
JMSException.
+ } finally {
+ allDone.countDown();
+ }
+ });
+
+ // Wait until Thread A's rollback has acquired
indexOrderedCursorUpdates.
+ assertTrue("Thread A rollback should start",
rollbackHoldsIndexLock.await(10, TimeUnit.SECONDS));
+
+ // Thread B: sends a normal message while Thread A's rollback holds
indexOrderedCursorUpdates.
+ // Thread B enters synchronized(pendingAdditions) in addMessage, then
calls
+ // indexListener.onAdd() which needs indexOrderedCursorUpdates →
BLOCKED (before fix).
+ // Meanwhile Thread A needs pendingAdditions (held by Thread B) →
DEADLOCK.
+ executor.execute(() -> {
+ try (final Connection conn = connectionFactory.createConnection();
+ final Session session = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE)) {
+ conn.start();
+ session.createProducer(session.createQueue(QUEUE_NAME))
+ .send(session.createTextMessage("should-succeed"));
+ } catch (final Exception ignored) {
+ } finally {
+ allDone.countDown();
+ }
+ });
+
+ assertTrue("Both threads should complete without deadlock",
+ allDone.await(15, TimeUnit.SECONDS));
+ executor.shutdown();
+ }
+
+ //
-------------------------------------------------------------------------
+
+ /**
+ * Coordinates the precise timing needed to expose the ABBA lock cycle.
+ *
+ * <p>Implements {@link IndexListener} to intercept {@code onAdd()} calls:
+ * <ul>
+ * <li>For the <em>first</em> message (Thread A's fail message): wraps
{@code mc.onCompletion}
+ * so that when it is called from inside {@code
synchronized(indexOrderedCursorUpdates)}
+ * it signals Thread B and waits for Thread B to hold {@code
pendingAdditions} before
+ * trying to acquire it — completing the deadlock cycle.</li>
+ * <li>For the <em>second</em> message (Thread B): signals Thread A that
+ * {@code pendingAdditions} is held, then calls the real {@code
onAdd} which tries
+ * to acquire {@code indexOrderedCursorUpdates} (held by Thread A) —
BLOCKED.</li>
+ * </ul>
+ *
+ * <p>Also provides a {@link H2JDBCAdapter} that fails the first {@code
doAddMessage}
+ * call, which is what pushes Thread A into the rollback path.
+ */
+ static class DeadlockCoordinator implements IndexListener {
+
+ private IndexListener queueListener;
+ private CountDownLatch rollbackHoldsIndexLock;
+ private CountDownLatch threadBHoldsPendingLock;
+
+ /** Set to true (on Thread A) in onAdd so the adapter throws on the
same thread. */
+ private final AtomicBoolean failNextWrite = new AtomicBoolean(false);
+ /** Flips to false after Thread A's onCompletion wrapper is
registered. */
+ private final AtomicBoolean firstMessage = new AtomicBoolean(true);
+
+ void setQueueListener(final IndexListener queueListener) {
+ this.queueListener = queueListener;
+ }
+
+ void arm(final CountDownLatch rollbackHoldsIndexLock,
+ final CountDownLatch threadBHoldsPendingLock) {
+ this.rollbackHoldsIndexLock = rollbackHoldsIndexLock;
+ this.threadBHoldsPendingLock = threadBHoldsPendingLock;
+ }
+
+ H2JDBCAdapter newAdapter() {
+ return new H2JDBCAdapter() {
+ @Override
+ public void doAddMessage(final TransactionContext c, final
long sequence,
+ final MessageId messageID,
+ final ActiveMQDestination
destination,
+ final byte[] data, final long
expiration,
+ final byte priority, final
XATransactionId xid)
+ throws SQLException, IOException {
+ if (failNextWrite.compareAndSet(true, false)) {
+ throw new SQLException(
+ "Simulated DB failure to reproduce deadlock
(GitHub #1731)", "S1000");
+ }
+ super.doAddMessage(c, sequence, messageID, destination,
+ data, expiration, priority, xid);
+ }
+ };
+ }
+
+ @Override
+ public void onAdd(final IndexListener.MessageContext mc) {
+ if (firstMessage.compareAndSet(true, false)) {
+ // Thread A — inside synchronized(pendingAdditions) in
JDBCMessageStore.addMessage.
+ // Mark the write to fail, then wrap onCompletion with
deadlock-triggering logic.
+ failNextWrite.set(true);
+ final Runnable original = mc.onCompletion;
+ final IndexListener.MessageContext wrapped = new
IndexListener.MessageContext(
+ mc.context, mc.message, () -> {
+ // Called from Queue.rollbackPendingCursorAdditions
while holding
+ // synchronized(indexOrderedCursorUpdates).
+ rollbackHoldsIndexLock.countDown(); //
signal Thread B to start
+ try {
+ threadBHoldsPendingLock.await(10,
TimeUnit.SECONDS); // wait for Thread B
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ original.run(); // needs pendingAdditions → DEADLOCK
if Thread B holds it
+ });
+ queueListener.onAdd(wrapped);
+ } else {
+ // Thread B — also inside synchronized(pendingAdditions).
+ // Signal Thread A that pendingAdditions is now held.
+ threadBHoldsPendingLock.countDown();
+ // Needs indexOrderedCursorUpdates (held by Thread A's
rollback) → BLOCKED.
+ queueListener.onAdd(mc);
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact