Author: rgodfrey Date: Wed Jan 11 09:52:16 2012 New Revision: 1229943 URL: http://svn.apache.org/viewvc?rev=1229943&view=rev Log: QPID-3717 - Fixes based on review by Robbie Gemmell
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1229943&r1=1229942&r2=1229943&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Wed Jan 11 09:52:16 2012 @@ -47,7 +47,6 @@ import org.apache.qpid.server.configurat import org.apache.qpid.server.configuration.ConnectionConfig; import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.configuration.SessionConfigType; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; @@ -90,7 +89,7 @@ public class ServerSession extends Sessi private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); - private static final int HALF_INCOMING_CREDIT_THRESHOLD = 1 << 30; + private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30; private final UUID _id; private ConnectionConfig _connectionConfig; @@ -100,12 +99,9 @@ public class ServerSession extends Sessi private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>(); - private final ConcurrentMap<Exchange, Boolean> _blockingExchanges = new ConcurrentHashMap<Exchange, Boolean>(); - - private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; - private final AtomicInteger _oustandingCredit = new AtomicInteger(Integer.MAX_VALUE); + private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); public static interface MessageDispositionChangeListener @@ -181,9 +177,11 @@ public class ServerSession extends Sessi public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues) { - if(_oustandingCredit.decrementAndGet() < HALF_INCOMING_CREDIT_THRESHOLD) + if(_outstandingCredit.get() != UNLIMITED_CREDIT + && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD)) { - invoke(new MessageFlow("",MessageCreditUnit.MESSAGE,HALF_INCOMING_CREDIT_THRESHOLD)); + _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD); + invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD)); } getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); PostEnqueueAction postTransactionAction; @@ -712,7 +710,7 @@ public class ServerSession extends Sessi MessageFlow mf = new MessageFlow(); mf.setUnit(MessageCreditUnit.MESSAGE); mf.setDestination(""); - _oustandingCredit.set(Integer.MAX_VALUE); + _outstandingCredit.set(Integer.MAX_VALUE); mf.setValue(Integer.MAX_VALUE); invoke(mf); --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org