Repository: activemq-artemis Updated Branches: refs/heads/master 9a885f142 -> 3f3046c5e
ARTEMIS-1856 support delays before deleting addresses & queues Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/97612c48 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/97612c48 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/97612c48 Branch: refs/heads/master Commit: 97612c48d31c6b9f770a904a9e692e3bb42a1138 Parents: 9a885f1 Author: Justin Bertram <jbert...@apache.org> Authored: Mon Sep 24 16:37:28 2018 -0500 Committer: Michael Andre Pearce <michael.andre.pea...@me.com> Committed: Fri Oct 26 19:30:06 2018 +0100 ---------------------------------------------------------------------- .../config/ActiveMQDefaultConfiguration.java | 10 ++ .../artemis/core/config/Configuration.java | 13 ++ .../core/config/impl/ConfigurationImpl.java | 13 ++ .../deployers/impl/FileConfigurationParser.java | 14 ++ .../artemis/core/postoffice/PostOffice.java | 2 + .../core/postoffice/impl/PostOfficeImpl.java | 130 ++++++++++++--- .../core/server/ActiveMQMessageBundle.java | 5 +- .../artemis/core/server/ActiveMQServer.java | 19 ++- .../core/server/ActiveMQServerLogger.java | 4 + .../activemq/artemis/core/server/Queue.java | 2 + .../core/server/impl/ActiveMQServerImpl.java | 32 +++- .../artemis/core/server/impl/AddressInfo.java | 10 ++ .../artemis/core/server/impl/QueueImpl.java | 9 + .../core/server/impl/QueueManagerImpl.java | 10 +- .../core/settings/impl/AddressSettings.java | 66 +++++++- .../resources/schema/artemis-configuration.xsd | 27 ++- .../impl/DefaultsFileConfigurationTest.java | 2 + .../core/config/impl/FileConfigurationTest.java | 5 + .../impl/ScheduledDeliveryHandlerTest.java | 5 + .../resources/ConfigurationTest-full-config.xml | 3 + ...ionTest-xinclude-config-address-settings.xml | 2 + .../ConfigurationTest-xinclude-config.xml | 1 + docs/user-manual/en/address-model.md | 14 ++ docs/user-manual/en/configuration-index.md | 7 +- .../server/AddressQueueDeleteDelayTest.java | 167 +++++++++++++++++++ .../unit/core/postoffice/impl/FakeQueue.java | 5 + .../core/server/impl/fakes/FakePostOffice.java | 4 + 27 files changed, 538 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index e05a88e..e7cb4cb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -214,6 +214,9 @@ public final class ActiveMQDefaultConfiguration { // how often (in ms) to scan for expired messages private static long DEFAULT_MESSAGE_EXPIRY_SCAN_PERIOD = 30000; + // how often (in ms) to scan for addresses and queues which should be deleted + private static long DEFAULT_ADDRESS_QUEUE_SCAN_PERIOD = 30000; + // the priority of the thread expiring messages private static int DEFAULT_MESSAGE_EXPIRY_THREAD_PRIORITY = 3; @@ -727,6 +730,13 @@ public final class ActiveMQDefaultConfiguration { } /** + * how often (in ms) to scan for addresses and queues which should be deleted + */ + public static long getDefaultAddressQueueScanPeriod() { + return DEFAULT_ADDRESS_QUEUE_SCAN_PERIOD; + } + + /** * the size of the cache for pre-creating message ID's */ public static int getDefaultIdCacheSize() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index b1c49c3..eb18d73 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -951,6 +951,19 @@ public interface Configuration { Configuration setMessageExpiryThreadPriority(int messageExpiryThreadPriority); /** + * Returns the frequency (in milliseconds) to scan addresses and queues to detect which + * ones should be deleted. <br> + * Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_MESSAGE_EXPIRY_SCAN_PERIOD}. + */ + long getAddressQueueScanPeriod(); + + /** + * Sets the frequency (in milliseconds) to scan addresses and queues to detect which + * ones should be deleted. + */ + Configuration setAddressQueueScanPeriod(long addressQueueScanPeriod); + + /** * @return A list of AddressSettings per matching to be deployed to the address settings repository */ Map<String, AddressSettings> getAddressesSettings(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 3e51d63..7fd1294 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -136,6 +136,8 @@ public class ConfigurationImpl implements Configuration, Serializable { private int messageExpiryThreadPriority = ActiveMQDefaultConfiguration.getDefaultMessageExpiryThreadPriority(); + private long addressQueueScanPeriod = ActiveMQDefaultConfiguration.getDefaultAddressQueueScanPeriod(); + protected int idCacheSize = ActiveMQDefaultConfiguration.getDefaultIdCacheSize(); private boolean persistIDCache = ActiveMQDefaultConfiguration.isDefaultPersistIdCache(); @@ -1001,6 +1003,17 @@ public class ConfigurationImpl implements Configuration, Serializable { } @Override + public long getAddressQueueScanPeriod() { + return addressQueueScanPeriod; + } + + @Override + public ConfigurationImpl setAddressQueueScanPeriod(final long addressQueueScanPeriod) { + this.addressQueueScanPeriod = addressQueueScanPeriod; + return this; + } + + @Override public boolean isSecurityEnabled() { return securityEnabled; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 5b0a3a4..255cce6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -213,12 +213,16 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String AUTO_DELETE_QUEUES = "auto-delete-queues"; + private static final String AUTO_DELETE_QUEUES_DELAY = "auto-delete-queues-delay"; + private static final String CONFIG_DELETE_QUEUES = "config-delete-queues"; private static final String AUTO_CREATE_ADDRESSES = "auto-create-addresses"; private static final String AUTO_DELETE_ADDRESSES = "auto-delete-addresses"; + private static final String AUTO_DELETE_ADDRESSES_DELAY = "auto-delete-addresses-delay"; + private static final String CONFIG_DELETE_ADDRESSES = "config-delete-addresses"; private static final String DEFAULT_PURGE_ON_NO_CONSUMERS = "default-purge-on-no-consumers"; @@ -344,6 +348,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { config.setMessageExpiryThreadPriority(getInteger(e, "message-expiry-thread-priority", config.getMessageExpiryThreadPriority(), Validators.THREAD_PRIORITY_RANGE)); + config.setAddressQueueScanPeriod(getLong(e, "address-queue-scan-period", config.getAddressQueueScanPeriod(), Validators.MINUS_ONE_OR_GT_ZERO)); + config.setIDCacheSize(getInteger(e, "id-cache-size", config.getIDCacheSize(), Validators.GT_ZERO)); config.setPersistIDCache(getBoolean(e, "persist-id-cache", config.isPersistIDCache())); @@ -1043,6 +1049,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { addressSettings.setAutoCreateQueues(XMLUtil.parseBoolean(child)); } else if (AUTO_DELETE_QUEUES.equalsIgnoreCase(name)) { addressSettings.setAutoDeleteQueues(XMLUtil.parseBoolean(child)); + } else if (AUTO_DELETE_QUEUES_DELAY.equalsIgnoreCase(name)) { + long autoDeleteQueuesDelay = XMLUtil.parseLong(child); + Validators.GE_ZERO.validate(AUTO_DELETE_QUEUES_DELAY, autoDeleteQueuesDelay); + addressSettings.setAutoDeleteQueuesDelay(autoDeleteQueuesDelay); } else if (CONFIG_DELETE_QUEUES.equalsIgnoreCase(name)) { String value = getTrimmedTextContent(child); Validators.DELETION_POLICY_TYPE.validate(CONFIG_DELETE_QUEUES, value); @@ -1052,6 +1062,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { addressSettings.setAutoCreateAddresses(XMLUtil.parseBoolean(child)); } else if (AUTO_DELETE_ADDRESSES.equalsIgnoreCase(name)) { addressSettings.setAutoDeleteAddresses(XMLUtil.parseBoolean(child)); + } else if (AUTO_DELETE_ADDRESSES_DELAY.equalsIgnoreCase(name)) { + long autoDeleteAddressesDelay = XMLUtil.parseLong(child); + Validators.GE_ZERO.validate(AUTO_DELETE_ADDRESSES_DELAY, autoDeleteAddressesDelay); + addressSettings.setAutoDeleteAddressesDelay(autoDeleteAddressesDelay); } else if (CONFIG_DELETE_ADDRESSES.equalsIgnoreCase(name)) { String value = getTrimmedTextContent(child); Validators.DELETION_POLICY_TYPE.validate(CONFIG_DELETE_ADDRESSES, value); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index b77e341..fb1e96c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -157,6 +157,8 @@ public interface PostOffice extends ActiveMQComponent { // we can't start expiry scanner until the system is load otherwise we may get weird races - https://issues.jboss.org/browse/HORNETQ-1142 void startExpiryScanner(); + void startAddressQueueScanner(); + boolean isAddressBound(SimpleString address) throws Exception; Set<SimpleString> getAddresses(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 35995eb..e573f85 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -113,11 +113,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding private final ManagementService managementService; - private Reaper reaperRunnable; + private ExpiryReaper expiryReaperRunnable; - private final long reaperPeriod; + private final long expiryReaperPeriod; - private final int reaperPriority; + private AddressQueueReaper addressQueueReaperRunnable; + + private final long addressQueueReaperPeriod; private final ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = new ConcurrentHashMap<>(); @@ -140,8 +142,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding final PagingManager pagingManager, final QueueFactory bindableFactory, final ManagementService managementService, - final long reaperPeriod, - final int reaperPriority, + final long expiryReaperPeriod, + final long addressQueueReaperPeriod, final WildcardConfiguration wildcardConfiguration, final int idCacheSize, final boolean persistIDCache, @@ -154,9 +156,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding this.pagingManager = pagingManager; - this.reaperPeriod = reaperPeriod; + this.expiryReaperPeriod = expiryReaperPeriod; - this.reaperPriority = reaperPriority; + this.addressQueueReaperPeriod = addressQueueReaperPeriod; if (wildcardConfiguration.isRoutingEnabled()) { addressManager = new WildcardAddressManager(this, wildcardConfiguration, storageManager); @@ -197,8 +199,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding managementService.removeNotificationListener(this); - if (reaperRunnable != null) - reaperRunnable.stop(); + if (expiryReaperRunnable != null) + expiryReaperRunnable.stop(); + + if (addressQueueReaperRunnable != null) + addressQueueReaperRunnable.stop(); addressManager.clear(); @@ -713,6 +718,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding managementService.unregisterDivert(uniqueName, binding.getAddress()); } + AddressInfo addressInfo = getAddressInfo(binding.getAddress()); + if (addressInfo != null) { + addressInfo.setBindingRemovedTimestamp(System.currentTimeMillis()); + } + if (binding.getType() != BindingType.DIVERT) { TypedProperties props = new TypedProperties(); @@ -1480,12 +1490,23 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding */ @Override public synchronized void startExpiryScanner() { - if (reaperPeriod > 0) { - if (reaperRunnable != null) - reaperRunnable.stop(); - reaperRunnable = new Reaper(server.getScheduledPool(), server.getExecutorFactory().getExecutor(), reaperPeriod, TimeUnit.MILLISECONDS, false); + if (expiryReaperPeriod > 0) { + if (expiryReaperRunnable != null) + expiryReaperRunnable.stop(); + expiryReaperRunnable = new ExpiryReaper(server.getScheduledPool(), server.getExecutorFactory().getExecutor(), expiryReaperPeriod, TimeUnit.MILLISECONDS, false); - reaperRunnable.start(); + expiryReaperRunnable.start(); + } + } + + @Override + public synchronized void startAddressQueueScanner() { + if (addressQueueReaperPeriod > 0) { + if (addressQueueReaperRunnable != null) + addressQueueReaperRunnable.stop(); + addressQueueReaperRunnable = new AddressQueueReaper(server.getScheduledPool(), server.getExecutorFactory().getExecutor(), addressQueueReaperPeriod, TimeUnit.MILLISECONDS, false); + + addressQueueReaperRunnable.start(); } } @@ -1504,13 +1525,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return message; } - private final class Reaper extends ActiveMQScheduledComponent { + private final class ExpiryReaper extends ActiveMQScheduledComponent { - Reaper(ScheduledExecutorService scheduledExecutorService, - Executor executor, - long checkPeriod, - TimeUnit timeUnit, - boolean onDemand) { + ExpiryReaper(ScheduledExecutorService scheduledExecutorService, + Executor executor, + long checkPeriod, + TimeUnit timeUnit, + boolean onDemand) { super(scheduledExecutorService, executor, checkPeriod, timeUnit, onDemand); } @@ -1540,6 +1561,75 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } + private final class AddressQueueReaper extends ActiveMQScheduledComponent { + + AddressQueueReaper(ScheduledExecutorService scheduledExecutorService, + Executor executor, + long checkPeriod, + TimeUnit timeUnit, + boolean onDemand) { + super(scheduledExecutorService, executor, checkPeriod, timeUnit, onDemand); + } + + @Override + public void run() { + Map<SimpleString, Binding> nameMap = addressManager.getBindings(); + + List<Queue> queues = new ArrayList<>(); + + for (Binding binding : nameMap.values()) { + if (binding.getType() == BindingType.LOCAL_QUEUE) { + Queue queue = (Queue) binding.getBindable(); + + queues.add(queue); + } + } + + for (Queue queue : queues) { + int consumerCount = queue.getConsumerCount(); + long messageCount = queue.getMessageCount(); + boolean autoCreated = queue.isAutoCreated(); + long consumerRemovedTimestamp = queue.getConsumerRemovedTimestamp(); + + if (!queue.isInternalQueue() && autoCreated && messageCount == 0 && consumerCount == 0 && consumerRemovedTimestamp != -1) { + SimpleString queueName = queue.getName(); + AddressSettings settings = addressSettingsRepository.getMatch(queue.getAddress().toString()); + if (settings.isAutoDeleteQueues() && (System.currentTimeMillis() - consumerRemovedTimestamp >= settings.getAutoDeleteQueuesDelay())) { + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { + ActiveMQServerLogger.LOGGER.info("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues()); + } + + try { + server.destroyQueue(queueName, null, true, false, settings.isAutoDeleteAddresses(), true); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName); + } + } + } + } + + Set<SimpleString> addresses = addressManager.getAddresses(); + + for (SimpleString address : addresses) { + AddressInfo addressInfo = getAddressInfo(address); + AddressSettings settings = addressSettingsRepository.getMatch(address.toString()); + + try { + if (addressInfo != null && !isAddressBound(address) && addressInfo.getBindingRemovedTimestamp() != -1 && (System.currentTimeMillis() - addressInfo.getBindingRemovedTimestamp() >= settings.getAutoDeleteAddressesDelay())) { + + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { + ActiveMQServerLogger.LOGGER.info("deleting auto-created address \"" + address + ".\""); + } + + server.removeAddressInfo(address, null); + } + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, address); + } + } + } + } + public static final class AddOperation implements TransactionOperation { private final List<MessageReference> refs; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index c797710..bc86af7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -140,7 +140,7 @@ public interface ActiveMQMessageBundle { ActiveMQIllegalStateException alreadyHaveReplicationServer(); @Message(id = 229025, value = "Cannot delete queue {0} on binding {1} - it has consumers = {2}", format = Message.Format.MESSAGE_FORMAT) - ActiveMQIllegalStateException cannotDeleteQueue(SimpleString name, SimpleString queueName, String s); + ActiveMQIllegalStateException cannotDeleteQueueWithConsumers(SimpleString name, SimpleString queueName, String s); @Message(id = 229026, value = "Backup Server was not yet in sync with live") ActiveMQIllegalStateException backupServerNotInSync(); @@ -438,4 +438,7 @@ public interface ActiveMQMessageBundle { @Message(id = 229214, value = "{0} must be a valid percentage value between 0 and 100 or -1 (actual value: {1})", format = Message.Format.MESSAGE_FORMAT) IllegalArgumentException notPercentOrMinusOne(String name, Number val); + + @Message(id = 229215, value = "Cannot delete queue {0} on binding {1} - it has {2} messages", format = Message.Format.MESSAGE_FORMAT) + ActiveMQIllegalStateException cannotDeleteQueueWithMessages(SimpleString name, SimpleString queueName, long messageCount); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index c50315c..1791f1c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -173,12 +173,6 @@ public interface ActiveMQServer extends ServiceComponent { */ ActiveMQServerControlImpl getActiveMQServerControl(); - void destroyQueue(SimpleString queueName, - SecurityAuth session, - boolean checkConsumerCount, - boolean removeConsumers, - boolean autoDeleteAddress) throws Exception; - void registerActivateCallback(ActivateCallback callback); void unregisterActivateCallback(ActivateCallback callback); @@ -477,6 +471,19 @@ public interface ActiveMQServer extends ServiceComponent { boolean checkConsumerCount, boolean removeConsumers) throws Exception; + void destroyQueue(SimpleString queueName, + SecurityAuth session, + boolean checkConsumerCount, + boolean removeConsumers, + boolean autoDeleteAddress) throws Exception; + + void destroyQueue(SimpleString queueName, + SecurityAuth session, + boolean checkConsumerCount, + boolean removeConsumers, + boolean autoDeleteAddress, + boolean checkMessageCount) throws Exception; + String destroyConnectionWithSessionMetadata(String metaKey, String metaValue) throws Exception; ScheduledExecutorService getScheduledPool(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 2117508..cad835a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1875,6 +1875,10 @@ public interface ActiveMQServerLogger extends BasicLogger { @Message(id = 224069, value = "Change detected in broker configuration file, but reload failed", format = Message.Format.MESSAGE_FORMAT) void configurationReloadFailed(@Cause Throwable t); + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 224070, value = "Failed to remove auto-created address {0}", format = Message.Format.MESSAGE_FORMAT) + void errorRemovingAutoCreatedAddress(@Cause Exception e, SimpleString addressName); + @LogMessage(level = Logger.Level.WARN) @Message(id = 224072, value = "Message Counter Sample Period too short: {0}", format = Message.Format.MESSAGE_FORMAT) void invalidMessageCounterPeriod(long value); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index f2fd8f9..8a120ea 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -110,6 +110,8 @@ public interface Queue extends Bindable,CriticalComponent { int getConsumerCount(); + long getConsumerRemovedTimestamp(); + /** * This will set a reference counter for every consumer present on the queue. * The ReferenceCounter will know what to do when the counter became zeroed. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 01553d2..f14c1d2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1934,6 +1934,16 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean checkConsumerCount, final boolean removeConsumers, final boolean autoDeleteAddress) throws Exception { + destroyQueue(queueName, session, checkConsumerCount, removeConsumers, autoDeleteAddress, false); + } + + @Override + public void destroyQueue(final SimpleString queueName, + final SecurityAuth session, + final boolean checkConsumerCount, + final boolean removeConsumers, + final boolean autoDeleteAddress, + final boolean checkMessageCount) throws Exception { if (postOffice == null) { return; } @@ -1955,11 +1965,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { Queue queue = (Queue) binding.getBindable(); - // This check is only valid if checkConsumerCount == true - if (checkConsumerCount && queue.getConsumerCount() != 0) { - throw ActiveMQMessageBundle.BUNDLE.cannotDeleteQueue(queue.getName(), queueName, binding.getClass().getName()); - } - if (session != null) { if (queue.isDurable()) { @@ -1970,6 +1975,17 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } + // This check is only valid if checkConsumerCount == true + if (checkConsumerCount && queue.getConsumerCount() != 0) { + throw ActiveMQMessageBundle.BUNDLE.cannotDeleteQueueWithConsumers(queue.getName(), queueName, binding.getClass().getName()); + } + + // This check is only valid if checkMessageCount == true + long messageCount = queue.getMessageCount(); + if (checkMessageCount && messageCount != 0) { + throw ActiveMQMessageBundle.BUNDLE.cannotDeleteQueueWithMessages(queue.getName(), queueName, messageCount); + } + queue.deleteQueue(removeConsumers); if (hasBrokerQueuePlugins()) { @@ -1978,7 +1994,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } AddressInfo addressInfo = getAddressInfo(address); - if (autoDeleteAddress && postOffice != null && addressInfo != null && addressInfo.isAutoCreated()) { + if (autoDeleteAddress && postOffice != null && addressInfo != null && addressInfo.isAutoCreated() && !isAddressBound(address.toString()) && addressSettingsRepository.getMatch(address.toString()).getAutoDeleteAddressesDelay() == 0) { try { removeAddressInfo(address, session); } catch (ActiveMQDeleteAddressException e) { @@ -2539,7 +2555,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { pagingManager = createPagingManager(); resourceManager = new ResourceManagerImpl((int) (configuration.getTransactionTimeout() / 1000), configuration.getTransactionTimeoutScanPeriod(), scheduledPool); - postOffice = new PostOfficeImpl(this, storageManager, pagingManager, queueFactory, managementService, configuration.getMessageExpiryScanPeriod(), configuration.getMessageExpiryThreadPriority(), configuration.getWildcardConfiguration(), configuration.getIDCacheSize(), configuration.isPersistIDCache(), addressSettingsRepository); + postOffice = new PostOfficeImpl(this, storageManager, pagingManager, queueFactory, managementService, configuration.getMessageExpiryScanPeriod(), configuration.getAddressQueueScanPeriod(), configuration.getWildcardConfiguration(), configuration.getIDCacheSize(), configuration.isPersistIDCache(), addressSettingsRepository); // This can't be created until node id is set clusterManager = new ClusterManager(executorFactory, this, postOffice, scheduledPool, managementService, configuration, nodeManager, haPolicy.isBackup()); @@ -2680,6 +2696,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { // We can only do this after everything is started otherwise we may get nasty races with expired messages postOffice.startExpiryScanner(); + + postOffice.startAddressQueueScanner(); } if (configuration.getMaxDiskUsage() != -1) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java index db2c67a..ef55896 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java @@ -46,6 +46,8 @@ public class AddressInfo { private static final AtomicLongFieldUpdater<AddressInfo> unRoutedMessageCountUpdater = AtomicLongFieldUpdater.newUpdater(AddressInfo.class, "unRoutedMessageCount"); + private long bindingRemovedTimestamp = -1; + public AddressInfo(SimpleString name) { this(name, EnumSet.noneOf(RoutingType.class)); } @@ -121,6 +123,14 @@ public class AddressInfo { return firstSeen; } + public long getBindingRemovedTimestamp() { + return bindingRemovedTimestamp; + } + + public void setBindingRemovedTimestamp(long bindingRemovedTimestamp) { + this.bindingRemovedTimestamp = bindingRemovedTimestamp; + } + @Override public String toString() { StringBuffer buff = new StringBuffer(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index bfa925a..dfd070f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -119,6 +119,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private static final Logger logger = Logger.getLogger(QueueImpl.class); private static final AtomicIntegerFieldUpdater dispatchingUpdater = AtomicIntegerFieldUpdater.newUpdater(QueueImpl.class, "dispatching"); private static final AtomicLongFieldUpdater dispatchStartTimeUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, "dispatchStartTime"); + private static final AtomicLongFieldUpdater consumerRemovedTimestampUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, "consumerRemovedTimestamp"); private static final AtomicReferenceFieldUpdater<QueueImpl, Filter> filterUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueImpl.class, Filter.class, "filter"); public static final int REDISTRIBUTOR_BATCH_SIZE = 100; @@ -233,6 +234,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final AtomicInteger consumersCount = new AtomicInteger(); + private volatile long consumerRemovedTimestamp = -1; + private final Set<Consumer> consumerSet = new HashSet<>(); private final Map<SimpleString, Consumer> groups = new HashMap<>(); @@ -1082,6 +1085,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (consumerSet.remove(consumer)) { int currentConsumerCount = consumersCount.decrementAndGet(); + consumerRemovedTimestampUpdater.set(this, System.currentTimeMillis()); boolean stopped = dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(true), BooleanUtil.toInt(currentConsumerCount != 0)); if (stopped) { dispatchStartTimeUpdater.set(this, -1); @@ -1184,6 +1188,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } @Override + public long getConsumerRemovedTimestamp() { + return consumerRemovedTimestampUpdater.get(this); + } + + @Override public synchronized Set<Consumer> getConsumers() { return new HashSet<>(consumerSet); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java index 23b0e5d..dd34bdc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java @@ -35,22 +35,22 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag //the queue may already have been deleted and this is a result of that if (queue == null) { if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQServerLogger.LOGGER.debug("pno queue to delete \"" + queueName + ".\""); + ActiveMQServerLogger.LOGGER.debug("no queue to delete \"" + queueName + ".\""); } return; } - SimpleString address = queue.getAddress(); - AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString()); + + AddressSettings settings = server.getAddressSettingsRepository().getMatch(queue.getAddress().toString()); long consumerCount = queue.getConsumerCount(); long messageCount = queue.getMessageCount(); - if (queue.isAutoCreated() && settings.isAutoDeleteQueues() && queue.getMessageCount() == 0 && queue.getConsumerCount() == 0) { + if (queue.isAutoCreated() && settings.isAutoDeleteQueues() && queue.getMessageCount() == 0 && queue.getConsumerCount() == 0 && settings.getAutoDeleteQueuesDelay() == 0) { if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created " : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues()); } try { - server.destroyQueue(queueName, null, true, false); + server.destroyQueue(queueName, null, true, false, settings.isAutoDeleteAddresses(), true); } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index 70397ca..6d59ae4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -72,12 +72,16 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable public static final boolean DEFAULT_AUTO_DELETE_QUEUES = true; + public static final long DEFAULT_AUTO_DELETE_QUEUES_DELAY = 0; + public static final DeletionPolicy DEFAULT_CONFIG_DELETE_QUEUES = DeletionPolicy.OFF; public static final boolean DEFAULT_AUTO_CREATE_ADDRESSES = true; public static final boolean DEFAULT_AUTO_DELETE_ADDRESSES = true; + public static final long DEFAULT_AUTO_DELETE_ADDRESSES_DELAY = 0; + public static final DeletionPolicy DEFAULT_CONFIG_DELETE_ADDRESSES = DeletionPolicy.OFF; public static final long DEFAULT_REDISTRIBUTION_DELAY = -1; @@ -159,12 +163,16 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable private Boolean autoDeleteQueues = null; + private Long autoDeleteQueuesDelay = null; + private DeletionPolicy configDeleteQueues = null; private Boolean autoCreateAddresses = null; private Boolean autoDeleteAddresses = null; + private Long autoDeleteAddressesDelay = null; + private DeletionPolicy configDeleteAddresses = null; private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE; @@ -218,9 +226,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable this.autoDeleteJmsTopics = other.autoDeleteJmsTopics; this.autoCreateQueues = other.autoCreateQueues; this.autoDeleteQueues = other.autoDeleteQueues; + this.autoDeleteQueuesDelay = other.autoDeleteQueuesDelay; this.configDeleteQueues = other.configDeleteQueues; this.autoCreateAddresses = other.autoCreateAddresses; this.autoDeleteAddresses = other.autoDeleteAddresses; + this.autoDeleteAddressesDelay = other.autoDeleteAddressesDelay; this.configDeleteAddresses = other.configDeleteAddresses; this.managementBrowsePageSize = other.managementBrowsePageSize; this.queuePrefetch = other.queuePrefetch; @@ -299,6 +309,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable return this; } + public long getAutoDeleteQueuesDelay() { + return autoDeleteQueuesDelay != null ? autoDeleteQueuesDelay : AddressSettings.DEFAULT_AUTO_DELETE_QUEUES_DELAY; + } + + public AddressSettings setAutoDeleteQueuesDelay(final long autoDeleteQueuesDelay) { + this.autoDeleteQueuesDelay = autoDeleteQueuesDelay; + return this; + } + public DeletionPolicy getConfigDeleteQueues() { return configDeleteQueues != null ? configDeleteQueues : AddressSettings.DEFAULT_CONFIG_DELETE_QUEUES; } @@ -326,6 +345,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable return this; } + public long getAutoDeleteAddressesDelay() { + return autoDeleteAddressesDelay != null ? autoDeleteAddressesDelay : AddressSettings.DEFAULT_AUTO_DELETE_ADDRESSES_DELAY; + } + + public AddressSettings setAutoDeleteAddressesDelay(final long autoDeleteAddressesDelay) { + this.autoDeleteAddressesDelay = autoDeleteAddressesDelay; + return this; + } + public DeletionPolicy getConfigDeleteAddresses() { return configDeleteAddresses != null ? configDeleteAddresses : AddressSettings.DEFAULT_CONFIG_DELETE_ADDRESSES; } @@ -701,6 +729,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable if (autoDeleteQueues == null) { autoDeleteQueues = merged.autoDeleteQueues; } + if (autoDeleteQueuesDelay == null) { + autoDeleteQueuesDelay = merged.autoDeleteQueuesDelay; + } if (configDeleteQueues == null) { configDeleteQueues = merged.configDeleteQueues; } @@ -710,6 +741,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable if (autoDeleteAddresses == null) { autoDeleteAddresses = merged.autoDeleteAddresses; } + if (autoDeleteAddressesDelay == null) { + autoDeleteAddressesDelay = merged.autoDeleteAddressesDelay; + } if (configDeleteAddresses == null) { configDeleteAddresses = merged.configDeleteAddresses; } @@ -886,6 +920,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable if (buffer.readableBytes() > 0) { defaultNonDestructive = BufferHelper.readNullableBoolean(buffer); } + + if (buffer.readableBytes() > 0) { + autoDeleteQueuesDelay = BufferHelper.readNullableLong(buffer); + } + + if (buffer.readableBytes() > 0) { + autoDeleteAddressesDelay = BufferHelper.readNullableLong(buffer); + } } @Override @@ -929,7 +971,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable BufferHelper.sizeOfNullableLong(defaultDelayBeforeDispatch) + BufferHelper.sizeOfNullableInteger(defaultConsumerWindowSize) + SimpleString.sizeofNullableString(defaultLastValueKey) + - BufferHelper.sizeOfNullableBoolean(defaultNonDestructive); + BufferHelper.sizeOfNullableBoolean(defaultNonDestructive) + + BufferHelper.sizeOfNullableLong(autoDeleteQueuesDelay) + + BufferHelper.sizeOfNullableLong(autoDeleteAddressesDelay); } @Override @@ -1016,6 +1060,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable BufferHelper.writeNullableBoolean(buffer, defaultNonDestructive); + BufferHelper.writeNullableLong(buffer, autoDeleteQueuesDelay); + + BufferHelper.writeNullableLong(buffer, autoDeleteAddressesDelay); + } /* (non-Javadoc) @@ -1053,9 +1101,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable result = prime * result + ((autoDeleteJmsTopics == null) ? 0 : autoDeleteJmsTopics.hashCode()); result = prime * result + ((autoCreateQueues == null) ? 0 : autoCreateQueues.hashCode()); result = prime * result + ((autoDeleteQueues == null) ? 0 : autoDeleteQueues.hashCode()); + result = prime * result + ((autoDeleteQueuesDelay == null) ? 0 : autoDeleteQueuesDelay.hashCode()); result = prime * result + ((configDeleteQueues == null) ? 0 : configDeleteQueues.hashCode()); result = prime * result + ((autoCreateAddresses == null) ? 0 : autoCreateAddresses.hashCode()); result = prime * result + ((autoDeleteAddresses == null) ? 0 : autoDeleteAddresses.hashCode()); + result = prime * result + ((autoDeleteAddressesDelay == null) ? 0 : autoDeleteAddressesDelay.hashCode()); result = prime * result + ((configDeleteAddresses == null) ? 0 : configDeleteAddresses.hashCode()); result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode()); result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode()); @@ -1222,6 +1272,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable return false; } else if (!autoDeleteQueues.equals(other.autoDeleteQueues)) return false; + if (autoDeleteQueuesDelay == null) { + if (other.autoDeleteQueuesDelay != null) + return false; + } else if (!autoDeleteQueuesDelay.equals(other.autoDeleteQueuesDelay)) + return false; if (configDeleteQueues == null) { if (other.configDeleteQueues != null) return false; @@ -1237,6 +1292,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable return false; } else if (!autoDeleteAddresses.equals(other.autoDeleteAddresses)) return false; + if (autoDeleteAddressesDelay == null) { + if (other.autoDeleteAddressesDelay != null) + return false; + } else if (!autoDeleteAddressesDelay.equals(other.autoDeleteAddressesDelay)) + return false; if (configDeleteAddresses == null) { if (other.configDeleteAddresses != null) return false; @@ -1365,12 +1425,16 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable autoCreateQueues + ", autoDeleteQueues=" + autoDeleteQueues + + ", autoDeleteQueuesDelay=" + + autoDeleteQueuesDelay + ", configDeleteQueues=" + configDeleteQueues + ", autoCreateAddresses=" + autoCreateAddresses + ", autoDeleteAddresses=" + autoDeleteAddresses + + ", autoDeleteAddressesDelay=" + + autoDeleteAddressesDelay + ", configDeleteAddresses=" + configDeleteAddresses + ", managementBrowsePageSize=" + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/main/resources/schema/artemis-configuration.xsd ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index e67dc9a..cd0d9c9 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -336,6 +336,14 @@ </xsd:annotation> </xsd:element> + <xsd:element name="address-queue-scan-period" type="xsd:long" default="30000" maxOccurs="1" minOccurs="0"> + <xsd:annotation> + <xsd:documentation> + how often (in ms) to scan for addresses and queues that need to be deleted + </xsd:documentation> + </xsd:annotation> + </xsd:element> + <xsd:element name="id-cache-size" type="xsd:int" default="20000" maxOccurs="1" minOccurs="0"> <xsd:annotation> <xsd:documentation> @@ -2953,6 +2961,15 @@ </xsd:annotation> </xsd:element> + <xsd:element name="auto-delete-queues-delay" type="xsd:long" default="0" maxOccurs="1" minOccurs="0"> + <xsd:annotation> + <xsd:documentation> + how long to wait (in milliseconds) before deleting auto-created queues after the queue has 0 + consumers and 0 messages + </xsd:documentation> + </xsd:annotation> + </xsd:element> + <xsd:element name="config-delete-queues" default="OFF" maxOccurs="1" minOccurs="0"> <xsd:annotation> <xsd:documentation> @@ -2974,7 +2991,6 @@ <xsd:documentation> whether or not to automatically create addresses when a client sends a message to or attempts to consume a message from a queue mapped to an address that doesn't exist - a queue </xsd:documentation> </xsd:annotation> </xsd:element> @@ -2987,6 +3003,15 @@ </xsd:annotation> </xsd:element> + <xsd:element name="auto-delete-addresses-delay" type="xsd:long" default="0" maxOccurs="1" minOccurs="0"> + <xsd:annotation> + <xsd:documentation> + how long to wait (in milliseconds) before deleting auto-created addresses after they no longer + have any queues + </xsd:documentation> + </xsd:annotation> + </xsd:element> + <xsd:element name="config-delete-addresses" default="OFF" maxOccurs="1" minOccurs="0"> <xsd:annotation> <xsd:documentation> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java index a07c797..388d2ca 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java @@ -126,6 +126,8 @@ public class DefaultsFileConfigurationTest extends ConfigurationImplTest { Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultMessageExpiryScanPeriod(), conf.getMessageExpiryScanPeriod()); + Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultAddressQueueScanPeriod(), conf.getAddressQueueScanPeriod()); + Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultMessageExpiryThreadPriority(), conf.getMessageExpiryThreadPriority()); Assert.assertTrue(conf.getHAPolicyConfiguration() instanceof LiveOnlyPolicyConfiguration); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 996b9fe..a60eb2d 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -90,6 +90,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { Assert.assertEquals(56789, conf.getTransactionTimeoutScanPeriod()); Assert.assertEquals(10111213, conf.getMessageExpiryScanPeriod()); Assert.assertEquals(8, conf.getMessageExpiryThreadPriority()); + Assert.assertEquals(25000, conf.getAddressQueueScanPeriod()); Assert.assertEquals(127, conf.getIDCacheSize()); Assert.assertEquals(true, conf.isPersistIDCache()); Assert.assertEquals(true, conf.isPersistDeliveryCountBeforeDelivery()); @@ -324,6 +325,8 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals(true, conf.getAddressesSettings().get("a1").isAutoDeleteJmsQueues()); assertEquals(true, conf.getAddressesSettings().get("a1").isAutoCreateJmsTopics()); assertEquals(true, conf.getAddressesSettings().get("a1").isAutoDeleteJmsTopics()); + assertEquals(0, conf.getAddressesSettings().get("a1").getAutoDeleteQueuesDelay()); + assertEquals(0, conf.getAddressesSettings().get("a1").getAutoDeleteAddressesDelay()); assertEquals(false, conf.getAddressesSettings().get("a1").isDefaultPurgeOnNoConsumers()); assertEquals(5, conf.getAddressesSettings().get("a1").getDefaultMaxConsumers()); assertEquals(RoutingType.ANYCAST, conf.getAddressesSettings().get("a1").getDefaultQueueRoutingType()); @@ -343,6 +346,8 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsQueues()); assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsTopics()); assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsTopics()); + assertEquals(500, conf.getAddressesSettings().get("a2").getAutoDeleteQueuesDelay()); + assertEquals(1000, conf.getAddressesSettings().get("a2").getAutoDeleteAddressesDelay()); assertEquals(true, conf.getAddressesSettings().get("a2").isDefaultPurgeOnNoConsumers()); assertEquals(15, conf.getAddressesSettings().get("a2").getDefaultMaxConsumers()); assertEquals(RoutingType.MULTICAST, conf.getAddressesSettings().get("a2").getDefaultQueueRoutingType()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index fed8c13..0ef7804 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -962,6 +962,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override + public long getConsumerRemovedTimestamp() { + return 0; + } + + @Override public void setConsumersRefCount(ReferenceCounter referenceCounter) { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/test/resources/ConfigurationTest-full-config.xml ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 186a712..7603a2a 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -48,6 +48,7 @@ <transaction-timeout-scan-period>56789</transaction-timeout-scan-period> <message-expiry-scan-period>10111213</message-expiry-scan-period> <message-expiry-thread-priority>8</message-expiry-thread-priority> + <address-queue-scan-period>25000</address-queue-scan-period> <id-cache-size>127</id-cache-size> <journal-datasync>false</journal-datasync> <persist-id-cache>true</persist-id-cache> @@ -331,8 +332,10 @@ <auto-delete-jms-topics>false</auto-delete-jms-topics> <auto-create-queues>false</auto-create-queues> <auto-delete-queues>false</auto-delete-queues> + <auto-delete-queues-delay>500</auto-delete-queues-delay> <auto-create-addresses>false</auto-create-addresses> <auto-delete-addresses>false</auto-delete-addresses> + <auto-delete-addresses-delay>1000</auto-delete-addresses-delay> <default-purge-on-no-consumers>true</default-purge-on-no-consumers> <default-max-consumers>15</default-max-consumers> <default-queue-routing-type>MULTICAST</default-queue-routing-type> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml index 443958e..0d5784d 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml @@ -56,8 +56,10 @@ <auto-delete-jms-topics>false</auto-delete-jms-topics> <auto-create-queues>false</auto-create-queues> <auto-delete-queues>false</auto-delete-queues> + <auto-delete-queues-delay>500</auto-delete-queues-delay> <auto-create-addresses>false</auto-create-addresses> <auto-delete-addresses>false</auto-delete-addresses> + <auto-delete-addresses-delay>1000</auto-delete-addresses-delay> <default-purge-on-no-consumers>true</default-purge-on-no-consumers> <default-max-consumers>15</default-max-consumers> <default-queue-routing-type>MULTICAST</default-queue-routing-type> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml index 526d005..d560379 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml @@ -49,6 +49,7 @@ <transaction-timeout-scan-period>56789</transaction-timeout-scan-period> <message-expiry-scan-period>10111213</message-expiry-scan-period> <message-expiry-thread-priority>8</message-expiry-thread-priority> + <address-queue-scan-period>25000</address-queue-scan-period> <id-cache-size>127</id-cache-size> <journal-datasync>false</journal-datasync> <persist-id-cache>true</persist-id-cache> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/docs/user-manual/en/address-model.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/address-model.md b/docs/user-manual/en/address-model.md index a23c5fc..801e332 100644 --- a/docs/user-manual/en/address-model.md +++ b/docs/user-manual/en/address-model.md @@ -597,9 +597,11 @@ that would be found in the `broker.xml` file. <auto-delete-jms-topics>true</auto-delete-jms-topics> <!-- deprecated! see auto-delete-addresses --> <auto-create-queues>true</auto-create-queues> <auto-delete-queues>true</auto-delete-queues> + <auto-delete-queues-delay>0</auto-delete-queues-delay> <config-delete-queues>OFF</config-delete-queues> <auto-create-addresses>true</auto-create-addresses> <auto-delete-addresses>true</auto-delete-addresses> + <auto-delete-addresses-delay>0</auto-delete-addresses-delay> <config-delete-addresses>OFF</config-delete-addresses> <management-browse-page-size>200</management-browse-page-size> <default-purge-on-no-consumers>false</default-purge-on-no-consumers> @@ -761,6 +763,12 @@ non-temporary, and non-transient. Default is `true`. auto-created queues when they have both 0 consumers and 0 messages. Default is `true`. +`auto-delete-queues-delay`. How long to wait (in milliseconds) before deleting +auto-created queues after the queue has 0 consumers and 0 messages. Default is +`0` (delete immediately). The broker's `address-queue-scan-period` controls +how often (in milliseconds) queues are scanned for potential deletion. Use `-1` +to disable scanning. The default scan value is `30000`. + `config-delete-queues`. How the broker should handle queues deleted on config reload, by delete policy: `OFF` or `FORCE`. Default is `OFF`. Read more about [configuration reload](config-reload.md). @@ -774,6 +782,12 @@ Default is `true`. auto-created addresses once the address no longer has any queues. Default is `true`. +`auto-delete-addresses-delay`. How long to wait (in milliseconds) before +deleting auto-created addresses after they no longer have any queues. Default +is `0` (delete immediately). The broker's `address-queue-scan-period` controls +how often (in milliseconds) addresses are scanned for potential deletion. Use +`-1` to disable scanning. The default scan value is `30000`. + `config-delete-addresses`. How the broker should handle addresses deleted on config reload, by delete policy: `OFF` or `FORCE`. Default is `OFF`. Read more about [configuration reload](config-reload.md). http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/docs/user-manual/en/configuration-index.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md index 617ca40..22d3c7e 100644 --- a/docs/user-manual/en/configuration-index.md +++ b/docs/user-manual/en/configuration-index.md @@ -143,6 +143,7 @@ log-delegate-factory-class-name | **deprecated** the name of the factory class t [message-counter-sample-period](management.md#message-counters) | the sample period (in ms) to use for message counters. | 10000 [message-expiry-scan-period](message-expiry.md#configuring-the-expiry-reaper-thread) | how often (in ms) to scan for expired messages. | 30000 [message-expiry-thread-priority](message-expiry.md#configuring-the-expiry-reaper-thread)| the priority of the thread expiring messages. | 3 +[address-queue-scan-period](address-model.md#configuring-addresses-and-queues-via-address-settings) | how often (in ms) to scan for addresses & queues that should be removed. | 30000 name | node name; used in topology notifications if set. | n/a [password-codec](masking-passwords.md) | the name of the class (and optional configuration properties) used to decode masked passwords. Only valid when `mask-password` is `true`. | n/a [page-max-concurrent-io](paging.md) | The max number of concurrent reads allowed on paging. | 5 @@ -216,10 +217,12 @@ Name | Description | Default [auto-create-jms-topics](address-model.md#configuring-addresses-and-queues-via-address-settings)| **deprecated** Create JMS topics automatically; see `auto-create-queues` & `auto-create-addresses` | `true` [auto-delete-jms-topics](address-model.md#configuring-addresses-and-queues-via-address-settings)| **deprecated** Delete JMS topics automatically; see `auto-create-queues` & `auto-create-addresses` | `true` [auto-create-queues](address-model.md#configuring-addresses-and-queues-via-address-settings) | Create queues automatically | `true` -[auto-delete-queues](address-model.md#configuring-addresses-and-queues-via-address-settings) | Delete queues automatically | `true` +[auto-delete-queues](address-model.md#configuring-addresses-and-queues-via-address-settings) | Delete auto-created queues automatically | `true` +[auto-delete-queues-delay](address-model.md#configuring-addresses-and-queues-via-address-settings) | Delay for deleting auto-created queues | 0 [config-delete-queues](config-reload.md)| How to deal with queues deleted from XML at runtime| `OFF` [auto-create-addresses](address-model.md#configuring-addresses-and-queues-via-address-settings) | Create addresses automatically | `true` -[auto-delete-addresses](address-model.md#configuring-addresses-and-queues-via-address-settings) | Delete addresses automatically | `true` +[auto-delete-addresses](address-model.md#configuring-addresses-and-queues-via-address-settings) | Delete auto-created addresses automatically | `true` +[auto-delete-addresses-delay](address-model.md#configuring-addresses-and-queues-via-address-settings) | Delay for deleting auto-created addresses | 0 [config-delete-addresses](config-reload.md) | How to deal with addresses deleted from XML at runtime | `OFF` [management-browse-page-size]() | Number of messages a management resource can browse| 200 [default-purge-on-no-consumers](address-model.md#non-durable-subscription-queue) | `purge-on-no-consumers` value if none is set on the queue | `false` http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AddressQueueDeleteDelayTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AddressQueueDeleteDelayTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AddressQueueDeleteDelayTest.java new file mode 100644 index 0000000..804be57 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AddressQueueDeleteDelayTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.server; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.junit.Wait; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.Before; +import org.junit.Test; + +public class AddressQueueDeleteDelayTest extends ActiveMQTestBase { + + private ActiveMQServer server; + + private ClientSession session; + + private ClientSessionFactory sf; + + private ServerLocator locator; + + @Test + public void testAddressQueueDeleteDelay() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + final long deleteQueuesDelay = 3000; + final long deleteAddressesDelay = 5000; + final long fudge = 200; + + AddressSettings addressSettings = new AddressSettings().setAutoDeleteQueuesDelay(deleteQueuesDelay).setAutoDeleteAddressesDelay(deleteAddressesDelay); + server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings); + + session.createQueue(address, RoutingType.MULTICAST, queue, null, true, true); + + assertTrue(Wait.waitFor(() -> server.locateQueue(queue) != null, 2000, 100)); + + ClientProducer producer = session.createProducer(address); + ClientMessage message = session.createMessage(true); + producer.send(message); + ClientConsumer consumer = session.createConsumer(queue); + session.start(); + ClientMessage m = consumer.receive(500); + assertNotNull(m); + m.acknowledge(); + session.commit(); + consumer.close(); + long start = System.currentTimeMillis(); + + assertTrue(Wait.waitFor(() -> server.locateQueue(queue) == null, deleteQueuesDelay + fudge, 50)); + long elapsedTime = System.currentTimeMillis() - start; + IntegrationTestLogger.LOGGER.info("Elapsed time to delete queue: " + elapsedTime); + assertTrue(elapsedTime >= (deleteQueuesDelay - fudge)); + start = System.currentTimeMillis(); + assertTrue(Wait.waitFor(() -> server.getAddressInfo(address) == null, deleteAddressesDelay + fudge, 50)); + elapsedTime = System.currentTimeMillis() - start; + IntegrationTestLogger.LOGGER.info("Elapsed time to delete address: " + elapsedTime); + assertTrue(elapsedTime >= (deleteAddressesDelay - fudge)); + } + + @Test + public void testAddressQueueDeleteDelayWithAdditionalAddressQueue() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + final long deleteQueuesDelay = 3000; + final long deleteAddressesDelay = 5000; + final long fudge = 200; + + AddressSettings addressSettings = new AddressSettings().setAutoDeleteQueuesDelay(deleteQueuesDelay).setAutoDeleteAddressesDelay(deleteAddressesDelay); + server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings); + + session.createQueue(address, RoutingType.MULTICAST, queue, null, true, true); + + assertTrue(Wait.waitFor(() -> server.locateQueue(queue) != null, 2000, 100)); + + ClientProducer producer = session.createProducer(address); + ClientMessage message = session.createMessage(true); + producer.send(message); + ClientConsumer consumer = session.createConsumer(queue); + session.start(); + ClientMessage m = consumer.receive(500); + assertNotNull(m); + m.acknowledge(); + session.commit(); + consumer.close(); + + Thread.sleep(deleteQueuesDelay / 2); + consumer = session.createConsumer(queue); + Thread.sleep(deleteQueuesDelay / 2); + consumer.close(); + + long start = System.currentTimeMillis(); + assertTrue(Wait.waitFor(() -> server.locateQueue(queue) == null, deleteQueuesDelay + fudge, 50)); + assertTrue(System.currentTimeMillis() - start >= (deleteQueuesDelay - fudge)); + + Thread.sleep(deleteAddressesDelay / 2); + session.createQueue(address, RoutingType.MULTICAST, queue, null, true, true); + Thread.sleep(deleteAddressesDelay / 2); + session.deleteQueue(queue); + + start = System.currentTimeMillis(); + assertTrue(Wait.waitFor(() -> server.getAddressInfo(address) == null, deleteAddressesDelay + fudge, 50)); + assertTrue(System.currentTimeMillis() - start >= (deleteAddressesDelay - fudge)); + } + + @Test + public void testDefaultAddressQueueDeleteDelay() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + final long fudge = 200; + + session.createQueue(address, RoutingType.MULTICAST, queue, null, true, true); + + assertTrue(Wait.waitFor(() -> server.locateQueue(queue) != null, 2000, 100)); + + ClientProducer producer = session.createProducer(address); + ClientMessage message = session.createMessage(true); + producer.send(message); + ClientConsumer consumer = session.createConsumer(queue); + session.start(); + ClientMessage m = consumer.receive(500); + assertNotNull(m); + m.acknowledge(); + session.commit(); + consumer.close(); + assertTrue(Wait.waitFor(() -> server.locateQueue(queue) == null, fudge, 50)); + assertTrue(Wait.waitFor(() -> server.getAddressInfo(address) == null, fudge, 50)); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + server = createServer(false); + + server.getConfiguration().setAddressQueueScanPeriod(100); + + server.start(); + locator = createInVMNonHALocator(); + sf = createSessionFactory(locator); + session = addClientSession(sf.createSession(false, true, true)); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 66424af..518fdb8 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -409,6 +409,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override + public long getConsumerRemovedTimestamp() { + return 0; + } + + @Override public ReferenceCounter getConsumersRefCount() { return null; //To change body of implemented methods use File | Settings | File Templates. } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/97612c48/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index db2d8e6..4bfb7e6 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -182,6 +182,10 @@ public class FakePostOffice implements PostOffice { } @Override + public void startAddressQueueScanner() { + } + + @Override public boolean isAddressBound(SimpleString address) throws Exception { return false; }