Repository: qpid-broker-j Updated Branches: refs/heads/master e5be145b5 -> d91e06176
QPID-7998: [Broker-J][AMQP 1.0] By default, allow global shared subscriptions but discard their links on detach Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/d91e0617 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/d91e0617 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/d91e0617 Branch: refs/heads/master Commit: d91e061768eb59ae5ae0c5039f420dc6d403003a Parents: e5be145 Author: Lorenz Quack <lqu...@apache.org> Authored: Mon Nov 6 13:58:45 2017 +0000 Committer: Lorenz Quack <lqu...@apache.org> Committed: Mon Nov 6 15:28:19 2017 +0000 ---------------------------------------------------------------------- .../server/virtualhost/AbstractVirtualHost.java | 8 +++--- .../virtualhost/QueueManagingVirtualHost.java | 12 ++++----- .../v1_0/ExchangeSendingDestination.java | 8 ------ .../protocol/v1_0/SendingLinkEndpoint.java | 28 +++++++++++++++++--- .../server/protocol/v1_0/Session_1_0Test.java | 1 - .../subscription/SharedSubscriptionTest.java | 3 --- 6 files changed, 35 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d91e0617/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 9e2889e..e4ca989 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -274,7 +274,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte private Collection<VirtualHostLogger> _virtualHostLoggersToClose; private PreferenceStore _preferenceStore; private long _flowToDiskCheckPeriod; - private volatile boolean _isGlobalSharedDurableSubscriptionDisabled; + private volatile boolean _isDiscardGlobalSharedSubscriptionLinksOnDetach; public AbstractVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode) { @@ -597,7 +597,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte _fileSystemMaxUsagePercent = getContextValue(Integer.class, Broker.STORE_FILESYSTEM_MAX_USAGE_PERCENT); _flowToDiskCheckPeriod = getContextValue(Long.class, FLOW_TO_DISK_CHECK_PERIOD); - _isGlobalSharedDurableSubscriptionDisabled = getContextValue(Boolean.class, GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED); + _isDiscardGlobalSharedSubscriptionLinksOnDetach = getContextValue(Boolean.class, DISCARD_GLOBAL_SHARED_SUBSCRIPTION_LINKS_ON_DETACH); QpidServiceLoader serviceLoader = new QpidServiceLoader(); for(ConnectionValidator validator : serviceLoader.instancesOf(ConnectionValidator.class)) @@ -2188,9 +2188,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } @Override - public boolean isGlobalSharedDurableSubscriptionDisabled() + public boolean isDiscardGlobalSharedSubscriptionLinksOnDetach() { - return _isGlobalSharedDurableSubscriptionDisabled; + return _isDiscardGlobalSharedSubscriptionLinksOnDetach; } @Override http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d91e0617/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java index c4418c0..b30373a 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java +++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java @@ -96,12 +96,12 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>> @ManagedContextDefault(name = VIRTUALHOST_STATISTICS_REPORING_PERIOD) int DEFAULT_STATISTICS_REPORTING_PERIOD = 0; - - String GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED = "qpid.feature.disabled:globalSharedDurableSubscription"; + String DISCARD_GLOBAL_SHARED_SUBSCRIPTION_LINKS_ON_DETACH = "qpid.jms.discardGlobalSharedSubscriptionLinksOnDetach"; @SuppressWarnings("unused") - @ManagedContextDefault(name = GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED, - description = "Flag to disable global shared durable subscriptions.") - boolean DEFAULT_GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED = true; + @ManagedContextDefault(name = DISCARD_GLOBAL_SHARED_SUBSCRIPTION_LINKS_ON_DETACH, + description = "If true AMQP 1.0 links of global shared subscriptions are discarded when the" + + " link detaches. This is to avoid leaking links with the Qpid JMS client.") + boolean DEFAULT_DISCARD_GLOBAL_SHARED_SUBSCRIPTION_LINKS_ON_DETACH = true; @ManagedAttribute( defaultValue = "${" + VIRTUALHOST_STATISTICS_REPORING_PERIOD + "}", description = "Period (in seconds) of the statistic report.") int getStatisticsReportingPeriod(); @@ -148,7 +148,7 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>> long getFlowToDiskCheckPeriod(); @DerivedAttribute( description = "Indicates whether global shared durable subscriptions are disabled") - boolean isGlobalSharedDurableSubscriptionDisabled(); + boolean isDiscardGlobalSharedSubscriptionLinksOnDetach(); String VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE = "virtualhost.connectionThreadPool.size"; @SuppressWarnings("unused") http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d91e0617/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java index a4a1de3..8f84530 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java @@ -144,7 +144,6 @@ public class ExchangeSendingDestination extends StandardSendingDestination { boolean isDurable = source.getExpiryPolicy() == TerminusExpiryPolicy.NEVER; boolean isShared = hasCapability(source.getCapabilities(), SHARED_CAPABILITY); - boolean isGlobal = hasCapability(source.getCapabilities(), GLOBAL_CAPABILITY); QueueManagingVirtualHost virtualHost; if (exchange.getAddressSpace() instanceof QueueManagingVirtualHost) @@ -157,13 +156,6 @@ public class ExchangeSendingDestination extends StandardSendingDestination "Address space of unexpected type")); } - if (isDurable && isShared && isGlobal && virtualHost.isGlobalSharedDurableSubscriptionDisabled()) - { - throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, - "Support for global shared durable subscription is disabled.")); - } - - Queue<?> queue; final Map<String, Object> attributes = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d91e0617/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java index 98f68f8..9c783b8 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java @@ -797,14 +797,15 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target> Source source = getSource(); TerminusExpiryPolicy expiryPolicy = source.getExpiryPolicy(); + NamedAddressSpace addressSpace = getSession().getConnection().getAddressSpace(); + List<Symbol> sourceCapabilities = source.getCapabilities() == null ? Collections.emptyList() : Arrays.asList(source.getCapabilities()); + if (close || TerminusExpiryPolicy.LINK_DETACH.equals(expiryPolicy) || ((expiryPolicy == null || TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy)) && getSession().isClosing()) || (TerminusExpiryPolicy.CONNECTION_CLOSE.equals(expiryPolicy) && getSession().getConnection().isClosing())) { - Error closingError = null; - NamedAddressSpace addressSpace = getSession().getConnection().getAddressSpace(); if (getDestination() instanceof ExchangeSendingDestination && addressSpace instanceof QueueManagingVirtualHost) { @@ -813,7 +814,6 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target> { ((QueueManagingVirtualHost) addressSpace).removeSubscriptionQueue( ((ExchangeSendingDestination) getDestination()).getQueue().getName()); - List<Symbol> sourceCapabilities = source.getCapabilities() == null ? Collections.emptyList() : Arrays.asList(source.getCapabilities()); TerminusDurability sourceDurability = source.getDurable(); if (sourceDurability != null @@ -858,6 +858,28 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target> LOGGER.warn("Unexpected error on detaching endpoint {}: {}", getLinkName(), error); } } + else if (addressSpace instanceof QueueManagingVirtualHost + && ((QueueManagingVirtualHost) addressSpace).isDiscardGlobalSharedSubscriptionLinksOnDetach() + && sourceCapabilities.contains(Session_1_0.SHARED_CAPABILITY) + && sourceCapabilities.contains(Session_1_0.GLOBAL_CAPABILITY) + && sourceCapabilities.contains(ExchangeSendingDestination.TOPIC_CAPABILITY)) + { + // For JMS 2.0 global shared subscriptions we do not want to keep the links hanging around. + // However, we keep one link (ending with "|global") to perform a null-source lookup upon un-subscription. + if (!getLinkName().endsWith("|global")) + { + getLink().linkClosed(); + } + else + { + Pattern linkNamePattern = Pattern.compile("^" + Pattern.quote(getLinkName()) + "$"); + final Collection<LinkModel> links = addressSpace.findSendingLinks(ANY_CONTAINER_ID, linkNamePattern); + if (links.size() > 1) + { + getLink().linkClosed(); + } + } + } super.detach(error, close); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d91e0617/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java index e466ccf..b5ba0aa 100644 --- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java +++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java @@ -106,7 +106,6 @@ public class Session_1_0Test extends QpidTestCase super.setUp(); Map<String, Object> virtualHostAttributes = new HashMap<>(); virtualHostAttributes.put(QueueManagingVirtualHost.NAME, "testVH"); - virtualHostAttributes.put(QueueManagingVirtualHost.CONTEXT, Collections.singletonMap(QueueManagingVirtualHost.GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED, false)); virtualHostAttributes.put(QueueManagingVirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE); _virtualHost = BrokerTestHelper.createVirtualHost(virtualHostAttributes); _taskExecutor = new CurrentThreadTaskExecutor(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d91e0617/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java index d89f14f..8842ebe 100644 --- a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java +++ b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java @@ -46,9 +46,6 @@ public class SharedSubscriptionTest extends QpidBrokerTestCase { TestBrokerConfiguration brokerConfiguration = getDefaultBrokerConfiguration(); brokerConfiguration.addHttpManagementConfiguration(); - brokerConfiguration.setBrokerAttribute("context", - Collections.singletonMap(QueueManagingVirtualHost.GLOBAL_SHARED_DURABLE_SUBSCRIPTION_DISABLED, - false)); super.setUp(); _restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org