Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 489a7b85c -> 8f3f9b1c2


QPID-8193: [Broker-J] Update queue entries expirations on change of queue 
attributes for maximum/minimum TTL


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/8f3f9b1c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/8f3f9b1c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/8f3f9b1c

Branch: refs/heads/master
Commit: 8f3f9b1c2a9586bc44889c5bb155ac16141ec093
Parents: 489a7b8
Author: Alex Rudyy <oru...@apache.org>
Authored: Thu Dec 6 18:16:30 2018 +0000
Committer: Alex Rudyy <oru...@apache.org>
Committed: Thu Dec 6 18:16:30 2018 +0000

----------------------------------------------------------------------
 .../apache/qpid/server/queue/AbstractQueue.java |  49 +++++++-
 .../http/endtoend/message/TimeToLiveTest.java   | 118 +++++++++++++++++++
 2 files changed, 165 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8f3f9b1c/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java 
b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 00c1e30..ba98f1a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -236,9 +236,9 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
     private String _messageGroupDefaultGroup;
     @ManagedAttributeField
     private int _maximumDistinctGroups;
-    @ManagedAttributeField
+    @ManagedAttributeField(afterSet = "queueMessageTtlChanged")
     private long _minimumMessageTtl;
-    @ManagedAttributeField
+    @ManagedAttributeField(afterSet = "queueMessageTtlChanged")
     private long _maximumMessageTtl;
     @ManagedAttributeField
     private boolean _ensureNondestructiveConsumers;
@@ -3718,4 +3718,49 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
             _transactions.remove(localTransaction);
         }
     }
+
+    @SuppressWarnings("unused")
+    private void queueMessageTtlChanged()
+    {
+        if (getState() == State.ACTIVE)
+        {
+            String taskName = String.format("Queue Housekeeping : %s : TTL 
Update", getName());
+            getVirtualHost().executeTask(taskName,
+                                         this::updateQueueEntryExpiration,
+                                         
getSystemTaskControllerContext(taskName, _virtualHost.getPrincipal()));
+        }
+    }
+
+    private void updateQueueEntryExpiration()
+    {
+        final QueueEntryList entries = getEntries();
+        if (entries != null)
+        {
+            final QueueEntryIterator queueListIterator = entries.iterator();
+            while (!_stopped.get() && queueListIterator.advance())
+            {
+                final QueueEntry node = queueListIterator.getNode();
+                if (!node.isDeleted())
+                {
+                    ServerMessage msg = node.getMessage();
+                    if (msg != null)
+                    {
+                        try (MessageReference messageReference = 
msg.newReference())
+                        {
+                            updateExpiration(node);
+                        }
+                        catch (MessageDeletedException e)
+                        {
+                            // Ignore
+                        }
+                    }
+                    if (node.expired())
+                    {
+                        expireEntry(node);
+                    }
+                }
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8f3f9b1c/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/TimeToLiveTest.java
----------------------------------------------------------------------
diff --git 
a/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/TimeToLiveTest.java
 
b/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/TimeToLiveTest.java
new file mode 100644
index 0000000..0d07643
--- /dev/null
+++ 
b/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/TimeToLiveTest.java
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.http.endtoend.message;
+
+import static javax.servlet.http.HttpServletResponse.SC_OK;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+import java.util.Collections;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.qpid.tests.http.HttpRequestConfig;
+import org.apache.qpid.tests.http.HttpTestBase;
+
+@HttpRequestConfig
+public class TimeToLiveTest extends HttpTestBase
+{
+    private static final String QUEUE_NAME = "testQueue";
+    private static final long HOUSE_KEEPING_CHECK_PERIOD = 100;
+    private static final long ONE_DAY_MILLISECONDS = 24 * 60 * 60 * 1000L;
+
+    @Before
+    public void setUp()
+    {
+        getBrokerAdmin().createQueue(QUEUE_NAME);
+    }
+
+    @BeforeClass
+    public static void setUpClass() throws Exception
+    {
+        System.setProperty("virtualhost.housekeepingCheckPeriod", 
String.valueOf(HOUSE_KEEPING_CHECK_PERIOD));
+    }
+
+    @AfterClass
+    public static void tearDownClass()
+    {
+        System.clearProperty("virtualhost.housekeepingCheckPeriod");
+    }
+
+    @Test
+    public void queueTimeToLiveUpdateIsAppliedToEnqueuedMessages() throws 
Exception
+    {
+        Connection connection = getConnection();
+        try
+        {
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE_NAME));
+            producer.send(session.createTextMessage("A"), 
DeliveryMode.NON_PERSISTENT, 4, ONE_DAY_MILLISECONDS);
+        }
+        finally
+        {
+            connection.close();
+        }
+
+        getHelper().submitRequest(String.format("queue/%s", QUEUE_NAME),
+                                  "POST",
+                                  
Collections.singletonMap("maximumMessageTtl", 1),
+                                  SC_OK);
+
+        Thread.sleep(HOUSE_KEEPING_CHECK_PERIOD * 2);
+
+        getHelper().submitRequest(String.format("queue/%s", QUEUE_NAME),
+                                  "POST",
+                                  
Collections.singletonMap("maximumMessageTtl", 0),
+                                  SC_OK);
+
+        Connection connection2 = getConnection();
+        try
+        {
+            connection2.start();
+            Session session = connection2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE_NAME));
+            MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE_NAME));
+            producer.send(session.createTextMessage("B") , 
DeliveryMode.NON_PERSISTENT, 4, 0);
+            Message message = consumer.receive(getReceiveTimeout());
+            assertThat(message, is(notNullValue()));
+            assertThat(message, is(instanceOf(TextMessage.class)));
+            assertThat(((TextMessage)message).getText(), is(equalTo("B")));
+        }
+        finally
+        {
+            connection2.close();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to