Author: kwall
Date: Tue Feb 10 18:10:16 2015
New Revision: 1658773

URL: http://svn.apache.org/r1658773
Log:
Refactoring: make the queue no longer be responsible for pushing messages onto 
the wire

Added:
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
      - copied, changed from r1658748, 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
Modified:
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
 Tue Feb 10 18:10:16 2015
@@ -23,12 +23,14 @@ package org.apache.qpid.server.consumer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.util.StateChangeListener;
 
 public abstract class AbstractConsumerTarget implements ConsumerTarget
@@ -41,6 +43,7 @@ public abstract class AbstractConsumerTa
 
     private final Lock _stateChangeLock = new ReentrantLock();
     private final AtomicInteger _stateActivates = new AtomicInteger();
+    private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new 
ConcurrentLinkedQueue();
 
 
     protected AbstractConsumerTarget(final State initialState)
@@ -48,6 +51,22 @@ public abstract class AbstractConsumerTa
         _state = new AtomicReference<State>(initialState);
     }
 
+    @Override
+    public void processPendingMessages()
+    {
+        while(hasMessagesToSend())
+        {
+            sendNextMessage();
+        }
+    }
+
+    @Override
+    public final boolean isSuspended()
+    {
+        return 
getSessionModel().getConnectionModel().isMessageAssignmentSuspended() || 
doIsSuspended();
+    }
+
+    protected abstract boolean doIsSuspended();
 
     public final State getState()
     {
@@ -136,4 +155,42 @@ public abstract class AbstractConsumerTa
         _stateChangeLock.unlock();
     }
 
+    @Override
+    public final long send(final ConsumerImpl consumer, MessageInstance entry, 
boolean batch)
+    {
+        _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch));
+
+        getSessionModel().getConnectionModel().flushBatched();
+        return entry.getMessage().getSize();
+    }
+
+    protected abstract void doSend(final ConsumerImpl consumer, 
MessageInstance entry, boolean batch);
+
+    @Override
+    public boolean hasMessagesToSend()
+    {
+        return !_queue.isEmpty();
+    }
+
+    @Override
+    public void sendNextMessage()
+    {
+
+        ConsumerMessageInstancePair consumerMessage = _queue.peek();
+        if (consumerMessage != null)
+        {
+            _queue.poll();
+
+            ConsumerImpl consumer = consumerMessage.getConsumer();
+            MessageInstance entry = consumerMessage.getEntry();
+            boolean batch = consumerMessage.isBatch();
+            doSend(consumer, entry, batch);
+
+            if (consumer.acquires())
+            {
+                entry.unlockAcquisition();
+            }
+        }
+
+    }
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
 Tue Feb 10 18:10:16 2015
@@ -31,6 +31,8 @@ public interface ConsumerImpl
 
     void externalStateChange();
 
