Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 97caecc13 -> c7b3d189f


QPID-7153: [Broker-J] Allow expired messages to be sent to DLQ

This closes #12


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c7b3d189
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c7b3d189
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c7b3d189

Branch: refs/heads/master
Commit: c7b3d189f5d4c1f5a9dcd8f1170480eb3cce75d9
Parents: 97caecc1
Author: Alex Rudyy <oru...@apache.org>
Authored: Tue Aug 7 14:39:30 2018 +0100
Committer: Robert Godfrey <rgodf...@apache.org>
Committed: Wed Aug 8 11:37:21 2018 +0200

----------------------------------------------------------------------
 .../server/consumer/AbstractConsumerTarget.java |   2 +-
 .../qpid/server/message/MessageInstance.java    |   7 +-
 .../qpid/server/message/RoutingResult.java      |  16 +++
 .../org/apache/qpid/server/model/Queue.java     |  22 ++++
 .../apache/qpid/server/queue/AbstractQueue.java | 129 +++++++++++++++----
 .../qpid/server/queue/QueueEntryImpl.java       |  22 +++-
 .../AbstractSystemMessageSource.java            |   5 +-
 .../consumer/AbstractConsumerTargetTest.java    |  10 +-
 .../server/queue/AbstractQueueTestBase.java     |  30 +++++
 .../qpid/server/queue/MockMessageInstance.java  |   5 +-
 .../server/queue/QueueEntryImplTestBase.java    |   2 +-
 .../protocol/v0_10/ConsumerTarget_0_10.java     |   4 +-
 .../qpid/server/protocol/v0_8/AMQChannel.java   |   2 +-
 .../protocol/v1_0/ConsumerTarget_1_0.java       |   2 +-
 .../server/management/amqp/ManagementNode.java  |   4 +-
 .../management/amqp/ManagementResponse.java     |   6 +-
 16 files changed, 218 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c7b3d189/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
 
b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 76d4652..c590eca 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -280,7 +280,7 @@ public abstract class AbstractConsumerTarget<T extends 
AbstractConsumerTarget<T>
                         case ROUTE_TO_ALTERNATE:
                             if (consumer.acquires())
                             {
-                                int enqueues = entry.routeToAlternate(null, 
null);
+                                int enqueues = entry.routeToAlternate(null, 
null, null);
                                 if (enqueues == 0)
                                 {
                                     LOGGER.info("Failed to convert message {} 
for this consumer because '{}'."

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c7b3d189/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java 
b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index d077d0c..e92adbc 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -21,7 +21,10 @@
 package org.apache.qpid.server.message;
 
 
+import java.util.function.Predicate;
+
 import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -73,7 +76,9 @@ public interface MessageInstance
 
     int getMaximumDeliveryCount();
 
-    int routeToAlternate(Action<? super MessageInstance> action, 
ServerTransaction txn);
+    int routeToAlternate(Action<? super MessageInstance> action,
+                         ServerTransaction txn,
+                         Predicate<BaseQueue> predicate);
 
     Filterable asFilterable();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c7b3d189/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java 
b/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
index 5231679..9175465 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
@@ -25,8 +25,10 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Predicate;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,6 +98,20 @@ public class RoutingResult<M extends ServerMessage<? extends 
StorableMessageMeta
         }
     }
 
+    public void filter(Predicate<BaseQueue> predicate)
+    {
+        Iterator<BaseQueue> iter = _queues.iterator();
+        while(iter.hasNext())
+        {
+            BaseQueue queue = iter.next();
+            if(!predicate.test(queue))
+            {
+                iter.remove();
+                _rejectingRoutableQueues.remove(queue);
+            }
+        }
+    }
+
     public int send(ServerTransaction txn,
                     final Action<? super MessageInstance> postEnqueueAction)
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c7b3d189/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java 
b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index c7ea2d8..7b37382 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -87,6 +87,7 @@ public interface Queue<X extends Queue<X>> extends 
ConfiguredObject<X>,
     String OVERFLOW_POLICY = "overflowPolicy";
     String MAXIMUM_QUEUE_DEPTH_MESSAGES = "maximumQueueDepthMessages";
     String MAXIMUM_QUEUE_DEPTH_BYTES = "maximumQueueDepthBytes";
+    String EXPIRY_POLICY = "expiryPolicy";
 
     String QUEUE_SCAVANGE_COUNT = "qpid.queue.scavenge_count";
     @SuppressWarnings("unused")
@@ -320,6 +321,21 @@ public interface Queue<X extends Queue<X>> extends 
ConfiguredObject<X>,
             mandatory = true)
     OverflowPolicy getOverflowPolicy();
 
