This is an automated email from the ASF dual-hosted git repository.
mattrpav pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new ec633b3d9b [AMQ-9773] Fix for only one message being recovered from
backup
ec633b3d9b is described below
commit ec633b3d9b0398cbe7814cd6bccf23f6c62fab15
Author: Matt Pavlovich <[email protected]>
AuthorDate: Wed Sep 24 18:02:12 2025 -0500
[AMQ-9773] Fix for only one message being recovered from backup
---
.../apache/activemq/store/kahadb/KahaDBStore.java | 21 +++++++++---
.../kahadb/KahaDBOffsetRecoveryListenerTest.java | 37 +++++++++++++++-------
2 files changed, 41 insertions(+), 17 deletions(-)
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index f52c49421e..d0fa3bda84 100644
---
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -754,6 +754,14 @@ public class KahaDBStore extends MessageDatabase
implements PersistenceAdapter,
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
+ /*
+ The endSequenceOffset is used when only endMessageId
is requested
+ there is a disconnect between iterator offset and a
destination's
+ sequence key.
+
+ If a destination has already processed messages, then
the sequence key
+ value is the number of total messages processed
through the queue all-time.
+ */
Long startSequenceOffset = null;
Long endSequenceOffset = null;
@@ -766,16 +774,15 @@ public class KahaDBStore extends MessageDatabase
implements PersistenceAdapter,
if(messageRecoveryContext.getEndMessageId() != null &&
!messageRecoveryContext.getEndMessageId().isBlank()) {
endSequenceOffset =
Optional.ofNullable(sd.messageIdIndex.get(tx,
messageRecoveryContext.getEndMessageId()))
.orElse(startSequenceOffset +
Long.valueOf(messageRecoveryContext.getMaxMessageCountReturned()));
- } else {
- endSequenceOffset = startSequenceOffset +
Long.valueOf(messageRecoveryContext.getMaxMessageCountReturned());
+
messageRecoveryContext.setEndSequenceId(endSequenceOffset);
}
- if(endSequenceOffset < startSequenceOffset) {
+ if(endSequenceOffset != null &&
+ endSequenceOffset < startSequenceOffset) {
LOG.warn("Invalid offset parameters start:{}
end:{}", startSequenceOffset, endSequenceOffset);
throw new IllegalStateException("Invalid offset
parameters start:" + startSequenceOffset + " end:" + endSequenceOffset);
}
-
messageRecoveryContext.setEndSequenceId(endSequenceOffset);
Entry<Long, MessageKeys> entry = null;
recoverRolledBackAcks(destination.getPhysicalName(),
sd, tx, messageRecoveryContext.getMaxMessageCountReturned(),
messageRecoveryContext);
Set<String> ackedAndPrepared =
ackedAndPreparedMap.get(destination.getPhysicalName());
@@ -796,7 +803,11 @@ public class KahaDBStore extends MessageDatabase
implements PersistenceAdapter,
break;
}
}
- sd.orderIndex.stoppedIterating();
+
+ // The sd.orderIndex uses the destination's cursor
+ if(!messageRecoveryContext.isUseDedicatedCursor()) {
+ sd.orderIndex.stoppedIterating();
+ }
}
});
} finally {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java
index 1b3f1e8afa..613b620d72 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java
@@ -32,7 +32,9 @@ import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,9 +54,17 @@ public class KahaDBOffsetRecoveryListenerTest {
protected BrokerService brokerService = null;
protected BrokerService restartBrokerService = null;
+ @Rule
+ public TestName testName = new TestName();
+
+ protected final int PRETEST_MSG_COUNT = 17531;
+
@Before
public void beforeEach() throws Exception {
-
+ // Send+Recv a odd number of messages beyond cache sizes
+ // to confirm the queue's sequence number gets pushed off
+ sendMessages(PRETEST_MSG_COUNT, testName.getMethodName());
+ assertEquals(Integer.valueOf(PRETEST_MSG_COUNT),
Integer.valueOf(receiveMessages(testName.getMethodName())));
}
@After
@@ -68,7 +78,7 @@ public class KahaDBOffsetRecoveryListenerTest {
broker.setUseJmx(false);
broker.setPersistenceAdapter(kaha);
broker.start();
- broker.waitUntilStarted(10_000l);
+ broker.waitUntilStarted(10_000L);
return broker;
}
@@ -130,7 +140,7 @@ public class KahaDBOffsetRecoveryListenerTest {
restartBrokerService = createBroker(createStore(false));
restartBrokerService.start();
- restartBrokerService.waitUntilStarted(30_000l);
+ restartBrokerService.waitUntilStarted(30_000L);
assertEquals(sendCount, receiveMessages(queueName));
restartBrokerService.stop();
@@ -139,42 +149,42 @@ public class KahaDBOffsetRecoveryListenerTest {
@Test
public void testOffsetZero() throws Exception {
- runOffsetTest(1_000, 1_000, 0, 1, 1, 0, "TEST.OFFSET.ZERO");
+ runOffsetTest(1_000, 1_000, 0, 1, 1, 0, testName.getMethodName());
}
@Test
public void testOffsetOne() throws Exception {
- runOffsetTest(1_000, 1_000, 1, 1, 1, 1, "TEST.OFFSET.ONE");
+ runOffsetTest(1_000, 1_000, 1, 1, 1, 1, testName.getMethodName());
}
@Test
public void testOffsetLastMinusOne() throws Exception {
- runOffsetTest(1_000, 1_000, 999, 1, 1, 999,
"TEST.OFFSET.LASTMINUSONE");
+ runOffsetTest(1_000, 1_000, 999, 1, 1, 999, testName.getMethodName());
}
@Test
public void testOffsetLast() throws Exception {
- runOffsetTest(1_000, 1_000, 1_000, 1, 0, -1, "TEST.OFFSET.LAST");
+ runOffsetTest(1_000, 1_000, 1_000, 1, 0, -1, testName.getMethodName());
}
@Test
public void testOffsetBeyondQueueSizeNoError() throws Exception {
- runOffsetTest(1_000, 1_000, 10_000, 1, 0, -1, "TEST.OFFSET.BEYOND");
+ runOffsetTest(1_000, 1_000, 10_000, 1, 0, -1,
testName.getMethodName());
}
@Test
public void testOffsetEmptyQueue() throws Exception {
- runOffsetTest(0, 0, 10_000, 1, 0, -1, "TEST.OFFSET.EMPTY");
+ runOffsetTest(0, 0, 10_000, 1, 0, -1, testName.getMethodName());
}
@Test
public void testOffsetWalk() throws Exception {
- runOffsetLoopTest(10_000, 10_000, 9_000, 200, 200, 9_000,
"TEST.OFFSET.WALK", 8, false);
+ runOffsetLoopTest(10_000, 10_000, 9_000, 200, 200, 9_000,
testName.getMethodName(), 8, false);
}
@Test
public void testOffsetRepeat() throws Exception {
- runOffsetLoopTest(10_000, 10_000, 7_000, 133, 133, 7_000,
"TEST.OFFSET.REPEAT", 10, true);
+ runOffsetLoopTest(10_000, 10_000, 7_000, 133, 133, 7_000,
testName.getMethodName(), 10, true);
}
private void sendMessages(int count, String queueName) throws JMSException
{
@@ -198,12 +208,15 @@ public class KahaDBOffsetRecoveryListenerTest {
private int receiveMessages(String queueName) throws JMSException {
int rc=0;
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("vm://localhost");
+ cf.setWatchTopicAdvisories(false);
+
Connection connection = cf.createConnection();
+
try {
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(new
ActiveMQQueue(queueName));
- while ( messageConsumer.receive(1000) !=null ) {
+ while (messageConsumer.receive(1_000L) != null) {
rc++;
}
return rc;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact