Author: rgodfrey
Date: Wed Feb 11 01:04:08 2015
New Revision: 1658849

URL: http://svn.apache.org/r1658849
Log:
QPID-6384 : fix various issues with durable links

Modified:
    
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
    
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
    
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java

Modified: 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java?rev=1658849&r1=1658848&r2=1658849&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
 Wed Feb 11 01:04:08 2015
@@ -21,15 +21,28 @@
 
 package org.apache.qpid.amqp_1_0.transport;
 
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.Source;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.Target;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.qpid.amqp_1_0.type.transport.Attach;
+import org.apache.qpid.amqp_1_0.type.transport.Detach;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.amqp_1_0.type.transport.Flow;
+import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.Role;
+import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+
 public abstract class LinkEndpoint<T extends LinkEventListener>
 {
 

Modified: 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java?rev=1658849&r1=1658848&r2=1658849&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
 Wed Feb 11 01:04:08 2015
@@ -98,6 +98,12 @@ public class SessionEndpoint
     private int _availableOutgoingCredit;
     private UnsignedInteger _lastSentIncomingLimit;
 
+    private final Error _sessionEndedLinkError =
+            new Error(LinkError.DETACH_FORCED,
+                     "Force detach the link because the session is remotely 
ended.");
+
+
+
     public SessionEndpoint(final ConnectionEndpoint connectionEndpoint)
     {
         this(connectionEndpoint, UnsignedInteger.valueOf(0));
@@ -240,19 +246,21 @@ public class SessionEndpoint
     private void detachLinks()
     {
         Collection<UnsignedInteger> handles = new 
ArrayList<UnsignedInteger>(_remoteLinkEndpoints.keySet());
-        Error error = new Error();
-        error.setCondition(LinkError.DETACH_FORCED);
-        error.setDescription("Force detach the link because the session is 
remotely ended.");
         for(UnsignedInteger handle : handles)
         {
             Detach detach = new Detach();
             detach.setClosed(false);
             detach.setHandle(handle);
-            detach.setError(error);
+            detach.setError(_sessionEndedLinkError);
             detach(handle, detach);
         }
     }
 
+    public boolean isSyntheticError(Error error)
+    {
+        return error == _sessionEndedLinkError;
+    }
+
     public short getSendingChannel()
     {
         return _sendingChannel;

Modified: 
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1658849&r1=1658848&r2=1658849&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 Wed Feb 11 01:04:08 2015
@@ -46,6 +46,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -283,7 +284,12 @@ class ConsumerTarget_1_0 extends Abstrac
     {
         //TODO
         getEndpoint().setSource(null);
-        getEndpoint().detach();
+        getEndpoint().close();
+
+        final LinkRegistry linkReg = getSession().getConnection()
+                .getVirtualHost()
+                
.getLinkRegistry(getEndpoint().getSession().getConnection().getRemoteContainerId());
+        linkReg.unregisterSendingLink(getEndpoint().getName());
     }
 
     public boolean allocateCredit(final ServerMessage msg)

Modified: 
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1658849&r1=1658848&r2=1658849&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
 Wed Feb 11 01:04:08 2015
@@ -464,7 +464,8 @@ public class SendingLink_1_0 implements
                 _consumer.releaseSendLock();
             }
         }
-        else if(detach == null || detach.getError() != null)
+        else if(detach.getError() != null
+                && 
!_linkAttachment.getEndpoint().getSession().isSyntheticError(detach.getError()))
         {
             _linkAttachment = null;
             _target.flowStateChanged();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to