Author: markt
Date: Mon May 16 19:56:21 2011
New Revision: 1103860
URL: http://svn.apache.org/viewvc?rev=1103860&view=rev
Log:
Tweak processor blocking so that it is non-blocking while no message is being
processed and blocking during the processing of a message
Align ajp nio protocol implementation with http nio
Still some TCK failures to resolve
Modified:
tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProtocol.java
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java?rev=1103860&r1=1103859&r2=1103860&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java Mon May 16
19:56:21 2011
@@ -196,29 +196,20 @@ public class AjpNioProcessor extends Abs
// Setting up the socket
this.socket = socket;
- int soTimeout = -1;
- final KeyAttachment ka = (KeyAttachment)socket.getAttachment(false);
- if (keepAliveTimeout > 0) {
- ka.setTimeout(soTimeout);
- }
+ long soTimeout = endpoint.getSoTimeout();
+ int keepAliveTimeout = endpoint.getKeepAliveTimeout();
// Error flag
error = false;
- boolean keptAlive = false;
-
+ final KeyAttachment ka = (KeyAttachment)socket.getAttachment(false);
+
while (!error && !endpoint.isPaused()) {
-
// Parsing the request header
try {
- // Set keep alive timeout if enabled
- if (keepAliveTimeout > 0) {
- ka.setTimeout(keepAliveTimeout);
- }
// Get first message of the request
- int bytesRead = readMessage(requestHeaderMessage, !keptAlive);
- if (!keptAlive && bytesRead == 0) {
- // No bytes on a blocking read - connection timeout
+ int bytesRead = readMessage(requestHeaderMessage, false);
+ if (bytesRead == 0) {
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
break;
}
@@ -235,8 +226,6 @@ public class AjpNioProcessor extends Abs
} catch (IOException e) {
error = true;
}
- // Should be unnecessary but just in case...
- keptAlive = true;
recycle();
continue;
} else if(type != Constants.JK_AJP13_FORWARD_REQUEST) {
@@ -244,12 +233,9 @@ public class AjpNioProcessor extends Abs
if(log.isDebugEnabled()) {
log.debug("Unexpected message: "+type);
}
- // Should be unnecessary but just in case...
- keptAlive = true;
recycle();
continue;
}
-
request.setStartTime(System.currentTimeMillis());
} catch (IOException e) {
error = true;
@@ -324,7 +310,11 @@ public class AjpNioProcessor extends Abs
request.updateCounters();
rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
- keptAlive = true;
+ // Set keep alive timeout if enabled
+ if (keepAliveTimeout > 0) {
+ ka.setTimeout(keepAliveTimeout);
+ }
+
recycle();
}
@@ -479,18 +469,23 @@ public class AjpNioProcessor extends Abs
/**
* Read the specified amount of bytes, and place them in the input buffer.
*/
- protected int read(byte[] buf, int pos, int n, boolean block)
+ protected int read(byte[] buf, int pos, int n, boolean blockFirstRead)
throws IOException {
int read = 0;
int res = 0;
+ boolean block = blockFirstRead;
+
while (read < n) {
res = readSocket(buf, read + pos, n, block);
if (res > 0) {
read += res;
+ } else if (res == 0 && !block) {
+ break;
} else {
throw new IOException(sm.getString("ajpprotocol.failedread"));
}
+ block = true;
}
return read;
}
@@ -596,14 +591,18 @@ public class AjpNioProcessor extends Abs
* @return The number of bytes read
* @throws IOException any other failure, including incomplete reads
*/
- protected int readMessage(AjpMessage message, boolean block)
+ protected int readMessage(AjpMessage message, boolean blockFirstRead)
throws IOException {
byte[] buf = message.getBuffer();
int headerLength = message.getHeaderLength();
- int bytesRead = read(buf, 0, headerLength, block);
+ int bytesRead = read(buf, 0, headerLength, blockFirstRead);
+ if (bytesRead == 0) {
+ return 0;
+ }
+
int messageLength = message.processHeader();
if (messageLength < 0) {
// Invalid AJP header signature
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProtocol.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProtocol.java?rev=1103860&r1=1103859&r2=1103860&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProtocol.java Mon May 16
19:56:21 2011
@@ -17,7 +17,6 @@
package org.apache.coyote.ajp;
-import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
@@ -113,7 +112,7 @@ public class AjpNioProtocol extends Abst
new ConcurrentHashMap<NioChannel, AjpNioProcessor>();
protected ConcurrentLinkedQueue<AjpNioProcessor> recycledProcessors =
- new ConcurrentLinkedQueue<AjpNioProcessor>() {
+ new ConcurrentLinkedQueue<AjpNioProcessor>() {
private static final long serialVersionUID = 1L;
protected AtomicInteger size = new AtomicInteger(0);
@Override
@@ -213,7 +212,7 @@ public class AjpNioProtocol extends Abst
recycledProcessors.offer(processor);
}
- // FIXME: Support for this could be added in AJP as well
+ // FIXME: Support for Comet could be added in AJP as well
@Override
public SocketState event(NioChannel socket, SocketStatus status) {
AjpNioProcessor processor = connections.get(socket);
@@ -244,16 +243,7 @@ public class AjpNioProtocol extends Abst
socket.getPoller().add(socket);
}
} else if (state == SocketState.LONG) {
- if (processor.isAsync()) {
- att.setAsync(true); // Re-enable timeouts
- } else {
- // Comet
- if (log.isDebugEnabled()) log.debug("Keeping
processor["+processor);
- // May receive more data from client
- SelectionKey key =
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
- key.interestOps(SelectionKey.OP_READ);
- att.interestOps(SelectionKey.OP_READ);
- }
+ att.setAsync(true); // Re-enable timeouts
} else {
// state == SocketState.ASYNC_END
// No further work required
@@ -265,27 +255,37 @@ public class AjpNioProtocol extends Abst
@Override
public SocketState process(NioChannel socket) {
- AjpNioProcessor processor = recycledProcessors.poll();
+ AjpNioProcessor processor = connections.remove(socket);
try {
if (processor == null) {
+ processor = recycledProcessors.poll();
+ }
+ if (processor == null) {
processor = createProcessor();
}
SocketState state = processor.process(socket);
if (state == SocketState.LONG) {
- // Check if the post processing is going to change the
state
+ // In the middle of processing a request/response. Keep the
+ // socket associated with the processor.
+ connections.put(socket, processor);
+
+ NioEndpoint.KeyAttachment att =
(NioEndpoint.KeyAttachment)socket.getAttachment(false);
+ att.setAsync(true);
+ // longPoll may change socket state (e.g. to trigger a
+ // complete or dispatch)
state = processor.asyncPostProcess();
}
if (state == SocketState.LONG || state ==
SocketState.ASYNC_END) {
- // Need to make socket available for next processing cycle
- // but no need for the poller
- connections.put(socket, processor);
- NioEndpoint.KeyAttachment att =
-
(NioEndpoint.KeyAttachment)socket.getAttachment(false);
- att.setAsync(true);
+ // Already done all we need to do.
+ } else if (state == SocketState.OPEN){
+ // In keep-alive but between requests. OK to recycle
+ // processor. Continue to poll for the next request.
+ release(socket, processor);
+ socket.getPoller().add(socket);
} else {
- processor.recycle();
- recycledProcessors.offer(processor);
+ // Connection closed. OK to recycle the processor.
+ release(socket, processor);
}
return state;
@@ -308,8 +308,7 @@ public class AjpNioProtocol extends Abst
// less-than-verbose logs.
log.error(sm.getString("ajpprotocol.proto.error"), e);
}
- processor.recycle();
- recycledProcessors.offer(processor);
+ release(socket, processor);
return SocketState.CLOSED;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]