+    @SuppressWarnings("unused")
+    @ManagedContextDefault(name = "queue.defaultExpiryPolicy",
+            description = "Specifies the default value for queue expiry 
policy. ")
+    ExpiryPolicy DEFAULT_EXPIRY_POLICY = ExpiryPolicy.DELETE;
+
+    @ManagedAttribute(defaultValue = "${queue.defaultExpiryPolicy}",
+            description = "Queue expiry policy."
+                          + " Options are Delete, and RouteToAlternate."
+                          + " The policy comes into effect where a message on 
the queue has exceeded its time to live."
+                          + " Delete - the expired message is deleted from the 
queue."
+                          + " RouteToAlternate - new expired message is routed 
to the alternate destination for the"
+                          + " queue, if present, or deleted if there is no 
alternate destination.",
+            mandatory = true)
+    ExpiryPolicy getExpiryPolicy();
+
     @ManagedOperation(nonModifying = true, changesConfiguredObjectState = 
false, skipAclCheck = true)
     Collection<PublishingLink> getPublishingLinks();
 
@@ -553,4 +569,10 @@ public interface Queue<X extends Queue<X>> extends 
ConfiguredObject<X>,
     QueueEntry getLeastSignificantOldestEntry();
 
     QueueEntryIterator queueEntryIterator();
+
+    enum ExpiryPolicy
+    {
+        DELETE,
+        ROUTE_TO_ALTERNATE
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c7b3d189/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java 
b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index c10f8e7..8194236 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -248,6 +249,9 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
     @ManagedAttributeField
     private CreatingLinkInfo _creatingLinkInfo;
 
+    @ManagedAttributeField
+    private ExpiryPolicy _expiryPolicy;
+
     private static final int RECOVERING = 1;
     private static final int COMPLETING_RECOVERY = 2;
     private static final int RECOVERED = 3;
@@ -755,6 +759,12 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
     }
 
     @Override
+    public ExpiryPolicy getExpiryPolicy()
+    {
+        return _expiryPolicy;
+    }
+
+    @Override
     public Collection<String> getAvailableAttributes()
     {
         return new ArrayList<String>(_arguments.keySet());
@@ -1284,36 +1294,51 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
 
     private void updateExpiration(final QueueEntry entry)
     {
-        long expiration = entry.getMessage().getExpiration();
-        long arrivalTime = entry.getMessage().getArrivalTime();
-        if(_minimumMessageTtl != 0l)
+        long expiration = calculateExpiration(entry.getMessage());
+        if (expiration > 0)
         {
-            if(arrivalTime == 0)
-            {
-                arrivalTime = System.currentTimeMillis();
-            }
-            if(expiration != 0L)
+            entry.setExpiration(expiration);
+        }
+    }
+
+    private long calculateExpiration(final ServerMessage message)
+    {
+        long expiration = message.getExpiration();
+        long arrivalTime = message.getArrivalTime();
+        if (_minimumMessageTtl != 0L)
+        {
+            if (expiration != 0L)
             {
-                long calculatedExpiration = arrivalTime+_minimumMessageTtl;
-                if(calculatedExpiration > expiration)
+                long calculatedExpiration = calculateExpiration(arrivalTime, 
_minimumMessageTtl);
+                if (calculatedExpiration > expiration)
                 {
-                    entry.setExpiration(calculatedExpiration);
                     expiration = calculatedExpiration;
                 }
             }
         }
-        if(_maximumMessageTtl != 0L)
+        if (_maximumMessageTtl != 0L)
         {
-            if(arrivalTime == 0)
+            long calculatedExpiration = calculateExpiration(arrivalTime, 
_maximumMessageTtl);
+            if (expiration == 0L || expiration > calculatedExpiration)
             {
-                arrivalTime = System.currentTimeMillis();
-            }
-            long calculatedExpiration = arrivalTime+_maximumMessageTtl;
-            if(expiration == 0L || expiration > calculatedExpiration)
-            {
-                entry.setExpiration(calculatedExpiration);
+                expiration = calculatedExpiration;
             }
         }
+        return expiration;
+    }
+
+    private long calculateExpiration(final long arrivalTime, final long ttl)
+    {
+        long sum;
+        try
+        {
+            sum = Math.addExact(arrivalTime == 0 ? System.currentTimeMillis() 
: arrivalTime, ttl);
+        }
+        catch (ArithmeticException e)
+        {
+            sum = Long.MAX_VALUE;
+        }
+        return sum;
     }
 
     private boolean assign(final QueueConsumer<?,?> sub, final QueueEntry 
entry)
@@ -1533,6 +1558,12 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
         return _deleted.get();
     }
 
+    boolean wouldExpire(final ServerMessage message)
+    {
+        long expiration = calculateExpiration(message);
+        return expiration != 0 && expiration <= System.currentTimeMillis();
+    }
+
     @Override
     public List<QueueEntry> getMessagesOnTheQueue()
     {
@@ -1766,6 +1797,32 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
         }
     }
 