+    ConsumerTarget getTarget();
+
     enum Option
     {
         ACQUIRES,

Copied: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
 (from r1658748, 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java)
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java?p2=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java&p1=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java&r1=1658748&r2=1658773&rev=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
 Tue Feb 10 18:10:16 2015
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,22 +15,38 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
-package org.apache.qpid.protocol;
 
-import javax.security.auth.Subject;
+package org.apache.qpid.server.consumer;
+
+import org.apache.qpid.server.message.MessageInstance;
 
-public interface ServerProtocolEngine extends ProtocolEngine
+public class ConsumerMessageInstancePair
 {
-    /**
-     * Gets the connection ID associated with this ProtocolEngine
-     */
-    long getConnectionId();
+    private final ConsumerImpl _consumer;
+    private final MessageInstance _entry;
+    private final boolean _batch;
+
+    public ConsumerMessageInstancePair(final ConsumerImpl consumer, final 
MessageInstance entry, final boolean batch)
+    {
+        _consumer = consumer;
+        _entry = entry;
+        _batch = batch;
+
+    }
 
-    Subject getSubject();
+    public ConsumerImpl getConsumer()
+    {
+        return _consumer;
+    }
 
-    boolean isTransportBlockedForWriting();
+    public MessageInstance getEntry()
+    {
+        return _entry;
+    }
 
-    void setTransportBlockedForWriting(boolean blocked);
+    public boolean isBatch()
+    {
+        return _batch;
+    }
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
 Tue Feb 10 18:10:16 2015
@@ -33,6 +33,8 @@ public interface ConsumerTarget
 
     void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> 
listener);
 
+    void processPendingMessages();
+
     enum State
     {
         ACTIVE, SUSPENDED, CLOSED
@@ -54,6 +56,10 @@ public interface ConsumerTarget
 
     long send(final ConsumerImpl consumer, MessageInstance entry, boolean 
batch);
 
+    boolean hasMessagesToSend();
+
+    void sendNextMessage();
+
     void flushBatched();
 
     void queueDeleted();

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
 Tue Feb 10 18:10:16 2015
@@ -107,4 +107,7 @@ public interface AMQConnectionModel<T ex
 
     void removeSessionListener(SessionModelListener listener);
 
+    void flushBatched();
+
+    boolean isMessageAssignmentSuspended();
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
 Tue Feb 10 18:10:16 2015
@@ -115,4 +115,6 @@ public interface AMQSessionModel<T exten
     long getTransactionUpdateTime();
 
     void transportStateChanged();
+
+    void processPendingMessages();
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
 Tue Feb 10 18:10:16 2015
@@ -84,6 +84,18 @@ public class MultiVersionProtocolEngine
         _onCloseTask = onCloseTask;
     }
 
+    @Override
+    public void setMessageAssignmentSuspended(final boolean value)
+    {
+        _delegate.setMessageAssignmentSuspended(value);
+    }
+
+    @Override
+    public boolean isMessageAssignmentSuspended()
+    {
+        return _delegate.isMessageAssignmentSuspended();
+    }
+
     public SocketAddress getRemoteAddress()
     {
         return _delegate.getRemoteAddress();
@@ -198,10 +210,33 @@ public class MultiVersionProtocolEngine
         return _delegate.getLastWriteTime();
     }
 
-
+    @Override
+    public void processPendingMessages()
+    {
+        _delegate.processPendingMessages();
+    }
 
     private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
     {
+
+        @Override
+        public void setMessageAssignmentSuspended(final boolean value)
+        {
+
+        }
+
+        @Override
+        public boolean isMessageAssignmentSuspended()
+        {
+            return false;
+        }
+
+        @Override
+        public void processPendingMessages()
+        {
+
+        }
+
         public SocketAddress getRemoteAddress()
         {
             return _network.getRemoteAddress();
@@ -318,6 +353,23 @@ public class MultiVersionProtocolEngine
             return 0;
         }
 
+        @Override
+        public void setMessageAssignmentSuspended(final boolean value)
+        {
+        }
+
+        @Override
+        public boolean isMessageAssignmentSuspended()
+        {
+            return false;
+        }
+
+        @Override
+        public void processPendingMessages()
+        {
+
+        }
+
         public void received(ByteBuffer msg)
         {
             _lastReadTime = System.currentTimeMillis();

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 Tue Feb 10 18:10:16 2015
@@ -1148,10 +1148,6 @@ public abstract class AbstractQueue<X ex
                     else
                     {
                         deliverMessage(sub, entry, false);
-                        if(sub.acquires())
-                        {
-                            entry.unlockAcquisition();
-                        }
                     }
                 }
             }
@@ -1978,10 +1974,6 @@ public abstract class AbstractQueue<X ex
                         else
                         {
                             deliverMessage(sub, node, batch);
-                            if(sub.acquires())
-                            {
-                                node.unlockAcquisition();
-                            }
                         }
 
                     }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
 Tue Feb 10 18:10:16 2015
@@ -52,5 +52,4 @@ public interface QueueConsumer<X extends
 
     QueueContext getQueueContext();
 
-    ConsumerTarget getTarget();
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
 Tue Feb 10 18:10:16 2015
@@ -507,6 +507,7 @@ class QueueConsumerImpl
         return _selector;
     }
 
+
     @Override
     public String toLogString()
     {

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
 Tue Feb 10 18:10:16 2015
@@ -178,6 +178,18 @@ public class MockConsumer implements Con
         return size;
     }
 
