Author: rgodfrey
Date: Wed Jan 11 09:52:16 2012
New Revision: 1229943

URL: http://svn.apache.org/viewvc?rev=1229943&view=rev
Log:
QPID-3717 - Fixes based on review by Robbie Gemmell

Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1229943&r1=1229942&r2=1229943&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
 Wed Jan 11 09:52:16 2012
@@ -47,7 +47,6 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.configuration.ConnectionConfig;
 import org.apache.qpid.server.configuration.SessionConfig;
 import org.apache.qpid.server.configuration.SessionConfigType;
-import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -90,7 +89,7 @@ public class ServerSession extends Sessi
     private static final Logger _logger = 
LoggerFactory.getLogger(ServerSession.class);
     
     private static final String NULL_DESTINTATION = 
UUID.randomUUID().toString();
-    private static final int HALF_INCOMING_CREDIT_THRESHOLD = 1 << 30;
+    private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
 
     private final UUID _id;
     private ConnectionConfig _connectionConfig;
@@ -100,12 +99,9 @@ public class ServerSession extends Sessi
 
     private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new 
ConcurrentHashMap<AMQQueue, Boolean>();
 
-    private final ConcurrentMap<Exchange, Boolean> _blockingExchanges = new 
ConcurrentHashMap<Exchange, Boolean>();
-
-
     private final AtomicBoolean _blocking = new AtomicBoolean(false);
     private ChannelLogSubject _logSubject;
-    private final AtomicInteger _oustandingCredit = new 
AtomicInteger(Integer.MAX_VALUE);
+    private final AtomicInteger _outstandingCredit = new 
AtomicInteger(UNLIMITED_CREDIT);
 
 
     public static interface MessageDispositionChangeListener
@@ -181,9 +177,11 @@ public class ServerSession extends Sessi
 
     public void enqueue(final ServerMessage message, final List<? extends 
BaseQueue> queues)
     {
-        if(_oustandingCredit.decrementAndGet() < 
HALF_INCOMING_CREDIT_THRESHOLD)
+        if(_outstandingCredit.get() != UNLIMITED_CREDIT
+                && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE 
- PRODUCER_CREDIT_TOPUP_THRESHOLD))
         {
-            invoke(new 
MessageFlow("",MessageCreditUnit.MESSAGE,HALF_INCOMING_CREDIT_THRESHOLD));
+            _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD);
+            invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, 
PRODUCER_CREDIT_TOPUP_THRESHOLD));
         }
         getConnectionModel().registerMessageReceived(message.getSize(), 
message.getArrivalTime());
         PostEnqueueAction postTransactionAction;
@@ -712,7 +710,7 @@ public class ServerSession extends Sessi
                 MessageFlow mf = new MessageFlow();
                 mf.setUnit(MessageCreditUnit.MESSAGE);
                 mf.setDestination("");
-                _oustandingCredit.set(Integer.MAX_VALUE);
+                _outstandingCredit.set(Integer.MAX_VALUE);
                 mf.setValue(Integer.MAX_VALUE);
                 invoke(mf);
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to