Author: rgodfrey
Date: Fri Dec 12 13:28:28 2014
New Revision: 1644870

URL: http://svn.apache.org/r1644870
Log:
Notify engine when transport is blocked

Modified:
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java?rev=1644870&r1=1644869&r2=1644870&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
 Fri Dec 12 13:28:28 2014
@@ -31,7 +31,7 @@ import javax.net.ssl.SSLContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.Ticker;
@@ -50,7 +50,7 @@ public class NonBlockingConnection imple
     private final Object _lock = new Object();
 
     public NonBlockingConnection(SocketChannel socket,
-                                 Receiver<ByteBuffer> delegate,
+                                 ServerProtocolEngine delegate,
                                  int sendBufferSize,
                                  int receiveBufferSize,
                                  long timeout,

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java?rev=1644870&r1=1644869&r2=1644870&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
 Fri Dec 12 13:28:28 2014
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.StandardSocketOptions;
-import java.nio.ByteBuffer;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.Set;
@@ -34,10 +33,9 @@ import javax.net.ssl.SSLContext;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.configuration.CommonProperties;
-import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.IncomingNetworkTransport;
 import org.apache.qpid.transport.network.NetworkConnection;
@@ -54,7 +52,7 @@ public class NonBlockingNetworkTransport
     private AcceptingThread _acceptor;
 
     protected NonBlockingConnection createNetworkConnection(final 
SocketChannel socket,
-                                                            final 
Receiver<ByteBuffer> engine,
+                                                            final 
ServerProtocolEngine engine,
                                                             final Integer 
sendBufferSize,
                                                             final Integer 
receiveBufferSize,
                                                             final int timeout,
@@ -166,7 +164,8 @@ public class NonBlockingNetworkTransport
                     {
                         socket = _serverSocket.accept();
 
-                        final ProtocolEngine engine = 
_factory.newProtocolEngine(socket.socket().getRemoteSocketAddress());
+                        final ServerProtocolEngine engine =
+                                (ServerProtocolEngine) 
_factory.newProtocolEngine(socket.socket().getRemoteSocketAddress());
 
                         if(engine != null)
                         {

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1644870&r1=1644869&r2=1644870&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
 Fri Dec 12 13:28:28 2014
@@ -41,8 +41,8 @@ import javax.net.ssl.SSLPeerUnverifiedEx
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.SenderClosedException;
 import org.apache.qpid.transport.SenderException;
@@ -63,7 +63,7 @@ public class NonBlockingSenderReceiver
     private final Thread _ioThread;
     private final String _remoteSocketAddress;
     private final AtomicBoolean _closed = new AtomicBoolean(false);
-    private final Receiver<ByteBuffer> _receiver;
+    private final ServerProtocolEngine _receiver;
     private final int _receiveBufSize;
     private final Ticker _ticker;
     private final Set<TransportEncryption> _encryptionSet;
@@ -79,7 +79,7 @@ public class NonBlockingSenderReceiver
 
 
     public NonBlockingSenderReceiver(final SocketChannel socketChannel,
-                                     Receiver<ByteBuffer> receiver,
+                                     ServerProtocolEngine receiver,
                                      int receiveBufSize,
                                      Ticker ticker,
                                      final Set<TransportEncryption> 
encryptionSet,
@@ -202,9 +202,10 @@ public class NonBlockingSenderReceiver
 
                 LOGGER.debug("Number Ready " +  numberReady);
 
-                doWrite();
+                _receiver.setTransportBlockedForWriting(!doWrite());
                 doRead();
                 boolean fullyWritten = doWrite();
+                _receiver.setTransportBlockedForWriting(!fullyWritten);
 
                 _socketChannel.register(_selector,
                                         fullyWritten



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to