remm        2005/06/23 09:31:55

  Modified:    jk/java/org/apache/coyote/ajp AjpAprProcessor.java
  Log:
  - Redo input, but somehow with no additional performance increase :(
  - My measurements seem to indicate that this is faster than regular AJP (which
    could probably use a more aggressive output buffering like I did), but with
    everything running on localhost, it's not exactly that great a benchmark.
    Poller only keepalives (firstReadTimeout="0" pollTime="2000") seems to be
    about as fast as regular AJP, which will mean minimum thread usage in Tomcat
    at no cost (provided previous performance levels were enough). Another
    fiding is that about 50% of the CPU usage is in Apache for some reason.
  - Hopefully this is reasonably bug free (don't expect that much though).
  
  Revision  Changes    Path
  1.7       +143 -87   
jakarta-tomcat-connectors/jk/java/org/apache/coyote/ajp/AjpAprProcessor.java
  
  Index: AjpAprProcessor.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-connectors/jk/java/org/apache/coyote/ajp/AjpAprProcessor.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- AjpAprProcessor.java      23 Jun 2005 11:50:34 -0000      1.6
  +++ AjpAprProcessor.java      23 Jun 2005 16:31:55 -0000      1.7
  @@ -96,34 +96,9 @@
   
           // Allocate input and output buffers
           inputBuffer = ByteBuffer.allocateDirect(16 * 1024);
  +        inputBuffer.limit(0);
           outputBuffer = ByteBuffer.allocateDirect(16 * 1024);
   
  -        // Set the get body message buffer
  -        AjpMessage getBodyMessage = new AjpMessage();
  -        getBodyMessage.reset();
  -        getBodyMessage.appendByte(Constants.JK_AJP13_GET_BODY_CHUNK);
  -        getBodyMessage.appendInt(Constants.MAX_READ_SIZE);
  -        getBodyMessage.end();
  -        getBodyMessageBuffer = 
ByteBuffer.allocateDirect(getBodyMessage.getLen());
  -        getBodyMessageBuffer.put(getBodyMessage.getBuffer(), 0, 
getBodyMessage.getLen());
  -
  -        // Set the read body message buffer
  -        AjpMessage pongMessage = new AjpMessage();
  -        pongMessage.reset();
  -        pongMessage.appendByte(Constants.JK_AJP13_CPONG_REPLY);
  -        pongMessage.end();
  -        pongMessageBuffer = ByteBuffer.allocateDirect(pongMessage.getLen());
  -        pongMessageBuffer.put(pongMessage.getBuffer(), 0, 
pongMessage.getLen());
  -
  -        // Allocate the end message array
  -        AjpMessage endMessage = new AjpMessage();
  -        endMessage.reset();
  -        endMessage.appendByte(Constants.JK_AJP13_END_RESPONSE);
  -        endMessage.appendByte(1);
  -        endMessage.end();
  -        endMessageArray = new byte[endMessage.getLen()];
  -        System.arraycopy(endMessage.getBuffer(), 0, endMessageArray, 0, 
endMessage.getLen());
  -
       }
   
   
  @@ -333,23 +308,58 @@
       
       /**
        * Direct buffer used for sending right away a get body message.
  -     * FIXME: can probably be static
        */
  -    protected ByteBuffer getBodyMessageBuffer = null;
  +    protected static final ByteBuffer getBodyMessageBuffer;
       
       
       /**
        * Direct buffer used for sending right away a pong message.
  -     * FIXME: can probably be static
        */
  -    protected ByteBuffer pongMessageBuffer = null;
  +    protected static final ByteBuffer pongMessageBuffer;
       
       
       /**
        * End message array.
  -     * FIXME: can probably be static
        */
  -    protected byte[] endMessageArray = null;
  +    protected static final byte[] endMessageArray;
  +    
  +    
  +    // ----------------------------------------------------- Static 
Initializer
  +
  +
  +    static {
  +
  +        // Set the get body message buffer
  +        AjpMessage getBodyMessage = new AjpMessage();
  +        getBodyMessage.reset();
  +        getBodyMessage.appendByte(Constants.JK_AJP13_GET_BODY_CHUNK);
  +        getBodyMessage.appendInt(Constants.MAX_READ_SIZE);
  +        getBodyMessage.end();
  +        getBodyMessageBuffer = 
  +            ByteBuffer.allocateDirect(getBodyMessage.getLen());
  +        getBodyMessageBuffer.put(getBodyMessage.getBuffer(), 0, 
  +                getBodyMessage.getLen());
  +
  +        // Set the read body message buffer
  +        AjpMessage pongMessage = new AjpMessage();
  +        pongMessage.reset();
  +        pongMessage.appendByte(Constants.JK_AJP13_CPONG_REPLY);
  +        pongMessage.end();
  +        pongMessageBuffer = ByteBuffer.allocateDirect(pongMessage.getLen());
  +        pongMessageBuffer.put(pongMessage.getBuffer(), 0, 
  +                pongMessage.getLen());
  +
  +        // Allocate the end message array
  +        AjpMessage endMessage = new AjpMessage();
  +        endMessage.reset();
  +        endMessage.appendByte(Constants.JK_AJP13_END_RESPONSE);
  +        endMessage.appendByte(1);
  +        endMessage.end();
  +        endMessageArray = new byte[endMessage.getLen()];
  +        System.arraycopy(endMessage.getBuffer(), 0, endMessageArray, 0, 
  +                endMessage.getLen());
  +
  +    }
       
       
       // ------------------------------------------------------------- 
Properties
  @@ -444,7 +454,8 @@
               // Parsing the request header
               try {
                   // Get first message of the request
  -                if (!readMessage(requestHeaderMessage, true)) {
  +                if (!readMessage(requestHeaderMessage, true, 
  +                        keptAlive && (endpoint.getCurrentThreadsBusy() > 
limit))) {
                       // This means that no data is available right now
                       // (long keepalive), so that the processor should be 
recycled
                       // and the method should return true
  @@ -507,7 +518,7 @@
               // Finish the response if not done yet
               if (!finished) {
                   try {
  -                    endRequest();
  +                    finish();
                   } catch (Throwable t) {
                       error = true;
                   }
  @@ -591,7 +602,7 @@
               }
   
               try {
  -                flushOutputBuffer();
  +                flush();
               } catch (IOException e) {
                   // Set error flag
                   error = true;
  @@ -604,7 +615,7 @@
               // transactions with the client
   
               try {
  -                endRequest();
  +                finish();
               } catch (IOException e) {
                   // Set error flag
                   error = true;
  @@ -1129,7 +1140,7 @@
       /**
        * Finish AJP response.
        */
  -    protected void endRequest()
  +    protected void finish()
           throws IOException {
   
           if (!response.isCommitted()) {
  @@ -1147,11 +1158,75 @@
           
           finished = true;
           if (outputBuffer.position() + endMessageArray.length > 
outputBuffer.capacity()) {
  -            flushOutputBuffer();
  -        } else {
  -            outputBuffer.put(endMessageArray);
  +            flush();
           }
  -        flushOutputBuffer();
  +        outputBuffer.put(endMessageArray);
  +        flush();
  +    }
  +    
  +    
  +    /**
  +     * Read at least the specified amount of bytes, and place them 
  +     * in the input buffer.
  +     */
  +    protected boolean read(int n)
  +        throws IOException {
  +
  +        if (inputBuffer.capacity() - inputBuffer.limit() <= 
  +                n - inputBuffer.remaining()) {
  +            inputBuffer.compact();
  +            inputBuffer.limit(inputBuffer.position());
  +            inputBuffer.position(0);
  +        }
  +        while (inputBuffer.remaining() < n) {
  +            int nRead = Socket.recvb
  +                (socket, inputBuffer, inputBuffer.limit(), 
  +                        inputBuffer.capacity() - inputBuffer.limit());
  +            if (nRead > 0) {
  +                inputBuffer.limit(inputBuffer.limit() + nRead);
  +            } else {
  +                throw new IOException(sm.getString("iib.failedread"));
  +            }
  +        }
  +        
  +        return true;
  +        
  +    }
  +    
  +    
  +    /**
  +     * Read at least the specified amount of bytes, and place them 
  +     * in the input buffer.
  +     */
  +    protected boolean readt(int n, boolean useAvailableData)
  +        throws IOException {
  +        
  +        if (useAvailableData && inputBuffer.remaining() == 0) {
  +            return false;
  +        }
  +        if (inputBuffer.capacity() - inputBuffer.limit() <= 
  +                n - inputBuffer.remaining()) {
  +            inputBuffer.compact();
  +            inputBuffer.limit(inputBuffer.position());
  +            inputBuffer.position(0);
  +        }
  +        while (inputBuffer.remaining() < n) {
  +            int nRead = Socket.recvbt
  +                (socket, inputBuffer, inputBuffer.limit(), 
  +                        inputBuffer.capacity() - inputBuffer.limit(), 
readTimeout);
  +            if (nRead > 0) {
  +                inputBuffer.limit(inputBuffer.limit() + nRead);
  +            } else {
  +                if ((-nRead) == Status.ETIMEDOUT || (-nRead) == 
Status.TIMEUP) {
  +                    return false;
  +                } else {
  +                    throw new IOException(sm.getString("iib.failedread"));
  +                }
  +            }
  +        }
  +        
  +        return true;
  +        
       }
       
       
  @@ -1162,7 +1237,7 @@
       public boolean receive() throws IOException {
           first = false;
           bodyMessage.reset();
  -        readMessage(bodyMessage, false);
  +        readMessage(bodyMessage, false, false);
           if( log.isDebugEnabled() )
               log.info( "Receiving: getting request body chunk " + 
bodyMessage.getLen() );
           
  @@ -1173,7 +1248,6 @@
               return false;
           }
           int blen = bodyMessage.peekInt();
  -
           if( blen == 0 ) {
               return false;
           }
  @@ -1188,15 +1262,14 @@
           empty = false;
           return true;
       }
  -    
  +
       /**
        * Get more request body data from the web server and store it in the 
        * internal buffer.
        *
        * @return true if there is more data, false if not.    
        */
  -    private boolean refillReadBuffer() throws IOException 
  -    {
  +    private boolean refillReadBuffer() throws IOException {
           // If the server returns an empty packet, assume that that end of
           // the stream has been reached (yuck -- fix protocol??).
           // FORM support
  @@ -1230,52 +1303,30 @@
        *         didn't return anything
        * @throws IOException any other failure, including incomplete reads
        */
  -    protected boolean readMessage(AjpMessage message, boolean first)
  +    protected boolean readMessage(AjpMessage message, boolean first, 
  +            boolean useAvailableData)
           throws IOException {
           
           byte[] buf = message.getBuffer();
           int headerLength = message.getHeaderLength();
   
  -        // Read the message header
  -        // FIXME: better buffering to avoid doing two reads !!!!
           if (first) {
  -            int nRead = Socket.recvt
  -                (socket, buf, 0, headerLength, readTimeout);
  -            // Note: Unlike before, I assume it is not acceptable to do more 
than
  -            // one read for the first four bytes
  -            if (nRead == headerLength) {
  -                message.processHeader();
  -            } else {
  -                if ((-nRead) == Status.ETIMEDOUT || (-nRead) == 
Status.TIMEUP) {
  -                    return false;
  -                } else {
  -                    throw new IOException(sm.getString("iib.failedread"));
  -                }
  +            if (!readt(headerLength, useAvailableData)) {
  +                return false;
               }
           } else {
  -            int nRead = Socket.recv(socket, buf, 0, headerLength);
  -            if (nRead == headerLength) {
  -                message.processHeader();
  -            } else {
  -                throw new IOException(sm.getString("iib.failedread"));
  -            }
  +            read(headerLength);
           }
  +        inputBuffer.get(message.getBuffer(), 0, headerLength);
  +        message.processHeader();
  +        read(message.getLen());
  +        inputBuffer.get(message.getBuffer(), headerLength, message.getLen());
           
  -        // Read the full message body; incomplete reads is a protocol error
  -        int messageLength = message.getLen();
  -        int pos = headerLength;
  -        while (pos < headerLength + messageLength) {
  -            int nRead = Socket.recv(socket, buf, pos, headerLength + 
messageLength - pos);
  -            if (nRead > 0) {
  -                pos += nRead;
  -            } else {
  -                throw new IOException(sm.getString("iib.failedread"));
  -            }
  -        }
           return true;
  +        
       }
  -    
  -    
  +
  +
       /**
        * Recycle the processor.
        */
  @@ -1290,7 +1341,9 @@
           request.recycle();
           response.recycle();
           certificates.recycle();
  -        requestHeaderMessage.reset();
  +
  +        inputBuffer.clear();
  +        inputBuffer.limit(0);
           outputBuffer.clear();
   
       }
  @@ -1299,7 +1352,7 @@
       /**
        * Callback to write data from the buffer.
        */
  -    protected void flushOutputBuffer()
  +    protected void flush()
           throws IOException {
           if (outputBuffer.position() > 0) {
               if (Socket.sendb(socket, outputBuffer, 0, 
outputBuffer.position()) < 0) {
  @@ -1369,10 +1422,13 @@
               throws IOException {
   
               if (!response.isCommitted()) {
  -                // Send the connector a request for commit. The connector 
should
  -                // then validate the headers, send them (using sendHeader) 
and 
  -                // set the filters accordingly.
  -                response.action(ActionCode.ACTION_COMMIT, null);
  +                // Validate and write response headers
  +                try {
  +                    prepareResponse();
  +                } catch (IOException e) {
  +                    // Set error flag
  +                    error = true;
  +                }
               }
   
               int len = chunk.getLength();
  @@ -1387,7 +1443,7 @@
                   len -= thisTime;
                   if (outputBuffer.position() + thisTime 
                           + bodyMessage.getHeaderLength() + 4 > 
outputBuffer.capacity()) {
  -                    flushOutputBuffer();
  +                    flush();
                   } else {
                       outputBuffer.put((byte) 0x41);
                       outputBuffer.put((byte) 0x42);
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to