Author: markt Date: Mon May 16 19:56:21 2011 New Revision: 1103860 URL: http://svn.apache.org/viewvc?rev=1103860&view=rev Log: Tweak processor blocking so that it is non-blocking while no message is being processed and blocking during the processing of a message Align ajp nio protocol implementation with http nio Still some TCK failures to resolve
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProtocol.java Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java?rev=1103860&r1=1103859&r2=1103860&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java Mon May 16 19:56:21 2011 @@ -196,29 +196,20 @@ public class AjpNioProcessor extends Abs // Setting up the socket this.socket = socket; - int soTimeout = -1; - final KeyAttachment ka = (KeyAttachment)socket.getAttachment(false); - if (keepAliveTimeout > 0) { - ka.setTimeout(soTimeout); - } + long soTimeout = endpoint.getSoTimeout(); + int keepAliveTimeout = endpoint.getKeepAliveTimeout(); // Error flag error = false; - boolean keptAlive = false; - + final KeyAttachment ka = (KeyAttachment)socket.getAttachment(false); + while (!error && !endpoint.isPaused()) { - // Parsing the request header try { - // Set keep alive timeout if enabled - if (keepAliveTimeout > 0) { - ka.setTimeout(keepAliveTimeout); - } // Get first message of the request - int bytesRead = readMessage(requestHeaderMessage, !keptAlive); - if (!keptAlive && bytesRead == 0) { - // No bytes on a blocking read - connection timeout + int bytesRead = readMessage(requestHeaderMessage, false); + if (bytesRead == 0) { rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); break; } @@ -235,8 +226,6 @@ public class AjpNioProcessor extends Abs } catch (IOException e) { error = true; } - // Should be unnecessary but just in case... - keptAlive = true; recycle(); continue; } else if(type != Constants.JK_AJP13_FORWARD_REQUEST) { @@ -244,12 +233,9 @@ public class AjpNioProcessor extends Abs if(log.isDebugEnabled()) { log.debug("Unexpected message: "+type); } - // Should be unnecessary but just in case... - keptAlive = true; recycle(); continue; } - request.setStartTime(System.currentTimeMillis()); } catch (IOException e) { error = true; @@ -324,7 +310,11 @@ public class AjpNioProcessor extends Abs request.updateCounters(); rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE); - keptAlive = true; + // Set keep alive timeout if enabled + if (keepAliveTimeout > 0) { + ka.setTimeout(keepAliveTimeout); + } + recycle(); } @@ -479,18 +469,23 @@ public class AjpNioProcessor extends Abs /** * Read the specified amount of bytes, and place them in the input buffer. */ - protected int read(byte[] buf, int pos, int n, boolean block) + protected int read(byte[] buf, int pos, int n, boolean blockFirstRead) throws IOException { int read = 0; int res = 0; + boolean block = blockFirstRead; + while (read < n) { res = readSocket(buf, read + pos, n, block); if (res > 0) { read += res; + } else if (res == 0 && !block) { + break; } else { throw new IOException(sm.getString("ajpprotocol.failedread")); } + block = true; } return read; } @@ -596,14 +591,18 @@ public class AjpNioProcessor extends Abs * @return The number of bytes read * @throws IOException any other failure, including incomplete reads */ - protected int readMessage(AjpMessage message, boolean block) + protected int readMessage(AjpMessage message, boolean blockFirstRead) throws IOException { byte[] buf = message.getBuffer(); int headerLength = message.getHeaderLength(); - int bytesRead = read(buf, 0, headerLength, block); + int bytesRead = read(buf, 0, headerLength, blockFirstRead); + if (bytesRead == 0) { + return 0; + } + int messageLength = message.processHeader(); if (messageLength < 0) { // Invalid AJP header signature Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProtocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProtocol.java?rev=1103860&r1=1103859&r2=1103860&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProtocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProtocol.java Mon May 16 19:56:21 2011 @@ -17,7 +17,6 @@ package org.apache.coyote.ajp; -import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; @@ -113,7 +112,7 @@ public class AjpNioProtocol extends Abst new ConcurrentHashMap<NioChannel, AjpNioProcessor>(); protected ConcurrentLinkedQueue<AjpNioProcessor> recycledProcessors = - new ConcurrentLinkedQueue<AjpNioProcessor>() { + new ConcurrentLinkedQueue<AjpNioProcessor>() { private static final long serialVersionUID = 1L; protected AtomicInteger size = new AtomicInteger(0); @Override @@ -213,7 +212,7 @@ public class AjpNioProtocol extends Abst recycledProcessors.offer(processor); } - // FIXME: Support for this could be added in AJP as well + // FIXME: Support for Comet could be added in AJP as well @Override public SocketState event(NioChannel socket, SocketStatus status) { AjpNioProcessor processor = connections.get(socket); @@ -244,16 +243,7 @@ public class AjpNioProtocol extends Abst socket.getPoller().add(socket); } } else if (state == SocketState.LONG) { - if (processor.isAsync()) { - att.setAsync(true); // Re-enable timeouts - } else { - // Comet - if (log.isDebugEnabled()) log.debug("Keeping processor["+processor); - // May receive more data from client - SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); - key.interestOps(SelectionKey.OP_READ); - att.interestOps(SelectionKey.OP_READ); - } + att.setAsync(true); // Re-enable timeouts } else { // state == SocketState.ASYNC_END // No further work required @@ -265,27 +255,37 @@ public class AjpNioProtocol extends Abst @Override public SocketState process(NioChannel socket) { - AjpNioProcessor processor = recycledProcessors.poll(); + AjpNioProcessor processor = connections.remove(socket); try { if (processor == null) { + processor = recycledProcessors.poll(); + } + if (processor == null) { processor = createProcessor(); } SocketState state = processor.process(socket); if (state == SocketState.LONG) { - // Check if the post processing is going to change the state + // In the middle of processing a request/response. Keep the + // socket associated with the processor. + connections.put(socket, processor); + + NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); + att.setAsync(true); + // longPoll may change socket state (e.g. to trigger a + // complete or dispatch) state = processor.asyncPostProcess(); } if (state == SocketState.LONG || state == SocketState.ASYNC_END) { - // Need to make socket available for next processing cycle - // but no need for the poller - connections.put(socket, processor); - NioEndpoint.KeyAttachment att = - (NioEndpoint.KeyAttachment)socket.getAttachment(false); - att.setAsync(true); + // Already done all we need to do. + } else if (state == SocketState.OPEN){ + // In keep-alive but between requests. OK to recycle + // processor. Continue to poll for the next request. + release(socket, processor); + socket.getPoller().add(socket); } else { - processor.recycle(); - recycledProcessors.offer(processor); + // Connection closed. OK to recycle the processor. + release(socket, processor); } return state; @@ -308,8 +308,7 @@ public class AjpNioProtocol extends Abst // less-than-verbose logs. log.error(sm.getString("ajpprotocol.proto.error"), e); } - processor.recycle(); - recycledProcessors.offer(processor); + release(socket, processor); return SocketState.CLOSED; } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org