Author: rgodfrey
Date: Tue Mar 14 11:49:27 2017
New Revision: 1786887

URL: http://svn.apache.org/viewvc?rev=1786887&view=rev
Log:
QPID-7708 : Allow session window credit to be set by a context variable

Modified:
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1786887&r1=1786886&r2=1786887&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
 Tue Mar 14 11:49:27 2017
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.logging.EventLoggerProvider;
+import org.apache.qpid.server.model.ManagedContextDefault;
 import org.apache.qpid.server.model.ManagedObject;
 import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
@@ -42,6 +43,11 @@ public interface AMQPConnection_1_0<C ex
                                                                              
ConnectionHandler,
                                                                              
EventLoggerProvider
 {
+
+    String CONNECTION_SESSION_CREDIT_WINDOW_SIZE = 
"connection.sessionCreditWindowSize";
+    @ManagedContextDefault(name = CONNECTION_SESSION_CREDIT_WINDOW_SIZE)
+    int DEFAULT_CONNECTION_SESSION_CREDIT_WINDOW_SIZE = 8192;
+
     Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
     Symbol SHARED_SUBSCRIPTIONS = Symbol.valueOf("SHARED-SUBS");
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java?rev=1786887&r1=1786886&r2=1786887&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
 Tue Mar 14 11:49:27 2017
@@ -603,7 +603,11 @@ public class AMQPConnection_1_0Impl exte
                 }
                 else
                 {
-                    Session_1_0 session = new Session_1_0(this, begin, 
sendingChannelId, receivingChannelId);
+                    Session_1_0 session = new Session_1_0(this,
+                                                          begin,
+                                                          sendingChannelId,
+                                                          receivingChannelId,
+                                                          
getContextValue(Integer.class, 
AMQPConnection_1_0.CONNECTION_SESSION_CREDIT_WINDOW_SIZE));
                     session.create();
 
                     _receivingSessions[receivingChannelId] = session;

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1786887&r1=1786886&r2=1786887&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 Tue Mar 14 11:49:27 2017
@@ -141,11 +141,7 @@ public class Session_1_0 extends Abstrac
     private final short _sendingChannel;
 
 
-    // has to be a power of two
     private static final int DEFAULT_SESSION_BUFFER_SIZE = 1 << 11;
-    private static final int BUFFER_SIZE_MASK = DEFAULT_SESSION_BUFFER_SIZE - 
1;
-
-
 
     private int _nextOutgoingDeliveryId;
 
@@ -157,7 +153,7 @@ public class Session_1_0 extends Abstrac
     private LinkedHashMap<UnsignedInteger,Delivery> _outgoingUnsettled = new 
LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
     private LinkedHashMap<UnsignedInteger,Delivery> _incomingUnsettled = new 
LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
 
-    private int _availableIncomingCredit = DEFAULT_SESSION_BUFFER_SIZE;
+    private final int _incomingWindowSize;
     private int _availableOutgoingCredit = DEFAULT_SESSION_BUFFER_SIZE;
     private UnsignedInteger _lastSentIncomingLimit;
 
@@ -175,7 +171,8 @@ public class Session_1_0 extends Abstrac
     public Session_1_0(final AMQPConnection_1_0 connection,
                        Begin begin,
                        short sendingChannelId,
-                       short receivingChannelId)
+                       short receivingChannelId,
+                       int incomingWindowSize)
     {
         super(connection, sendingChannelId);
         _sendingChannel = sendingChannelId;
@@ -184,6 +181,7 @@ public class Session_1_0 extends Abstrac
         _nextIncomingTransferId = new 
SequenceNumber(begin.getNextOutgoingId().intValue());
         _connection = connection;
         _primaryDomain = getPrimaryDomain();
+        _incomingWindowSize = incomingWindowSize;
 
         AccessController.doPrivileged((new PrivilegedAction<Object>()
         {
@@ -390,8 +388,10 @@ public class Session_1_0 extends Abstrac
         {
             UnsignedInteger clientsCredit =
                     
_lastSentIncomingLimit.subtract(UnsignedInteger.valueOf(_nextIncomingTransferId.intValue()));
-            int i = 
UnsignedInteger.valueOf(_availableIncomingCredit).subtract(clientsCredit).compareTo(clientsCredit);
-            if (i >= 0)
+
+            // TODO - we should use a better metric here, and/or manage 
session credit across the whole connection
+            // send a flow if the window is at least half used up
+            if 
(UnsignedInteger.valueOf(_incomingWindowSize).subtract(clientsCredit).compareTo(clientsCredit)
 >= 0)
             {
                 sendFlow();
             }
@@ -497,9 +497,9 @@ public class Session_1_0 extends Abstrac
         {
             final int nextIncomingId = _nextIncomingTransferId.intValue();
             flow.setNextIncomingId(UnsignedInteger.valueOf(nextIncomingId));
-            _lastSentIncomingLimit = UnsignedInteger.valueOf(nextIncomingId + 
_availableIncomingCredit);
+            _lastSentIncomingLimit = UnsignedInteger.valueOf(nextIncomingId + 
_incomingWindowSize);
         }
-        
flow.setIncomingWindow(UnsignedInteger.valueOf(_availableIncomingCredit));
+        flow.setIncomingWindow(UnsignedInteger.valueOf(_incomingWindowSize));
 
         
flow.setNextOutgoingId(UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue()));
         
flow.setOutgoingWindow(UnsignedInteger.valueOf(_availableOutgoingCredit));
@@ -653,7 +653,7 @@ public class Session_1_0 extends Abstrac
 
     UnsignedInteger getIncomingWindowSize()
     {
-        return UnsignedInteger.valueOf(_availableIncomingCredit);
+        return UnsignedInteger.valueOf(_incomingWindowSize);
     }
 
     AccessControlContext getAccessControllerContext()

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java?rev=1786887&r1=1786886&r2=1786887&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
 Tue Mar 14 11:49:27 2017
@@ -715,7 +715,7 @@ public class Session_1_0Test extends Qpi
     {
         Begin begin = mock(Begin.class);
         when(begin.getNextOutgoingId()).thenReturn(new 
UnsignedInteger(channelId));
-        Session_1_0 session = new Session_1_0(connection, begin, (short) 
channelId, (short) channelId);
+        Session_1_0 session = new Session_1_0(connection, begin, (short) 
channelId, (short) channelId, 2048);
         return session;
     }
 
@@ -728,4 +728,4 @@ public class Session_1_0Test extends Qpi
         detach.setClosed(closed);
         session.receiveDetach(detach);
     }
-}
\ No newline at end of file
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to