ARTEMIS-252 retryMessages retrying to topic subscriptions + some ammends to #193


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2a81a5f1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2a81a5f1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2a81a5f1

Branch: refs/heads/master
Commit: 2a81a5f146d6bb8b9dcef352fc68978464db85aa
Parents: 8848c96
Author: Clebert Suconic <clebertsuco...@apache.org>
Authored: Mon Oct 12 16:34:23 2015 -0400
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Mon Oct 12 17:03:53 2015 -0400

----------------------------------------------------------------------
 .../core/management/impl/QueueControlImpl.java  | 40 ++++-----
 .../activemq/artemis/core/server/Queue.java     |  2 +
 .../artemis/core/server/impl/QueueImpl.java     | 56 +++++++++++-
 .../impl/ScheduledDeliveryHandlerTest.java      |  5 ++
 .../server/management/JMSQueueControlTest.java  | 93 +++++++++++++++++---
 .../unit/core/postoffice/impl/FakeQueue.java    |  5 ++
 6 files changed, 161 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a81a5f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 1260169..cb362ec 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -37,11 +36,12 @@ import 
org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
-import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
+import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.utils.LinkedListIterator;
@@ -558,16 +558,19 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
       clearIO();
 
       try {
-         MessageReference message = queue.getReference(messageID);
-         if ( message == null ) {
-            return false;
-         }
-         else {
-            final String originalAddress = 
message.getMessage().getStringProperty(Message.HDR_ORIGINAL_ADDRESS);
-            if (originalAddress != null) {
-               return queue.moveReference(messageID, new 
SimpleString(originalAddress));
+         Filter singleMessageFilter = new Filter() {
+            @Override
+            public boolean match(ServerMessage message) {
+               return message.getMessageID() == messageID;
             }
-         }
+
+            @Override
+            public SimpleString getFilterString() {
+               return new SimpleString("custom filter for MESSAGEID= 
messageID");
+            }
+         };
+
+         queue.retryMessages(singleMessageFilter);
       }
       finally {
          blockOnIO();
@@ -580,25 +583,12 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
       checkStarted();
       clearIO();
 
-      int retriedMessages = 0;
       try {
-         Iterator<MessageReference> messageIterator = queue.totalIterator();
-         while (messageIterator.hasNext()) {
-            MessageReference message = messageIterator.next();
-            // Will only try messages with Message.HDR_ORIGINAL_ADDRESS set.
-            final String originalAddress = 
message.getMessage().getStringProperty(Message.HDR_ORIGINAL_ADDRESS);
-            final long messageID = message.getMessage().getMessageID();
-            if ( originalAddress != null) {
-               if ( queue.moveReference(messageID, new 
SimpleString(originalAddress))) {
-                  retriedMessages++;
-               }
-            }
-         }
+         return queue.retryMessages(null);
       }
       finally {
          blockOnIO();
       }
-      return retriedMessages;
    }
 
    public boolean moveMessage(final long messageID, final String 
otherQueueName) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a81a5f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index ba12a55..f5a19a8 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -164,6 +164,8 @@ public interface Queue extends Bindable {
                       SimpleString toAddress,
                       boolean rejectDuplicates) throws Exception;
 
+   int retryMessages(Filter filter) throws Exception;
+
    void addRedistributor(long delay);
 
    void cancelRedistributor() throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a81a5f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 2ac5c8a..95a7ba6 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -55,6 +55,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
 import org.apache.activemq.artemis.core.remoting.server.RemotingService;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@@ -1525,6 +1526,50 @@ public class QueueImpl implements Queue {
       });
    }
 
