remm        2005/06/23 04:50:34

  Modified:    jk/java/org/apache/coyote/ajp AjpAprProcessor.java
  Log:
  - Add buffering for output.
  - This seems to work fine, but I didn't do that much testing yet.
  - Not done yet: input buffering is still missing, and it will likely be more
    significant (at least when there's a lot of upload). It's also harder,
    unfortunately :(
  
  Revision  Changes    Path
  1.6       +179 -106  
jakarta-tomcat-connectors/jk/java/org/apache/coyote/ajp/AjpAprProcessor.java
  
  Index: AjpAprProcessor.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-connectors/jk/java/org/apache/coyote/ajp/AjpAprProcessor.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- AjpAprProcessor.java      17 Jun 2005 09:43:35 -0000      1.5
  +++ AjpAprProcessor.java      23 Jun 2005 11:50:34 -0000      1.6
  @@ -20,6 +20,7 @@
   import java.io.IOException;
   import java.io.InterruptedIOException;
   import java.net.InetAddress;
  +import java.nio.ByteBuffer;
   import java.security.cert.CertificateFactory;
   import java.security.cert.X509Certificate;
   
  @@ -84,11 +85,45 @@
           response.setOutputBuffer(new SocketOutputBuffer());
           request.setResponse(response);
   
  -        readTimeout = endpoint.getFirstReadTimeout() * 1000;
  +        if (endpoint.getFirstReadTimeout() > 0) {
  +            readTimeout = endpoint.getFirstReadTimeout() * 1000;
  +        } else {
  +            readTimeout = 100 * 1000;
  +        }
   
           // Cause loading of HexUtils
           int foo = HexUtils.DEC[0];
   
  +        // Allocate input and output buffers
  +        inputBuffer = ByteBuffer.allocateDirect(16 * 1024);
  +        outputBuffer = ByteBuffer.allocateDirect(16 * 1024);
  +
  +        // Set the get body message buffer
  +        AjpMessage getBodyMessage = new AjpMessage();
  +        getBodyMessage.reset();
  +        getBodyMessage.appendByte(Constants.JK_AJP13_GET_BODY_CHUNK);
  +        getBodyMessage.appendInt(Constants.MAX_READ_SIZE);
  +        getBodyMessage.end();
  +        getBodyMessageBuffer = 
ByteBuffer.allocateDirect(getBodyMessage.getLen());
  +        getBodyMessageBuffer.put(getBodyMessage.getBuffer(), 0, 
getBodyMessage.getLen());
  +
  +        // Set the read body message buffer
  +        AjpMessage pongMessage = new AjpMessage();
  +        pongMessage.reset();
  +        pongMessage.appendByte(Constants.JK_AJP13_CPONG_REPLY);
  +        pongMessage.end();
  +        pongMessageBuffer = ByteBuffer.allocateDirect(pongMessage.getLen());
  +        pongMessageBuffer.put(pongMessage.getBuffer(), 0, 
pongMessage.getLen());
  +
  +        // Allocate the end message array
  +        AjpMessage endMessage = new AjpMessage();
  +        endMessage.reset();
  +        endMessage.appendByte(Constants.JK_AJP13_END_RESPONSE);
  +        endMessage.appendByte(1);
  +        endMessage.end();
  +        endMessageArray = new byte[endMessage.getLen()];
  +        System.arraycopy(endMessage.getBuffer(), 0, endMessageArray, 0, 
endMessage.getLen());
  +
       }
   
   
  @@ -118,13 +153,13 @@
        * processing of the first message of a "request", so it might not be a 
request
        * header. It will stay unchanged during the processing of the whole 
request. 
        */
  -    protected AjpMessage headerMessage = new AjpMessage(); 
  +    protected AjpMessage requestHeaderMessage = new AjpMessage(); 
   
   
       /**
  -     * Message used for output. 
  +     * Message used for response header composition. 
        */
  -    protected AjpMessage outputMessage = new AjpMessage(); 
  +    protected AjpMessage responseHeaderMessage = new AjpMessage(); 
   
   
       /**
  @@ -146,18 +181,6 @@
   
       
       /**
  -     * All purpose response message.
  -     */
  -    protected AjpMessage responseMessage = new AjpMessage();
  -    
  -    
  -    /**
  -     * Read body message.
  -     */
  -    protected AjpMessage readBodyMessage = new AjpMessage();
  -
  -    
  -    /**
        * State flag.
        */
       protected boolean started = false;
  @@ -296,6 +319,39 @@
       protected boolean finished = false;
       
       
  +    /**
  +     * Direct buffer used for output.
  +     */
  +    protected ByteBuffer outputBuffer = null;
  +    
  +    
  +    /**
  +     * Direct buffer used for input.
  +     */
  +    protected ByteBuffer inputBuffer = null;
  +    
  +    
  +    /**
  +     * Direct buffer used for sending right away a get body message.
  +     * FIXME: can probably be static
  +     */
  +    protected ByteBuffer getBodyMessageBuffer = null;
  +    
  +    
  +    /**
  +     * Direct buffer used for sending right away a pong message.
  +     * FIXME: can probably be static
  +     */
  +    protected ByteBuffer pongMessageBuffer = null;
  +    
  +    
  +    /**
  +     * End message array.
  +     * FIXME: can probably be static
  +     */
  +    protected byte[] endMessageArray = null;
  +    
  +    
       // ------------------------------------------------------------- 
Properties
   
   
  @@ -375,14 +431,20 @@
   
           long soTimeout = endpoint.getSoTimeout();
   
  +        int limit = 0;
  +        if (endpoint.getFirstReadTimeout() > 0) {
  +            limit = endpoint.getMaxThreads() / 2;
  +        }
  +
           boolean openSocket = true;
  +        boolean keptAlive = false;
   
           while (started && !error) {
   
               // Parsing the request header
               try {
                   // Get first message of the request
  -                if (!readMessage(headerMessage, true)) {
  +                if (!readMessage(requestHeaderMessage, true)) {
                       // This means that no data is available right now
                       // (long keepalive), so that the processor should be 
recycled
                       // and the method should return true
  @@ -391,15 +453,17 @@
                   }
                   // Check message type, process right away and break if 
                   // not regular request processing
  -                int type = headerMessage.getByte();
  +                int type = requestHeaderMessage.getByte();
                   // FIXME: Any other types which should be checked ?
                   if (type == Constants.JK_AJP13_CPING_REQUEST) {
  -                    headerMessage.reset();
  -                    headerMessage.appendByte(Constants.JK_AJP13_CPONG_REPLY);
  -                    writeMessage(headerMessage);
  +                    if (Socket.sendb(socket, pongMessageBuffer, 0, 
  +                            pongMessageBuffer.position()) < 0) {
  +                        error = true;
  +                    }
                       continue;
                   }
   
  +                keptAlive = true;
                   request.setStartTime(System.currentTimeMillis());
               } catch (IOException e) {
                   error = true;
  @@ -515,12 +579,19 @@
   
           } else if (actionCode == ActionCode.ACTION_CLIENT_FLUSH) {
   
  -            if (response.isCommitted())
  -                return;
  +            if (!response.isCommitted()) {
  +                // Validate and write response headers
  +                try {
  +                    prepareResponse();
  +                } catch (IOException e) {
  +                    // Set error flag
  +                    error = true;
  +                    return;
  +                }
  +            }
   
  -            // Validate and write response headers
               try {
  -                prepareResponse();
  +                flushOutputBuffer();
               } catch (IOException e) {
                   // Set error flag
                   error = true;
  @@ -721,21 +792,21 @@
       protected void prepareRequest() {
   
           // Translate the HTTP method code to a String.
  -        byte methodCode = headerMessage.getByte();
  +        byte methodCode = requestHeaderMessage.getByte();
           if (methodCode != Constants.SC_M_JK_STORED) {
               String methodName = Constants.methodTransArray[(int)methodCode - 
1];
               request.method().setString(methodName);
           }
   
  -        headerMessage.getBytes(request.protocol()); 
  -        headerMessage.getBytes(request.requestURI());
  +        requestHeaderMessage.getBytes(request.protocol()); 
  +        requestHeaderMessage.getBytes(request.requestURI());
   
  -        headerMessage.getBytes(request.remoteAddr());
  -        headerMessage.getBytes(request.remoteHost());
  -        headerMessage.getBytes(request.localName());
  -        request.setLocalPort(headerMessage.getInt());
  +        requestHeaderMessage.getBytes(request.remoteAddr());
  +        requestHeaderMessage.getBytes(request.remoteHost());
  +        requestHeaderMessage.getBytes(request.localName());
  +        request.setLocalPort(requestHeaderMessage.getInt());
   
  -        boolean isSSL = headerMessage.getByte() != 0;
  +        boolean isSSL = requestHeaderMessage.getByte() != 0;
           if (isSSL) {
               // XXX req.setSecure( true );
               request.scheme().setString("https");
  @@ -744,20 +815,20 @@
           // Decode headers
           MimeHeaders headers = request.getMimeHeaders();
   
  -        int hCount = headerMessage.getInt();
  +        int hCount = requestHeaderMessage.getInt();
           for(int i = 0 ; i < hCount ; i++) {
               String hName = null;
               
               // Header names are encoded as either an integer code starting
               // with 0xA0, or as a normal string (in which case the first
               // two bytes are the length).
  -            int isc = headerMessage.peekInt();
  +            int isc = requestHeaderMessage.peekInt();
               int hId = isc & 0xFF;
               
               MessageBytes vMB = null;
               isc &= 0xFF00;
               if(0xA000 == isc) {
  -                headerMessage.getInt(); // To advance the read position
  +                requestHeaderMessage.getInt(); // To advance the read 
position
                   hName = Constants.headerTransArray[hId - 1];
                   vMB = headers.addValue( hName );
               } else {
  @@ -768,13 +839,13 @@
                   // SC_REQ_CONTENT_LENGTH=8 - leading to unexpected
                   // behaviour.  see bug 5861 for more information.
                   hId = -1;
  -                headerMessage.getBytes(tmpMB);
  +                requestHeaderMessage.getBytes(tmpMB);
                   ByteChunk bc = tmpMB.getByteChunk();
                   vMB = headers.addValue(bc.getBuffer(),
                           bc.getStart(), bc.getLength());
               }
               
  -            headerMessage.getBytes(vMB);
  +            requestHeaderMessage.getBytes(vMB);
               
               if (hId == Constants.SC_REQ_CONTENT_LENGTH ||
                       (hId == -1 && tmpMB.equalsIgnoreCase("Content-Length"))) 
{
  @@ -794,7 +865,7 @@
           boolean moreAttr = true;
   
           while (moreAttr) {
  -            byte attributeCode = headerMessage.getByte();
  +            byte attributeCode = requestHeaderMessage.getByte();
               if (attributeCode == Constants.SC_A_ARE_DONE)
                   break;
   
  @@ -803,15 +874,15 @@
               if (attributeCode == Constants.SC_A_SSL_KEY_SIZE) {
                   // Bug 1326: it's an Integer.
                   request.setAttribute(SSLSupport.KEY_SIZE_KEY,
  -                                 new Integer(headerMessage.getInt()));
  +                                 new Integer(requestHeaderMessage.getInt()));
                  //Integer.toString(msg.getInt()));
               }
   
               if (attributeCode == Constants.SC_A_REQ_ATTRIBUTE ) {
                   // 2 strings ???...
  -                headerMessage.getBytes(tmpMB);
  +                requestHeaderMessage.getBytes(tmpMB);
                   String n = tmpMB.toString();
  -                headerMessage.getBytes(tmpMB);
  +                requestHeaderMessage.getBytes(tmpMB);
                   String v = tmpMB.toString();
                   request.setAttribute(n, v);
                   if (log.isTraceEnabled())
  @@ -821,63 +892,63 @@
               // 1 string attributes
               switch (attributeCode) {
               case Constants.SC_A_CONTEXT :
  -                headerMessage.getBytes(tmpMB);
  +                requestHeaderMessage.getBytes(tmpMB);
                   // nothing
                   break;
                   
               case Constants.SC_A_SERVLET_PATH :
  -                headerMessage.getBytes(tmpMB);
  +                requestHeaderMessage.getBytes(tmpMB);
                   // nothing 
                   break;
                   
               case Constants.SC_A_REMOTE_USER :
                   if (tomcatAuthentication) {
                       // ignore server
  -                    headerMessage.getBytes(tmpMB);
  +                    requestHeaderMessage.getBytes(tmpMB);
                   } else {
  -                    headerMessage.getBytes(request.getRemoteUser());
  +                    requestHeaderMessage.getBytes(request.getRemoteUser());
                   }
                   break;
                   
               case Constants.SC_A_AUTH_TYPE :
                   if (tomcatAuthentication) {
                       // ignore server
  -                    headerMessage.getBytes(tmpMB);
  +                    requestHeaderMessage.getBytes(tmpMB);
                   } else {
  -                    headerMessage.getBytes(request.getAuthType());
  +                    requestHeaderMessage.getBytes(request.getAuthType());
                   }
                   break;
                   
               case Constants.SC_A_QUERY_STRING :
  -                headerMessage.getBytes(request.queryString());
  +                requestHeaderMessage.getBytes(request.queryString());
                   break;
                   
               case Constants.SC_A_JVM_ROUTE :
  -                headerMessage.getBytes(request.instanceId());
  +                requestHeaderMessage.getBytes(request.instanceId());
                   break;
                   
               case Constants.SC_A_SSL_CERT :
                   request.scheme().setString("https");
                   // SSL certificate extraction is costy, moved to 
JkCoyoteHandler
  -                headerMessage.getBytes(certificates);
  +                requestHeaderMessage.getBytes(certificates);
                   break;
                   
               case Constants.SC_A_SSL_CIPHER   :
                   request.scheme().setString( "https" );
  -                headerMessage.getBytes(tmpMB);
  +                requestHeaderMessage.getBytes(tmpMB);
                   request.setAttribute(SSLSupport.CIPHER_SUITE_KEY,
                                    tmpMB.toString());
                   break;
                   
               case Constants.SC_A_SSL_SESSION  :
                   request.scheme().setString( "https" );
  -                headerMessage.getBytes(tmpMB);
  +                requestHeaderMessage.getBytes(tmpMB);
                   request.setAttribute(SSLSupport.SESSION_ID_KEY, 
                                     tmpMB.toString());
                   break;
                   
               case Constants.SC_A_SECRET  :
  -                headerMessage.getBytes(tmpMB);
  +                requestHeaderMessage.getBytes(tmpMB);
                   String secret = tmpMB.toString();
                   if(log.isInfoEnabled())
                       log.info("Secret: " + secret);
  @@ -886,7 +957,7 @@
                   break;
                   
               case Constants.SC_A_STORED_METHOD:
  -                headerMessage.getBytes(request.method()); 
  +                requestHeaderMessage.getBytes(request.method()); 
                   break;
                   
               default:
  @@ -1013,9 +1084,9 @@
   
           response.setCommitted(true);
           
  -        outputMessage.reset();
  -        outputMessage.appendByte(Constants.JK_AJP13_SEND_HEADERS);
  -        outputMessage.appendInt(response.getStatus());
  +        responseHeaderMessage.reset();
  +        responseHeaderMessage.appendByte(Constants.JK_AJP13_SEND_HEADERS);
  +        responseHeaderMessage.appendInt(response.getStatus());
           
           String message = response.getMessage();
           if (message == null){
  @@ -1024,7 +1095,7 @@
               message = message.replace('\n', ' ').replace('\r', ' ');
           }
           tmpMB.setString(message);
  -        outputMessage.appendBytes(tmpMB);
  +        responseHeaderMessage.appendBytes(tmpMB);
   
           // XXX add headers
           
  @@ -1042,14 +1113,15 @@
               headers.setValue("Content-Length").setInt(contentLength);
           }
           int numHeaders = headers.size();
  -        outputMessage.appendInt(numHeaders);
  +        responseHeaderMessage.appendInt(numHeaders);
           for (int i = 0; i < numHeaders; i++) {
               MessageBytes hN = headers.getName(i);
  -            outputMessage.appendBytes(hN);
  +            responseHeaderMessage.appendBytes(hN);
               MessageBytes hV=headers.getValue(i);
  -            outputMessage.appendBytes(hV);
  +            responseHeaderMessage.appendBytes(hV);
           }
  -        writeMessage(outputMessage);
  +        responseHeaderMessage.end();
  +        outputBuffer.put(responseHeaderMessage.getBuffer(), 0, 
responseHeaderMessage.getLen());
   
       }
   
  @@ -1074,10 +1146,12 @@
               return;
           
           finished = true;
  -        outputMessage.reset();
  -        outputMessage.appendByte(Constants.JK_AJP13_END_RESPONSE);
  -        outputMessage.appendByte(1);
  -        writeMessage(outputMessage);
  +        if (outputBuffer.position() + endMessageArray.length > 
outputBuffer.capacity()) {
  +            flushOutputBuffer();
  +        } else {
  +            outputBuffer.put(endMessageArray);
  +        }
  +        flushOutputBuffer();
       }
       
       
  @@ -1135,16 +1209,11 @@
               return false;
           }
   
  -        // Why not use outBuf??
  -        readBodyMessage.reset();
  -        readBodyMessage.appendByte(Constants.JK_AJP13_GET_BODY_CHUNK);
  -        readBodyMessage.appendInt(Constants.MAX_READ_SIZE);
  -        writeMessage(readBodyMessage);
  -
  -        // In JNI mode, response will be in bodyMsg. In TCP mode, response 
need to be
  -        // read
  +        // Request more data immediately
  +        Socket.sendb(socket, getBodyMessageBuffer, 0, 
  +                getBodyMessageBuffer.position());
   
  -        boolean moreData=receive();
  +        boolean moreData = receive();
           if( !moreData ) {
               endOfStream = true;
           }
  @@ -1208,20 +1277,6 @@
       
       
       /**
  -     * Send the specified AJP message.
  -     *   
  -     * @param message to send
  -     * @throws IOException IO error when writing the message
  -     */
  -    protected void writeMessage(AjpMessage message) 
  -        throws IOException {
  -        message.end();
  -        if (Socket.send(socket, message.getBuffer(), 0, message.getLen()) < 
0)
  -            throw new IOException(sm.getString("iib.failedwrite"));
  -    }
  -
  -
  -    /**
        * Recycle the processor.
        */
       public void recycle() {
  @@ -1235,11 +1290,26 @@
           request.recycle();
           response.recycle();
           certificates.recycle();
  -        headerMessage.reset();
  +        requestHeaderMessage.reset();
  +        outputBuffer.clear();
   
       }
   
   
  +    /**
  +     * Callback to write data from the buffer.
  +     */
  +    protected void flushOutputBuffer()
  +        throws IOException {
  +        if (outputBuffer.position() > 0) {
  +            if (Socket.sendb(socket, outputBuffer, 0, 
outputBuffer.position()) < 0) {
  +                throw new IOException(sm.getString("iib.failedwrite"));
  +            }
  +            outputBuffer.clear();
  +        }
  +    }
  +
  +
       // ------------------------------------- InputStreamInputBuffer Inner 
Class
   
   
  @@ -1306,25 +1376,28 @@
               }
   
               int len = chunk.getLength();
  -            byte buf[] = bodyMessage.getBuffer();
               // 4 - hardcoded, byte[] marshalling overhead 
  -            int chunkSize=buf.length - bodyMessage.getHeaderLength() - 4;
  -            int off=0;
  -            while( len > 0 ) {
  -                int thisTime=len;
  -                if( thisTime > chunkSize ) {
  -                    thisTime=chunkSize;
  +            int chunkSize = 8*1024 - 4 - 4;
  +            int off = 0;
  +            while (len > 0) {
  +                int thisTime = len;
  +                if (thisTime > chunkSize) {
  +                    thisTime = chunkSize;
                   }
                   len -= thisTime;
  -                
  -                // FIXME: Don't use a temp buffer
  -                bodyMessage.reset();
  -                bodyMessage.appendByte( 
AjpConstants.JK_AJP13_SEND_BODY_CHUNK);
  -                if (log.isTraceEnabled()) 
  -                    log.trace("doWrite " + off + " " + thisTime + " " + len);
  -                bodyMessage.appendBytes(chunk.getBytes(), chunk.getOffset() 
+ off, thisTime);
  +                if (outputBuffer.position() + thisTime 
  +                        + bodyMessage.getHeaderLength() + 4 > 
outputBuffer.capacity()) {
  +                    flushOutputBuffer();
  +                } else {
  +                    outputBuffer.put((byte) 0x41);
  +                    outputBuffer.put((byte) 0x42);
  +                    outputBuffer.putShort((short) (thisTime + 4));
  +                    outputBuffer.put(AjpConstants.JK_AJP13_SEND_BODY_CHUNK);
  +                    outputBuffer.putShort((short) chunk.getLength());
  +                    outputBuffer.put(chunk.getBytes(), chunk.getOffset() + 
off, thisTime);
  +                    outputBuffer.put((byte) 0x00);
  +                }
                   off += thisTime;
  -                writeMessage(bodyMessage);
               }
               
               return chunk.getLength();
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to