+    @Override
+    public boolean hasMessagesToSend()
+    {
+        return false;
+    }
+
+    @Override
+    public void sendNextMessage()
+    {
+
+    }
+
     public void flushBatched()
     {
 
@@ -230,6 +242,12 @@ public class MockConsumer implements Con
         }
     }
 
+    @Override
+    public void processPendingMessages()
+    {
+
+    }
+
     public ArrayList<MessageInstance> getMessages()
     {
         return messages;
@@ -462,6 +480,12 @@ public class MockConsumer implements Con
         {
 
         }
+
+        @Override
+        public void processPendingMessages()
+        {
+
+        }
     }
 
     private static class MockConnectionModel implements AMQConnectionModel
@@ -594,6 +618,18 @@ public class MockConsumer implements Con
         }
 
         @Override
+        public void flushBatched()
+        {
+
+        }
+
+        @Override
+        public boolean isMessageAssignmentSuspended()
+        {
+            return false;
+        }
+
+        @Override
         public String getClientVersion()
         {
             return null;

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
 Tue Feb 10 18:10:16 2015
@@ -104,7 +104,8 @@ public class ConsumerTarget_0_10 extends
         _name = name;
     }
 
-    public boolean isSuspended()
+    @Override
+    public boolean doIsSuspended()
     {
         return getState()!=State.ACTIVE || _deleted.get() || 
_session.isClosing() || _session.getConnectionModel().isStopped(); // TODO 
check for Session suspension
     }
@@ -195,7 +196,7 @@ public class ConsumerTarget_0_10 extends
 
     private final AddMessageDispositionListenerAction _postIdSettingAction;
 
-    public long send(final ConsumerImpl consumer, final MessageInstance entry, 
boolean batch)
+    public void doSend(final ConsumerImpl consumer, final MessageInstance 
entry, boolean batch)
     {
         ServerMessage serverMsg = entry.getMessage();
 
@@ -346,7 +347,6 @@ public class ConsumerTarget_0_10 extends
         {
             recordUnacknowledged(entry);
         }
-        return size;
     }
 
     void recordUnacknowledged(MessageInstance entry)

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
 Tue Feb 10 18:10:16 2015
@@ -32,6 +32,7 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.Constant;
 import org.apache.qpid.transport.network.Assembler;
@@ -55,6 +56,8 @@ public class ProtocolEngine_0_10  extend
     private long _lastWriteTime = _createTime;
     private volatile boolean _transportBlockedForWriting;
 
+    private volatile boolean _messageAssignmentSuspended;
+
     public ProtocolEngine_0_10(ServerConnection conn,
                                NetworkConnection network)
     {
@@ -67,6 +70,20 @@ public class ProtocolEngine_0_10  extend
         }
     }
 
+    @Override
+    public boolean isMessageAssignmentSuspended()
+    {
+        return _messageAssignmentSuspended;
+    }
+
+    @Override
+    public void setMessageAssignmentSuspended(final boolean 
messageAssignmentSuspended)
+    {
+        _messageAssignmentSuspended = messageAssignmentSuspended;
+    }
+
+
+
     public void setNetworkConnection(final NetworkConnection network, final 
ByteBufferSender sender)
     {
         
if(!getSubject().equals(Subject.getSubject(AccessController.getContext())))
@@ -252,4 +269,12 @@ public class ProtocolEngine_0_10  extend
         _connection.transportStateChanged();
     }
 
+    @Override
+    public void processPendingMessages()
+    {
+        for (AMQSessionModel session : _connection.getSessionModels())
+        {
+            session.processPendingMessages();
+        }
+    }
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
 Tue Feb 10 18:10:16 2015
@@ -685,4 +685,17 @@ public class ServerConnection extends Co
             ssn.transportStateChanged();
         }
     }
+
+    @Override
+    public void flushBatched()
+    {
+        getSender().flush();
+    }
+
+
+    @Override
+    public boolean isMessageAssignmentSuspended()
+    {
+        return _serverProtocolEngine.isMessageAssignmentSuspended();
+    }
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
 Tue Feb 10 18:10:16 2015
@@ -1135,6 +1135,15 @@ public class ServerSession extends Sessi
         }
     }
 
