Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1785660&r1=1785659&r2=1785660&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Mon Mar 6 15:06:57 2017 @@ -46,6 +46,9 @@ import java.util.concurrent.atomic.Atomi import javax.security.auth.Subject; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,11 +62,8 @@ import org.apache.qpid.server.logging.Lo import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.message.MessageDestination; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.message.MessageInstanceConsumer; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.AbstractConfiguredObject; -import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.ExclusivityPolicy; @@ -71,8 +71,7 @@ import org.apache.qpid.server.model.Name import org.apache.qpid.server.model.NotFoundException; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.Session; -import org.apache.qpid.server.model.State; -import org.apache.qpid.server.protocol.LinkRegistry; +import org.apache.qpid.server.protocol.LinkModel; import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils; import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; @@ -89,7 +88,6 @@ import org.apache.qpid.server.protocol.v import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoMessages; import org.apache.qpid.server.protocol.v1_0.type.messaging.ExactSubjectFilter; import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter; -import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter; import org.apache.qpid.server.protocol.v1_0.type.messaging.MatchingSubjectFilter; import org.apache.qpid.server.protocol.v1_0.type.messaging.NoLocalFilter; import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; @@ -98,7 +96,6 @@ import org.apache.qpid.server.protocol.v import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability; import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy; import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator; -import org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability; import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; @@ -110,10 +107,10 @@ import org.apache.qpid.server.protocol.v import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; +import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError; import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; import org.apache.qpid.server.security.SecurityToken; import org.apache.qpid.server.session.AbstractAMQPSession; -import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.transport.AMQPConnection; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -138,14 +135,11 @@ public class Session_1_0 extends Abstrac private SessionState _sessionState; - private final Map<String, SendingLinkEndpoint> _sendingLinkMap = new HashMap<>(); - private final Map<String, ReceivingLinkEndpoint> _receivingLinkMap = new HashMap<>(); - private final Map<LinkEndpoint, UnsignedInteger> _localLinkEndpoints = new HashMap<>(); - private final Map<UnsignedInteger, LinkEndpoint> _remoteLinkEndpoints = new HashMap<>(); + private final Map<LinkEndpoint<?>, UnsignedInteger> _endpointToOutputHandle = new HashMap<>(); + private final Map<UnsignedInteger, LinkEndpoint<?>> _inputHandleToEndpoint = new HashMap<>(); + private final Set<LinkEndpoint<?>> _associatedLinkEndpoints = new HashSet<>(); - private final List<TxnCoordinatorReceivingLink_1_0> _txnCoordinatorLinks = new ArrayList<>(); - - private short _receivingChannel; + private final short _receivingChannel; private final short _sendingChannel; @@ -213,73 +207,37 @@ public class Session_1_0 extends Abstrac { if(_sessionState == SessionState.ACTIVE) { - UnsignedInteger handle = attach.getHandle(); - if(_remoteLinkEndpoints.containsKey(handle)) + UnsignedInteger inputHandle = attach.getHandle(); + if (_inputHandleToEndpoint.containsKey(inputHandle)) { - // TODO - Error - handle busy? + getConnection().close(new Error(SessionError.HANDLE_IN_USE, "inputHandle of Attach already in use: " + attach.toString())); + throw new ConnectionScopedRuntimeException(String.format("Input Handle '%d' already in use", inputHandle.intValue())); } else { - Map<String, ? extends LinkEndpoint> linkMap = - attach.getRole() == Role.RECEIVER ? _sendingLinkMap : _receivingLinkMap; - LinkEndpoint endpoint = linkMap.get(attach.getName()); - if(endpoint == null) + final Class<? extends LinkModel> linkType; + if (attach.getRole() == Role.RECEIVER) { - - if (attach.getRole() == Role.RECEIVER) + linkType = SendingLink_1_0.class; + } + else + { + if (attach.getTarget() instanceof Coordinator) { - endpoint = new SendingLinkEndpoint(this, attach); - } - else if (attach.getRole() == Role.SENDER) - { - if (attach.getTarget() instanceof Coordinator) - { - endpoint = new TxnCoordinatorReceivingLinkEndpoint(this, attach); - } - else - { - endpoint = new StandardReceivingLinkEndpoint(this, attach); - } + linkType = TxnCoordinatorReceivingLink_1_0.class; } else { - // TODO error handling - } - - if(_blockingEntities.contains(this) && attach.getRole() == Role.SENDER) - { - endpoint.setStopped(true); + linkType = StandardReceivingLink_1_0.class; } - - // TODO : fix below - distinguish between local and remote owned - endpoint.setSource(attach.getSource()); - endpoint.setTarget(attach.getTarget()); - ((Map<String,LinkEndpoint>)linkMap).put(attach.getName(), endpoint); } - else - { - endpoint.receiveAttach(attach); - } - - if(attach.getRole() == Role.SENDER) - { - endpoint.setDeliveryCount(attach.getInitialDeliveryCount()); - } - - _remoteLinkEndpoints.put(handle, endpoint); - if(!_localLinkEndpoints.containsKey(endpoint)) - { - UnsignedInteger localHandle = findNextAvailableHandle(); - endpoint.setLocalHandle(localHandle); - _localLinkEndpoints.put(endpoint, localHandle); + final Link_1_0 link = (Link_1_0) getAddressSpace().getLink(getConnection().getRemoteContainerId(), + attach.getName(), + linkType); + final ListenableFuture<? extends LinkEndpoint<?>> future = link.attach(this, attach); - remoteLinkCreation(endpoint, attach); - } - else - { - // TODO - error already attached - } + addFutureCallback(future, new EndpointCreationCallback(attach), MoreExecutors.directExecutor()); } } } @@ -419,18 +377,16 @@ public class Session_1_0 extends Abstrac case ACTIVE: detachLinks(); remoteEnd(end); - short sendChannel = _sendingChannel; - _connection.sendEnd(sendChannel, new End(), true); + _connection.sendEnd(_sendingChannel, new End(), true); _sessionState = SessionState.ENDED; break; default: - sendChannel = _sendingChannel; End reply = new End(); Error error = new Error(); error.setCondition(AmqpError.ILLEGAL_STATE); error.setDescription("END called on Session which has not been opened"); reply.setError(error); - _connection.sendEnd(sendChannel, reply, true); + _connection.sendEnd(_sendingChannel, reply, true); break; } } @@ -463,7 +419,7 @@ public class Session_1_0 extends Abstrac public void receiveFlow(final Flow flow) { UnsignedInteger handle = flow.getHandle(); - final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle); + final LinkEndpoint<?> endpoint = handle == null ? null : _inputHandleToEndpoint.get(handle); final UnsignedInteger nextOutgoingId = flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId(); @@ -476,8 +432,8 @@ public class Session_1_0 extends Abstrac } else { - final Collection<LinkEndpoint> allLinkEndpoints = _remoteLinkEndpoints.values(); - for (LinkEndpoint le : allLinkEndpoints) + final Collection<LinkEndpoint<?>> allLinkEndpoints = _inputHandleToEndpoint.values(); + for (LinkEndpoint<?> le : allLinkEndpoints) { le.flowStateChanged(); } @@ -620,16 +576,14 @@ public class Session_1_0 extends Abstrac { _nextIncomingTransferId.incr(); - UnsignedInteger handle = transfer.getHandle(); - - - LinkEndpoint linkEndpoint = _remoteLinkEndpoints.get(handle); + UnsignedInteger inputHandle = transfer.getHandle(); + LinkEndpoint<?> linkEndpoint = _inputHandleToEndpoint.get(inputHandle); if (linkEndpoint == null) { Error error = new Error(); error.setCondition(AmqpError.ILLEGAL_STATE); - error.setDescription("TRANSFER called on Session for link handle " + handle + " which is not attached"); + error.setDescription("TRANSFER called on Session for link handle " + inputHandle + " which is not attached"); _connection.close(error); } @@ -638,7 +592,7 @@ public class Session_1_0 extends Abstrac Error error = new Error(); error.setCondition(ConnectionError.FRAMING_ERROR); - error.setDescription("TRANSFER called on Session for link handle " + handle + " which is a sending ink not a receiving link"); + error.setDescription("TRANSFER called on Session for link handle " + inputHandle + " which is a sending ink not a receiving link"); _connection.close(error); } @@ -681,7 +635,7 @@ public class Session_1_0 extends Abstrac Error error = new Error(); error.setCondition(AmqpError.ILLEGAL_STATE); error.setDescription("TRANSFER called on Session for link handle " - + handle + + inputHandle + " with incorrect delivery id " + transfer.getDeliveryId()); reply.setError(error); @@ -704,11 +658,6 @@ public class Session_1_0 extends Abstrac } } - private Collection<LinkEndpoint> getLocalLinkEndpoints() - { - return new ArrayList<>(_localLinkEndpoints.keySet()); - } - boolean isEnded() { return _sessionState == SessionState.ENDED || _connection.isClosed(); @@ -724,413 +673,94 @@ public class Session_1_0 extends Abstrac return _accessControllerContext; } - public void remoteLinkCreation(final LinkEndpoint endpoint, Attach attach) + public ReceivingDestination getReceivingDestination(final Target target) throws AmqpErrorException { - Link_1_0 link = null; - Error error = null; - Set<Symbol> capabilities = new HashSet<>(); - try + final ReceivingDestination destination; + if (target != null) { - if (endpoint.getRole() == Role.SENDER) + if (Boolean.TRUE.equals(target.getDynamic())) { - link = createSendingLink(endpoint, attach); - if (link != null) - { - capabilities.add(AMQPConnection_1_0.SHARED_SUBSCRIPTIONS); - } - } - else if (endpoint.getTarget() instanceof Coordinator) - { - link = createCoordinatorLink(endpoint); + MessageDestination tempQueue = createDynamicDestination(target.getDynamicNodeProperties()); + target.setAddress(tempQueue.getName()); } - else // standard (non-Coordinator) receiver - { - link = createReceivingLink(endpoint, capabilities); - } - } - catch (AmqpErrorException e) - { - if (e.getError() == null || e.getError().getCondition() == AmqpError.INTERNAL_ERROR) - { - _logger.error("Could not create link", e); - } - else - { - _logger.debug("Could not create link", e); - } - if (endpoint.getRole() == Role.SENDER) + String addr = target.getAddress(); + if (addr == null || "".equals(addr.trim())) { - endpoint.setSource(null); + MessageDestination messageDestination = getAddressSpace().getDefaultDestination(); + destination = new NodeReceivingDestination(messageDestination, target.getDurable(), + target.getExpiryPolicy(), "", + target.getCapabilities(), + _connection.getEventLogger()); } - else - { - endpoint.setTarget(null); - } - error = e.getError(); - } - - endpoint.setCapabilities(capabilities); - endpoint.attach(); - - if (link == null) - { - if (error == null) - { - error = new Error(); - error.setCondition(AmqpError.NOT_FOUND); - } - endpoint.close(error); - } - else - { - endpoint.start(); - } - } - - private Link_1_0 createReceivingLink(final LinkEndpoint endpoint, - final Set<Symbol> capabilities) - { - Link_1_0 link = null; - Destination destination; - final LinkRegistry linkRegistry = getAddressSpace().getLinkRegistry(getConnection().getRemoteContainerId()); - StandardReceivingLink_1_0 previousLink = - (StandardReceivingLink_1_0) linkRegistry.getDurableReceivingLink(endpoint.getName()); - - if (previousLink == null) - { - - Target target = (Target) endpoint.getTarget(); - - if (target != null) + else if (!addr.startsWith("/") && addr.contains("/")) { - if (Boolean.TRUE.equals(target.getDynamic())) + String[] parts = addr.split("/", 2); + Exchange<?> exchange = getExchange(parts[0]); + if (exchange != null) { - - MessageDestination tempQueue = createDynamicDestination(target.getDynamicNodeProperties()); - target.setAddress(tempQueue.getName()); + Symbol[] capabilities1 = target.getCapabilities(); + ExchangeDestination exchangeDestination = new ExchangeDestination(exchange, + null, + target.getDurable(), + target.getExpiryPolicy(), + parts[0], + parts[1], + capabilities1 != null ? Arrays.asList(capabilities1) : Collections.<Symbol>emptyList()); + destination = exchangeDestination; } - - String addr = target.getAddress(); - if (addr == null || "".equals(addr.trim())) + else { - MessageDestination messageDestination = getAddressSpace().getDefaultDestination(); - destination = new NodeReceivingDestination(messageDestination, target.getDurable(), - target.getExpiryPolicy(), "", - target.getCapabilities(), - _connection.getEventLogger()); - target.setCapabilities(destination.getCapabilities()); - - if (_blockingEntities.contains(messageDestination)) - { - endpoint.setStopped(true); - } + destination = null; } - else if (!addr.startsWith("/") && addr.contains("/")) - { - String[] parts = addr.split("/", 2); - Exchange<?> exchange = getExchange(parts[0]); - if (exchange != null) - { - Symbol[] capabilities1 = target.getCapabilities(); - ExchangeDestination exchangeDestination = new ExchangeDestination(exchange, - null, - target.getDurable(), - target.getExpiryPolicy(), - parts[0], - parts[1], - capabilities1 != null ? Arrays.asList(capabilities1) : Collections.<Symbol>emptyList()); - target.setCapabilities(exchangeDestination.getCapabilities()); - destination = exchangeDestination; - } - else - { - endpoint.setTarget(null); - destination = null; - } + } + else + { + MessageDestination messageDestination = + getAddressSpace().getAttainedMessageDestination(addr); + if (messageDestination != null) + { + destination = + new NodeReceivingDestination(messageDestination, + target.getDurable(), + target.getExpiryPolicy(), + addr, + target.getCapabilities(), + _connection.getEventLogger()); } else { - MessageDestination messageDestination = - getAddressSpace().getAttainedMessageDestination(addr); - if (messageDestination != null) + Queue<?> queue = getQueue(addr); + if (queue != null) { - destination = - new NodeReceivingDestination(messageDestination, - target.getDurable(), - target.getExpiryPolicy(), - addr, - target.getCapabilities(), - _connection.getEventLogger()); - target.setCapabilities(destination.getCapabilities()); + destination = new QueueDestination(queue, addr); } else { - Queue<?> queue = getQueue(addr); - if (queue != null) - { - - destination = new QueueDestination(queue, addr); - } - else - { - endpoint.setTarget(null); - destination = null; - } + destination = null; } } } - else - { - destination = null; - } - if (destination != null) - { - final ReceivingDestination receivingDestination = (ReceivingDestination) destination; - - MessageDestination messageDestination = receivingDestination.getMessageDestination(); - if(!(messageDestination instanceof Queue) || ((Queue<?>)messageDestination).isHoldOnPublishEnabled()) - { - capabilities.add(DELAYED_DELIVERY); - } - final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; - final StandardReceivingLink_1_0 receivingLink = new StandardReceivingLink_1_0(receivingLinkEndpoint); - receivingLinkEndpoint.setDestination(receivingDestination); - receivingLinkEndpoint.setLink(receivingLink); - link = receivingLink; - if (TerminusDurability.UNSETTLED_STATE.equals(target.getDurable()) - || TerminusDurability.CONFIGURATION.equals(target.getDurable())) - { - linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink); - } - } } else { - StandardReceivingLinkEndpoint receivingLinkEndpoint = (StandardReceivingLinkEndpoint) endpoint; - previousLink.setLinkAttachment(receivingLinkEndpoint); - receivingLinkEndpoint.setLink(previousLink); - link = previousLink; - receivingLinkEndpoint.setLocalUnsettled(receivingLinkEndpoint.getUnsettledOutcomeMap()); + destination = null; } - return link; - } - private TxnCoordinatorReceivingLink_1_0 createCoordinatorLink(final LinkEndpoint endpoint) throws AmqpErrorException - { - Coordinator coordinator = (Coordinator) endpoint.getTarget(); - TxnCapability[] coordinatorCapabilities = coordinator.getCapabilities(); - boolean localTxn = false; - boolean multiplePerSession = false; - if (coordinatorCapabilities != null) + if (destination != null) { - for (TxnCapability capability : coordinatorCapabilities) - { - if (capability.equals(TxnCapability.LOCAL_TXN)) - { - localTxn = true; - } - else if (capability.equals(TxnCapability.MULTI_TXNS_PER_SSN)) - { - multiplePerSession = true; - } - else - { - Error error = new Error(); - error.setCondition(AmqpError.NOT_IMPLEMENTED); - error.setDescription("Unsupported capability: " + capability); - throw new AmqpErrorException(error); - } - } - } - - final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; - final TxnCoordinatorReceivingLink_1_0 coordinatorLink = - new TxnCoordinatorReceivingLink_1_0( - receivingLinkEndpoint - ); - receivingLinkEndpoint.setLink(coordinatorLink); - return coordinatorLink; - } - - private SendingLink_1_0 createSendingLink(final LinkEndpoint endpoint, Attach attach) throws AmqpErrorException - { - final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint; - SendingLink_1_0 link = null; - final LinkRegistry linkRegistry = getAddressSpace().getLinkRegistry(getConnection().getRemoteContainerId()); - final SendingLink_1_0 previousLink = (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName()); - - if (previousLink == null) - { - Source source = (Source) sendingLinkEndpoint.getSource(); - SendingDestination destination = null; - if (source != null) - { - destination = getSendingDestination(sendingLinkEndpoint.getName(), source); - if (destination == null) - { - sendingLinkEndpoint.setSource(null); - } - else - { - source.setCapabilities(destination.getCapabilities()); - } - } - else - { - final Symbol[] linkCapabilities = attach.getDesiredCapabilities(); - boolean isGlobal = hasCapability(linkCapabilities, ExchangeDestination.GLOBAL_CAPABILITY); - final String queueName = getMangledSubscriptionName(endpoint.getName(), true, true, isGlobal); - final MessageSource messageSource = getAddressSpace().getAttainedMessageSource(queueName); - // TODO START The Source should be persisted on the LinkEndpoint - if (messageSource instanceof Queue) - { - Queue<?> queue = (Queue<?>) messageSource; - source = new Source(); - List<Symbol> capabilities = new ArrayList<>(); - if (queue.getExclusive() == ExclusivityPolicy.SHARED_SUBSCRIPTION) - { - capabilities.add(ExchangeDestination.SHARED_CAPABILITY); - } - if (isGlobal) - { - capabilities.add(ExchangeDestination.GLOBAL_CAPABILITY); - } - capabilities.add(ExchangeDestination.TOPIC_CAPABILITY); - source.setCapabilities(capabilities.toArray(new Symbol[capabilities.size()])); - - final Collection<Exchange> exchanges = queue.getVirtualHost().getChildren(Exchange.class); - String bindingKey = null; - Exchange<?> foundExchange = null; - for (Exchange<?> exchange : exchanges) - { - for (Binding binding : exchange.getPublishingLinks(queue)) - { - String exchangeName = exchange.getName(); - bindingKey = binding.getName(); - final Map<String, Object> bindingArguments = binding.getArguments(); - Map<Symbol, Filter> filter = new HashMap<>(); - if (bindingArguments.containsKey(AMQPFilterTypes.JMS_SELECTOR)) - { - filter.put(Symbol.getSymbol("jms-selector"), new JMSSelectorFilter((String) bindingArguments.get(AMQPFilterTypes.JMS_SELECTOR))); - } - if (bindingArguments.containsKey(AMQPFilterTypes.NO_LOCAL)) - { - filter.put(Symbol.getSymbol("no-local"), NoLocalFilter.INSTANCE); - } - foundExchange = exchange; - source.setAddress(exchangeName + "/" + bindingKey); - source.setFilter(filter); - break; - } - if (foundExchange != null) - { - break; - } - } - if (foundExchange != null) - { - source.setDurable(TerminusDurability.CONFIGURATION); - TerminusExpiryPolicy terminusExpiryPolicy; - switch (queue.getLifetimePolicy()) - { - case PERMANENT: - terminusExpiryPolicy = TerminusExpiryPolicy.NEVER; - break; - case DELETE_ON_NO_LINKS: - case DELETE_ON_NO_OUTBOUND_LINKS: - terminusExpiryPolicy = TerminusExpiryPolicy.LINK_DETACH; - break; - case DELETE_ON_CONNECTION_CLOSE: - terminusExpiryPolicy = TerminusExpiryPolicy.CONNECTION_CLOSE; - break; - case DELETE_ON_SESSION_END: - terminusExpiryPolicy = TerminusExpiryPolicy.SESSION_END; - break; - default: - throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "unexpected liftetime policy " + queue.getLifetimePolicy())); - } - sendingLinkEndpoint.setSource(source); - destination = new ExchangeDestination(foundExchange, - queue, - TerminusDurability.CONFIGURATION, - terminusExpiryPolicy, - foundExchange.getName(), - bindingKey, - capabilities); - } - - } - // TODO END - } - - if (destination != null) - { - final SendingLink_1_0 sendingLink = - new SendingLink_1_0( - sendingLinkEndpoint); - //sendingLink.createConsumerTarget(); - - sendingLinkEndpoint.doStuff(destination); - sendingLinkEndpoint.createConsumerTarget(); - - sendingLinkEndpoint.setLink(sendingLink); - sendingLinkEndpoint.setDurability(((Source) attach.getSource()).getDurable()); - registerConsumer(sendingLinkEndpoint); - - if (destination instanceof ExchangeDestination) - { - ExchangeDestination exchangeDestination = (ExchangeDestination) destination; - exchangeDestination.getQueue().setAttributes(Collections.<String, Object>singletonMap(Queue.DESIRED_STATE, State.ACTIVE)); - } - - link = sendingLink; - if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) - || TerminusDurability.CONFIGURATION.equals(source.getDurable())) - { - linkRegistry.registerSendingLink(endpoint.getName(), sendingLink); - } - - } + target.setCapabilities(destination.getCapabilities()); } else { - Source newSource = (Source) attach.getSource(); - Source oldSource = (Source) previousLink.getEndpoint().getSource(); - - if (sendingLinkEndpoint.getDestination() == null) - { - final SendingDestination oldDestination = - getSendingDestination(previousLink.getEndpoint().getName(), oldSource); - sendingLinkEndpoint.doStuff(oldDestination); - sendingLinkEndpoint.setDurability(oldSource.getDurable()); - - } - - if (sendingLinkEndpoint.getDestination() instanceof ExchangeDestination - && newSource != null - && !Boolean.TRUE.equals(newSource.getDynamic())) - { - final SendingDestination newDestination = - getSendingDestination(previousLink.getEndpoint().getName(), newSource); - if (updateSourceForSubscription(sendingLinkEndpoint, newSource, newDestination)) - { - sendingLinkEndpoint.setDestination(newDestination); - } - } - - sendingLinkEndpoint.setSource(oldSource); - previousLink.setLinkAttachment(this, sendingLinkEndpoint); - sendingLinkEndpoint.setLink(previousLink); - link = previousLink; - sendingLinkEndpoint.setLocalUnsettled(sendingLinkEndpoint.getUnsettledOutcomeMap()); - registerConsumer(sendingLinkEndpoint); - + throw new AmqpErrorException(AmqpError.NOT_FOUND, + String.format("Could not find destination for target '%s'", target)); } - return link; + + return destination; } - private boolean updateSourceForSubscription(final SendingLinkEndpoint linkEndpoint, final Source newSource, + public boolean updateSourceForSubscription(final SendingLinkEndpoint linkEndpoint, final Source newSource, final SendingDestination newDestination) { SendingDestination oldDestination = linkEndpoint.getDestination(); @@ -1153,7 +783,7 @@ public class Session_1_0 extends Abstrac return false; } - private SendingDestination getSendingDestination(final String linkName, final Source source) throws AmqpErrorException + public SendingDestination getSendingDestination(final String linkName, final Source source) throws AmqpErrorException { SendingDestination destination = null; @@ -1184,6 +814,11 @@ public class Session_1_0 extends Abstrac } } + if (destination == null) + { + throw new AmqpErrorException(AmqpError.NOT_FOUND, + String.format("Could not find destination for source '%s'", source)); + } return destination; } @@ -1378,18 +1013,6 @@ public class Session_1_0 extends Abstrac return false; } - - private void registerConsumer(final SendingLinkEndpoint linkEndpoint) - { - MessageInstanceConsumer consumer = linkEndpoint.getConsumer(); - if(consumer instanceof Consumer<?,?>) - { - Consumer<?,ConsumerTarget_1_0> modelConsumer = (Consumer<?,ConsumerTarget_1_0>) consumer; - _consumers.add(modelConsumer); - } - } - - private MessageSource createDynamicSource(Map properties) { final String queueName = _primaryDomain + "TempQueue" + UUID.randomUUID().toString(); @@ -1496,11 +1119,17 @@ public class Session_1_0 extends Abstrac void remoteEnd(End end) { - - for(LinkEndpoint linkEndpoint : getLocalLinkEndpoints()) + List<LinkEndpoint<?>> linkEndpoints = new ArrayList<>(_endpointToOutputHandle.keySet()); + for(LinkEndpoint linkEndpoint : linkEndpoints) { linkEndpoint.remoteDetached(new Detach()); + linkEndpoint.dissociateSession(); } + for (LinkEndpoint<?> linkEndpoint : _associatedLinkEndpoints) + { + linkEndpoint.dissociateSession(); + } + _associatedLinkEndpoints.clear(); _connection.sessionEnded(this); performCloseTasks(); @@ -1586,10 +1215,13 @@ public class Session_1_0 extends Abstrac @Override public void transportStateChanged() { - for(SendingLinkEndpoint endpoint : _sendingLinkMap.values()) + for (LinkEndpoint<?> linkEndpoint : _endpointToOutputHandle.keySet()) { - ConsumerTarget_1_0 target = endpoint.getConsumerTarget(); - target.flowStateChanged(); + if (linkEndpoint instanceof SendingLinkEndpoint) + { + ConsumerTarget_1_0 target = ((SendingLinkEndpoint) linkEndpoint).getConsumerTarget(); + target.flowStateChanged(); + } } if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting()) @@ -1619,14 +1251,14 @@ public class Session_1_0 extends Abstrac { messageWithSubject(ChannelMessages.FLOW_ENFORCED(queue.getName())); - for (ReceivingLinkEndpoint endpoint : _receivingLinkMap.values()) + for (LinkEndpoint<?> linkEndpoint : _endpointToOutputHandle.keySet()) { - if (isQueueDestinationForLink(queue, endpoint.getReceivingDestination())) + if (linkEndpoint instanceof ReceivingLinkEndpoint + && isQueueDestinationForLink(queue, ((ReceivingLinkEndpoint) linkEndpoint).getReceivingDestination())) { - endpoint.setStopped(true); + linkEndpoint.setStopped(true); } } - } } @@ -1659,11 +1291,12 @@ public class Session_1_0 extends Abstrac { messageWithSubject(ChannelMessages.FLOW_REMOVED()); } - for (ReceivingLinkEndpoint endpoint : _receivingLinkMap.values()) + for (LinkEndpoint<?> linkEndpoint : _endpointToOutputHandle.keySet()) { - if (isQueueDestinationForLink(queue, endpoint.getReceivingDestination())) + if (linkEndpoint instanceof ReceivingLinkEndpoint + && isQueueDestinationForLink(queue, ((ReceivingLinkEndpoint) linkEndpoint).getReceivingDestination())) { - endpoint.setStopped(false); + linkEndpoint.setStopped(false); } } } @@ -1689,15 +1322,16 @@ public class Session_1_0 extends Abstrac { messageWithSubject(ChannelMessages.FLOW_ENFORCED("** All Queues **")); - for(LinkEndpoint endpoint : _receivingLinkMap.values()) + for (LinkEndpoint<?> linkEndpoint : _endpointToOutputHandle.keySet()) { - endpoint.setStopped(true); + if (linkEndpoint instanceof ReceivingLinkEndpoint) + { + linkEndpoint.setStopped(true); + } } } } - - @Override public void unblock() { @@ -1720,11 +1354,12 @@ public class Session_1_0 extends Abstrac { messageWithSubject(ChannelMessages.FLOW_REMOVED()); } - for(ReceivingLinkEndpoint endpoint : _receivingLinkMap.values()) + for (LinkEndpoint<?> linkEndpoint : _endpointToOutputHandle.keySet()) { - if(!_blockingEntities.contains(endpoint.getReceivingDestination())) + if (linkEndpoint instanceof ReceivingLinkEndpoint + && !_blockingEntities.contains(((ReceivingLinkEndpoint) linkEndpoint).getReceivingDestination())) { - endpoint.setStopped(false); + linkEndpoint.setStopped(false); } } } @@ -1774,7 +1409,8 @@ public class Session_1_0 extends Abstrac @Override public long getConsumerCount() { - return getConsumers().size(); + // TODO - fix statistic - need to count consumers + return -1; } @Override @@ -1894,19 +1530,11 @@ public class Session_1_0 extends Abstrac private void detach(UnsignedInteger handle, Detach detach) { - if(_remoteLinkEndpoints.containsKey(handle)) + if(_inputHandleToEndpoint.containsKey(handle)) { - LinkEndpoint endpoint = _remoteLinkEndpoints.remove(handle); - + LinkEndpoint<?> endpoint = _inputHandleToEndpoint.remove(handle); endpoint.remoteDetached(detach); - - _localLinkEndpoints.remove(endpoint); - - if (Boolean.TRUE.equals(detach.getClosed())) - { - Map<String, ? extends LinkEndpoint> linkMap = endpoint.getRole() == Role.SENDER ? _sendingLinkMap : _receivingLinkMap; - linkMap.remove(endpoint.getName()); - } + _endpointToOutputHandle.remove(endpoint); } else { @@ -1916,7 +1544,7 @@ public class Session_1_0 extends Abstrac private void detachLinks() { - Collection<UnsignedInteger> handles = new ArrayList<UnsignedInteger>(_remoteLinkEndpoints.keySet()); + Collection<UnsignedInteger> handles = new ArrayList<>(_inputHandleToEndpoint.keySet()); for(UnsignedInteger handle : handles) { Detach detach = new Detach(); @@ -1926,56 +1554,19 @@ public class Session_1_0 extends Abstrac detach(handle, detach); } - final LinkRegistry linkRegistry = getAddressSpace().getLinkRegistry(getConnection().getRemoteContainerId()); - - for(LinkEndpoint<?> linkEndpoint : _sendingLinkMap.values()) + for (LinkEndpoint<?> linkEndpoint : _associatedLinkEndpoints) { - final SendingLink_1_0 link = (SendingLink_1_0) linkRegistry.getDurableSendingLink(linkEndpoint.getName()); - - if (link != null) - { - synchronized (link) - { - if (link.getEndpoint() == linkEndpoint) - { - try - { - link.setLinkAttachment(null, (SendingLinkEndpoint) linkEndpoint); - } - catch (AmqpErrorException e) - { - throw new ConnectionScopedRuntimeException(e); - } - } - } - } - } - - for(LinkEndpoint<?> linkEndpoint : _receivingLinkMap.values()) - { - final StandardReceivingLink_1_0 - link = (StandardReceivingLink_1_0) linkRegistry.getDurableReceivingLink(linkEndpoint.getName()); - - if (link != null) - { - synchronized (link) - { - if (link.getEndpoint() == linkEndpoint) - { - link.setLinkAttachment((ReceivingLinkEndpoint) linkEndpoint); - } - } - } + linkEndpoint.dissociateSession(); } } - private UnsignedInteger findNextAvailableHandle() + private UnsignedInteger findNextAvailableOutputHandle() { int i = 0; do { - if(!_localLinkEndpoints.containsValue(UnsignedInteger.valueOf(i))) + if(!_endpointToOutputHandle.containsValue(UnsignedInteger.valueOf(i))) { return UnsignedInteger.valueOf(i); } @@ -2018,6 +1609,85 @@ public class Session_1_0 extends Abstrac return primaryDomain; } + private class EndpointCreationCallback<T extends LinkEndpoint<?>> implements FutureCallback<T> + { + + private final Attach _attach; + + EndpointCreationCallback(final Attach attach) + { + _attach = attach; + } + + @Override + public void onSuccess(final T endpoint) + { + doOnIOThreadAsync(new Runnable() + { + @Override + public void run() + { + _associatedLinkEndpoints.add(endpoint); + endpoint.setLocalHandle(findNextAvailableOutputHandle()); + if (attachWasUnsuccessful(endpoint)) + { + endpoint.attach(); + + Error error = new Error(); + error.setCondition(AmqpError.NOT_FOUND); + endpoint.close(error); + } + else + { + if (endpoint.getRole() == Role.RECEIVER + && (_blockingEntities.contains(Session_1_0.this) + || (endpoint instanceof StandardReceivingLinkEndpoint + && _blockingEntities.contains(((ReceivingLinkEndpoint) endpoint).getReceivingDestination())))) + { + endpoint.setStopped(true); + } + _inputHandleToEndpoint.put(_attach.getHandle(), endpoint); + if (!_endpointToOutputHandle.containsKey(endpoint)) + { + _endpointToOutputHandle.put(endpoint, endpoint.getLocalHandle()); + endpoint.attach(); + endpoint.start(); + } + else + { + // TODO - link stealing??? + } + + } + } + }); + } + + @Override + public void onFailure(final Throwable t) + { + String errorMessage = String.format("Failed to create LinkEndpoint in response to Attach: %s", _attach); + _logger.error(errorMessage, t); + throw new ConnectionScopedRuntimeException(errorMessage, t); + } + + private boolean attachWasUnsuccessful(final T endpoint) + { + if (endpoint.getRole().equals(Role.SENDER)) + { + return endpoint.getSource() == null; + } + else if (endpoint.getRole().equals(Role.RECEIVER)) + { + return endpoint.getTarget() == null; + } + else + { + throw new IllegalStateException(String.format("Unknown LinkEndpoint role '%s'", endpoint.getRole())); + } + } + } + private final class BindingInfo { private final Map<Symbol, Filter> _actualFilters = new HashMap<>();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java?rev=1785660&r1=1785659&r2=1785660&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java Mon Mar 6 15:06:57 2017 @@ -36,7 +36,7 @@ import org.apache.qpid.server.message.Me import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageFormat; import org.apache.qpid.server.protocol.MessageFormatRegistry; -import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl; +import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; import org.apache.qpid.server.protocol.v1_0.type.Binary; import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; @@ -73,17 +73,14 @@ public class StandardReceivingLinkEndpoi { private static final Logger LOGGER = LoggerFactory.getLogger(StandardReceivingLinkEndpoint.class); - private final SectionDecoderImpl _sectionDecoder; private ArrayList<Transfer> _incompleteMessage; private boolean _resumedMessage; private Binary _messageDeliveryTag; private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>()); - public StandardReceivingLinkEndpoint(final Session_1_0 session, - final Attach attach) + public StandardReceivingLinkEndpoint(final StandardReceivingLink_1_0 link, final SectionDecoder sectionDecoder) { - super(session, attach); - _sectionDecoder = new SectionDecoderImpl(getSession().getConnection().getSectionDecoderRegistry()); + super(link, sectionDecoder); } @Override @@ -333,9 +330,8 @@ public class StandardReceivingLinkEndpoi } else if(detach == null || detach.getError() != null) { - getLink().setLinkAttachmentToNull(); - // TODO do we have to set an error? detach(); + dissociateSession(); } else { @@ -360,7 +356,7 @@ public class StandardReceivingLinkEndpoi List<EncodingRetainingSection<?>> sections; try { - sections = _sectionDecoder.parseAll(fragments); + sections = getSectionDecoder().parseAll(fragments); } catch (AmqpErrorException e) { @@ -455,10 +451,13 @@ public class StandardReceivingLinkEndpoi contentSize); } - public void doLinkAttachment() + @Override + public void attachReceived(final Attach attach) throws AmqpErrorException { - Map initialUnsettledMap = getInitialUnsettledMap(); + super.attachReceived(attach); + setDeliveryCount(attach.getInitialDeliveryCount()); + Map initialUnsettledMap = getInitialUnsettledMap(); Map<Binary, Outcome> unsettledCopy = new HashMap<Binary, Outcome>(_unsettledMap); for(Map.Entry<Binary, Outcome> entry : unsettledCopy.entrySet()) { Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java?rev=1785660&r1=1785659&r2=1785660&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java Mon Mar 6 15:06:57 2017 @@ -20,30 +20,194 @@ */ package org.apache.qpid.server.protocol.v1_0; -public class StandardReceivingLink_1_0 implements ReceivingLink_1_0 +import java.util.Arrays; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl; +import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; +import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; + +public class StandardReceivingLink_1_0 extends AbstractLink<StandardReceivingLinkEndpoint> implements ReceivingLink_1_0 { - private volatile ReceivingLinkEndpoint _linkEndpoint; + public StandardReceivingLink_1_0(final String linkName) + { + super(linkName); + } - public StandardReceivingLink_1_0(final ReceivingLinkEndpoint endpoint) + @Override + protected ListenableFuture<StandardReceivingLinkEndpoint> stealLink(final Session_1_0 session, final Attach attach) { - _linkEndpoint = endpoint; + throw new UnsupportedOperationException("Link stealing not implemented yet"); + /* + final SettableFuture<StandardReceivingLinkEndpoint> returnFuture = SettableFuture.create(); + _linkEndpoint.getSession().doOnIOThreadAsync(new Runnable() + { + @Override + public void run() + { + _linkEndpoint.close(new Error(LinkError.STOLEN, + String.format("Link is being stolen by connection '%s'", + session.getConnection()))); + try + { + returnFuture.set(attach(session, attach).get()); + } + catch (InterruptedException e) + { + returnFuture.setException(e); + Thread.currentThread().interrupt(); + } + catch (ExecutionException e) + { + returnFuture.setException(e.getCause()); + } + } + }); + return returnFuture; + */ } - public ReceivingLinkEndpoint getEndpoint() + @Override + protected ListenableFuture<StandardReceivingLinkEndpoint> reattachLink(final Session_1_0 session, + final Attach attach) { - return _linkEndpoint; + if (_linkEndpoint == null) + { + SectionDecoderImpl sectionDecoder = new SectionDecoderImpl(session.getConnection() + .getDescribedTypeRegistry() + .getSectionDecoderRegistry()); + _linkEndpoint = new StandardReceivingLinkEndpoint(this, sectionDecoder); + } + + _target = new Target(); + _source = attach.getSource(); + + try + { + _linkEndpoint.associateSession(session); + _linkEndpoint.attachReceived(attach); + + Target attachTarget = (Target) attach.getTarget(); + final ReceivingDestination destination = session.getReceivingDestination(attachTarget); + ((Target) _target).setAddress(attachTarget.getAddress()); + ((Target) _target).setDynamic(attachTarget.getDynamic()); + ((Target) _target).setCapabilities(destination.getCapabilities()); + _linkEndpoint.setCapabilities(Arrays.asList(destination.getCapabilities())); + _linkEndpoint.setDestination(destination); + } + catch (AmqpErrorException e) + { + rejectLink(session, attach); + } + + return Futures.immediateFuture(_linkEndpoint); } + + @Override + protected ListenableFuture<StandardReceivingLinkEndpoint> resumeLink(final Session_1_0 session, final Attach attach) + { + if (getTarget() == null) + { + throw new IllegalStateException("Terminus should be set when resuming a Link."); + } + if (attach.getTarget() == null) + { + throw new IllegalStateException("Attach.getTarget should not be null when resuming a Link. That would be recovering the Link."); + } + + _source = attach.getSource(); + + try + { + + if (_linkEndpoint == null) + { + SectionDecoderImpl sectionDecoder = new SectionDecoderImpl(session.getConnection() + .getDescribedTypeRegistry() + .getSectionDecoderRegistry()); + _linkEndpoint = new StandardReceivingLinkEndpoint(this, sectionDecoder); + + final ReceivingDestination destination = session.getReceivingDestination((Target) _target); + _linkEndpoint.setDestination(destination); + ((Target) _target).setCapabilities(destination.getCapabilities()); + _linkEndpoint.setCapabilities(Arrays.asList(destination.getCapabilities())); + _linkEndpoint.setDestination(destination); + } + + _linkEndpoint.associateSession(session); + _linkEndpoint.attachReceived(attach); + + _linkEndpoint.setLocalUnsettled(_linkEndpoint.getUnsettledOutcomeMap()); + } + catch (AmqpErrorException e) + { + rejectLink(session, attach); + } + + return Futures.immediateFuture(_linkEndpoint); } - public void setLinkAttachment(final ReceivingLinkEndpoint linkEndpoint) + @Override + protected ListenableFuture<StandardReceivingLinkEndpoint> recoverLink(final Session_1_0 session, + final Attach attach) { - _linkEndpoint = linkEndpoint; - ((StandardReceivingLinkEndpoint)getEndpoint()).doLinkAttachment(); + if (_target == null) + { + return rejectLink(session, attach); + } + + _source = attach.getSource(); + + try + { + if (_linkEndpoint == null) + { + SectionDecoderImpl sectionDecoder = new SectionDecoderImpl(session.getConnection() + .getDescribedTypeRegistry() + .getSectionDecoderRegistry()); + _linkEndpoint = new StandardReceivingLinkEndpoint(this, sectionDecoder); + + final ReceivingDestination destination = session.getReceivingDestination((Target) _target); + ((Target) _target).setCapabilities(destination.getCapabilities()); + _linkEndpoint.setCapabilities(Arrays.asList(destination.getCapabilities())); + _linkEndpoint.setDestination(destination); + } + + _linkEndpoint.associateSession(session); + _linkEndpoint.attachReceived(attach); + + _linkEndpoint.setLocalUnsettled(_linkEndpoint.getUnsettledOutcomeMap()); + } + catch (AmqpErrorException e) + { + rejectLink(session, attach); + } + + return Futures.immediateFuture(_linkEndpoint); } @Override - public void setLinkAttachmentToNull() + protected ListenableFuture<StandardReceivingLinkEndpoint> establishLink(final Session_1_0 session, + final Attach attach) { - _linkEndpoint = null; + if (_linkEndpoint != null || getTarget() != null) + { + throw new IllegalStateException("LinkEndpoint and Target should be null when establishing a Link."); + } + + return reattachLink(session, attach); } + private ListenableFuture<StandardReceivingLinkEndpoint> rejectLink(final Session_1_0 session, + final Attach attach) + { + SectionDecoderImpl sectionDecoder = + new SectionDecoderImpl(session.getConnection().getDescribedTypeRegistry().getSectionDecoderRegistry()); + _linkEndpoint = new StandardReceivingLinkEndpoint(this, sectionDecoder); + _linkEndpoint.associateSession(session); + _target = null; + return Futures.immediateFuture(_linkEndpoint); + } } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java?rev=1785660&r1=1785659&r2=1785660&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java Mon Mar 6 15:06:57 2017 @@ -25,7 +25,7 @@ import java.util.List; import java.util.Map; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl; +import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; import org.apache.qpid.server.protocol.v1_0.type.Binary; import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; @@ -48,16 +48,13 @@ import org.apache.qpid.server.util.Conne public class TxnCoordinatorReceivingLinkEndpoint extends ReceivingLinkEndpoint { - private final SectionDecoderImpl _sectionDecoder; private final LinkedHashMap<Integer, ServerTransaction> _createdTransactions = new LinkedHashMap<>(); private ArrayList<Transfer> _incompleteMessage; - public TxnCoordinatorReceivingLinkEndpoint(final Session_1_0 session, - final Attach attach) + public TxnCoordinatorReceivingLinkEndpoint(final TxnCoordinatorReceivingLink_1_0 link, + final SectionDecoder sectionDecoder) { - super(session, attach); - _sectionDecoder = new SectionDecoderImpl(getSession().getConnection().getDescribedTypeRegistry().getSectionDecoderRegistry()); - + super(link, sectionDecoder); } @Override @@ -109,7 +106,7 @@ public class TxnCoordinatorReceivingLink // Only interested in the amqp-value section that holds the message to the coordinator try { - List<EncodingRetainingSection<?>> sections = _sectionDecoder.parseAll(payload); + List<EncodingRetainingSection<?>> sections = getSectionDecoder().parseAll(payload); boolean amqpValueSectionFound = false; for(EncodingRetainingSection section : sections) { @@ -225,5 +222,10 @@ public class TxnCoordinatorReceivingLink } - + @Override + public void attachReceived(final Attach attach) throws AmqpErrorException + { + super.attachReceived(attach); + setDeliveryCount(attach.getInitialDeliveryCount()); + } } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java?rev=1785660&r1=1785659&r2=1785660&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java Mon Mar 6 15:06:57 2017 @@ -20,20 +20,92 @@ */ package org.apache.qpid.server.protocol.v1_0; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder; +import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl; +import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator; import org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability; +import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; -public class TxnCoordinatorReceivingLink_1_0 implements ReceivingLink_1_0 +public class TxnCoordinatorReceivingLink_1_0 extends AbstractLink<TxnCoordinatorReceivingLinkEndpoint> implements ReceivingLink_1_0 { - public TxnCoordinatorReceivingLink_1_0(ReceivingLinkEndpoint endpoint) + public TxnCoordinatorReceivingLink_1_0(final String linkName) { - ((Coordinator)endpoint.getTarget()).setCapabilities(TxnCapability.LOCAL_TXN, TxnCapability.MULTI_SSNS_PER_TXN, TxnCapability.MULTI_TXNS_PER_SSN); + super(linkName); } + @Override + protected ListenableFuture<TxnCoordinatorReceivingLinkEndpoint> stealLink(final Session_1_0 session, + final Attach attach) + { + return rejectLink(session, attach); + } @Override - public void setLinkAttachmentToNull() + protected ListenableFuture<TxnCoordinatorReceivingLinkEndpoint> reattachLink(final Session_1_0 session, + final Attach attach) { + return rejectLink(session, attach); } + @Override + protected ListenableFuture<TxnCoordinatorReceivingLinkEndpoint> resumeLink(final Session_1_0 session, + final Attach attach) + { + return rejectLink(session, attach); + } + + @Override + protected ListenableFuture<TxnCoordinatorReceivingLinkEndpoint> recoverLink(final Session_1_0 session, + final Attach attach) + { + return rejectLink(session, attach); + } + + @Override + protected ListenableFuture<TxnCoordinatorReceivingLinkEndpoint> establishLink(final Session_1_0 session, + final Attach attach) + { + if (_linkEndpoint != null || getTarget() != null) + { + throw new IllegalStateException("LinkEndpoint and Target should be null when establishing a Link."); + } + + _target = new Coordinator(); + ((Coordinator) _target).setCapabilities(TxnCapability.LOCAL_TXN, + TxnCapability.MULTI_SSNS_PER_TXN, + TxnCapability.MULTI_TXNS_PER_SSN); + _source = attach.getSource(); + + try + { + SectionDecoder sectionDecoder = new SectionDecoderImpl(session.getConnection() + .getDescribedTypeRegistry() + .getSectionDecoderRegistry()); + _linkEndpoint = new TxnCoordinatorReceivingLinkEndpoint(this, sectionDecoder); + _linkEndpoint.associateSession(session); + _linkEndpoint.attachReceived(attach); + } + catch (AmqpErrorException e) + { + rejectLink(session, attach); + } + + return Futures.immediateFuture(_linkEndpoint); + } + + private ListenableFuture<TxnCoordinatorReceivingLinkEndpoint> rejectLink(final Session_1_0 session, + final Attach attach) + { + SectionDecoder sectionDecoder = new SectionDecoderImpl(session.getConnection() + .getDescribedTypeRegistry() + .getSectionDecoderRegistry()); + _linkEndpoint = new TxnCoordinatorReceivingLinkEndpoint(this, sectionDecoder); + _linkEndpoint.associateSession(session); + _target = null; + return Futures.immediateFuture(_linkEndpoint); + } } Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java?rev=1785660&view=auto ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java (added) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java Mon Mar 6 15:06:57 2017 @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.protocol.v1_0; + +import static org.mockito.Mockito.mock; + +import org.apache.qpid.server.protocol.LinkModel; +import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; + +public class LinkRegistryTest extends QpidTestCase +{ + private QueueManagingVirtualHost _virtualHost; + private LinkRegistryImpl _linkRegistry; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + _virtualHost = mock(QueueManagingVirtualHost.class); + _linkRegistry = new LinkRegistryImpl(_virtualHost); + } + + private void doTestGetLink(final Class<? extends LinkModel> type) throws Exception + { + String remoteContainerId = "testRemoteContainerId"; + String linkName = "testLinkName"; + LinkModel link = _linkRegistry.getLink(remoteContainerId, linkName, type); + assertNotNull("LinkRegistry#getLink should always return an object", link); + LinkModel link2 = _linkRegistry.getLink(remoteContainerId, linkName, type); + assertNotNull("LinkRegistry#getLink should always return an object", link2); + assertSame("Two calls to LinkRegistry#getLink should return the same object", link, link2); + } + + public void testGetSendingLink() throws Exception + { + doTestGetLink(SendingLink_1_0.class); + } + + public void testGetReceivingLink() throws Exception + { + doTestGetLink(StandardReceivingLink_1_0.class); + } + + public void testGetCoordinatingLink() throws Exception + { + doTestGetLink(TxnCoordinatorReceivingLink_1_0.class); + } +} Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java?rev=1785660&r1=1785659&r2=1785660&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java Mon Mar 6 15:06:57 2017 @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v1_0; +import static org.junit.Assert.assertArrayEquals; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -28,6 +29,7 @@ import static org.mockito.Mockito.when; import java.security.Principal; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -38,11 +40,13 @@ import javax.security.auth.Subject; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.apache.qpid.server.common.AMQPFilterTypes; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.model.Binding; @@ -56,6 +60,8 @@ import org.apache.qpid.server.model.Publ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.protocol.v1_0.type.BaseSource; +import org.apache.qpid.server.protocol.v1_0.type.BaseTarget; import org.apache.qpid.server.protocol.v1_0.type.FrameBody; import org.apache.qpid.server.protocol.v1_0.type.Symbol; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; @@ -84,16 +90,26 @@ public class Session_1_0Test extends Qpi private VirtualHost<?> _virtualHost; private Session_1_0 _session; private int _handle; + private CurrentThreadTaskExecutor _taskExecutor; @Override public void setUp() throws Exception { super.setUp(); _virtualHost = BrokerTestHelper.createVirtualHost("testVH"); + _taskExecutor = new CurrentThreadTaskExecutor(); + _taskExecutor.start(); _connection = createAmqpConnection_1_0("testContainerId"); this._session = createSession_1_0(_connection, 0); } + @Override + protected void tearDown() throws Exception + { + _taskExecutor.stop(); + super.tearDown(); + } + public void testReceiveAttachTopicNonDurableNoContainer() throws Exception { final String linkName = "testLink"; @@ -308,7 +324,7 @@ public class Session_1_0Test extends Qpi assertNotNull("Unexpected source", sentAttach.getSource()); Source source = (Source)sentAttach.getSource(); assertEquals("Unexpected address", address, source.getAddress()); - assertEquals("Unexpected capabilities", ((Source)attach.getSource()).getCapabilities(), source.getCapabilities()); + assertTrue("Unexpected source capabilities", Arrays.asList(source.getCapabilities()).contains(Symbol.valueOf("topic"))); Collection<Queue> queues = _virtualHost.getChildren(Queue.class); assertEquals("Unexpected number of queues after unsubscribe", 1, queues.size()); @@ -497,8 +513,22 @@ public class Session_1_0Test extends Qpi assertEquals("Unexpected name", receivedAttach.getName(), sentAttach.getName()); assertEquals("Unexpected role", Role.SENDER, sentAttach.getRole()); - assertEquals("Unexpected source", receivedAttach.getSource(), sentAttach.getSource()); - assertEquals("Unexpected target", receivedAttach.getTarget(), sentAttach.getTarget()); + + Source receivedSource = (Source) receivedAttach.getSource(); + Source sentSource = (Source) sentAttach.getSource(); + assertEquals("Unexpected source address", receivedSource.getAddress(), sentSource.getAddress()); + assertArrayEquals("Unexpected source capabilities", receivedSource.getCapabilities(), sentSource.getCapabilities()); + assertEquals("Unexpected source durability", receivedSource.getDurable(), sentSource.getDurable()); + assertEquals("Unexpected source expiry policy", receivedSource.getExpiryPolicy(), sentSource.getExpiryPolicy()); + assertEquals("Unexpected source dynamic flag", receivedSource.getDynamic(), sentSource.getDynamic()); + + Target receivedTarget = (Target) receivedAttach.getTarget(); + Target sentTarget = (Target) sentAttach.getTarget(); + assertEquals("Unexpected target address", receivedTarget.getAddress(), sentTarget.getAddress()); + assertArrayEquals("Unexpected target capabilities", receivedTarget.getCapabilities(), sentTarget.getCapabilities()); + assertEquals("Unexpected target durability", receivedTarget.getDurable(), sentTarget.getDurable()); + assertEquals("Unexpected target expiry policy", receivedTarget.getExpiryPolicy(), sentTarget.getExpiryPolicy()); + assertEquals("Unexpected target dynamic flag", receivedTarget.getDynamic(), sentTarget.getDynamic()); final Collection<Queue> queues = _virtualHost.getChildren(Queue.class); assertEquals("Unexpected number of queues after attach", 1, queues.size()); @@ -651,7 +681,8 @@ public class Session_1_0Test extends Qpi when(connection.getAddressSpace()).thenReturn(_virtualHost); when(connection.getEventLogger()).thenReturn(mock(EventLogger.class)); when(connection.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD)).thenReturn(1L); - when(connection.getChildExecutor()).thenReturn(mock(TaskExecutor.class)); + when(connection.getChildExecutor()).thenReturn(_taskExecutor); + when(connection.getTaskExecutor()).thenReturn(_taskExecutor); when(connection.getModel()).thenReturn(BrokerModel.getInstance()); when(connection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT); when(connection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT); Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java?rev=1785660&r1=1785659&r2=1785660&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java (original) +++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java Mon Mar 6 15:06:57 2017 @@ -26,6 +26,7 @@ import java.security.AccessController; import java.security.Principal; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -52,9 +53,10 @@ import org.apache.qpid.server.model.Conn import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.model.PublishingLink; import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.plugin.SystemAddressSpaceCreator; import org.apache.qpid.server.protocol.LinkModel; -import org.apache.qpid.server.protocol.LinkRegistry; +import org.apache.qpid.server.virtualhost.LinkRegistry; import org.apache.qpid.server.security.SecurityToken; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.session.AMQPSession; @@ -66,6 +68,7 @@ import org.apache.qpid.server.txn.DtxNot import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.server.virtualhost.LinkRegistryFactory; import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNode; public class ManagementAddressSpace implements NamedAddressSpace @@ -83,11 +86,11 @@ public class ManagementAddressSpace impl private final MessageStore _messageStore; private final MessageDestination _defaultDestination = new DefaultDestination(); private final List<AMQPConnection<?>> _connections = new CopyOnWriteArrayList<>(); - private final LinkRegistry _linkRegistry = new NonDurableLinkRegistry(); private final Broker<?> _broker; private final Principal _principal; private final UUID _id; private final ConcurrentMap<Object, ConcurrentMap<String, ProxyMessageSource>> _connectionSpecificDestinations = new ConcurrentHashMap<>(); + private final LinkRegistry _linkRegistry; public ManagementAddressSpace(final SystemAddressSpaceCreator.AddressSpaceRegistry addressSpaceRegistry) { @@ -105,6 +108,22 @@ public class ManagementAddressSpace impl _messageStore = new MemoryMessageStore(); _principal = new ManagementAddressSpacePrincipal(this); _id = UUID.nameUUIDFromBytes((_broker.getId().toString()+"/"+name).getBytes(StandardCharsets.UTF_8)); + + Iterator<LinkRegistryFactory> + linkRegistryFactories = (new QpidServiceLoader()).instancesOf(LinkRegistryFactory.class).iterator(); + if (linkRegistryFactories.hasNext()) + { + final LinkRegistryFactory linkRegistryFactory = linkRegistryFactories.next(); + if (linkRegistryFactories.hasNext()) + { + throw new RuntimeException("Found multiple implementations of LinkRegistry"); + } + _linkRegistry = linkRegistryFactory.create(this); + } + else + { + _linkRegistry = null; + } } @@ -209,9 +228,9 @@ public class ManagementAddressSpace impl } @Override - public LinkRegistry getLinkRegistry(final String remoteContainerId) + public <T extends LinkModel> T getLink(final String remoteContainerId, final String linkName, final Class<T> type) { - return _linkRegistry; + return _linkRegistry.getLink(remoteContainerId, linkName, type); } @Override @@ -386,37 +405,4 @@ public class ManagementAddressSpace impl } } - - private class NonDurableLinkRegistry implements LinkRegistry - { - @Override - public LinkModel getDurableSendingLink(final String name) - { - return null; - } - - @Override - public boolean registerSendingLink(final String name, final LinkModel link) - { - throw new ConnectionScopedRuntimeException("Durable links are not supported"); - } - - @Override - public boolean unregisterSendingLink(final String name) - { - return false; - } - - @Override - public LinkModel getDurableReceivingLink(final String name) - { - return null; - } - - @Override - public boolean registerReceivingLink(final String name, final LinkModel link) - { - throw new ConnectionScopedRuntimeException("Durable links are not supported"); - } - } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