+    private void routeToAlternate(QueueEntry entry,
+                                  Runnable postRouteTask,
+                                  Predicate<BaseQueue> predicate)
+    {
+        boolean acquiredForDequeueing = entry.acquireOrSteal(() ->
+                                                             {
+                                                                 
LOGGER.debug("routing stolen node {} to alternate", entry);
+                                                                 
entry.routeToAlternate(null, null, predicate);
+                                                                 if 
(postRouteTask != null)
+                                                                 {
+                                                                     
postRouteTask.run();
+                                                                 }
+                                                             });
+
+        if (acquiredForDequeueing)
+        {
+            LOGGER.debug("routing node {} to alternate", entry);
+            entry.routeToAlternate(null, null, predicate);
+            if (postRouteTask != null)
+            {
+                postRouteTask.run();
+            }
+        }
+    }
+
+
     @Override
     public void addDeleteTask(final Action<? super X> task)
     {
@@ -1853,7 +1910,7 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
         for(final QueueEntry entry : entries)
         {
             // TODO log requeues with a post enqueue action
-            int requeues = entry.routeToAlternate(null, txn);
+            int requeues = entry.routeToAlternate(null, txn, null);
 
             if(requeues == 0)
             {
@@ -2089,11 +2146,7 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
                 if (expired)
                 {
                     expired = false;
-                    if (node.acquire())
-                    {
-                        dequeueEntry(node);
-                        
_queueStatistics.addToExpired(node.getSizeWithHeader());
-                    }
+                    expireEntry(node);
                 }
 
                 if(QueueContext._lastSeenUpdater.compareAndSet(context, 
lastSeen, node))
@@ -2103,8 +2156,9 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
 
                 lastSeen = context.getLastSeenEntry();
                 releasedNode = context.getReleasedEntry();
-                node = (releasedNode != null && 
lastSeen.compareTo(releasedNode)>=0) ? releasedNode : getEntries().next(
-                        lastSeen);
+                node = (releasedNode != null && 
lastSeen.compareTo(releasedNode)>=0)
+                        ? releasedNode
+                        : getEntries().next(lastSeen);
             }
             return node;
         }
@@ -2162,7 +2216,7 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
                 // If the node has expired then acquire it
                 if (node.expired())
                 {
-                    deleteEntry(node, () -> 
_queueStatistics.addToExpired(node.getSizeWithHeader()));
+                    expireEntry(node);
                 }
                 else
                 {
@@ -2194,7 +2248,26 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
         {
             checkForNotification(null, listener, currentTime, thresholdTime, 
check);
         }
+    }
 
+    private void expireEntry(final QueueEntry node)
+    {
+        ExpiryPolicy expiryPolicy = getExpiryPolicy();
+        long sizeWithHeader = node.getSizeWithHeader();
+        switch (expiryPolicy)
+        {
+            case DELETE:
+                deleteEntry(node, () -> 
_queueStatistics.addToExpired(sizeWithHeader) );
+                break;
+            case ROUTE_TO_ALTERNATE:
+                routeToAlternate(node, () -> 
_queueStatistics.addToExpired(sizeWithHeader),
+                                 q -> !((q instanceof AbstractQueue) && 
((AbstractQueue) q).wouldExpire(node.getMessage())));
+                break;
+            default:
+                throw new ServerScopedRuntimeException("Unknown expiry policy: 
"
+                                                       + expiryPolicy
+                                                       + " this is a coding 
error inside Qpid");
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c7b3d189/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java 
b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 51cee72..64818d9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Predicate;
 
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
@@ -452,7 +453,7 @@ public abstract class QueueEntryImpl implements QueueEntry
         }
         else if(acquire())
         {
-            routeToAlternate(null, null);
+            routeToAlternate(null, null, null);
         }
     }
 
@@ -574,7 +575,9 @@ public abstract class QueueEntryImpl implements QueueEntry
     }
 
     @Override
-    public int routeToAlternate(final Action<? super MessageInstance> action, 
ServerTransaction txn)
+    public int routeToAlternate(final Action<? super MessageInstance> action,
+                                ServerTransaction txn,
+                                final Predicate<BaseQueue> predicate)
     {
         if (!isAcquired())
         {
@@ -590,15 +593,22 @@ public abstract class QueueEntryImpl implements QueueEntry
             txn = new 
LocalTransaction(getQueue().getVirtualHost().getMessageStore());
         }
 
-        RoutingResult result;
+        RoutingResult<?> result;
+        ServerMessage<?> message = getMessage();
         if (alternateBindingDestination != null)
         {
-            result = alternateBindingDestination.route(getMessage(), 
getMessage().getInitialRoutingAddress(),
-                                                           
getInstanceProperties());
+            result = alternateBindingDestination.route(message,
+                                                       
message.getInitialRoutingAddress(),
+                                                       
getInstanceProperties());
         }
         else
         {
-            result = new RoutingResult<>(getMessage());
+            result = new RoutingResult<>(message);
+        }
+
+        if(predicate != null)
+        {
+            result.filter(predicate);
         }
 
         txn.dequeue(getEnqueueRecord(), new ServerTransaction.Action()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c7b3d189/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
 
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
index b703ec1..d1a533d 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
@@ -28,6 +28,7 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Predicate;
 
 import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -42,6 +43,7 @@ import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.session.AMQPSession;
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -365,7 +367,8 @@ public abstract class AbstractSystemMessageSource 
implements MessageSource
 
         @Override
         public int routeToAlternate(final Action<? super MessageInstance> 
action,
-                                    final ServerTransaction txn)
+                                    final ServerTransaction txn,
+                                    final Predicate<BaseQueue> predicate)
         {
             return 0;
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c7b3d189/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
index fef48f3..9e0239b 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
@@ -147,7 +147,7 @@ public class AbstractConsumerTargetTest extends UnitTestBase
                                             
e.getCause().getClass().getSimpleName()), condition);
         }
         assertTrue("message credit was not restored", 
_consumerTarget.isCreditRestored());
-        verify(_messageInstance, never()).routeToAlternate(any(Action.class), 
any(ServerTransaction.class));
+        verify(_messageInstance, never()).routeToAlternate(any(Action.class), 
any(ServerTransaction.class), any());
     }
 
     @Test
