Repository: qpid-broker-j Updated Branches: refs/heads/master 489a7b85c -> 8f3f9b1c2
QPID-8193: [Broker-J] Update queue entries expirations on change of queue attributes for maximum/minimum TTL Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/8f3f9b1c Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/8f3f9b1c Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/8f3f9b1c Branch: refs/heads/master Commit: 8f3f9b1c2a9586bc44889c5bb155ac16141ec093 Parents: 489a7b8 Author: Alex Rudyy <oru...@apache.org> Authored: Thu Dec 6 18:16:30 2018 +0000 Committer: Alex Rudyy <oru...@apache.org> Committed: Thu Dec 6 18:16:30 2018 +0000 ---------------------------------------------------------------------- .../apache/qpid/server/queue/AbstractQueue.java | 49 +++++++- .../http/endtoend/message/TimeToLiveTest.java | 118 +++++++++++++++++++ 2 files changed, 165 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8f3f9b1c/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 00c1e30..ba98f1a 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -236,9 +236,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private String _messageGroupDefaultGroup; @ManagedAttributeField private int _maximumDistinctGroups; - @ManagedAttributeField + @ManagedAttributeField(afterSet = "queueMessageTtlChanged") private long _minimumMessageTtl; - @ManagedAttributeField + @ManagedAttributeField(afterSet = "queueMessageTtlChanged") private long _maximumMessageTtl; @ManagedAttributeField private boolean _ensureNondestructiveConsumers; @@ -3718,4 +3718,49 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> _transactions.remove(localTransaction); } } + + @SuppressWarnings("unused") + private void queueMessageTtlChanged() + { + if (getState() == State.ACTIVE) + { + String taskName = String.format("Queue Housekeeping : %s : TTL Update", getName()); + getVirtualHost().executeTask(taskName, + this::updateQueueEntryExpiration, + getSystemTaskControllerContext(taskName, _virtualHost.getPrincipal())); + } + } + + private void updateQueueEntryExpiration() + { + final QueueEntryList entries = getEntries(); + if (entries != null) + { + final QueueEntryIterator queueListIterator = entries.iterator(); + while (!_stopped.get() && queueListIterator.advance()) + { + final QueueEntry node = queueListIterator.getNode(); + if (!node.isDeleted()) + { + ServerMessage msg = node.getMessage(); + if (msg != null) + { + try (MessageReference messageReference = msg.newReference()) + { + updateExpiration(node); + } + catch (MessageDeletedException e) + { + // Ignore + } + } + if (node.expired()) + { + expireEntry(node); + } + } + } + } + } + } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8f3f9b1c/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/TimeToLiveTest.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/TimeToLiveTest.java b/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/TimeToLiveTest.java new file mode 100644 index 0000000..0d07643 --- /dev/null +++ b/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/TimeToLiveTest.java @@ -0,0 +1,118 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.http.endtoend.message; + +import static javax.servlet.http.HttpServletResponse.SC_OK; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; + +import java.util.Collections; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.qpid.tests.http.HttpRequestConfig; +import org.apache.qpid.tests.http.HttpTestBase; + +@HttpRequestConfig +public class TimeToLiveTest extends HttpTestBase +{ + private static final String QUEUE_NAME = "testQueue"; + private static final long HOUSE_KEEPING_CHECK_PERIOD = 100; + private static final long ONE_DAY_MILLISECONDS = 24 * 60 * 60 * 1000L; + + @Before + public void setUp() + { + getBrokerAdmin().createQueue(QUEUE_NAME); + } + + @BeforeClass + public static void setUpClass() throws Exception + { + System.setProperty("virtualhost.housekeepingCheckPeriod", String.valueOf(HOUSE_KEEPING_CHECK_PERIOD)); + } + + @AfterClass + public static void tearDownClass() + { + System.clearProperty("virtualhost.housekeepingCheckPeriod"); + } + + @Test + public void queueTimeToLiveUpdateIsAppliedToEnqueuedMessages() throws Exception + { + Connection connection = getConnection(); + try + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); + producer.send(session.createTextMessage("A"), DeliveryMode.NON_PERSISTENT, 4, ONE_DAY_MILLISECONDS); + } + finally + { + connection.close(); + } + + getHelper().submitRequest(String.format("queue/%s", QUEUE_NAME), + "POST", + Collections.singletonMap("maximumMessageTtl", 1), + SC_OK); + + Thread.sleep(HOUSE_KEEPING_CHECK_PERIOD * 2); + + getHelper().submitRequest(String.format("queue/%s", QUEUE_NAME), + "POST", + Collections.singletonMap("maximumMessageTtl", 0), + SC_OK); + + Connection connection2 = getConnection(); + try + { + connection2.start(); + Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); + producer.send(session.createTextMessage("B") , DeliveryMode.NON_PERSISTENT, 4, 0); + Message message = consumer.receive(getReceiveTimeout()); + assertThat(message, is(notNullValue())); + assertThat(message, is(instanceOf(TextMessage.class))); + assertThat(((TextMessage)message).getText(), is(equalTo("B"))); + } + finally + { + connection2.close(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org