Author: markt
Date: Mon May 16 19:56:21 2011
New Revision: 1103860

URL: http://svn.apache.org/viewvc?rev=1103860&view=rev
Log:
Tweak processor blocking so that it is non-blocking while no message is being 
processed and blocking during the processing of a message
Align ajp nio protocol implementation with http nio
Still some TCK failures to resolve

Modified:
    tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
    tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProtocol.java

Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java?rev=1103860&r1=1103859&r2=1103860&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java Mon May 16 
19:56:21 2011
@@ -196,29 +196,20 @@ public class AjpNioProcessor extends Abs
         // Setting up the socket
         this.socket = socket;
         
-        int soTimeout = -1;
-        final KeyAttachment ka = (KeyAttachment)socket.getAttachment(false);
-        if (keepAliveTimeout > 0) {
-            ka.setTimeout(soTimeout);
-        }
+        long soTimeout = endpoint.getSoTimeout();
+        int keepAliveTimeout = endpoint.getKeepAliveTimeout();
 
         // Error flag
         error = false;
 
-        boolean keptAlive = false;
-
+        final KeyAttachment ka = (KeyAttachment)socket.getAttachment(false);
+        
         while (!error && !endpoint.isPaused()) {
-
             // Parsing the request header
             try {
-                // Set keep alive timeout if enabled
-                if (keepAliveTimeout > 0) {
-                    ka.setTimeout(keepAliveTimeout);
-                }
                 // Get first message of the request
-                int bytesRead = readMessage(requestHeaderMessage, !keptAlive);
-                if (!keptAlive && bytesRead == 0) {
-                    // No bytes on a blocking read - connection timeout
+                int bytesRead = readMessage(requestHeaderMessage, false);
+                if (bytesRead == 0) {
                     rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
                     break;
                 }
@@ -235,8 +226,6 @@ public class AjpNioProcessor extends Abs
                     } catch (IOException e) {
                         error = true;
                     }
-                    // Should be unnecessary but just in case...
-                    keptAlive = true;
                     recycle();
                     continue;
                 } else if(type != Constants.JK_AJP13_FORWARD_REQUEST) {
@@ -244,12 +233,9 @@ public class AjpNioProcessor extends Abs
                     if(log.isDebugEnabled()) {
                         log.debug("Unexpected message: "+type);
                     }
-                    // Should be unnecessary but just in case...
-                    keptAlive = true;
                     recycle();
                     continue;
                 }
-
                 request.setStartTime(System.currentTimeMillis());
             } catch (IOException e) {
                 error = true;
@@ -324,7 +310,11 @@ public class AjpNioProcessor extends Abs
             request.updateCounters();
 
             rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
-            keptAlive = true;
+            // Set keep alive timeout if enabled
+            if (keepAliveTimeout > 0) {
+                ka.setTimeout(keepAliveTimeout);
+            }
+
             recycle();
         }
         
@@ -479,18 +469,23 @@ public class AjpNioProcessor extends Abs
     /**
      * Read the specified amount of bytes, and place them in the input buffer.
      */
-    protected int read(byte[] buf, int pos, int n, boolean block)
+    protected int read(byte[] buf, int pos, int n, boolean blockFirstRead)
         throws IOException {
 
         int read = 0;
         int res = 0;
+        boolean block = blockFirstRead;
+        
         while (read < n) {
             res = readSocket(buf, read + pos, n, block);
             if (res > 0) {
                 read += res;
+            } else if (res == 0 && !block) {
+                break;
             } else {
                 throw new IOException(sm.getString("ajpprotocol.failedread"));
             }
+            block = true;
         }
         return read;
     }
@@ -596,14 +591,18 @@ public class AjpNioProcessor extends Abs
      * @return The number of bytes read
      * @throws IOException any other failure, including incomplete reads
      */