@@ -167,7 +167,7 @@ public class AbstractConsumerTargetTest extends UnitTestBase
                                             
e.getCause().getClass().getSimpleName()), condition);
         }
         assertTrue("message credit was not restored", 
_consumerTarget.isCreditRestored());
-        verify(_messageInstance, never()).routeToAlternate(any(Action.class), 
any(ServerTransaction.class));
+        verify(_messageInstance, never()).routeToAlternate(any(Action.class), 
any(ServerTransaction.class), any());
     }
 
     @Test
@@ -177,7 +177,7 @@ public class AbstractConsumerTargetTest extends UnitTestBase
 
         _consumerTarget.sendNextMessage();
         assertTrue("message credit was not restored", 
_consumerTarget.isCreditRestored());
-        verify(_messageInstance).routeToAlternate(any(Action.class), 
any(ServerTransaction.class));
+        verify(_messageInstance).routeToAlternate(any(Action.class), 
any(ServerTransaction.class), any());
     }
 
     @Test
@@ -187,7 +187,7 @@ public class AbstractConsumerTargetTest extends UnitTestBase
 
         _consumerTarget.sendNextMessage();
         assertTrue("message credit was not restored", 
_consumerTarget.isCreditRestored());
-        verify(_messageInstance, never()).routeToAlternate(any(Action.class), 
any(ServerTransaction.class));
+        verify(_messageInstance, never()).routeToAlternate(any(Action.class), 
any(ServerTransaction.class), any());
     }
 
     @Test
