On 12/09/2014 13:04, Violeta Georgieva wrote: > Hi, > > 2014-08-22 14:02 GMT+03:00 <ma...@apache.org>: >> >> Author: markt >> Date: Fri Aug 22 11:02:19 2014 >> New Revision: 1619738 >> >> URL: http://svn.apache.org/r1619738 >> Log: >> Extend support for the WebSocket permessage-deflate extension to > compression of outgoing messages on the server side. > > I would like to back-port this to 7.0.x. > Wdyt?
No objections here. You might want to check the rest of the websocket package for changes that haven't been back-ported as well. Mark > > Regards, > Violeta > > >> Modified: >> tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java >> tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java >> > tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java >> tomcat/trunk/webapps/docs/changelog.xml >> >> Modified: tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java >> URL: > http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java?rev=1619738&r1=1619737&r2=1619738&view=diff >> > ============================================================================== >> --- tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java > (original) >> +++ tomcat/trunk/java/org/apache/tomcat/websocket/MessagePart.java Fri > Aug 22 11:02:19 2014 >> @@ -25,15 +25,17 @@ class MessagePart { >> private final int rsv; >> private final byte opCode; >> private final ByteBuffer payload; >> - private final SendHandler handler; >> + private final SendHandler intermediateHandler; >> + private volatile SendHandler endHandler; >> >> public MessagePart( boolean fin, int rsv, byte opCode, ByteBuffer > payload, >> - SendHandler handler) { >> + SendHandler intermediateHandler, SendHandler endHandler) { >> this.fin = fin; >> this.rsv = rsv; >> this.opCode = opCode; >> this.payload = payload; >> - this.handler = handler; >> + this.intermediateHandler = intermediateHandler; >> + this.endHandler = endHandler; >> } >> >> >> @@ -57,8 +59,17 @@ class MessagePart { >> } >> >> >> - public SendHandler getHandler() { >> - return handler; >> + public SendHandler getIntermediateHandler() { >> + return intermediateHandler; >> + } >> + >> + >> + public SendHandler getEndHandler() { >> + return endHandler; >> + } >> + >> + public void setEndHandler(SendHandler endHandler) { >> + this.endHandler = endHandler; >> } >> } >> >> >> Modified: > tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java >> URL: > http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java?rev=1619738&r1=1619737&r2=1619738&view=diff >> > ============================================================================== >> --- tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java > (original) >> +++ tomcat/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java > Fri Aug 22 11:02:19 2014 >> @@ -21,10 +21,12 @@ import java.nio.ByteBuffer; >> import java.util.ArrayList; >> import java.util.List; >> import java.util.zip.DataFormatException; >> +import java.util.zip.Deflater; >> import java.util.zip.Inflater; >> >> import javax.websocket.Extension; >> import javax.websocket.Extension.Parameter; >> +import javax.websocket.SendHandler; >> >> import org.apache.tomcat.util.res.StringManager; >> >> @@ -47,10 +49,15 @@ public class PerMessageDeflate implement >> private final boolean clientContextTakeover; >> private final int clientMaxWindowBits; >> private final Inflater inflater = new Inflater(true); >> - private final ByteBuffer readBuffer = ByteBuffer.allocate(8192); >> + private final ByteBuffer readBuffer = > ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE); >> + private final Deflater deflater = new > Deflater(Deflater.DEFAULT_COMPRESSION, true); >> >> private volatile Transformation next; >> private volatile boolean skipDecompression = false; >> + private volatile ByteBuffer writeBuffer = > ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE); >> + private volatile boolean deflaterResetRequired = true; >> + private volatile boolean firstCompressedFrameWritten = false; >> + private volatile byte[] EOM_BUFFER = new byte[EOM_BYTES.length + 1]; >> >> static PerMessageDeflate negotiate(List<List<Parameter>> > preferences) { >> // Accept the first preference that the server is able to support >> @@ -288,25 +295,143 @@ public class PerMessageDeflate implement >> >> >> @Override >> - public List<MessagePart> sendMessagePart(List<MessagePart> > messageParts) { >> - List<MessagePart> compressedParts = new > ArrayList<>(messageParts.size()); >> + public List<MessagePart> sendMessagePart(List<MessagePart> > uncompressedParts) { >> + List<MessagePart> allCompressedParts = new ArrayList<>(); >> >> - for (MessagePart messagePart : messageParts) { >> - byte opCode = messagePart.getOpCode(); >> + for (MessagePart uncompressedPart : uncompressedParts) { >> + byte opCode = uncompressedPart.getOpCode(); >> if (Util.isControl(opCode)) { >> // Control messages can appear in the middle of other > messages >> // and must not be compressed. Pass it straight through >> - compressedParts.add(messagePart); >> + allCompressedParts.add(uncompressedPart); >> } else { >> - // TODO: Implement compression of sent messages >> - compressedParts.add(messagePart); >> + List<MessagePart> compressedParts = new ArrayList<>(); >> + ByteBuffer uncompressedPayload = > uncompressedPart.getPayload(); >> + SendHandler uncompressedIntermediateHandler = >> + uncompressedPart.getIntermediateHandler(); >> + >> + // Need to reset the deflater at the start of every > message >> + if (deflaterResetRequired) { >> + deflater.reset(); >> + deflaterResetRequired = false; >> + firstCompressedFrameWritten = false; >> + } >> + >> + deflater.setInput(uncompressedPayload.array(), >> + uncompressedPayload.arrayOffset() + > uncompressedPayload.position(), >> + uncompressedPayload.remaining()); >> + >> + int flush = (uncompressedPart.isFin() ? > Deflater.SYNC_FLUSH : Deflater.NO_FLUSH); >> + boolean deflateRequired = true; >> + >> + while(deflateRequired) { >> + ByteBuffer compressedPayload = writeBuffer; >> + >> + int written = > deflater.deflate(compressedPayload.array(), >> + compressedPayload.arrayOffset() + > compressedPayload.position(), >> + compressedPayload.remaining(), flush); >> + > compressedPayload.position(compressedPayload.position() + written); >> + >> + if (!uncompressedPart.isFin() && > compressedPayload.hasRemaining() && deflater.needsInput()) { >> + // This message part has been fully processed by > the >> + // deflater. Fire the send handler for this > message part >> + // and move on to the next message part. >> + break; >> + } >> + >> + // If this point is reached, a new compressed > message part >> + // will be created... >> + MessagePart compressedPart; >> + >> + // .. and a new writeBuffer will be required. >> + writeBuffer = > ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE); >> + >> + // Flip the compressed payload ready for writing >> + compressedPayload.flip(); >> + >> + boolean fin = uncompressedPart.isFin(); >> + boolean full = compressedPayload.limit() == > compressedPayload.capacity(); >> + boolean needsInput = deflater.needsInput(); >> + >> + if (fin && !full && needsInput) { >> + // End of compressed message. Drop EOM bytes and > output. >> + > compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length); >> + compressedPart = new MessagePart(true, > getRsv(uncompressedPart), >> + opCode, compressedPayload, > uncompressedIntermediateHandler, >> + uncompressedIntermediateHandler); >> + deflaterResetRequired = true; >> + deflateRequired = false; >> + } else if (full && !needsInput) { >> + // Write buffer full and input message not fully > read. >> + // Output and start new compressed part. >> + compressedPart = new MessagePart(false, > getRsv(uncompressedPart), >> + opCode, compressedPayload, > uncompressedIntermediateHandler, >> + uncompressedIntermediateHandler); >> + } else if (!fin && full && needsInput) { >> + // Write buffer full and input message not fully > read. >> + // Output and get more data. >> + compressedPart = new MessagePart(false, > getRsv(uncompressedPart), >> + opCode, compressedPayload, > uncompressedIntermediateHandler, >> + uncompressedIntermediateHandler); >> + deflateRequired = false; >> + } else if (fin && full && needsInput) { >> + // Write buffer full. Input fully read. Deflater > may be >> + // in one of four states: >> + // - output complete (just happened to align > with end of >> + // buffer >> + // - in middle of EOM bytes >> + // - about to write EOM bytes >> + // - more data to write >> + int eomBufferWritten = > deflater.deflate(EOM_BUFFER, 0, EOM_BUFFER.length, Deflater.SYNC_FLUSH); >> + if (eomBufferWritten < EOM_BUFFER.length) { >> + // EOM has just been completed >> + > compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length + > eomBufferWritten); >> + compressedPart = new MessagePart(true, >> + getRsv(uncompressedPart), opCode, > compressedPayload, >> + uncompressedIntermediateHandler, > uncompressedIntermediateHandler); >> + deflaterResetRequired = true; >> + deflateRequired = false; >> + } else { >> + // More data to write >> + // Copy bytes to new write buffer >> + writeBuffer.put(EOM_BUFFER, 0, > eomBufferWritten); >> + compressedPart = new MessagePart(false, >> + getRsv(uncompressedPart), opCode, > compressedPayload, >> + uncompressedIntermediateHandler, > uncompressedIntermediateHandler); >> + } >> + } else { >> + throw new IllegalStateException("Should never > happen"); >> + } >> + >> + // Add the newly created compressed part to the set > of parts >> + // to pass on to the next transformation. >> + compressedParts.add(compressedPart); >> + } >> + >> + SendHandler uncompressedEndHandler = > uncompressedPart.getEndHandler(); >> + int size = compressedParts.size(); >> + if (size > 0) { >> + compressedParts.get(size - > 1).setEndHandler(uncompressedEndHandler); >> + } >> + >> + allCompressedParts.addAll(compressedParts); >> } >> } >> >> if (next == null) { >> - return compressedParts; >> + return allCompressedParts; >> } else { >> - return next.sendMessagePart(compressedParts); >> + return next.sendMessagePart(allCompressedParts); >> } >> } >> + >> + >> + private int getRsv(MessagePart uncompressedMessagePart) { >> + int result = uncompressedMessagePart.getRsv(); >> + if (!firstCompressedFrameWritten) { >> + result += RSV_BITMASK; >> + firstCompressedFrameWritten = true; >> + } >> + return result; >> + } >> } >> >> Modified: > tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java >> URL: > http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1619738&r1=1619737&r2=1619738&view=diff >> > ============================================================================== >> --- > tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java > (original) >> +++ > tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java > Fri Aug 22 11:02:19 2014 >> @@ -61,6 +61,9 @@ public abstract class WsRemoteEndpointIm >> >> private final StateMachine stateMachine = new StateMachine(); >> >> + private final IntermediateMessageHandler intermediateMessageHandler = >> + new IntermediateMessageHandler(this); >> + >> private Transformation transformation = null; >> private boolean messagePartInProgress = false; >> private final Queue<MessagePart> messagePartQueue = new > ArrayDeque<>(); >> @@ -258,10 +261,19 @@ public abstract class WsRemoteEndpointIm >> >> List<MessagePart> messageParts = new ArrayList<>(); >> messageParts.add(new MessagePart(last, 0, opCode, payload, >> + intermediateMessageHandler, >> new EndMessageHandler(this, handler))); >> >> messageParts = transformation.sendMessagePart(messageParts); >> >> + // Some extensions/transformations may buffer messages so it is > possible >> + // that no message parts will be returned. If this is the case > the >> + // trigger the suppler SendHandler >> + if (messageParts.size() == 0) { >> + handler.onResult(new SendResult()); >> + return; >> + } >> + >> MessagePart mp = messageParts.remove(0); >> >> boolean doWrite = false; >> @@ -329,12 +341,15 @@ public abstract class WsRemoteEndpointIm >> >> wsSession.updateLastActive(); >> >> - handler.onResult(result); >> + // Some handlers, such as the IntermediateMessageHandler, do not > have a >> + // nested handler so handler may be null. >> + if (handler != null) { >> + handler.onResult(result); >> + } >> } >> >> >> void writeMessagePart(MessagePart mp) { >> - >> if (closed) { >> throw new IllegalStateException( >> sm.getString("wsRemoteEndpoint.closed")); >> @@ -343,7 +358,7 @@ public abstract class WsRemoteEndpointIm >> if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) { >> nextFragmented = fragmented; >> nextText = text; >> - doWrite(mp.getHandler(), outputBuffer); >> + doWrite(mp.getEndHandler(), outputBuffer); >> return; >> } >> >> @@ -397,14 +412,13 @@ public abstract class WsRemoteEndpointIm >> if (getBatchingAllowed() || isMasked()) { >> // Need to write via output buffer >> OutputBufferSendHandler obsh = new OutputBufferSendHandler( >> - mp.getHandler(), headerBuffer, mp.getPayload(), mask, >> + mp.getEndHandler(), headerBuffer, mp.getPayload(), > mask, >> outputBuffer, !getBatchingAllowed(), this); >> obsh.write(); >> } else { >> // Can write directly >> - doWrite(mp.getHandler(), headerBuffer, mp.getPayload()); >> + doWrite(mp.getEndHandler(), headerBuffer, mp.getPayload()); >> } >> - >> } >> >> >> @@ -446,6 +460,31 @@ public abstract class WsRemoteEndpointIm >> } >> >> >> + /** >> + * If a transformation needs to split a {@link MessagePart} into > multiple >> + * {@link MessagePart}s, it uses this handler as the end handler for > each of >> + * the additional {@link MessagePart}s. This handler notifies this > this >> + * class that the {@link MessagePart} has been processed and that > the next >> + * {@link MessagePart} in the queue should be started. The final >> + * {@link MessagePart} will use the {@link EndMessageHandler} > provided with >> + * the original {@link MessagePart}. >> + */ >> + private static class IntermediateMessageHandler implements > SendHandler { >> + >> + private final WsRemoteEndpointImplBase endpoint; >> + >> + public IntermediateMessageHandler(WsRemoteEndpointImplBase > endpoint) { >> + this.endpoint = endpoint; >> + } >> + >> + >> + @Override >> + public void onResult(SendResult result) { >> + endpoint.endMessage(null, result); >> + } >> + } >> + >> + >> public void sendObject(Object obj) throws IOException { >> Future<Void> f = sendObjectByFuture(obj); >> try { >> >> Modified: tomcat/trunk/webapps/docs/changelog.xml >> URL: > http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1619738&r1=1619737&r2=1619738&view=diff >> > ============================================================================== >> --- tomcat/trunk/webapps/docs/changelog.xml (original) >> +++ tomcat/trunk/webapps/docs/changelog.xml Fri Aug 22 11:02:19 2014 >> @@ -54,6 +54,14 @@ >> </fix> >> </changelog> >> </subsection> >> + <subsection name="WebSocket"> >> + <changelog> >> + <add> >> + Extend support for the <code>permessage-deflate</code> extension > to >> + compression of outgoing messages on the server side. (markt) >> + </add> >> + </changelog> >> + </subsection> >> <subsection name="Other"> >> <changelog> >> <add> >> >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org >> For additional commands, e-mail: dev-h...@tomcat.apache.org >> > --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org