+    @Override
+    public void processPendingMessages()
+    {
+        for(ConsumerTarget target : getSubscriptions())
+        {
+            target.processPendingMessages();
+        }
+    }
+
 
     public final long getMaxUncommittedInMemorySize()
     {

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 Tue Feb 10 18:10:16 2015
@@ -3606,4 +3606,14 @@ public class AMQChannel
             }
         }
     }
+
+    @Override
+    public void processPendingMessages()
+    {
+
+        for(ConsumerTarget target : _tag2SubscriptionTargetMap.values())
+        {
+            target.processPendingMessages();
+        }
+    }
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
 Tue Feb 10 18:10:16 2015
@@ -202,6 +202,22 @@ public class AMQProtocolEngine implement
     private long _maxMessageSize;
     private volatile boolean _transportBlockedForWriting;
 
+    private volatile boolean _messageAssignmentSuspended;
+
+
+    @Override
+    public boolean isMessageAssignmentSuspended()
+    {
+        return _messageAssignmentSuspended;
+    }
+
+    @Override
+    public void setMessageAssignmentSuspended(final boolean 
messageAssignmentSuspended)
+    {
+        _messageAssignmentSuspended = messageAssignmentSuspended;
+    }
+
+
     public AMQProtocolEngine(Broker<?> broker,
                              final NetworkConnection network,
                              final long connectionId,
@@ -331,9 +347,9 @@ public class AMQProtocolEngine implement
             {
 
                 final long arrivalTime = System.currentTimeMillis();
-                if(!_authenticated &&
-                   (arrivalTime - _creationTime) > 
_port.getContextValue(Long.class,
-                                                                         
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))
+                if (!_authenticated &&
+                    (arrivalTime - _creationTime) > 
_port.getContextValue(Long.class,
+                                                                          
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))
                 {
                     _logger.warn("Connection has taken more than "
                                  + _port.getContextValue(Long.class, 
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)
@@ -388,7 +404,7 @@ public class AMQProtocolEngine implement
                 }
                 catch (StoreException e)
                 {
-                    if(_virtualHost.getState() == State.ACTIVE)
+                    if (_virtualHost.getState() == State.ACTIVE)
                     {
                         throw e;
                     }
@@ -1362,7 +1378,7 @@ public class AMQProtocolEngine implement
     {
         closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
                                                       getMethodRegistry(),
-                                                             null));
+                                                      null));
     }
 
     public void block()
@@ -2049,4 +2065,12 @@ public class AMQProtocolEngine implement
         return _closing.get();
     }
 
+    @Override
+    public void processPendingMessages()
+    {
+        for (AMQSessionModel session : getSessionModels())
+        {
+            session.processPendingMessages();
+        }
+    }
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
 Tue Feb 10 18:10:16 2015
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -33,6 +34,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
 import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerMessageInstancePair;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageInstance;
@@ -99,6 +101,7 @@ public abstract class ConsumerTarget_0_8
         return _consumers;
     }
 
+
     static final class BrowserConsumer extends ConsumerTarget_0_8
     {
         public BrowserConsumer(AMQChannel channel,
@@ -123,7 +126,7 @@ public abstract class ConsumerTarget_0_8
          * @throws org.apache.qpid.AMQException
          */
         @Override
-        public long send(final ConsumerImpl consumer, MessageInstance entry, 
boolean batch)
+        public void doSend(final ConsumerImpl consumer, MessageInstance entry, 
boolean batch)
         {
             // We don't decrement the reference here as we don't want to 
consume the message
             // but we do want to send it to the client.
@@ -131,7 +134,7 @@ public abstract class ConsumerTarget_0_8
             synchronized (getChannel())
             {
                 long deliveryTag = getChannel().getNextDeliveryTag();
-                return sendToClient(consumer, entry.getMessage(), 
entry.getInstanceProperties(), deliveryTag);
+                sendToClient(consumer, entry.getMessage(), 
entry.getInstanceProperties(), deliveryTag);
             }
 
         }
@@ -178,7 +181,7 @@ public abstract class ConsumerTarget_0_8
          * @param batch
          */
         @Override
-        public long send(final ConsumerImpl consumer, MessageInstance entry, 
boolean batch)
+        public void doSend(final ConsumerImpl consumer, MessageInstance entry, 
boolean batch)
         {
             // if we do not need to wait for client acknowledgements
             // we can decrement the reference count immediately.
@@ -205,7 +208,6 @@ public abstract class ConsumerTarget_0_8
 
             }
             ref.release();
-            return size;
 
         }
 
@@ -278,9 +280,10 @@ public abstract class ConsumerTarget_0_8
          * @param batch
          */
         @Override
-        public long send(final ConsumerImpl consumer, MessageInstance entry, 
boolean batch)
+        public void doSend(final ConsumerImpl consumer, MessageInstance entry, 
boolean batch)
         {
 
+            // put queue entry on a list and then notify the connection to 
read list.
 
             synchronized (getChannel())
             {
@@ -292,12 +295,15 @@ public abstract class ConsumerTarget_0_8
                 entry.addStateChangeListener(getReleasedStateChangeListener());
                 long size = sendToClient(consumer, entry.getMessage(), 
entry.getInstanceProperties(), deliveryTag);
                 entry.incrementDeliveryCount();
-                return size;
             }
+
+
         }
 
 
 
+
+
     }
 
 