@@ -231,7 +231,7 @@ public class AbstractConsumerTargetTest extends UnitTestBase
                                             
e.getCause().getClass().getSimpleName()), condition);
         }
         assertTrue("message credit was not restored", 
_consumerTarget.isCreditRestored());
-        verify(_messageInstance, never()).routeToAlternate(any(Action.class), 
any(ServerTransaction.class));
+        verify(_messageInstance, never()).routeToAlternate(any(Action.class), 
any(ServerTransaction.class), any());
     }
 
     private void configureBehaviour(final boolean acquires,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c7b3d189/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
 
b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index ffe3ae9..58c8f5a 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -1120,6 +1120,36 @@ abstract class AbstractQueueTestBase extends UnitTestBase
         doMoveOrCopyMessageTest(false);
     }
 
+    @Test
+    public void testExpiryPolicyRouteToAlternate()
+    {
+        Map<String, Object> dlqAttributes = new HashMap<>();
+        dlqAttributes.put(Queue.NAME, getTestName() + "_dlq");
+        dlqAttributes.put(Queue.MINIMUM_MESSAGE_TTL, Long.MAX_VALUE);
+        Queue<?> dlq = _virtualHost.createChild(Queue.class, dlqAttributes);
+
+        Map<String,Object> attributes = new HashMap<>(_arguments);
+        attributes.put(Queue.NAME, getTestName());
+        attributes.put(Queue.ALTERNATE_BINDING, 
Collections.singletonMap("destination", dlq.getName()));
+        attributes.put(Queue.EXPIRY_POLICY, 
Queue.ExpiryPolicy.ROUTE_TO_ALTERNATE);
+
+        Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
+
+        ServerMessage message = createMessage(1L);
+        long arrivalTime = 50000L;
+        when(message.getArrivalTime()).thenReturn(arrivalTime);
+        when(message.getExpiration()).thenReturn(arrivalTime + 5000L);
+        when(message.isResourceAcceptable(any())).thenReturn(true);
+        queue.enqueue(message,null, null);
+
+        assertEquals("Unexpected queue depth", 1, 
queue.getQueueDepthMessages());
+
+        queue.checkMessageStatus();
+
+        assertEquals("Unexpected queue depth after checking message status", 
0, queue.getQueueDepthMessages());
+        assertEquals("Unexpected DLQ depth", 1, dlq.getQueueDepthMessages());
+    }
+
     private void doMoveOrCopyMessageTest(final boolean move)
     {
         Queue target = _virtualHost.createChild(Queue.class, 
Collections.singletonMap(Queue.NAME, getTestName() + "_target"));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c7b3d189/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
 
b/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
index 4b8f537..bb77467 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.util.function.Predicate;
+
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.InstanceProperties;
@@ -51,7 +53,8 @@ public class MockMessageInstance implements MessageInstance
 
     @Override
     public int routeToAlternate(final Action<? super MessageInstance> action,
-                                final ServerTransaction txn)
+                                final ServerTransaction txn,
+                                final Predicate<BaseQueue> predicate)
     {
         return 0;
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c7b3d189/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
 
b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 1bfafea..bbd3dad 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -378,7 +378,7 @@ public abstract class QueueEntryImplTestBase extends 
UnitTestBase
         final Action<? super MessageInstance> action = mock(Action.class);
         
when(_queueEntry.getMessage().isResourceAcceptable(dlq)).thenReturn(true);
         _queueEntry.acquire();
-        int enqueues = _queueEntry.routeToAlternate(action, null);
+        int enqueues = _queueEntry.routeToAlternate(action, null, null);
 
         assertEquals("Unexpected number of enqueues", 1, enqueues);
         verify(action).performAction(any());

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c7b3d189/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index c58f897..7c4cc7f 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -406,7 +406,7 @@ public class ConsumerTarget_0_10 extends 
AbstractConsumerTarget<ConsumerTarget_0
     {
         if (entry.makeAcquisitionUnstealable(consumer))
         {
-            entry.routeToAlternate(null, null);
+            entry.routeToAlternate(null, null, null);
         }
     }
 
@@ -438,7 +438,7 @@ public class ConsumerTarget_0_10 extends 
AbstractConsumerTarget<ConsumerTarget_0
                                                                            
requeueEntry.getOwningResource()
                                                                                
    .getName()));
                 }
