On 12/09/2014 13:04, Violeta Georgieva wrote:
> Hi,
> 
> 2014-08-22 14:02 GMT+03:00 <ma...@apache.org>:
>>
>> Author: markt
>> Date: Fri Aug 22 11:02:19 2014
>> New Revision: 1619738
>>
>> URL: http://svn.apache.org/r1619738
>> Log:
>> Extend support for the WebSocket permessage-deflate extension to
> compression of outgoing messages on the server side.
> 
> I would like to back-port this to 7.0.x.
> Wdyt?

No objections here. You might want to check the rest of the websocket
package for changes that haven't been back-ported as well.

Mark


> 
> Regards,
> Violeta
> 
> 
>> Modified:
>>     tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java
>>     tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
>>
> tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
>>     tomcat/trunk/webapps/docs/changelog.xml
>>
>> Modified: tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java
>> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java?rev=1619738&r1=1619737&r2=1619738&view=diff
>>
> ==============================================================================
>> --- tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java
> (original)
>> +++ tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java Fri
> Aug 22 11:02:19 2014
>> @@ -25,15 +25,17 @@ class MessagePart {
>>      private final int rsv;
>>      private final byte opCode;
>>      private final ByteBuffer payload;
>> -    private final SendHandler handler;
>> +    private final SendHandler intermediateHandler;
>> +    private volatile SendHandler endHandler;
>>
>>      public MessagePart( boolean fin, int rsv, byte opCode, ByteBuffer
> payload,
>> -            SendHandler handler) {
>> +            SendHandler intermediateHandler, SendHandler endHandler) {
>>          this.fin = fin;
>>          this.rsv = rsv;
>>          this.opCode = opCode;
>>          this.payload = payload;
>> -        this.handler = handler;
>> +        this.intermediateHandler = intermediateHandler;
>> +        this.endHandler = endHandler;
>>      }
>>
>>
>> @@ -57,8 +59,17 @@ class MessagePart {
>>      }
>>
>>
>> -    public SendHandler getHandler() {
>> -        return handler;
>> +    public SendHandler getIntermediateHandler() {
>> +        return intermediateHandler;
>> +    }
>> +
>> +
>> +    public SendHandler getEndHandler() {
>> +        return endHandler;
>> +    }
>> +
>> +    public void setEndHandler(SendHandler endHandler) {
>> +        this.endHandler = endHandler;
>>      }
>>  }
>>
>>
>> Modified:
> tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
>> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java?rev=1619738&r1=1619737&r2=1619738&view=diff
>>
> ==============================================================================
>> --- tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
> (original)
>> +++ tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java
> Fri Aug 22 11:02:19 2014
>> @@ -21,10 +21,12 @@ import java.nio.ByteBuffer;
>>  import java.util.ArrayList;
>>  import java.util.List;
>>  import java.util.zip.DataFormatException;
>> +import java.util.zip.Deflater;
>>  import java.util.zip.Inflater;
>>
>>  import javax.websocket.Extension;
>>  import javax.websocket.Extension.Parameter;
>> +import javax.websocket.SendHandler;
>>
>>  import org.apache.tomcat.util.res.StringManager;
>>
>> @@ -47,10 +49,15 @@ public class PerMessageDeflate implement
>>      private final boolean clientContextTakeover;
>>      private final int clientMaxWindowBits;
>>      private final Inflater inflater = new Inflater(true);
>> -    private final ByteBuffer readBuffer = ByteBuffer.allocate(8192);
>> +    private final ByteBuffer readBuffer =
> ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
>> +    private final Deflater deflater = new
> Deflater(Deflater.DEFAULT_COMPRESSION, true);
>>
>>      private volatile Transformation next;
>>      private volatile boolean skipDecompression = false;
>> +    private volatile ByteBuffer writeBuffer =
> ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
>> +    private volatile boolean deflaterResetRequired = true;
>> +    private volatile boolean firstCompressedFrameWritten = false;
>> +    private volatile byte[] EOM_BUFFER = new byte[EOM_BYTES.length + 1];
>>
>>      static PerMessageDeflate negotiate(List<List<Parameter>>
> preferences) {
>>          // Accept the first preference that the server is able to support
>> @@ -288,25 +295,143 @@ public class PerMessageDeflate implement
>>
>>
>>      @Override
>> -    public List<MessagePart> sendMessagePart(List<MessagePart>
> messageParts) {
>> -        List<MessagePart> compressedParts = new
> ArrayList<>(messageParts.size());
>> +    public List<MessagePart> sendMessagePart(List<MessagePart>
> uncompressedParts) {
>> +        List<MessagePart> allCompressedParts = new ArrayList<>();
>>
>> -        for (MessagePart messagePart : messageParts) {
>> -            byte opCode = messagePart.getOpCode();
>> +        for (MessagePart uncompressedPart : uncompressedParts) {
>> +            byte opCode = uncompressedPart.getOpCode();
>>              if (Util.isControl(opCode)) {
>>                  // Control messages can appear in the middle of other
> messages
>>                  // and must not be compressed. Pass it straight through
>> -                compressedParts.add(messagePart);
>> +                allCompressedParts.add(uncompressedPart);
>>              } else {
>> -                // TODO: Implement compression of sent messages
>> -                compressedParts.add(messagePart);
>> +                List<MessagePart> compressedParts = new ArrayList<>();
>> +                ByteBuffer uncompressedPayload =
> uncompressedPart.getPayload();
>> +                SendHandler uncompressedIntermediateHandler =
>> +                        uncompressedPart.getIntermediateHandler();
>> +
>> +                // Need to reset the deflater at the start of every
> message
>> +                if (deflaterResetRequired) {
>> +                    deflater.reset();
>> +                    deflaterResetRequired = false;
>> +                    firstCompressedFrameWritten = false;
>> +                }
>> +
>> +                deflater.setInput(uncompressedPayload.array(),
>> +                        uncompressedPayload.arrayOffset() +
> uncompressedPayload.position(),
>> +                        uncompressedPayload.remaining());
>> +
>> +                int flush = (uncompressedPart.isFin() ?
> Deflater.SYNC_FLUSH : Deflater.NO_FLUSH);
>> +                boolean deflateRequired = true;
>> +
>> +                while(deflateRequired) {
>> +                    ByteBuffer compressedPayload = writeBuffer;
>> +
>> +                    int written =
> deflater.deflate(compressedPayload.array(),
>> +                            compressedPayload.arrayOffset() +
> compressedPayload.position(),
>> +                            compressedPayload.remaining(), flush);
>> +
>  compressedPayload.position(compressedPayload.position() + written);
>> +
>> +                    if (!uncompressedPart.isFin() &&
> compressedPayload.hasRemaining() && deflater.needsInput()) {
>> +                        // This message part has been fully processed by
> the
>> +                        // deflater. Fire the send handler for this
> message part
>> +                        // and move on to the next message part.
>> +                        break;
>> +                    }
>> +
>> +                    // If this point is reached, a new compressed
> message part
>> +                    // will be created...
>> +                    MessagePart compressedPart;
>> +
>> +                    // .. and a new writeBuffer will be required.
>> +                    writeBuffer =
> ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
>> +
>> +                    // Flip the compressed payload ready for writing
>> +                    compressedPayload.flip();
>> +
>> +                    boolean fin = uncompressedPart.isFin();
>> +                    boolean full = compressedPayload.limit() ==
> compressedPayload.capacity();
>> +                    boolean needsInput = deflater.needsInput();
>> +
>> +                    if (fin && !full && needsInput) {
>> +                        // End of compressed message. Drop EOM bytes and
> output.
>> +
>  compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length);
>> +                        compressedPart = new MessagePart(true,
> getRsv(uncompressedPart),
>> +                                opCode, compressedPayload,
> uncompressedIntermediateHandler,
>> +                                uncompressedIntermediateHandler);
>> +                        deflaterResetRequired = true;
>> +                        deflateRequired = false;
>> +                    } else if (full && !needsInput) {
>> +                        // Write buffer full and input message not fully
> read.
>> +                        // Output and start new compressed part.
>> +                        compressedPart = new MessagePart(false,
> getRsv(uncompressedPart),
>> +                                opCode, compressedPayload,
> uncompressedIntermediateHandler,
>> +                                uncompressedIntermediateHandler);
>> +                    } else if (!fin && full && needsInput) {
>> +                        // Write buffer full and input message not fully
> read.
>> +                        // Output and get more data.
>> +                        compressedPart = new MessagePart(false,
> getRsv(uncompressedPart),
>> +                                opCode, compressedPayload,
> uncompressedIntermediateHandler,
>> +                                uncompressedIntermediateHandler);
>> +                        deflateRequired = false;
>> +                    } else if (fin && full && needsInput) {
>> +                        // Write buffer full. Input fully read. Deflater
> may be
>> +                        // in one of four states:
>> +                        // - output complete (just happened to align
> with end of
>> +                        //   buffer
>> +                        // - in middle of EOM bytes
>> +                        // - about to write EOM bytes
>> +                        // - more data to write
>> +                        int eomBufferWritten =
> deflater.deflate(EOM_BUFFER, 0, EOM_BUFFER.length, Deflater.SYNC_FLUSH);
>> +                        if (eomBufferWritten < EOM_BUFFER.length) {
>> +                            // EOM has just been completed
>> +
>  compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length +
> eomBufferWritten);
>> +                            compressedPart = new MessagePart(true,
>> +                                    getRsv(uncompressedPart), opCode,
> compressedPayload,
>> +                                    uncompressedIntermediateHandler,
> uncompressedIntermediateHandler);
>> +                            deflaterResetRequired = true;
>> +                            deflateRequired = false;
>> +                        } else {
>> +                            // More data to write
>> +                            // Copy bytes to new write buffer
>> +                            writeBuffer.put(EOM_BUFFER, 0,
> eomBufferWritten);
>> +                            compressedPart = new MessagePart(false,
>> +                                    getRsv(uncompressedPart), opCode,
> compressedPayload,
>> +                                    uncompressedIntermediateHandler,
> uncompressedIntermediateHandler);
>> +                        }
>> +                    } else {
>> +                        throw new IllegalStateException("Should never
> happen");
>> +                    }
>> +
>> +                    // Add the newly created compressed part to the set
> of parts
>> +                    // to pass on to the next transformation.
>> +                    compressedParts.add(compressedPart);
>> +                }
>> +
>> +                SendHandler uncompressedEndHandler =
> uncompressedPart.getEndHandler();
>> +                int size = compressedParts.size();
>> +                if (size > 0) {
>> +                    compressedParts.get(size -
> 1).setEndHandler(uncompressedEndHandler);
>> +                }
>> +
>> +                allCompressedParts.addAll(compressedParts);
>>              }
>>          }
>>
>>          if (next == null) {
>> -            return compressedParts;
>> +            return allCompressedParts;
>>          } else {
>> -            return next.sendMessagePart(compressedParts);
>> +            return next.sendMessagePart(allCompressedParts);
>>          }
>>      }
>> +
>> +
>> +    private int getRsv(MessagePart uncompressedMessagePart) {
>> +        int result = uncompressedMessagePart.getRsv();
>> +        if (!firstCompressedFrameWritten) {
>> +            result += RSV_BITMASK;
>> +            firstCompressedFrameWritten = true;
>> +        }
>> +        return result;
>> +    }
>>  }
>>
>> Modified:
> tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
>> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1619738&r1=1619737&r2=1619738&view=diff
>>
> ==============================================================================
>> ---
> tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
> (original)
>> +++
> tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
> Fri Aug 22 11:02:19 2014
>> @@ -61,6 +61,9 @@ public abstract class WsRemoteEndpointIm
>>
>>      private final StateMachine stateMachine = new StateMachine();
>>
>> +    private final IntermediateMessageHandler intermediateMessageHandler =
>> +            new IntermediateMessageHandler(this);
>> +
>>      private Transformation transformation = null;
>>      private boolean messagePartInProgress = false;
>>      private final Queue<MessagePart> messagePartQueue = new
> ArrayDeque<>();
>> @@ -258,10 +261,19 @@ public abstract class WsRemoteEndpointIm
>>
>>          List<MessagePart> messageParts = new ArrayList<>();
>>          messageParts.add(new MessagePart(last, 0, opCode, payload,
>> +                intermediateMessageHandler,
>>                  new EndMessageHandler(this, handler)));
>>
>>          messageParts = transformation.sendMessagePart(messageParts);
>>
>> +        // Some extensions/transformations may buffer messages so it is
> possible
>> +        // that no message parts will be returned. If this is the case
> the
>> +        // trigger the suppler SendHandler
>> +        if (messageParts.size() == 0) {
>> +            handler.onResult(new SendResult());
>> +            return;
>> +        }
>> +
>>          MessagePart mp = messageParts.remove(0);
>>
>>          boolean doWrite = false;
>> @@ -329,12 +341,15 @@ public abstract class WsRemoteEndpointIm
>>
>>          wsSession.updateLastActive();
>>
>> -        handler.onResult(result);
>> +        // Some handlers, such as the IntermediateMessageHandler, do not
> have a
>> +        // nested handler so handler may be null.
>> +        if (handler != null) {
>> +            handler.onResult(result);
>> +        }
>>      }
>>
>>
>>      void writeMessagePart(MessagePart mp) {
>> -
>>          if (closed) {
>>              throw new IllegalStateException(
>>                      sm.getString("wsRemoteEndpoint.closed"));
>> @@ -343,7 +358,7 @@ public abstract class WsRemoteEndpointIm
>>          if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) {
>>              nextFragmented = fragmented;
>>              nextText = text;
>> -            doWrite(mp.getHandler(), outputBuffer);
>> +            doWrite(mp.getEndHandler(), outputBuffer);
>>              return;
>>          }
>>
>> @@ -397,14 +412,13 @@ public abstract class WsRemoteEndpointIm
>>          if (getBatchingAllowed() || isMasked()) {
>>              // Need to write via output buffer
>>              OutputBufferSendHandler obsh = new OutputBufferSendHandler(
>> -                    mp.getHandler(), headerBuffer, mp.getPayload(), mask,
>> +                    mp.getEndHandler(), headerBuffer, mp.getPayload(),
> mask,
>>                      outputBuffer, !getBatchingAllowed(), this);
>>              obsh.write();
>>          } else {
>>              // Can write directly
>> -            doWrite(mp.getHandler(), headerBuffer, mp.getPayload());
>> +            doWrite(mp.getEndHandler(), headerBuffer, mp.getPayload());
>>          }
>> -
>>      }
>>
>>
>> @@ -446,6 +460,31 @@ public abstract class WsRemoteEndpointIm
>>      }
>>
>>
>> +    /**
>> +     * If a transformation needs to split a {@link MessagePart} into
> multiple
>> +     * {@link MessagePart}s, it uses this handler as the end handler for
> each of
>> +     * the additional {@link MessagePart}s. This handler notifies this
> this
>> +     * class that the {@link MessagePart} has been processed and that
> the next
>> +     * {@link MessagePart} in the queue should be started. The final
>> +     * {@link MessagePart} will use the {@link EndMessageHandler}
> provided with
>> +     * the original {@link MessagePart}.
>> +     */
>> +    private static class IntermediateMessageHandler implements
> SendHandler {
>> +
>> +        private final WsRemoteEndpointImplBase endpoint;
>> +
>> +        public IntermediateMessageHandler(WsRemoteEndpointImplBase
> endpoint) {
>> +            this.endpoint = endpoint;
>> +        }
>> +
>> +
>> +        @Override
>> +        public void onResult(SendResult result) {
>> +            endpoint.endMessage(null, result);
>> +        }
>> +    }
>> +
>> +
>>      public void sendObject(Object obj) throws IOException {
>>          Future<Void> f = sendObjectByFuture(obj);
>>          try {
>>
>> Modified: tomcat/trunk/webapps/docs/changelog.xml
>> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1619738&r1=1619737&r2=1619738&view=diff
>>
> ==============================================================================
>> --- tomcat/trunk/webapps/docs/changelog.xml (original)
>> +++ tomcat/trunk/webapps/docs/changelog.xml Fri Aug 22 11:02:19 2014
>> @@ -54,6 +54,14 @@
>>        </fix>
>>      </changelog>
>>    </subsection>
>> +  <subsection name="WebSocket">
>> +    <changelog>
>> +      <add>
>> +        Extend support for the <code>permessage-deflate</code> extension
> to
>> +        compression of outgoing messages on the server side. (markt)
>> +      </add>
>> +    </changelog>
>> +  </subsection>
>>    <subsection name="Other">
>>      <changelog>
>>        <add>
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
>> For additional commands, e-mail: dev-h...@tomcat.apache.org
>>
> 


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to