This is an automated email from the ASF dual-hosted git repository.

mattrpav pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit dad947fe4aca288256733bbb1bc442e593f26196
Author: Matt Pavlovich <mattr...@apache.org>
AuthorDate: Tue Apr 23 11:43:09 2024 -0500

    [AMQ-9484] Support exporting kahadb messages from a queue with an offset
    
    (cherry picked from commit 1a1b42f0c9bd784b8c8aee4f42deab140d7480e3)
---
 .../org/apache/activemq/store/MessageStore.java    |   2 +
 .../apache/activemq/store/ProxyMessageStore.java   |   5 +
 .../activemq/store/memory/MemoryMessageStore.java  |  26 +++
 .../activemq/store/jdbc/JDBCMessageStore.java      |  13 ++
 .../apache/activemq/store/kahadb/KahaDBStore.java  |  41 ++++
 .../activemq/store/kahadb/TempKahaDBStore.java     |  31 +++
 .../region/cursors/StoreQueueCursorOrderTest.java  |   5 +
 .../kahadb/KahaDBOffsetRecoveryListenerTest.java   | 221 +++++++++++++++++++++
 8 files changed, 344 insertions(+)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java 
b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
index aee619a27a..e35327f848 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
@@ -178,6 +178,8 @@ public interface MessageStore extends Service {
 
     void recoverNextMessages(int maxReturned, MessageRecoveryListener 
listener) throws Exception;
 
+    void recoverNextMessages(int offset, int maxReturned, 
MessageRecoveryListener listener) throws Exception;
+
     void dispose(ConnectionContext context);
 
     /**
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
index cd319a65d3..a4fb4be5b8 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
@@ -110,6 +110,11 @@ public class ProxyMessageStore implements MessageStore {
         delegate.recoverNextMessages(maxReturned, listener);
     }
 
+    @Override
+    public void recoverNextMessages(int offset, int maxReturned, 
MessageRecoveryListener listener) throws Exception {
+        delegate.recoverNextMessages(offset, maxReturned, listener);
+    }
+
     @Override
     public void resetBatching() {
         delegate.resetBatching();
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
index 953c83e19a..f0857fb960 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
@@ -130,6 +130,32 @@ public class MemoryMessageStore extends 
AbstractMessageStore {
         }
     }
 
+    @Override
+    public void recoverNextMessages(int offset, int maxReturned, 
MessageRecoveryListener listener) throws Exception {
+        synchronized (messageTable) {
+            boolean pastLackBatch = lastBatchId == null;
+            int position = 0;
+            for (Map.Entry<MessageId, Message> entry : 
messageTable.entrySet()) {
+                if(offset > 0 && offset > position) {
+                    position++;
+                    continue;
+                }
+                if (pastLackBatch) {
+                    Object msg = entry.getValue();
+                    lastBatchId = entry.getKey();
+                    if (msg.getClass() == MessageId.class) {
+                        listener.recoverMessageReference((MessageId) msg);
+                    } else {
+                        listener.recoverMessage((Message) msg);
+                    }
+                } else {
+                    pastLackBatch = entry.getKey().equals(lastBatchId);
+                }
+                position++;
+            }
+        }
+    }
+
     @Override
     public void resetBatching() {
         lastBatchId = null;
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index 8adc2f78ee..70ddb7ab1e 100644
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -398,6 +398,19 @@ public class JDBCMessageStore extends AbstractMessageStore 
{
 
     }
 
+    /**
+     * @param offset
+     * @param maxReturned
+     * @param listener
+     * @throws Exception
+     * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
+     *      org.apache.activemq.store.MessageRecoveryListener)
+     */
+    @Override
+    public void recoverNextMessages(int offset, int maxReturned, final 
MessageRecoveryListener listener) throws Exception {
+        throw new 
UnsupportedOperationException("recoverNextMesage(offset,maxReturned,listener) 
is not supported.");
+    }
+
     public void trackRollbackAck(Message message) {
         synchronized (rolledBackAcks) {
             rolledBackAcks.put((Long)message.getMessageId().getEntryLocator(), 
message);
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 50224a5dfa..31a86bcae0 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -732,6 +732,47 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
             }
         }
 
+        @Override
+        public void recoverNextMessages(final int offset, final int 
maxReturned, final MessageRecoveryListener listener) throws Exception {
+            indexLock.writeLock().lock();
+            try {
+                pageFile.tx().execute(new Transaction.Closure<Exception>() {
+                    @Override
+                    public void execute(Transaction tx) throws Exception {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        Entry<Long, MessageKeys> entry = null;
+                        int position = 0;
+                        int counter = 
recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, 
listener);
+                        Set ackedAndPrepared = 
ackedAndPreparedMap.get(destination.getPhysicalName());
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = 
sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
+                            entry = iterator.next();
+
+                            if (ackedAndPrepared != null && 
ackedAndPrepared.contains(entry.getValue().messageId)) {
+                                continue;
+                            }
+
+                            if(offset > 0 && offset > position) {
+                                position++;
+                                continue;
+                            }
+
+                            Message msg = 
loadMessage(entry.getValue().location);
+                            
msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
+                            listener.recoverMessage(msg);
+                            counter++;
+                            position++;
+                            if (counter >= maxReturned || 
!listener.canRecoveryNextMessage()) {
+                                break;
+                            }
+                        }
+                        sd.orderIndex.stoppedIterating();
+                    }
+                });
+            } finally {
+                indexLock.writeLock().unlock();
+            }
+        }
+
         protected int recoverRolledBackAcks(String recoveredTxStateMapKey, 
StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener 
listener) throws Exception {
             int counter = 0;
             String id;
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
index 7048b09a44..7835a1b7b6 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
@@ -241,6 +241,37 @@ public class TempKahaDBStore extends TempMessageDatabase 
implements PersistenceA
             }
         }
 