-    protected int readMessage(AjpMessage message, boolean block)
+    protected int readMessage(AjpMessage message, boolean blockFirstRead)
         throws IOException {
 
         byte[] buf = message.getBuffer();
         int headerLength = message.getHeaderLength();
 
-        int bytesRead = read(buf, 0, headerLength, block);
+        int bytesRead = read(buf, 0, headerLength, blockFirstRead);
 
+        if (bytesRead == 0) {
+            return 0;
+        }
+        
         int messageLength = message.processHeader();
         if (messageLength < 0) {
             // Invalid AJP header signature

Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProtocol.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProtocol.java?rev=1103860&r1=1103859&r2=1103860&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProtocol.java Mon May 16 
19:56:21 2011
@@ -17,7 +17,6 @@
 
 package org.apache.coyote.ajp;
 
-import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
@@ -113,7 +112,7 @@ public class AjpNioProtocol extends Abst
             new ConcurrentHashMap<NioChannel, AjpNioProcessor>();
 
         protected ConcurrentLinkedQueue<AjpNioProcessor> recycledProcessors = 
-            new ConcurrentLinkedQueue<AjpNioProcessor>() {
+                new ConcurrentLinkedQueue<AjpNioProcessor>() {
             private static final long serialVersionUID = 1L;
             protected AtomicInteger size = new AtomicInteger(0);
             @Override
@@ -213,7 +212,7 @@ public class AjpNioProtocol extends Abst
             recycledProcessors.offer(processor);
         }
 
-        // FIXME: Support for this could be added in AJP as well
+        // FIXME: Support for Comet could be added in AJP as well
         @Override
         public SocketState event(NioChannel socket, SocketStatus status) {
             AjpNioProcessor processor = connections.get(socket);
@@ -244,16 +243,7 @@ public class AjpNioProtocol extends Abst
                             socket.getPoller().add(socket);
                         }
                     } else if (state == SocketState.LONG) {
-                        if (processor.isAsync()) {
-                            att.setAsync(true); // Re-enable timeouts
-                        } else {
-                            // Comet
-                            if (log.isDebugEnabled()) log.debug("Keeping 
processor["+processor);
-                            // May receive more data from client
-                            SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
-                            key.interestOps(SelectionKey.OP_READ);
-                            att.interestOps(SelectionKey.OP_READ);
-                        }
+                        att.setAsync(true); // Re-enable timeouts
                     } else {
                         // state == SocketState.ASYNC_END
                         // No further work required
@@ -265,27 +255,37 @@ public class AjpNioProtocol extends Abst
         
         @Override
         public SocketState process(NioChannel socket) {
-            AjpNioProcessor processor = recycledProcessors.poll();
+            AjpNioProcessor processor = connections.remove(socket);
             try {
                 if (processor == null) {
+                    processor = recycledProcessors.poll();
+                }
+                if (processor == null) {
                     processor = createProcessor();
                 }
 
                 SocketState state = processor.process(socket);
                 if (state == SocketState.LONG) {
-                    // Check if the post processing is going to change the 
state
+                    // In the middle of processing a request/response. Keep the
+                    // socket associated with the processor.
+                    connections.put(socket, processor);
+                    
+                    NioEndpoint.KeyAttachment att = 
(NioEndpoint.KeyAttachment)socket.getAttachment(false);
+                    att.setAsync(true);
+                    // longPoll may change socket state (e.g. to trigger a
+                    // complete or dispatch)
                     state = processor.asyncPostProcess();
                 }
                 if (state == SocketState.LONG || state == 
SocketState.ASYNC_END) {
-                    // Need to make socket available for next processing cycle
-                    // but no need for the poller
-                    connections.put(socket, processor);
-                    NioEndpoint.KeyAttachment att =
-                            
(NioEndpoint.KeyAttachment)socket.getAttachment(false);
-                    att.setAsync(true);
+                    // Already done all we need to do.
+                } else if (state == SocketState.OPEN){
+                    // In keep-alive but between requests. OK to recycle
+                    // processor. Continue to poll for the next request.
+                    release(socket, processor);
+                    socket.getPoller().add(socket);
                 } else {
-                    processor.recycle();
-                    recycledProcessors.offer(processor);
+                    // Connection closed. OK to recycle the processor.
+                    release(socket, processor);
                 }
                 return state;
 
@@ -308,8 +308,7 @@ public class AjpNioProtocol extends Abst
                 // less-than-verbose logs.
                 log.error(sm.getString("ajpprotocol.proto.error"), e);
             }
-            processor.recycle();
-            recycledProcessors.offer(processor);
+            release(socket, processor);
             return SocketState.CLOSED;
         }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to