Author: kwall Date: Tue Feb 10 18:10:16 2015 New Revision: 1658773 URL: http://svn.apache.org/r1658773 Log: Refactoring: make the queue no longer be responsible for pushing messages onto the wire
Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java - copied, changed from r1658748, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Tue Feb 10 18:10:16 2015 @@ -23,12 +23,14 @@ package org.apache.qpid.server.consumer; import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.util.StateChangeListener; public abstract class AbstractConsumerTarget implements ConsumerTarget @@ -41,6 +43,7 @@ public abstract class AbstractConsumerTa private final Lock _stateChangeLock = new ReentrantLock(); private final AtomicInteger _stateActivates = new AtomicInteger(); + private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue(); protected AbstractConsumerTarget(final State initialState) @@ -48,6 +51,22 @@ public abstract class AbstractConsumerTa _state = new AtomicReference<State>(initialState); } + @Override + public void processPendingMessages() + { + while(hasMessagesToSend()) + { + sendNextMessage(); + } + } + + @Override + public final boolean isSuspended() + { + return getSessionModel().getConnectionModel().isMessageAssignmentSuspended() || doIsSuspended(); + } + + protected abstract boolean doIsSuspended(); public final State getState() { @@ -136,4 +155,42 @@ public abstract class AbstractConsumerTa _stateChangeLock.unlock(); } + @Override + public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + { + _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch)); + + getSessionModel().getConnectionModel().flushBatched(); + return entry.getMessage().getSize(); + } + + protected abstract void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch); + + @Override + public boolean hasMessagesToSend() + { + return !_queue.isEmpty(); + } + + @Override + public void sendNextMessage() + { + + ConsumerMessageInstancePair consumerMessage = _queue.peek(); + if (consumerMessage != null) + { + _queue.poll(); + + ConsumerImpl consumer = consumerMessage.getConsumer(); + MessageInstance entry = consumerMessage.getEntry(); + boolean batch = consumerMessage.isBatch(); + doSend(consumer, entry, batch); + + if (consumer.acquires()) + { + entry.unlockAcquisition(); + } + } + + } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java Tue Feb 10 18:10:16 2015 @@ -31,6 +31,8 @@ public interface ConsumerImpl void externalStateChange(); + ConsumerTarget getTarget(); + enum Option { ACQUIRES, Copied: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java (from r1658748, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java) URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java?p2=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java&p1=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java&r1=1658748&r2=1658773&rev=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java Tue Feb 10 18:10:16 2015 @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,22 +15,38 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * */ -package org.apache.qpid.protocol; -import javax.security.auth.Subject; +package org.apache.qpid.server.consumer; + +import org.apache.qpid.server.message.MessageInstance; -public interface ServerProtocolEngine extends ProtocolEngine +public class ConsumerMessageInstancePair { - /** - * Gets the connection ID associated with this ProtocolEngine - */ - long getConnectionId(); + private final ConsumerImpl _consumer; + private final MessageInstance _entry; + private final boolean _batch; + + public ConsumerMessageInstancePair(final ConsumerImpl consumer, final MessageInstance entry, final boolean batch) + { + _consumer = consumer; + _entry = entry; + _batch = batch; + + } - Subject getSubject(); + public ConsumerImpl getConsumer() + { + return _consumer; + } - boolean isTransportBlockedForWriting(); + public MessageInstance getEntry() + { + return _entry; + } - void setTransportBlockedForWriting(boolean blocked); + public boolean isBatch() + { + return _batch; + } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Tue Feb 10 18:10:16 2015 @@ -33,6 +33,8 @@ public interface ConsumerTarget void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener); + void processPendingMessages(); + enum State { ACTIVE, SUSPENDED, CLOSED @@ -54,6 +56,10 @@ public interface ConsumerTarget long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch); + boolean hasMessagesToSend(); + + void sendNextMessage(); + void flushBatched(); void queueDeleted(); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Tue Feb 10 18:10:16 2015 @@ -107,4 +107,7 @@ public interface AMQConnectionModel<T ex void removeSessionListener(SessionModelListener listener); + void flushBatched(); + + boolean isMessageAssignmentSuspended(); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Tue Feb 10 18:10:16 2015 @@ -115,4 +115,6 @@ public interface AMQSessionModel<T exten long getTransactionUpdateTime(); void transportStateChanged(); + + void processPendingMessages(); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Tue Feb 10 18:10:16 2015 @@ -84,6 +84,18 @@ public class MultiVersionProtocolEngine _onCloseTask = onCloseTask; } + @Override + public void setMessageAssignmentSuspended(final boolean value) + { + _delegate.setMessageAssignmentSuspended(value); + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return _delegate.isMessageAssignmentSuspended(); + } + public SocketAddress getRemoteAddress() { return _delegate.getRemoteAddress(); @@ -198,10 +210,33 @@ public class MultiVersionProtocolEngine return _delegate.getLastWriteTime(); } - + @Override + public void processPendingMessages() + { + _delegate.processPendingMessages(); + } private class ClosedDelegateProtocolEngine implements ServerProtocolEngine { + + @Override + public void setMessageAssignmentSuspended(final boolean value) + { + + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return false; + } + + @Override + public void processPendingMessages() + { + + } + public SocketAddress getRemoteAddress() { return _network.getRemoteAddress(); @@ -318,6 +353,23 @@ public class MultiVersionProtocolEngine return 0; } + @Override + public void setMessageAssignmentSuspended(final boolean value) + { + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return false; + } + + @Override + public void processPendingMessages() + { + + } + public void received(ByteBuffer msg) { _lastReadTime = System.currentTimeMillis(); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Feb 10 18:10:16 2015 @@ -1148,10 +1148,6 @@ public abstract class AbstractQueue<X ex else { deliverMessage(sub, entry, false); - if(sub.acquires()) - { - entry.unlockAcquisition(); - } } } } @@ -1978,10 +1974,6 @@ public abstract class AbstractQueue<X ex else { deliverMessage(sub, node, batch); - if(sub.acquires()) - { - node.unlockAcquisition(); - } } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Tue Feb 10 18:10:16 2015 @@ -52,5 +52,4 @@ public interface QueueConsumer<X extends QueueContext getQueueContext(); - ConsumerTarget getTarget(); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Tue Feb 10 18:10:16 2015 @@ -507,6 +507,7 @@ class QueueConsumerImpl return _selector; } + @Override public String toLogString() { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Tue Feb 10 18:10:16 2015 @@ -178,6 +178,18 @@ public class MockConsumer implements Con return size; } + @Override + public boolean hasMessagesToSend() + { + return false; + } + + @Override + public void sendNextMessage() + { + + } + public void flushBatched() { @@ -230,6 +242,12 @@ public class MockConsumer implements Con } } + @Override + public void processPendingMessages() + { + + } + public ArrayList<MessageInstance> getMessages() { return messages; @@ -462,6 +480,12 @@ public class MockConsumer implements Con { } + + @Override + public void processPendingMessages() + { + + } } private static class MockConnectionModel implements AMQConnectionModel @@ -594,6 +618,18 @@ public class MockConsumer implements Con } @Override + public void flushBatched() + { + + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return false; + } + + @Override public String getClientVersion() { return null; Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Tue Feb 10 18:10:16 2015 @@ -104,7 +104,8 @@ public class ConsumerTarget_0_10 extends _name = name; } - public boolean isSuspended() + @Override + public boolean doIsSuspended() { return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension } @@ -195,7 +196,7 @@ public class ConsumerTarget_0_10 extends private final AddMessageDispositionListenerAction _postIdSettingAction; - public long send(final ConsumerImpl consumer, final MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch) { ServerMessage serverMsg = entry.getMessage(); @@ -346,7 +347,6 @@ public class ConsumerTarget_0_10 extends { recordUnacknowledged(entry); } - return size; } void recordUnacknowledged(MessageInstance entry) Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Tue Feb 10 18:10:16 2015 @@ -32,6 +32,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.Constant; import org.apache.qpid.transport.network.Assembler; @@ -55,6 +56,8 @@ public class ProtocolEngine_0_10 extend private long _lastWriteTime = _createTime; private volatile boolean _transportBlockedForWriting; + private volatile boolean _messageAssignmentSuspended; + public ProtocolEngine_0_10(ServerConnection conn, NetworkConnection network) { @@ -67,6 +70,20 @@ public class ProtocolEngine_0_10 extend } } + @Override + public boolean isMessageAssignmentSuspended() + { + return _messageAssignmentSuspended; + } + + @Override + public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) + { + _messageAssignmentSuspended = messageAssignmentSuspended; + } + + + public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender) { if(!getSubject().equals(Subject.getSubject(AccessController.getContext()))) @@ -252,4 +269,12 @@ public class ProtocolEngine_0_10 extend _connection.transportStateChanged(); } + @Override + public void processPendingMessages() + { + for (AMQSessionModel session : _connection.getSessionModels()) + { + session.processPendingMessages(); + } + } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Tue Feb 10 18:10:16 2015 @@ -685,4 +685,17 @@ public class ServerConnection extends Co ssn.transportStateChanged(); } } + + @Override + public void flushBatched() + { + getSender().flush(); + } + + + @Override + public boolean isMessageAssignmentSuspended() + { + return _serverProtocolEngine.isMessageAssignmentSuspended(); + } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Tue Feb 10 18:10:16 2015 @@ -1135,6 +1135,15 @@ public class ServerSession extends Sessi } } + @Override + public void processPendingMessages() + { + for(ConsumerTarget target : getSubscriptions()) + { + target.processPendingMessages(); + } + } + public final long getMaxUncommittedInMemorySize() { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Tue Feb 10 18:10:16 2015 @@ -3606,4 +3606,14 @@ public class AMQChannel } } } + + @Override + public void processPendingMessages() + { + + for(ConsumerTarget target : _tag2SubscriptionTargetMap.values()) + { + target.processPendingMessages(); + } + } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Tue Feb 10 18:10:16 2015 @@ -202,6 +202,22 @@ public class AMQProtocolEngine implement private long _maxMessageSize; private volatile boolean _transportBlockedForWriting; + private volatile boolean _messageAssignmentSuspended; + + + @Override + public boolean isMessageAssignmentSuspended() + { + return _messageAssignmentSuspended; + } + + @Override + public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) + { + _messageAssignmentSuspended = messageAssignmentSuspended; + } + + public AMQProtocolEngine(Broker<?> broker, final NetworkConnection network, final long connectionId, @@ -331,9 +347,9 @@ public class AMQProtocolEngine implement { final long arrivalTime = System.currentTimeMillis(); - if(!_authenticated && - (arrivalTime - _creationTime) > _port.getContextValue(Long.class, - Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)) + if (!_authenticated && + (arrivalTime - _creationTime) > _port.getContextValue(Long.class, + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)) { _logger.warn("Connection has taken more than " + _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) @@ -388,7 +404,7 @@ public class AMQProtocolEngine implement } catch (StoreException e) { - if(_virtualHost.getState() == State.ACTIVE) + if (_virtualHost.getState() == State.ACTIVE) { throw e; } @@ -1362,7 +1378,7 @@ public class AMQProtocolEngine implement { closeConnection(0, new AMQConnectionException(cause, message, 0, 0, getMethodRegistry(), - null)); + null)); } public void block() @@ -2049,4 +2065,12 @@ public class AMQProtocolEngine implement return _closing.get(); } + @Override + public void processPendingMessages() + { + for (AMQSessionModel session : getSessionModels()) + { + session.processPendingMessages(); + } + } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Tue Feb 10 18:10:16 2015 @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v0_8; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -33,6 +34,7 @@ import org.apache.qpid.framing.AMQShortS import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.consumer.ConsumerMessageInstancePair; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; @@ -99,6 +101,7 @@ public abstract class ConsumerTarget_0_8 return _consumers; } + static final class BrowserConsumer extends ConsumerTarget_0_8 { public BrowserConsumer(AMQChannel channel, @@ -123,7 +126,7 @@ public abstract class ConsumerTarget_0_8 * @throws org.apache.qpid.AMQException */ @Override - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. @@ -131,7 +134,7 @@ public abstract class ConsumerTarget_0_8 synchronized (getChannel()) { long deliveryTag = getChannel().getNextDeliveryTag(); - return sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag); } } @@ -178,7 +181,7 @@ public abstract class ConsumerTarget_0_8 * @param batch */ @Override - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. @@ -205,7 +208,6 @@ public abstract class ConsumerTarget_0_8 } ref.release(); - return size; } @@ -278,9 +280,10 @@ public abstract class ConsumerTarget_0_8 * @param batch */ @Override - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { + // put queue entry on a list and then notify the connection to read list. synchronized (getChannel()) { @@ -292,12 +295,15 @@ public abstract class ConsumerTarget_0_8 entry.addStateChangeListener(getReleasedStateChangeListener()); long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag); entry.incrementDeliveryCount(); - return size; } + + } + + } @@ -382,7 +388,8 @@ public abstract class ConsumerTarget_0_8 return subscriber + "]"; } - public boolean isSuspended() + @Override + public boolean doIsSuspended() { return getState()!=State.ACTIVE || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped(); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Tue Feb 10 18:10:16 2015 @@ -65,7 +65,7 @@ public class Connection_1_0 implements C private final AmqpPort<?> _port; private final Broker<?> _broker; private final SubjectCreator _subjectCreator; - private final ServerProtocolEngine _protocolEngine; + private final ProtocolEngine_1_0_0_SASL _protocolEngine; private VirtualHostImpl _vhost; private final Transport _transport; private final ConnectionEndpoint _conn; @@ -110,7 +110,7 @@ public class Connection_1_0 implements C AmqpPort<?> port, Transport transport, final SubjectCreator subjectCreator, - final ServerProtocolEngine protocolEngine) + final ProtocolEngine_1_0_0_SASL protocolEngine) { _protocolEngine = protocolEngine; _broker = broker; @@ -498,4 +498,16 @@ public class Connection_1_0 implements C session.transportStateChanged(); } } + + @Override + public void flushBatched() + { + _protocolEngine.flushBatched(); + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return _protocolEngine.isMessageAssignmentSuspended(); + } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Tue Feb 10 18:10:16 2015 @@ -83,7 +83,8 @@ class ConsumerTarget_1_0 extends Abstrac return _link.getEndpoint(); } - public boolean isSuspended() + @Override + public boolean doIsSuspended() { return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE; @@ -113,22 +114,10 @@ class ConsumerTarget_1_0 extends Abstrac } } - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch) { // TODO - long size = entry.getMessage().getSize(); - send(entry); - return size; - } - - public void flushBatched() - { - // TODO - } - - public void send(final MessageInstance queueEntry) - { - ServerMessage serverMessage = queueEntry.getMessage(); + ServerMessage serverMessage = entry.getMessage(); Message_1_0 message; if(serverMessage instanceof Message_1_0) { @@ -168,7 +157,7 @@ class ConsumerTarget_1_0 extends Abstrac payload.flip(); } - if(queueEntry.getDeliveryCount() != 0) + if(entry.getDeliveryCount() != 0) { payload = payload.duplicate(); ValueHandler valueHandler = new ValueHandler(_typeRegistry); @@ -200,7 +189,7 @@ class ConsumerTarget_1_0 extends Abstrac header.setPriority(oldHeader.getPriority()); header.setTtl(oldHeader.getTtl()); } - header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount())); + header.setDeliveryCount(UnsignedInteger.valueOf(entry.getDeliveryCount())); _sectionEncoder.reset(); _sectionEncoder.encodeObject(header); Binary encodedHeader = _sectionEncoder.getEncoding(); @@ -230,10 +219,10 @@ class ConsumerTarget_1_0 extends Abstrac else { UnsettledAction action = _acquires - ? new DispositionAction(tag, queueEntry) - : new DoNothingAction(tag, queueEntry); + ? new DispositionAction(tag, entry) + : new DoNothingAction(tag, entry); - _link.addUnsettled(tag, action, queueEntry); + _link.addUnsettled(tag, action, entry); } if(_transactionId != null) @@ -257,9 +246,9 @@ class ConsumerTarget_1_0 extends Abstrac public void onRollback() { - if(queueEntry.isAcquiredBy(getConsumer())) + if(entry.isAcquiredBy(getConsumer())) { - queueEntry.release(); + entry.release(); _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true); @@ -274,12 +263,17 @@ class ConsumerTarget_1_0 extends Abstrac } else { - queueEntry.release(); + entry.release(); } } } + public void flushBatched() + { + // TODO + } + public void queueDeleted() { //TODO Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java Tue Feb 10 18:10:16 2015 @@ -56,6 +56,7 @@ import org.apache.qpid.protocol.ServerPr import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.util.ServerScopedRuntimeException; @@ -135,6 +136,10 @@ public class ProtocolEngine_1_0_0_SASL i private State _state = State.A; + private volatile boolean _messageAssignmentSuspended; + + + public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final Broker<?> broker, long id, AmqpPort<?> port, Transport transport) @@ -150,6 +155,19 @@ public class ProtocolEngine_1_0_0_SASL i } + @Override + public boolean isMessageAssignmentSuspended() + { + return _messageAssignmentSuspended; + } + + @Override + public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) + { + _messageAssignmentSuspended = messageAssignmentSuspended; + } + + public SocketAddress getRemoteAddress() { return _network.getRemoteAddress(); @@ -576,4 +594,17 @@ public class ProtocolEngine_1_0_0_SASL i } + public void flushBatched() + { + _sender.flush(); + } + + @Override + public void processPendingMessages() + { + for (AMQSessionModel session : _connection.getSessionModels()) + { + session.processPendingMessages(); + } + } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Tue Feb 10 18:10:16 2015 @@ -899,6 +899,16 @@ public class Session_1_0 implements Sess return 0L; } + @Override + public void processPendingMessages() + { + for(Consumer<?> consumer : getConsumers()) + { + + ((ConsumerImpl)consumer).getTarget().processPendingMessages(); + } + } + private void consumerAdded(Consumer<?> consumer) { for(ConsumerListener l : _consumerListeners) Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Tue Feb 10 18:10:16 2015 @@ -164,6 +164,12 @@ class ManagementNodeConsumer implements } + @Override + public ConsumerTarget getTarget() + { + return _target; + } + ManagementNode getManagementNode() { return _managementNode; Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java Tue Feb 10 18:10:16 2015 @@ -34,4 +34,10 @@ public interface ServerProtocolEngine ex boolean isTransportBlockedForWriting(); void setTransportBlockedForWriting(boolean blocked); + + void setMessageAssignmentSuspended(boolean value); + + boolean isMessageAssignmentSuspended(); + + void processPendingMessages(); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java Tue Feb 10 18:10:16 2015 @@ -61,7 +61,7 @@ public class NonBlockingSenderReceiver private final String _remoteSocketAddress; private final AtomicBoolean _closed = new AtomicBoolean(false); - private final ServerProtocolEngine _receiver; + private final ServerProtocolEngine _protocolEngine; private final int _receiveBufSize; private final Ticker _ticker; private final Set<TransportEncryption> _encryptionSet; @@ -81,7 +81,7 @@ public class NonBlockingSenderReceiver public NonBlockingSenderReceiver(final NonBlockingConnection connection, - ServerProtocolEngine receiver, + ServerProtocolEngine protocolEngine, int receiveBufSize, Ticker ticker, final Set<TransportEncryption> encryptionSet, @@ -94,7 +94,7 @@ public class NonBlockingSenderReceiver { _connection = connection; _socketChannel = connection.getSocketChannel(); - _receiver = receiver; + _protocolEngine = protocolEngine; _receiveBufSize = receiveBufSize; _ticker = ticker; _encryptionSet = encryptionSet; @@ -170,15 +170,22 @@ public class NonBlockingSenderReceiver _ticker.tick(currentTime); } - _receiver.setTransportBlockedForWriting(!doWrite()); + _protocolEngine.setMessageAssignmentSuspended(true); + + _protocolEngine.processPendingMessages(); + + _protocolEngine.setTransportBlockedForWriting(!doWrite()); boolean dataRead = doRead(); _fullyWritten = doWrite(); - _receiver.setTransportBlockedForWriting(!_fullyWritten); + _protocolEngine.setTransportBlockedForWriting(!_fullyWritten); if(dataRead || (_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0)) { _stateChanged.set(true); } + + // tell all consumer targets that it is okay to accept more + _protocolEngine.setMessageAssignmentSuspended(false); } catch (IOException e) { @@ -213,7 +220,7 @@ public class NonBlockingSenderReceiver } LOGGER.debug("Closing receiver"); - _receiver.closed(); + _protocolEngine.closed(); try { @@ -373,7 +380,7 @@ public class NonBlockingSenderReceiver ByteBuffer dup = _currentBuffer.duplicate(); dup.flip(); _currentBuffer = _currentBuffer.slice(); - _receiver.received(dup); + _protocolEngine.received(dup); } } else if(_transportEncryption == TransportEncryption.TLS) @@ -414,7 +421,7 @@ public class NonBlockingSenderReceiver { readData = true; } - _receiver.received(appInputBuffer); + _protocolEngine.received(appInputBuffer); } while(unwrapped > 0 || tasksRun); @@ -451,7 +458,7 @@ public class NonBlockingSenderReceiver if (_transportEncryption == TransportEncryption.NONE) { - _receiver.received(_netInputBuffer); + _protocolEngine.received(_netInputBuffer); } else { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes?rev=1658773&r1=1658772&r2=1658773&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes Tue Feb 10 18:10:16 2015 @@ -27,3 +27,5 @@ org.apache.qpid.test.client.queue.QueueP //QPID-4153 Messages causing a runtime selector error should be dead-lettered (or something similar) org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError + +org.apache.qpid.server.protocol.v0_8.AckTest --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org