+        @Override
+        public void recoverNextMessages(final int offset, final int 
maxReturned, final MessageRecoveryListener listener) throws Exception {
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<Exception>(){
+                    @Override
+                    public void execute(Transaction tx) throws Exception {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        Entry<Long, MessageRecord> entry=null;
+                        int counter = 0;
+                        int position = 0;
+                        for (Iterator<Entry<Long, MessageRecord>> iterator = 
sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+                            entry = iterator.next();
+                            if(offset > 0 && offset > position) {
+                                position++;
+                                continue;
+                            }
+                            listener.recoverMessage( (Message) 
wireFormat.unmarshal(entry.getValue().data ) );
+                            counter++;
+                            position++;
+                            if( counter >= maxReturned ) {
+                                break;
+                            }
+                        }
+                        if( entry!=null ) {
+                            cursorPos = entry.getKey()+1;
+                        }
+                    }
+                });
+            }
+        }
+
         @Override
         public void resetBatching() {
             cursorPos=0;
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
index 5a1ab90d58..cbb1579b57 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
@@ -510,6 +510,11 @@ public class StoreQueueCursorOrderTest {
             }
         }
 
+        @Override
+        public void recoverNextMessages(int offset, int maxReturned, 
MessageRecoveryListener listener) throws Exception {
+
+        }
+
         @Override
         public void setBatch(MessageId message) {
             batch.set((Long)message.getFutureOrSequenceLong());
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java
new file mode 100644
index 0000000000..9ea4b68a84
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import jakarta.jms.Connection;
+import jakarta.jms.JMSException;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Session;
+import jakarta.jms.TextMessage;
+
+import static org.junit.Assert.assertEquals;
+
+public class KahaDBOffsetRecoveryListenerTest {
+
+    protected BrokerService brokerService = null;
+    protected KahaDBStore kaha = null;
+
+    @Before
+    public void beforeEach() throws Exception {
+
+    }
+
+    @After
+    public void afterEach() {
+        brokerService = null;
+        kaha = null;
+    }
+
+    protected BrokerService createBroker(KahaDBStore kaha) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.setPersistenceAdapter(kaha);
+        broker.start();
+        broker.waitUntilStarted(10_000l);
+        return broker;
+    }
+
+    private KahaDBStore createStore(boolean delete) throws IOException {
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new 
File("target/activemq-data/kahadb-recovery-tests"));
+        if( delete ) {
+            kaha.deleteAllMessages();
+        }
+        return kaha;
+    }
+
+    protected void runOffsetTest(int sendCount, int expectedMessageCount, int 
recoverOffset, int recoverCount, int expectedRecoverCount, int 
expectedRecoverIndex, String queueName) throws Exception {
+        kaha = createStore(true);
+        kaha.setJournalMaxFileLength(1024*100);
+        brokerService = createBroker(kaha);
+        sendMessages(sendCount, queueName);
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+
+        TestMessageRecoveryListener testMessageRecoveryListener = new 
TestMessageRecoveryListener();
+        kaha = createStore(false);
+        kaha.start();
+        MessageStore messageStore = kaha.createQueueMessageStore(new 
ActiveMQQueue(queueName));
+        messageStore.start();
+        assertEquals(Integer.valueOf(expectedMessageCount), 
Integer.valueOf(messageStore.getMessageCount()));
+        messageStore.recoverNextMessages(recoverOffset, recoverCount, 
testMessageRecoveryListener);
+        messageStore.stop();
+        kaha.stop();
+
+        assertEquals(Integer.valueOf(expectedRecoverCount), 
Integer.valueOf(testMessageRecoveryListener.getRecoveredMessages().size()));
+
+        if(expectedRecoverIndex >= 0) {
+            assertEquals(Integer.valueOf(expectedRecoverIndex), 
(Integer)testMessageRecoveryListener.getRecoveredMessages().get(0).getProperty("index"));
+        }
+
+        brokerService = createBroker(kaha);
+        assertEquals(sendCount, receiveMessages(queueName));
+    }
+
+    @Test
+    public void testOffsetZero() throws Exception {
+        runOffsetTest(1_000, 1_000, 0, 1, 1, 0, "TEST.OFFSET.ZERO");
+    }
+
+    @Test
+    public void testOffsetOne() throws Exception {
+        runOffsetTest(1_000, 1_000, 1, 1, 1, 1, "TEST.OFFSET.ONE");
+    }
+
+    @Test
+    public void testOffsetLastMinusOne() throws Exception {
+        runOffsetTest(1_000, 1_000, 999, 1, 1, 999, 
"TEST.OFFSET.LASTMINUSONE");
+    }
+
+    @Test
+    public void testOffsetLast() throws Exception {
+        runOffsetTest(1_000, 1_000, 1_000, 1, 0, -1, "TEST.OFFSET.LAST");
+    }
+
+    @Test
+    public void testOffsetBeyondQueueSizeNoError() throws Exception {
+        runOffsetTest(1_000, 1_000, 10_000, 1, 0, -1, "TEST.OFFSET.BEYOND");
+    }
+
+    @Test
+    public void testOffsetEmptyQueue() throws Exception {
+        runOffsetTest(0, 0, 10_000, 1, 0, -1, "TEST.OFFSET.EMPTY");
+    }
+
+    private void sendMessages(int count, String queueName) throws JMSException 
{
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost");
+        cf.setUseAsyncSend(true);
+        cf.setProducerWindowSize(1024);
+        cf.setWatchTopicAdvisories(false);
+
+        Connection connection = cf.createConnection();
+        try {
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(new 
ActiveMQQueue(queueName));
+            for (int i = 0; i < count; i++) {
+                TextMessage textMessage = 
session.createTextMessage(createContent(i));
+                textMessage.setIntProperty("index", i);
+                producer.send(textMessage);
+            }
+        } finally {
+            connection.close();
+        }
+    }
+
+    private int receiveMessages(String queueName) throws JMSException {
+        int rc=0;
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = cf.createConnection();
+        try {
+            connection.start();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer messageConsumer = session.createConsumer(new 
ActiveMQQueue(queueName));
+            while ( messageConsumer.receive(1000) !=null ) {
+                rc++;
+            }
+            return rc;
+        } finally {
+            connection.close();
+        }
+    }
+
+    private String createContent(int i) {
+        StringBuilder sb = new StringBuilder(i+":");
+        while( sb.length() < 1024 ) {
+            sb.append("*");
+        }
+        return sb.toString();
+    }
+
+    static class TestMessageRecoveryListener implements 
MessageRecoveryListener {
+
+        List<MessageId> recoveredMessageIds = new LinkedList<>();
+        List<Message> recoveredMessages = new LinkedList<>();
+
+        @Override
+        public boolean hasSpace() {
+            return true;
+        }
+
+        @Override
+        public boolean isDuplicate(MessageId messageId) {
+            return recoveredMessageIds.contains(messageId);
+        }
+
+        @Override
+        public boolean recoverMessage(Message message) throws Exception {
+            if(recoveredMessages.contains(message)) {
+                return false;
+            }
+            return recoveredMessages.add(message);
+        }
+
+        @Override
+        public boolean recoverMessageReference(MessageId messageId) throws 
Exception {
+            if(recoveredMessageIds.contains(messageId)) {
+                return false;
+            }
+            return recoveredMessageIds.add(messageId);
+        }
+
+        public List<MessageId> getRecoveredMessageIds() {
+            return recoveredMessageIds;
+        }
+
+        public List<Message> getRecoveredMessages() {
+            return recoveredMessages;
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@activemq.apache.org
For additional commands, e-mail: commits-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to