-            }, null);
+            }, null, null);
         }
         if (requeues == 0)
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c7b3d189/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index ac9ea95..c618ccf 100644
--- 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -1573,7 +1573,7 @@ public class AMQChannel extends 
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                                                                          
requeueEntry.getOwningResource()
                                                                                
.getName()));
                     }
-                }, null);
+                }, null, null);
             }
 
             if(requeues == 0)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c7b3d189/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index ebcaa60..b1232ed 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -608,7 +608,7 @@ class ConsumerTarget_1_0 extends 
AbstractConsumerTarget<ConsumerTarget_1_0>
                                             
ChannelMessages.DEADLETTERMSG(message.getMessageNumber(),
                                                                           
requeueEntry.getOwningResource().getName()));
                     }
-                }, null);
+                }, null, null);
             }
 
             if (requeues == 0)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c7b3d189/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
 
b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index c9c067a..c167f46 100644
--- 
a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ 
b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -43,6 +43,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Predicate;
 import java.util.regex.Pattern;
 
 import javax.security.auth.Subject;
@@ -1722,7 +1723,8 @@ class ManagementNode implements MessageSource, 
MessageDestination, BaseQueue
 
         @Override
         public int routeToAlternate(final Action<? super MessageInstance> 
action,
-                                    final ServerTransaction txn)
+                                    final ServerTransaction txn,
+                                    final Predicate<BaseQueue> predicate)
         {
             return 0;
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c7b3d189/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
 
b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
index 8e9e5c8..bab5fc4 100644
--- 
a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
+++ 
b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.management.amqp;
 
+import java.util.function.Predicate;
+
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageInstance;
@@ -27,6 +29,7 @@ import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -174,7 +177,8 @@ class ManagementResponse implements MessageInstance
 
     @Override
     public int routeToAlternate(final Action<? super MessageInstance> action,
-                                final ServerTransaction txn)
+                                final ServerTransaction txn,
+                                final Predicate<BaseQueue> predicate)
     {
         return 0;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to