Author: rgodfrey Date: Wed Feb 11 01:04:08 2015 New Revision: 1658849 URL: http://svn.apache.org/r1658849 Log: QPID-6384 : fix various issues with durable links
Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java?rev=1658849&r1=1658848&r2=1658849&view=diff ============================================================================== --- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java (original) +++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java Wed Feb 11 01:04:08 2015 @@ -21,15 +21,28 @@ package org.apache.qpid.amqp_1_0.transport; -import org.apache.qpid.amqp_1_0.type.*; -import org.apache.qpid.amqp_1_0.type.transport.*; -import org.apache.qpid.amqp_1_0.type.transport.Error; - import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.Source; +import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.Target; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.transport.Attach; +import org.apache.qpid.amqp_1_0.type.transport.Detach; +import org.apache.qpid.amqp_1_0.type.transport.Error; +import org.apache.qpid.amqp_1_0.type.transport.Flow; +import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; +import org.apache.qpid.amqp_1_0.type.transport.Role; +import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; +import org.apache.qpid.amqp_1_0.type.transport.Transfer; + public abstract class LinkEndpoint<T extends LinkEventListener> { Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java?rev=1658849&r1=1658848&r2=1658849&view=diff ============================================================================== --- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java (original) +++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java Wed Feb 11 01:04:08 2015 @@ -98,6 +98,12 @@ public class SessionEndpoint private int _availableOutgoingCredit; private UnsignedInteger _lastSentIncomingLimit; + private final Error _sessionEndedLinkError = + new Error(LinkError.DETACH_FORCED, + "Force detach the link because the session is remotely ended."); + + + public SessionEndpoint(final ConnectionEndpoint connectionEndpoint) { this(connectionEndpoint, UnsignedInteger.valueOf(0)); @@ -240,19 +246,21 @@ public class SessionEndpoint private void detachLinks() { Collection<UnsignedInteger> handles = new ArrayList<UnsignedInteger>(_remoteLinkEndpoints.keySet()); - Error error = new Error(); - error.setCondition(LinkError.DETACH_FORCED); - error.setDescription("Force detach the link because the session is remotely ended."); for(UnsignedInteger handle : handles) { Detach detach = new Detach(); detach.setClosed(false); detach.setHandle(handle); - detach.setError(error); + detach.setError(_sessionEndedLinkError); detach(handle, detach); } } + public boolean isSyntheticError(Error error) + { + return error == _sessionEndedLinkError; + } + public short getSendingChannel() { return _sendingChannel; Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1658849&r1=1658848&r2=1658849&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Wed Feb 11 01:04:08 2015 @@ -46,6 +46,7 @@ import org.apache.qpid.server.message.Me import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -283,7 +284,12 @@ class ConsumerTarget_1_0 extends Abstrac { //TODO getEndpoint().setSource(null); - getEndpoint().detach(); + getEndpoint().close(); + + final LinkRegistry linkReg = getSession().getConnection() + .getVirtualHost() + .getLinkRegistry(getEndpoint().getSession().getConnection().getRemoteContainerId()); + linkReg.unregisterSendingLink(getEndpoint().getName()); } public boolean allocateCredit(final ServerMessage msg) Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1658849&r1=1658848&r2=1658849&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original) +++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Wed Feb 11 01:04:08 2015 @@ -464,7 +464,8 @@ public class SendingLink_1_0 implements _consumer.releaseSendLock(); } } - else if(detach == null || detach.getError() != null) + else if(detach.getError() != null + && !_linkAttachment.getEndpoint().getSession().isSyntheticError(detach.getError())) { _linkAttachment = null; _target.flowStateChanged(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org