Author: kwall Date: Tue Mar 3 14:15:30 2015 New Revision: 1663708 URL: http://svn.apache.org/r1663708 Log: channel block/unblock now async, remove unnecessary selector bumps
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1663708&r1=1663707&r2=1663708&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Tue Mar 3 14:15:30 2015 @@ -116,5 +116,5 @@ public interface AMQSessionModel<T exten void transportStateChanged(); - void processPendingMessages(); + void processPending(); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1663708&r1=1663707&r2=1663708&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Tue Mar 3 14:15:30 2015 @@ -623,6 +623,8 @@ public class NonBlockingConnection imple @Override public void send(final ByteBuffer msg) { + assert Thread.currentThread().getName().startsWith(SelectorThread.IO_THREAD_NAME_PREFIX) : "Send called by unexpected thread " + Thread.currentThread().getName(); + if (_closed.get()) { throw new SenderClosedException("I/O for thread " + _remoteSocketAddress + " is already closed"); @@ -634,7 +636,5 @@ public class NonBlockingConnection imple @Override public void flush() { - getSelector().wakeup(); - } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1663708&r1=1663707&r2=1663708&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Tue Mar 3 14:15:30 2015 @@ -36,11 +36,9 @@ import java.util.concurrent.ScheduledThr import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -/** -* Created by keith on 28/01/2015. -*/ public class SelectorThread extends Thread { + public static final String IO_THREAD_NAME_PREFIX = "NCS-"; private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>(); private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>(); private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>(); @@ -289,7 +287,8 @@ public class SelectorThread extends Thre String currentName = Thread.currentThread().getName(); try { - Thread.currentThread().setName("NCS-"+connection.getRemoteAddress().toString()); + Thread.currentThread().setName( + IO_THREAD_NAME_PREFIX + connection.getRemoteAddress().toString()); processConnection(connection); } finally Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1663708&r1=1663707&r2=1663708&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Tue Mar 3 14:15:30 2015 @@ -488,7 +488,7 @@ public class MockConsumer implements Con } @Override - public void processPendingMessages() + public void processPending() { } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1663708&r1=1663707&r2=1663708&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Tue Mar 3 14:15:30 2015 @@ -632,7 +632,6 @@ public class ConsumerTarget_0_10 extends public void flushBatched() { - _session.getConnection().flush(); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1663708&r1=1663707&r2=1663708&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Tue Mar 3 14:15:30 2015 @@ -70,7 +70,6 @@ public class ServerConnection extends Co { private final Broker<?> _broker; - private Runnable _onOpenTask; private AtomicBoolean _logClosed = new AtomicBoolean(false); private final Subject _authorizedSubject = new Subject(); @@ -79,10 +78,10 @@ public class ServerConnection extends Co private final long _connectionId; private final Object _reference = new Object(); private VirtualHostImpl<?,?,?> _virtualHost; - private AmqpPort<?> _port; - private AtomicLong _lastIoTime = new AtomicLong(); + private final AmqpPort<?> _port; + private final AtomicLong _lastIoTime = new AtomicLong(); private boolean _blocking; - private Transport _transport; + private final Transport _transport; private final CopyOnWriteArrayList<Action<? super ServerConnection>> _connectionCloseTaskList = new CopyOnWriteArrayList<Action<? super ServerConnection>>(); @@ -95,7 +94,7 @@ public class ServerConnection extends Co private volatile boolean _stopped; private int _messageCompressionThreshold; - private int _maxMessageSize; + private final int _maxMessageSize; private ServerProtocolEngine _serverProtocolEngine; @@ -149,10 +148,6 @@ public class ServerConnection extends Co if (state == State.OPEN) { - if (_onOpenTask != null) - { - _onOpenTask.run(); - } getEventLogger().message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), @@ -256,11 +251,6 @@ public class ServerConnection extends Co return _stopped; } - public void onOpen(final Runnable task) - { - _onOpenTask = task; - } - public void closeSessionAsync(final ServerSession session, final AMQConstant cause, final String message) { addAsyncTask(new Action<ServerConnection>() @@ -740,7 +730,7 @@ public class ServerConnection extends Co for (AMQSessionModel session : getSessionModels()) { - session.processPendingMessages(); + session.processPending(); } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1663708&r1=1663707&r2=1663708&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Tue Mar 3 14:15:30 2015 @@ -137,6 +137,7 @@ public class ServerSession extends Sessi private org.apache.qpid.server.model.Session<?> _modelObject; private long _blockTime; private long _blockingTimeout; + private boolean _wireBlockingState; public static interface MessageDispositionChangeListener { @@ -208,10 +209,6 @@ public class ServerSession extends Sessi if (state == State.OPEN) { getVirtualHost().getEventLogger().message(ChannelMessages.CREATE()); - if(_blocking.get()) - { - invokeBlock(); - } } } else @@ -245,6 +242,17 @@ public class ServerSession extends Sessi invoke(new MessageStop("")); } + private void invokeUnblock() + { + MessageFlow mf = new MessageFlow(); + mf.setUnit(MessageCreditUnit.MESSAGE); + mf.setDestination(""); + _outstandingCredit.set(Integer.MAX_VALUE); + mf.setValue(Integer.MAX_VALUE); + invoke(mf); + } + + @Override protected boolean isFull(int id) { @@ -824,12 +832,11 @@ public class ServerSession extends Sessi if(_blocking.compareAndSet(false,true)) { + getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); if(getState() == State.OPEN) { - invokeBlock(); + getConnection().notifyWork(); } - _blockTime = System.currentTimeMillis(); - getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); } @@ -853,24 +860,17 @@ public class ServerSession extends Sessi { if(_blocking.compareAndSet(true,false) && !isClosing()) { - _blockTime = 0l; getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED()); - MessageFlow mf = new MessageFlow(); - mf.setUnit(MessageCreditUnit.MESSAGE); - mf.setDestination(""); - _outstandingCredit.set(Integer.MAX_VALUE); - mf.setValue(Integer.MAX_VALUE); - invoke(mf); - - + getConnection().notifyWork(); } } } + boolean blockingTimeoutExceeded() { long blockTime = _blockTime; - boolean b = _blocking.get() && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout; + boolean b = _wireBlockingState && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout; return b; } @@ -1136,8 +1136,25 @@ public class ServerSession extends Sessi } @Override - public void processPendingMessages() + public void processPending() { + boolean desiredBlockingState = _blocking.get(); + if (desiredBlockingState != _wireBlockingState) + { + _wireBlockingState = desiredBlockingState; + + if (desiredBlockingState) + { + invokeBlock(); + } + else + { + invokeUnblock(); + } + _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0; + } + + for(ConsumerTarget target : getSubscriptions()) { target.processPending(); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1663708&r1=1663707&r2=1663708&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Tue Mar 3 14:15:30 2015 @@ -209,6 +209,8 @@ public class AMQChannel private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<>(); private long _maxUncommittedInMemorySize; + private boolean _wireBlockingState; + public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore) { _creditManager = new Pre0_10CreditManager(0l,0l, connection); @@ -1611,12 +1613,14 @@ public class AMQChannel { if(_blockingEntities.add(this)) { + if(_blocking.compareAndSet(false,true)) { getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **")); - flow(false); - _blockTime = System.currentTimeMillis(); + + + getConnection().notifyWork(); } } } @@ -1628,8 +1632,7 @@ public class AMQChannel if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false)) { getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED()); - - flow(true); + getConnection().notifyWork(); } } } @@ -1643,8 +1646,7 @@ public class AMQChannel if(_blocking.compareAndSet(false,true)) { getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName())); - flow(false); - _blockTime = System.currentTimeMillis(); + getConnection().notifyWork(); } } @@ -1657,7 +1659,7 @@ public class AMQChannel if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing()) { getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED()); - flow(true); + getConnection().notifyWork(); } } } @@ -2262,7 +2264,7 @@ public class AMQChannel private boolean blockingTimeoutExceeded() { - return _blocking.get() && (System.currentTimeMillis() - _blockTime) > _blockingTimeout; + return _wireBlockingState && (System.currentTimeMillis() - _blockTime) > _blockingTimeout; } @Override @@ -3598,9 +3600,17 @@ public class AMQChannel } @Override - public void processPendingMessages() + public void processPending() { + boolean desiredBlockingState = _blocking.get(); + if (desiredBlockingState != _wireBlockingState) + { + _wireBlockingState = desiredBlockingState; + flow(!desiredBlockingState); + _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0; + } + for(ConsumerTarget target : _tag2SubscriptionTargetMap.values()) { target.processPending(); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1663708&r1=1663707&r2=1663708&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Tue Mar 3 14:15:30 2015 @@ -44,8 +44,6 @@ import java.util.concurrent.CopyOnWriteA import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import javax.security.auth.Subject; import javax.security.sasl.SaslException; @@ -148,11 +146,8 @@ public class AMQProtocolEngine implement * The channels that the latest call to {@link #received(ByteBuffer)} applied to. * Used so we know which channels we need to call {@link AMQChannel#receivedComplete()} * on after handling the frames. - * - * Thread-safety: guarded by {@link #_receivedLock}. */ - private final Set<AMQChannel> _channelsForCurrentMessage = - new HashSet<>(); + private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<>(); private AMQDecoder _decoder; @@ -197,7 +192,6 @@ public class AMQProtocolEngine implement private long _lastReceivedTime = System.currentTimeMillis(); // TODO consider if this is what we want? private boolean _blocking; - private final ReentrantLock _receivedLock; private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis()); private final Broker<?> _broker; private final Transport _transport; @@ -251,7 +245,6 @@ public class AMQProtocolEngine implement _port = port; _transport = transport; _maxNoOfChannels = broker.getConnection_sessionCountLimit(); - _receivedLock = new ReentrantLock(); _decoder = new BrokerDecoder(this); _connectionID = connectionId; _logSubject = new ConnectionLogSubject(this); @@ -545,43 +538,8 @@ public class AMQProtocolEngine implement private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY]; - private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes); private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes); - private ByteBuffer asByteBuffer(AMQDataBlock block) - { - final int size = (int) block.getSize(); - - final byte[] data; - - - if(size > REUSABLE_BYTE_BUFFER_CAPACITY) - { - data= new byte[size]; - } - else - { - - data = _reusableBytes; - } - _reusableDataOutput.setBuffer(data); - - try - { - block.writePayload(_reusableDataOutput); - } - catch (IOException e) - { - throw new ServerScopedRuntimeException(e); - } - - final ByteBuffer copy = ByteBuffer.allocate(_reusableDataOutput.length()); - copy.put(data, 0, _reusableDataOutput.length()); - copy.flip(); - return copy; - } - - /** * Convenience method that writes a frame to the protocol session. Equivalent to calling * getProtocolSession().write(). @@ -1969,11 +1927,6 @@ public class AMQProtocolEngine implement return _reference; } - public Lock getReceivedLock() - { - return _receivedLock; - } - @Override public long getLastReadTime() { @@ -2095,6 +2048,8 @@ public class AMQProtocolEngine implement @Override public void processPending() { + + while(_asyncTaskList.peek() != null) { Action<? super AMQProtocolEngine> asyncAction = _asyncTaskList.poll(); @@ -2103,7 +2058,7 @@ public class AMQProtocolEngine implement for (AMQSessionModel session : getSessionModels()) { - session.processPendingMessages(); + session.processPending(); } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1663708&r1=1663707&r2=1663708&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Tue Mar 3 14:15:30 2015 @@ -515,6 +515,7 @@ public abstract class ConsumerTarget_0_8 if (isAutoClose()) { _needToClose.set(true); + getChannel().getConnection().notifyWork(); } } @@ -531,8 +532,6 @@ public abstract class ConsumerTarget_0_8 public void flushBatched() { _channel.getConnection().setDeferFlush(false); - - _channel.getConnection().notifyWork(); } protected void addUnacknowledgedMessage(MessageInstance entry) Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1663708&r1=1663707&r2=1663708&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Tue Mar 3 14:15:30 2015 @@ -552,7 +552,7 @@ public class Connection_1_0 implements C for (AMQSessionModel session : getSessionModels()) { - session.processPendingMessages(); + session.processPending(); } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1663708&r1=1663707&r2=1663708&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Tue Mar 3 14:15:30 2015 @@ -901,7 +901,7 @@ public class Session_1_0 implements Sess } @Override - public void processPendingMessages() + public void processPending() { for(Consumer<?> consumer : getConsumers()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org