@@ -382,7 +388,8 @@ public abstract class ConsumerTarget_0_8
         return subscriber + "]";
     }
 
-    public boolean isSuspended()
+    @Override
+    public boolean doIsSuspended()
     {
         return getState()!=State.ACTIVE || _channel.isSuspended() || 
_deleted.get() || _channel.getConnectionModel().isStopped();
     }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
 Tue Feb 10 18:10:16 2015
@@ -65,7 +65,7 @@ public class Connection_1_0 implements C
     private final AmqpPort<?> _port;
     private final Broker<?> _broker;
     private final SubjectCreator _subjectCreator;
-    private final ServerProtocolEngine _protocolEngine;
+    private final ProtocolEngine_1_0_0_SASL _protocolEngine;
     private VirtualHostImpl _vhost;
     private final Transport _transport;
     private final ConnectionEndpoint _conn;
@@ -110,7 +110,7 @@ public class Connection_1_0 implements C
                           AmqpPort<?> port,
                           Transport transport,
                           final SubjectCreator subjectCreator,
-                          final ServerProtocolEngine protocolEngine)
+                          final ProtocolEngine_1_0_0_SASL protocolEngine)
     {
         _protocolEngine = protocolEngine;
         _broker = broker;
@@ -498,4 +498,16 @@ public class Connection_1_0 implements C
             session.transportStateChanged();
         }
     }
+
+    @Override
+    public void flushBatched()
+    {
+        _protocolEngine.flushBatched();
+    }
+
+    @Override
+    public boolean isMessageAssignmentSuspended()
+    {
+        return _protocolEngine.isMessageAssignmentSuspended();
+    }
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 Tue Feb 10 18:10:16 2015
@@ -83,7 +83,8 @@ class ConsumerTarget_1_0 extends Abstrac
         return _link.getEndpoint();
     }
 
-    public boolean isSuspended()
+    @Override
+    public boolean doIsSuspended()
     {
         return _link.getSession().getConnectionModel().isStopped() || 
getState() != State.ACTIVE;
 
@@ -113,22 +114,10 @@ class ConsumerTarget_1_0 extends Abstrac
         }
     }
 
-    public long send(final ConsumerImpl consumer, MessageInstance entry, 
boolean batch)
+    public void doSend(final ConsumerImpl consumer, final MessageInstance 
entry, boolean batch)
     {
         // TODO
-        long size = entry.getMessage().getSize();
-        send(entry);
-        return size;
-    }
-
-    public void flushBatched()
-    {
-        // TODO
-    }
-
-    public void send(final MessageInstance queueEntry)
-    {
-        ServerMessage serverMessage = queueEntry.getMessage();
+        ServerMessage serverMessage = entry.getMessage();
         Message_1_0 message;
         if(serverMessage instanceof Message_1_0)
         {
@@ -168,7 +157,7 @@ class ConsumerTarget_1_0 extends Abstrac
             payload.flip();
         }
 
-        if(queueEntry.getDeliveryCount() != 0)
+        if(entry.getDeliveryCount() != 0)
         {
             payload = payload.duplicate();
             ValueHandler valueHandler = new ValueHandler(_typeRegistry);
@@ -200,7 +189,7 @@ class ConsumerTarget_1_0 extends Abstrac
                 header.setPriority(oldHeader.getPriority());
                 header.setTtl(oldHeader.getTtl());
             }
-            
header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount()));
+            
header.setDeliveryCount(UnsignedInteger.valueOf(entry.getDeliveryCount()));
             _sectionEncoder.reset();
             _sectionEncoder.encodeObject(header);
             Binary encodedHeader = _sectionEncoder.getEncoding();