+   public int retryMessages(Filter filter) throws Exception {
+
+      final HashMap<SimpleString, Long> queues = new HashMap<>();
+
+      return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
+         @Override
+         public void actMessage(Transaction tx, MessageReference ref) throws 
Exception {
+
+            SimpleString originalMessageAddress = 
ref.getMessage().getSimpleStringProperty(MessageImpl.HDR_ORIGINAL_ADDRESS);
+            SimpleString originalMessageQueue = 
ref.getMessage().getSimpleStringProperty(MessageImpl.HDR_ORIGINAL_QUEUE);
+
+            if (originalMessageAddress != null) {
+
+               incDelivering();
+
+               Long targetQueue = null;
+               if (originalMessageQueue != null && 
!originalMessageQueue.equals(originalMessageAddress)) {
+                  targetQueue = queues.get(originalMessageQueue);
+                  if (targetQueue == null) {
+                     Binding binding = 
postOffice.getBinding(originalMessageQueue);
+
+                     if (binding != null && binding instanceof 
LocalQueueBinding) {
+                        targetQueue = ((LocalQueueBinding)binding).getID();
+                        queues.put(originalMessageQueue, targetQueue);
+                     }
+                  }
+               }
+
+               if (targetQueue != null) {
+                  move(originalMessageAddress, tx, ref, false, false, 
targetQueue.longValue());
+               }
+               else {
+                  move(originalMessageAddress, tx, ref, false, false);
+
+               }
+
+
+            }
+         }
+      });
+
+
+   }
+
    public synchronized boolean changeReferencePriority(final long messageID, 
final byte newPriority) throws Exception {
       LinkedListIterator<MessageReference> iter = iterator();
 
@@ -2057,11 +2102,20 @@ public class QueueImpl implements Queue {
                      final Transaction tx,
                      final MessageReference ref,
                      final boolean expiry,
-                     final boolean rejectDuplicate) throws Exception {
+                     final boolean rejectDuplicate,
+                     final long ... queueIDs) throws Exception {
       ServerMessage copyMessage = makeCopy(ref, expiry);
 
       copyMessage.setAddress(toAddress);
 
+      if (queueIDs != null && queueIDs.length > 0) {
+         ByteBuffer buffer = ByteBuffer.allocate(8 * queueIDs.length);
+         for (long id : queueIDs) {
+            buffer.putLong(id);
+         }
+         copyMessage.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, 
buffer.array());
+      }
+
       postOffice.route(copyMessage, null, tx, false, rejectDuplicate);
 
       acknowledge(tx, ref);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a81a5f1/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index d42c89d..b18be4a 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -908,6 +908,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public int retryMessages(Filter filter) throws Exception {
+         return 0;
+      }
+
+      @Override
       public int getConsumerCount() {
          return 0;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a81a5f1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
index b5183ca..ac9b28e 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
@@ -16,6 +16,20 @@
  */
 package org.apache.activemq.artemis.tests.integration.jms.server.management;
 
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.Notification;
+import javax.naming.Context;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -39,6 +53,7 @@ import 
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
 import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
+import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
 import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
 import org.apache.activemq.artemis.jms.server.management.JMSNotificationType;
 import 
org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
@@ -51,20 +66,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.management.Notification;
-import javax.naming.Context;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * A QueueControlTest
  * <br>
@@ -808,6 +809,16 @@ public class JMSQueueControlTest extends 
ManagementTestBase {
       return testQueue;
    }
 
+   protected ActiveMQTopic createTestTopicWithDLQ(final String queueName, 
final ActiveMQQueue dlq) throws Exception {
+      serverManager.createTopic(false, queueName);
+      ActiveMQTopic testQueue = (ActiveMQTopic) 
ActiveMQJMSClient.createTopic(queueName);
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setDeadLetterAddress(new SimpleString(dlq.getAddress()));
+      addressSettings.setMaxDeliveryAttempts(1);
+      server.getAddressSettingsRepository().addMatch(testQueue.getAddress(), 
addressSettings);
+      return testQueue;
+   }
+
    /**
     * Test retrying all messages put on DLQ - i.e. they should appear on the 
original queue.
     * @throws Exception
@@ -834,10 +845,64 @@ public class JMSQueueControlTest extends 
ManagementTestBase {
       Assert.assertEquals(0, getMessageCount(testQueueControl));
       Assert.assertEquals(numMessagesToTest,getMessageCount(dlqQueueControl));
 
+      Assert.assertEquals(10,getMessageCount(dlqQueueControl));
+
       dlqQueueControl.retryMessages();
 
       Assert.assertEquals(numMessagesToTest, 
getMessageCount(testQueueControl));
       Assert.assertEquals(0,getMessageCount(dlqQueueControl));
+
+      connection.close();
+   }
+
+   /**
+    * Test retrying all messages put on DLQ - i.e. they should appear on the 
original queue.
+    * @throws Exception
+    */
+   @Test
+   public void testRetryMessagesOnTopic() throws Exception {
+      ActiveMQQueue dlq = createDLQ(RandomUtil.randomString());
+      ActiveMQTopic testTopic = 
createTestTopicWithDLQ(RandomUtil.randomString(), dlq);
+
+      Connection connectionConsume = createConnection();
+      connectionConsume.setClientID("ID");
+      Session sessionConsume = connectionConsume.createSession(true, 
Session.SESSION_TRANSACTED);
+      MessageConsumer cons1 = 
sessionConsume.createDurableSubscriber(testTopic, "sub1");
+      MessageConsumer cons2 = 
sessionConsume.createDurableSubscriber(testTopic, "sub2");
+
+
+      final int numMessagesToTest = 10;
+      JMSUtil.sendMessages(testTopic, numMessagesToTest);
+
+
+      connectionConsume.start();
+      for (int i = 0; i < numMessagesToTest; i++) {
+         Assert.assertNotNull(cons1.receive(500));
+      }
+      sessionConsume.commit();
+
+      Assert.assertNull(cons1.receiveNoWait());
+
+      connectionConsume.start();
+      for (int i = 0; i < numMessagesToTest; i++) {
+         cons2.receive(500);
+      }
+      sessionConsume.rollback();
+      Assert.assertNull(cons2.receiveNoWait());
+
+      JMSQueueControl dlqQueueControl = createManagementControl(dlq);
+      dlqQueueControl.retryMessages();
+
+      Assert.assertNull("Retry is sending back to cons1 even though it 
succeeded", cons1.receiveNoWait());
+
+      for (int i = 0; i < numMessagesToTest; i++) {
+         Assert.assertNotNull(cons2.receive(500));
+      }
+      sessionConsume.commit();
+      Assert.assertNull(cons1.receiveNoWait());
+
+      connectionConsume.close();
+
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a81a5f1/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 970e89d..4aefc9a 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -50,6 +50,11 @@ public class FakeQueue implements Queue {
    }
 
    @Override
+   public int retryMessages(Filter filter) throws Exception {
+      return 0;
+   }
+
+   @Override
    public void setConsumersRefCount(ReferenceCounter referenceCounter) {
 
    }

Reply via email to