remm 2005/06/23 04:50:34
Modified: jk/java/org/apache/coyote/ajp AjpAprProcessor.java
Log:
- Add buffering for output.
- This seems to work fine, but I didn't do that much testing yet.
- Not done yet: input buffering is still missing, and it will likely be more
significant (at least when there's a lot of upload). It's also harder,
unfortunately :(
Revision Changes Path
1.6 +179 -106
jakarta-tomcat-connectors/jk/java/org/apache/coyote/ajp/AjpAprProcessor.java
Index: AjpAprProcessor.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-connectors/jk/java/org/apache/coyote/ajp/AjpAprProcessor.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- AjpAprProcessor.java 17 Jun 2005 09:43:35 -0000 1.5
+++ AjpAprProcessor.java 23 Jun 2005 11:50:34 -0000 1.6
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetAddress;
+import java.nio.ByteBuffer;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
@@ -84,11 +85,45 @@
response.setOutputBuffer(new SocketOutputBuffer());
request.setResponse(response);
- readTimeout = endpoint.getFirstReadTimeout() * 1000;
+ if (endpoint.getFirstReadTimeout() > 0) {
+ readTimeout = endpoint.getFirstReadTimeout() * 1000;
+ } else {
+ readTimeout = 100 * 1000;
+ }
// Cause loading of HexUtils
int foo = HexUtils.DEC[0];
+ // Allocate input and output buffers
+ inputBuffer = ByteBuffer.allocateDirect(16 * 1024);
+ outputBuffer = ByteBuffer.allocateDirect(16 * 1024);
+
+ // Set the get body message buffer
+ AjpMessage getBodyMessage = new AjpMessage();
+ getBodyMessage.reset();
+ getBodyMessage.appendByte(Constants.JK_AJP13_GET_BODY_CHUNK);
+ getBodyMessage.appendInt(Constants.MAX_READ_SIZE);
+ getBodyMessage.end();
+ getBodyMessageBuffer =
ByteBuffer.allocateDirect(getBodyMessage.getLen());
+ getBodyMessageBuffer.put(getBodyMessage.getBuffer(), 0,
getBodyMessage.getLen());
+
+ // Set the read body message buffer
+ AjpMessage pongMessage = new AjpMessage();
+ pongMessage.reset();
+ pongMessage.appendByte(Constants.JK_AJP13_CPONG_REPLY);
+ pongMessage.end();
+ pongMessageBuffer = ByteBuffer.allocateDirect(pongMessage.getLen());
+ pongMessageBuffer.put(pongMessage.getBuffer(), 0,
pongMessage.getLen());
+
+ // Allocate the end message array
+ AjpMessage endMessage = new AjpMessage();
+ endMessage.reset();
+ endMessage.appendByte(Constants.JK_AJP13_END_RESPONSE);
+ endMessage.appendByte(1);
+ endMessage.end();
+ endMessageArray = new byte[endMessage.getLen()];
+ System.arraycopy(endMessage.getBuffer(), 0, endMessageArray, 0,
endMessage.getLen());
+
}
@@ -118,13 +153,13 @@
* processing of the first message of a "request", so it might not be a
request
* header. It will stay unchanged during the processing of the whole
request.
*/
- protected AjpMessage headerMessage = new AjpMessage();
+ protected AjpMessage requestHeaderMessage = new AjpMessage();
/**
- * Message used for output.
+ * Message used for response header composition.
*/
- protected AjpMessage outputMessage = new AjpMessage();
+ protected AjpMessage responseHeaderMessage = new AjpMessage();
/**
@@ -146,18 +181,6 @@
/**
- * All purpose response message.
- */
- protected AjpMessage responseMessage = new AjpMessage();
-
-
- /**
- * Read body message.
- */
- protected AjpMessage readBodyMessage = new AjpMessage();
-
-
- /**
* State flag.
*/
protected boolean started = false;
@@ -296,6 +319,39 @@
protected boolean finished = false;
+ /**
+ * Direct buffer used for output.
+ */
+ protected ByteBuffer outputBuffer = null;
+
+
+ /**
+ * Direct buffer used for input.
+ */
+ protected ByteBuffer inputBuffer = null;
+
+
+ /**
+ * Direct buffer used for sending right away a get body message.
+ * FIXME: can probably be static
+ */
+ protected ByteBuffer getBodyMessageBuffer = null;
+
+
+ /**
+ * Direct buffer used for sending right away a pong message.
+ * FIXME: can probably be static
+ */
+ protected ByteBuffer pongMessageBuffer = null;
+
+
+ /**
+ * End message array.
+ * FIXME: can probably be static
+ */
+ protected byte[] endMessageArray = null;
+
+
// -------------------------------------------------------------
Properties
@@ -375,14 +431,20 @@
long soTimeout = endpoint.getSoTimeout();
+ int limit = 0;
+ if (endpoint.getFirstReadTimeout() > 0) {
+ limit = endpoint.getMaxThreads() / 2;
+ }
+
boolean openSocket = true;
+ boolean keptAlive = false;
while (started && !error) {
// Parsing the request header
try {
// Get first message of the request
- if (!readMessage(headerMessage, true)) {
+ if (!readMessage(requestHeaderMessage, true)) {
// This means that no data is available right now
// (long keepalive), so that the processor should be
recycled
// and the method should return true
@@ -391,15 +453,17 @@
}
// Check message type, process right away and break if
// not regular request processing
- int type = headerMessage.getByte();
+ int type = requestHeaderMessage.getByte();
// FIXME: Any other types which should be checked ?
if (type == Constants.JK_AJP13_CPING_REQUEST) {
- headerMessage.reset();
- headerMessage.appendByte(Constants.JK_AJP13_CPONG_REPLY);
- writeMessage(headerMessage);
+ if (Socket.sendb(socket, pongMessageBuffer, 0,
+ pongMessageBuffer.position()) < 0) {
+ error = true;
+ }
continue;
}
+ keptAlive = true;
request.setStartTime(System.currentTimeMillis());
} catch (IOException e) {
error = true;
@@ -515,12 +579,19 @@
} else if (actionCode == ActionCode.ACTION_CLIENT_FLUSH) {
- if (response.isCommitted())
- return;
+ if (!response.isCommitted()) {
+ // Validate and write response headers
+ try {
+ prepareResponse();
+ } catch (IOException e) {
+ // Set error flag
+ error = true;
+ return;
+ }
+ }
- // Validate and write response headers
try {
- prepareResponse();
+ flushOutputBuffer();
} catch (IOException e) {
// Set error flag
error = true;
@@ -721,21 +792,21 @@
protected void prepareRequest() {
// Translate the HTTP method code to a String.
- byte methodCode = headerMessage.getByte();
+ byte methodCode = requestHeaderMessage.getByte();
if (methodCode != Constants.SC_M_JK_STORED) {
String methodName = Constants.methodTransArray[(int)methodCode -
1];
request.method().setString(methodName);
}
- headerMessage.getBytes(request.protocol());
- headerMessage.getBytes(request.requestURI());
+ requestHeaderMessage.getBytes(request.protocol());
+ requestHeaderMessage.getBytes(request.requestURI());
- headerMessage.getBytes(request.remoteAddr());
- headerMessage.getBytes(request.remoteHost());
- headerMessage.getBytes(request.localName());
- request.setLocalPort(headerMessage.getInt());
+ requestHeaderMessage.getBytes(request.remoteAddr());
+ requestHeaderMessage.getBytes(request.remoteHost());
+ requestHeaderMessage.getBytes(request.localName());
+ request.setLocalPort(requestHeaderMessage.getInt());
- boolean isSSL = headerMessage.getByte() != 0;
+ boolean isSSL = requestHeaderMessage.getByte() != 0;
if (isSSL) {
// XXX req.setSecure( true );
request.scheme().setString("https");
@@ -744,20 +815,20 @@
// Decode headers
MimeHeaders headers = request.getMimeHeaders();
- int hCount = headerMessage.getInt();
+ int hCount = requestHeaderMessage.getInt();
for(int i = 0 ; i < hCount ; i++) {
String hName = null;
// Header names are encoded as either an integer code starting
// with 0xA0, or as a normal string (in which case the first
// two bytes are the length).
- int isc = headerMessage.peekInt();
+ int isc = requestHeaderMessage.peekInt();
int hId = isc & 0xFF;
MessageBytes vMB = null;
isc &= 0xFF00;
if(0xA000 == isc) {
- headerMessage.getInt(); // To advance the read position
+ requestHeaderMessage.getInt(); // To advance the read
position
hName = Constants.headerTransArray[hId - 1];
vMB = headers.addValue( hName );
} else {
@@ -768,13 +839,13 @@
// SC_REQ_CONTENT_LENGTH=8 - leading to unexpected
// behaviour. see bug 5861 for more information.
hId = -1;
- headerMessage.getBytes(tmpMB);
+ requestHeaderMessage.getBytes(tmpMB);
ByteChunk bc = tmpMB.getByteChunk();
vMB = headers.addValue(bc.getBuffer(),
bc.getStart(), bc.getLength());
}
- headerMessage.getBytes(vMB);
+ requestHeaderMessage.getBytes(vMB);
if (hId == Constants.SC_REQ_CONTENT_LENGTH ||
(hId == -1 && tmpMB.equalsIgnoreCase("Content-Length")))
{
@@ -794,7 +865,7 @@
boolean moreAttr = true;
while (moreAttr) {
- byte attributeCode = headerMessage.getByte();
+ byte attributeCode = requestHeaderMessage.getByte();
if (attributeCode == Constants.SC_A_ARE_DONE)
break;
@@ -803,15 +874,15 @@
if (attributeCode == Constants.SC_A_SSL_KEY_SIZE) {
// Bug 1326: it's an Integer.
request.setAttribute(SSLSupport.KEY_SIZE_KEY,
- new Integer(headerMessage.getInt()));
+ new Integer(requestHeaderMessage.getInt()));
//Integer.toString(msg.getInt()));
}
if (attributeCode == Constants.SC_A_REQ_ATTRIBUTE ) {
// 2 strings ???...
- headerMessage.getBytes(tmpMB);
+ requestHeaderMessage.getBytes(tmpMB);
String n = tmpMB.toString();
- headerMessage.getBytes(tmpMB);
+ requestHeaderMessage.getBytes(tmpMB);
String v = tmpMB.toString();
request.setAttribute(n, v);
if (log.isTraceEnabled())
@@ -821,63 +892,63 @@
// 1 string attributes
switch (attributeCode) {
case Constants.SC_A_CONTEXT :
- headerMessage.getBytes(tmpMB);
+ requestHeaderMessage.getBytes(tmpMB);
// nothing
break;
case Constants.SC_A_SERVLET_PATH :
- headerMessage.getBytes(tmpMB);
+ requestHeaderMessage.getBytes(tmpMB);
// nothing
break;
case Constants.SC_A_REMOTE_USER :
if (tomcatAuthentication) {
// ignore server
- headerMessage.getBytes(tmpMB);
+ requestHeaderMessage.getBytes(tmpMB);
} else {
- headerMessage.getBytes(request.getRemoteUser());
+ requestHeaderMessage.getBytes(request.getRemoteUser());
}
break;
case Constants.SC_A_AUTH_TYPE :
if (tomcatAuthentication) {
// ignore server
- headerMessage.getBytes(tmpMB);
+ requestHeaderMessage.getBytes(tmpMB);
} else {
- headerMessage.getBytes(request.getAuthType());
+ requestHeaderMessage.getBytes(request.getAuthType());
}
break;
case Constants.SC_A_QUERY_STRING :
- headerMessage.getBytes(request.queryString());
+ requestHeaderMessage.getBytes(request.queryString());
break;
case Constants.SC_A_JVM_ROUTE :
- headerMessage.getBytes(request.instanceId());
+ requestHeaderMessage.getBytes(request.instanceId());
break;
case Constants.SC_A_SSL_CERT :
request.scheme().setString("https");
// SSL certificate extraction is costy, moved to
JkCoyoteHandler
- headerMessage.getBytes(certificates);
+ requestHeaderMessage.getBytes(certificates);
break;
case Constants.SC_A_SSL_CIPHER :
request.scheme().setString( "https" );
- headerMessage.getBytes(tmpMB);
+ requestHeaderMessage.getBytes(tmpMB);
request.setAttribute(SSLSupport.CIPHER_SUITE_KEY,
tmpMB.toString());
break;
case Constants.SC_A_SSL_SESSION :
request.scheme().setString( "https" );
- headerMessage.getBytes(tmpMB);
+ requestHeaderMessage.getBytes(tmpMB);
request.setAttribute(SSLSupport.SESSION_ID_KEY,
tmpMB.toString());
break;
case Constants.SC_A_SECRET :
- headerMessage.getBytes(tmpMB);
+ requestHeaderMessage.getBytes(tmpMB);
String secret = tmpMB.toString();
if(log.isInfoEnabled())
log.info("Secret: " + secret);
@@ -886,7 +957,7 @@
break;
case Constants.SC_A_STORED_METHOD:
- headerMessage.getBytes(request.method());
+ requestHeaderMessage.getBytes(request.method());
break;
default:
@@ -1013,9 +1084,9 @@
response.setCommitted(true);
- outputMessage.reset();
- outputMessage.appendByte(Constants.JK_AJP13_SEND_HEADERS);
- outputMessage.appendInt(response.getStatus());
+ responseHeaderMessage.reset();
+ responseHeaderMessage.appendByte(Constants.JK_AJP13_SEND_HEADERS);
+ responseHeaderMessage.appendInt(response.getStatus());
String message = response.getMessage();
if (message == null){
@@ -1024,7 +1095,7 @@
message = message.replace('\n', ' ').replace('\r', ' ');
}
tmpMB.setString(message);
- outputMessage.appendBytes(tmpMB);
+ responseHeaderMessage.appendBytes(tmpMB);
// XXX add headers
@@ -1042,14 +1113,15 @@
headers.setValue("Content-Length").setInt(contentLength);
}
int numHeaders = headers.size();
- outputMessage.appendInt(numHeaders);
+ responseHeaderMessage.appendInt(numHeaders);
for (int i = 0; i < numHeaders; i++) {
MessageBytes hN = headers.getName(i);
- outputMessage.appendBytes(hN);
+ responseHeaderMessage.appendBytes(hN);
MessageBytes hV=headers.getValue(i);
- outputMessage.appendBytes(hV);
+ responseHeaderMessage.appendBytes(hV);
}
- writeMessage(outputMessage);
+ responseHeaderMessage.end();
+ outputBuffer.put(responseHeaderMessage.getBuffer(), 0,
responseHeaderMessage.getLen());
}
@@ -1074,10 +1146,12 @@
return;
finished = true;
- outputMessage.reset();
- outputMessage.appendByte(Constants.JK_AJP13_END_RESPONSE);
- outputMessage.appendByte(1);
- writeMessage(outputMessage);
+ if (outputBuffer.position() + endMessageArray.length >
outputBuffer.capacity()) {
+ flushOutputBuffer();
+ } else {
+ outputBuffer.put(endMessageArray);
+ }
+ flushOutputBuffer();
}
@@ -1135,16 +1209,11 @@
return false;
}
- // Why not use outBuf??
- readBodyMessage.reset();
- readBodyMessage.appendByte(Constants.JK_AJP13_GET_BODY_CHUNK);
- readBodyMessage.appendInt(Constants.MAX_READ_SIZE);
- writeMessage(readBodyMessage);
-
- // In JNI mode, response will be in bodyMsg. In TCP mode, response
need to be
- // read
+ // Request more data immediately
+ Socket.sendb(socket, getBodyMessageBuffer, 0,
+ getBodyMessageBuffer.position());
- boolean moreData=receive();
+ boolean moreData = receive();
if( !moreData ) {
endOfStream = true;
}
@@ -1208,20 +1277,6 @@
/**
- * Send the specified AJP message.
- *
- * @param message to send
- * @throws IOException IO error when writing the message
- */
- protected void writeMessage(AjpMessage message)
- throws IOException {
- message.end();
- if (Socket.send(socket, message.getBuffer(), 0, message.getLen()) <
0)
- throw new IOException(sm.getString("iib.failedwrite"));
- }
-
-
- /**
* Recycle the processor.
*/
public void recycle() {
@@ -1235,11 +1290,26 @@
request.recycle();
response.recycle();
certificates.recycle();
- headerMessage.reset();
+ requestHeaderMessage.reset();
+ outputBuffer.clear();
}
+ /**
+ * Callback to write data from the buffer.
+ */
+ protected void flushOutputBuffer()
+ throws IOException {
+ if (outputBuffer.position() > 0) {
+ if (Socket.sendb(socket, outputBuffer, 0,
outputBuffer.position()) < 0) {
+ throw new IOException(sm.getString("iib.failedwrite"));
+ }
+ outputBuffer.clear();
+ }
+ }
+
+
// ------------------------------------- InputStreamInputBuffer Inner
Class
@@ -1306,25 +1376,28 @@
}
int len = chunk.getLength();
- byte buf[] = bodyMessage.getBuffer();
// 4 - hardcoded, byte[] marshalling overhead
- int chunkSize=buf.length - bodyMessage.getHeaderLength() - 4;
- int off=0;
- while( len > 0 ) {
- int thisTime=len;
- if( thisTime > chunkSize ) {
- thisTime=chunkSize;
+ int chunkSize = 8*1024 - 4 - 4;
+ int off = 0;
+ while (len > 0) {
+ int thisTime = len;
+ if (thisTime > chunkSize) {
+ thisTime = chunkSize;
}
len -= thisTime;
-
- // FIXME: Don't use a temp buffer
- bodyMessage.reset();
- bodyMessage.appendByte(
AjpConstants.JK_AJP13_SEND_BODY_CHUNK);
- if (log.isTraceEnabled())
- log.trace("doWrite " + off + " " + thisTime + " " + len);
- bodyMessage.appendBytes(chunk.getBytes(), chunk.getOffset()
+ off, thisTime);
+ if (outputBuffer.position() + thisTime
+ + bodyMessage.getHeaderLength() + 4 >
outputBuffer.capacity()) {
+ flushOutputBuffer();
+ } else {
+ outputBuffer.put((byte) 0x41);
+ outputBuffer.put((byte) 0x42);
+ outputBuffer.putShort((short) (thisTime + 4));
+ outputBuffer.put(AjpConstants.JK_AJP13_SEND_BODY_CHUNK);
+ outputBuffer.putShort((short) chunk.getLength());
+ outputBuffer.put(chunk.getBytes(), chunk.getOffset() +
off, thisTime);
+ outputBuffer.put((byte) 0x00);
+ }
off += thisTime;
- writeMessage(bodyMessage);
}
return chunk.getLength();
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]