@@ -230,10 +219,10 @@ class ConsumerTarget_1_0 extends Abstrac
                 else
                 {
                     UnsettledAction action = _acquires
-                                             ? new DispositionAction(tag, 
queueEntry)
-                                             : new DoNothingAction(tag, 
queueEntry);
+                                             ? new DispositionAction(tag, 
entry)
+                                             : new DoNothingAction(tag, entry);
 
-                    _link.addUnsettled(tag, action, queueEntry);
+                    _link.addUnsettled(tag, action, entry);
                 }
 
                 if(_transactionId != null)
@@ -257,9 +246,9 @@ class ConsumerTarget_1_0 extends Abstrac
 
                             public void onRollback()
                             {
-                                if(queueEntry.isAcquiredBy(getConsumer()))
+                                if(entry.isAcquiredBy(getConsumer()))
                                 {
-                                    queueEntry.release();
+                                    entry.release();
                                     _link.getEndpoint().updateDisposition(tag, 
(DeliveryState)null, true);
 
 
@@ -274,12 +263,17 @@ class ConsumerTarget_1_0 extends Abstrac
             }
             else
             {
-                queueEntry.release();
+                entry.release();
             }
         }
 
     }
 
+    public void flushBatched()
+    {
+        // TODO
+    }
+
     public void queueDeleted()
     {
         //TODO

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
 Tue Feb 10 18:10:16 2015
@@ -56,6 +56,7 @@ import org.apache.qpid.protocol.ServerPr
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.security.auth.UsernamePrincipal;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -135,6 +136,10 @@ public class ProtocolEngine_1_0_0_SASL i
 
     private State _state = State.A;
 
+    private volatile boolean _messageAssignmentSuspended;
+
+
+
 
     public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, 
final Broker<?> broker,
                                      long id, AmqpPort<?> port, Transport 
transport)
@@ -150,6 +155,19 @@ public class ProtocolEngine_1_0_0_SASL i
     }
 
 
+    @Override
+    public boolean isMessageAssignmentSuspended()
+    {
+        return _messageAssignmentSuspended;
+    }
+
+    @Override
+    public void setMessageAssignmentSuspended(final boolean 
messageAssignmentSuspended)
+    {
+        _messageAssignmentSuspended = messageAssignmentSuspended;
+    }
+
+
     public SocketAddress getRemoteAddress()
     {
         return _network.getRemoteAddress();
@@ -576,4 +594,17 @@ public class ProtocolEngine_1_0_0_SASL i
 
     }
 
+    public void flushBatched()
+    {
+        _sender.flush();
+    }
+
+    @Override
+    public void processPendingMessages()
+    {
+        for (AMQSessionModel session : _connection.getSessionModels())
+        {
+            session.processPendingMessages();
+        }
+    }
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 Tue Feb 10 18:10:16 2015
@@ -899,6 +899,16 @@ public class Session_1_0 implements Sess
         return 0L;
     }
 
+    @Override
+    public void processPendingMessages()
+    {
+        for(Consumer<?> consumer : getConsumers())
+        {
+
+            ((ConsumerImpl)consumer).getTarget().processPendingMessages();
+        }
+    }
+
     private void consumerAdded(Consumer<?> consumer)
     {
         for(ConsumerListener l : _consumerListeners)

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
 Tue Feb 10 18:10:16 2015
@@ -164,6 +164,12 @@ class ManagementNodeConsumer implements
 
     }
 
+    @Override
+    public ConsumerTarget getTarget()
+    {
+        return _target;
+    }
+
     ManagementNode getManagementNode()
     {
         return _managementNode;

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
 Tue Feb 10 18:10:16 2015
@@ -34,4 +34,10 @@ public interface ServerProtocolEngine ex
     boolean isTransportBlockedForWriting();
 
     void setTransportBlockedForWriting(boolean blocked);
+
+    void setMessageAssignmentSuspended(boolean value);
+
+    boolean isMessageAssignmentSuspended();
+
+    void processPendingMessages();
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
 Tue Feb 10 18:10:16 2015
@@ -61,7 +61,7 @@ public class NonBlockingSenderReceiver
 
     private final String _remoteSocketAddress;
     private final AtomicBoolean _closed = new AtomicBoolean(false);
-    private final ServerProtocolEngine _receiver;
+    private final ServerProtocolEngine _protocolEngine;
     private final int _receiveBufSize;
     private final Ticker _ticker;
     private final Set<TransportEncryption> _encryptionSet;
@@ -81,7 +81,7 @@ public class NonBlockingSenderReceiver
 
 
     public NonBlockingSenderReceiver(final NonBlockingConnection connection,
-                                     ServerProtocolEngine receiver,
+                                     ServerProtocolEngine protocolEngine,
                                      int receiveBufSize,
                                      Ticker ticker,
                                      final Set<TransportEncryption> 
encryptionSet,
@@ -94,7 +94,7 @@ public class NonBlockingSenderReceiver
     {
         _connection = connection;
         _socketChannel = connection.getSocketChannel();
-        _receiver = receiver;
+        _protocolEngine = protocolEngine;
         _receiveBufSize = receiveBufSize;
         _ticker = ticker;
         _encryptionSet = encryptionSet;
@@ -170,15 +170,22 @@ public class NonBlockingSenderReceiver
                     _ticker.tick(currentTime);
                 }
 
-                _receiver.setTransportBlockedForWriting(!doWrite());
+                _protocolEngine.setMessageAssignmentSuspended(true);
+
+                _protocolEngine.processPendingMessages();
+
+                _protocolEngine.setTransportBlockedForWriting(!doWrite());
                 boolean dataRead = doRead();
                 _fullyWritten = doWrite();
-                _receiver.setTransportBlockedForWriting(!_fullyWritten);
+                _protocolEngine.setTransportBlockedForWriting(!_fullyWritten);
 
                 if(dataRead || (_workDone && _netInputBuffer != null && 
_netInputBuffer.position() != 0))
                 {
                     _stateChanged.set(true);
                 }
+
+                // tell all consumer targets that it is okay to accept more
+                _protocolEngine.setMessageAssignmentSuspended(false);
             }
             catch (IOException e)
             {
@@ -213,7 +220,7 @@ public class NonBlockingSenderReceiver
 
             }
             LOGGER.debug("Closing receiver");
-            _receiver.closed();
+            _protocolEngine.closed();
 
             try
             {
@@ -373,7 +380,7 @@ public class NonBlockingSenderReceiver
                 ByteBuffer dup = _currentBuffer.duplicate();
                 dup.flip();
                 _currentBuffer = _currentBuffer.slice();
-                _receiver.received(dup);
+                _protocolEngine.received(dup);
             }
         }
         else if(_transportEncryption == TransportEncryption.TLS)
@@ -414,7 +421,7 @@ public class NonBlockingSenderReceiver
                     {
                         readData = true;
                     }
-                    _receiver.received(appInputBuffer);
+                    _protocolEngine.received(appInputBuffer);
                 }
                 while(unwrapped > 0 || tasksRun);
 
@@ -451,7 +458,7 @@ public class NonBlockingSenderReceiver
 
                     if (_transportEncryption == TransportEncryption.NONE)
                     {
-                        _receiver.received(_netInputBuffer);
+                        _protocolEngine.received(_netInputBuffer);
                     }
                     else
                     {

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes?rev=1658773&r1=1658772&r2=1658773&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes 
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes 
Tue Feb 10 18:10:16 2015
@@ -27,3 +27,5 @@ org.apache.qpid.test.client.queue.QueueP
 //QPID-4153 Messages causing a runtime selector error should be dead-lettered 
(or something similar)
 org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError
 
+
+org.apache.qpid.server.protocol.v0_8.AckTest



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to