Author: violetagg Date: Wed Sep 17 10:37:10 2014 New Revision: 1625506 URL: http://svn.apache.org/r1625506 Log: Merged revision 1619738 from tomcat/trunk: Extend support for the WebSocket permessage-deflate extension to compression of outgoing messages on the server side.
Modified: tomcat/tc7.0.x/trunk/ (props changed) tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/MessagePart.java tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Propchange: tomcat/tc7.0.x/trunk/ ------------------------------------------------------------------------------ Merged /tomcat/trunk:r1619738 Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/MessagePart.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/MessagePart.java?rev=1625506&r1=1625505&r2=1625506&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/MessagePart.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/MessagePart.java Wed Sep 17 10:37:10 2014 @@ -25,15 +25,17 @@ class MessagePart { private final int rsv; private final byte opCode; private final ByteBuffer payload; - private final SendHandler handler; + private final SendHandler intermediateHandler; + private volatile SendHandler endHandler; public MessagePart( boolean fin, int rsv, byte opCode, ByteBuffer payload, - SendHandler handler) { + SendHandler intermediateHandler, SendHandler endHandler) { this.fin = fin; this.rsv = rsv; this.opCode = opCode; this.payload = payload; - this.handler = handler; + this.intermediateHandler = intermediateHandler; + this.endHandler = endHandler; } @@ -57,8 +59,17 @@ class MessagePart { } - public SendHandler getHandler() { - return handler; + public SendHandler getIntermediateHandler() { + return intermediateHandler; + } + + + public SendHandler getEndHandler() { + return endHandler; + } + + public void setEndHandler(SendHandler endHandler) { + this.endHandler = endHandler; } } Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java?rev=1625506&r1=1625505&r2=1625506&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/PerMessageDeflate.java Wed Sep 17 10:37:10 2014 @@ -21,10 +21,12 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.zip.DataFormatException; +import java.util.zip.Deflater; import java.util.zip.Inflater; import javax.websocket.Extension; import javax.websocket.Extension.Parameter; +import javax.websocket.SendHandler; import org.apache.tomcat.util.res.StringManager; @@ -47,10 +49,15 @@ public class PerMessageDeflate implement private final boolean clientContextTakeover; private final int clientMaxWindowBits; private final Inflater inflater = new Inflater(true); - private final ByteBuffer readBuffer = ByteBuffer.allocate(8192); + private final ByteBuffer readBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE); + private final Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true); private volatile Transformation next; private volatile boolean skipDecompression = false; + private volatile ByteBuffer writeBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE); + private volatile boolean deflaterResetRequired = true; + private volatile boolean firstCompressedFrameWritten = false; + private volatile byte[] EOM_BUFFER = new byte[EOM_BYTES.length + 1]; static PerMessageDeflate negotiate(List<List<Parameter>> preferences) { // Accept the first preference that the server is able to support @@ -288,25 +295,143 @@ public class PerMessageDeflate implement @Override - public List<MessagePart> sendMessagePart(List<MessagePart> messageParts) { - List<MessagePart> compressedParts = new ArrayList<MessagePart>(messageParts.size()); + public List<MessagePart> sendMessagePart(List<MessagePart> uncompressedParts) { + List<MessagePart> allCompressedParts = new ArrayList<MessagePart>(); - for (MessagePart messagePart : messageParts) { - byte opCode = messagePart.getOpCode(); + for (MessagePart uncompressedPart : uncompressedParts) { + byte opCode = uncompressedPart.getOpCode(); if (Util.isControl(opCode)) { // Control messages can appear in the middle of other messages // and must not be compressed. Pass it straight through - compressedParts.add(messagePart); + allCompressedParts.add(uncompressedPart); } else { - // TODO: Implement compression of sent messages - compressedParts.add(messagePart); + List<MessagePart> compressedParts = new ArrayList<MessagePart>(); + ByteBuffer uncompressedPayload = uncompressedPart.getPayload(); + SendHandler uncompressedIntermediateHandler = + uncompressedPart.getIntermediateHandler(); + + // Need to reset the deflater at the start of every message + if (deflaterResetRequired) { + deflater.reset(); + deflaterResetRequired = false; + firstCompressedFrameWritten = false; + } + + deflater.setInput(uncompressedPayload.array(), + uncompressedPayload.arrayOffset() + uncompressedPayload.position(), + uncompressedPayload.remaining()); + + int flush = (uncompressedPart.isFin() ? Deflater.SYNC_FLUSH : Deflater.NO_FLUSH); + boolean deflateRequired = true; + + while(deflateRequired) { + ByteBuffer compressedPayload = writeBuffer; + + int written = deflater.deflate(compressedPayload.array(), + compressedPayload.arrayOffset() + compressedPayload.position(), + compressedPayload.remaining(), flush); + compressedPayload.position(compressedPayload.position() + written); + + if (!uncompressedPart.isFin() && compressedPayload.hasRemaining() && deflater.needsInput()) { + // This message part has been fully processed by the + // deflater. Fire the send handler for this message part + // and move on to the next message part. + break; + } + + // If this point is reached, a new compressed message part + // will be created... + MessagePart compressedPart; + + // .. and a new writeBuffer will be required. + writeBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE); + + // Flip the compressed payload ready for writing + compressedPayload.flip(); + + boolean fin = uncompressedPart.isFin(); + boolean full = compressedPayload.limit() == compressedPayload.capacity(); + boolean needsInput = deflater.needsInput(); + + if (fin && !full && needsInput) { + // End of compressed message. Drop EOM bytes and output. + compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length); + compressedPart = new MessagePart(true, getRsv(uncompressedPart), + opCode, compressedPayload, uncompressedIntermediateHandler, + uncompressedIntermediateHandler); + deflaterResetRequired = true; + deflateRequired = false; + } else if (full && !needsInput) { + // Write buffer full and input message not fully read. + // Output and start new compressed part. + compressedPart = new MessagePart(false, getRsv(uncompressedPart), + opCode, compressedPayload, uncompressedIntermediateHandler, + uncompressedIntermediateHandler); + } else if (!fin && full && needsInput) { + // Write buffer full and input message not fully read. + // Output and get more data. + compressedPart = new MessagePart(false, getRsv(uncompressedPart), + opCode, compressedPayload, uncompressedIntermediateHandler, + uncompressedIntermediateHandler); + deflateRequired = false; + } else if (fin && full && needsInput) { + // Write buffer full. Input fully read. Deflater may be + // in one of four states: + // - output complete (just happened to align with end of + // buffer + // - in middle of EOM bytes + // - about to write EOM bytes + // - more data to write + int eomBufferWritten = deflater.deflate(EOM_BUFFER, 0, EOM_BUFFER.length, Deflater.SYNC_FLUSH); + if (eomBufferWritten < EOM_BUFFER.length) { + // EOM has just been completed + compressedPayload.limit(compressedPayload.limit() - EOM_BYTES.length + eomBufferWritten); + compressedPart = new MessagePart(true, + getRsv(uncompressedPart), opCode, compressedPayload, + uncompressedIntermediateHandler, uncompressedIntermediateHandler); + deflaterResetRequired = true; + deflateRequired = false; + } else { + // More data to write + // Copy bytes to new write buffer + writeBuffer.put(EOM_BUFFER, 0, eomBufferWritten); + compressedPart = new MessagePart(false, + getRsv(uncompressedPart), opCode, compressedPayload, + uncompressedIntermediateHandler, uncompressedIntermediateHandler); + } + } else { + throw new IllegalStateException("Should never happen"); + } + + // Add the newly created compressed part to the set of parts + // to pass on to the next transformation. + compressedParts.add(compressedPart); + } + + SendHandler uncompressedEndHandler = uncompressedPart.getEndHandler(); + int size = compressedParts.size(); + if (size > 0) { + compressedParts.get(size - 1).setEndHandler(uncompressedEndHandler); + } + + allCompressedParts.addAll(compressedParts); } } if (next == null) { - return compressedParts; + return allCompressedParts; } else { - return next.sendMessagePart(compressedParts); + return next.sendMessagePart(allCompressedParts); } } + + + private int getRsv(MessagePart uncompressedMessagePart) { + int result = uncompressedMessagePart.getRsv(); + if (!firstCompressedFrameWritten) { + result += RSV_BITMASK; + firstCompressedFrameWritten = true; + } + return result; + } } Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1625506&r1=1625505&r2=1625506&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java Wed Sep 17 10:37:10 2014 @@ -61,6 +61,9 @@ public abstract class WsRemoteEndpointIm private final StateMachine stateMachine = new StateMachine(); + private final IntermediateMessageHandler intermediateMessageHandler = + new IntermediateMessageHandler(this); + private Transformation transformation = null; private boolean messagePartInProgress = false; private final Queue<MessagePart> messagePartQueue = new ArrayDeque<MessagePart>(); @@ -261,10 +264,19 @@ public abstract class WsRemoteEndpointIm List<MessagePart> messageParts = new ArrayList<MessagePart>(); messageParts.add(new MessagePart(last, 0, opCode, payload, + intermediateMessageHandler, new EndMessageHandler(this, handler))); messageParts = transformation.sendMessagePart(messageParts); + // Some extensions/transformations may buffer messages so it is possible + // that no message parts will be returned. If this is the case the + // trigger the suppler SendHandler + if (messageParts.size() == 0) { + handler.onResult(new SendResult()); + return; + } + MessagePart mp = messageParts.remove(0); boolean doWrite = false; @@ -332,12 +344,15 @@ public abstract class WsRemoteEndpointIm wsSession.updateLastActive(); - handler.onResult(result); + // Some handlers, such as the IntermediateMessageHandler, do not have a + // nested handler so handler may be null. + if (handler != null) { + handler.onResult(result); + } } void writeMessagePart(MessagePart mp) { - if (closed) { throw new IllegalStateException( sm.getString("wsRemoteEndpoint.closed")); @@ -346,7 +361,7 @@ public abstract class WsRemoteEndpointIm if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) { nextFragmented = fragmented; nextText = text; - doWrite(mp.getHandler(), outputBuffer); + doWrite(mp.getEndHandler(), outputBuffer); return; } @@ -400,14 +415,13 @@ public abstract class WsRemoteEndpointIm if (getBatchingAllowed() || isMasked()) { // Need to write via output buffer OutputBufferSendHandler obsh = new OutputBufferSendHandler( - mp.getHandler(), headerBuffer, mp.getPayload(), mask, + mp.getEndHandler(), headerBuffer, mp.getPayload(), mask, outputBuffer, !getBatchingAllowed(), this); obsh.write(); } else { // Can write directly - doWrite(mp.getHandler(), headerBuffer, mp.getPayload()); + doWrite(mp.getEndHandler(), headerBuffer, mp.getPayload()); } - } @@ -449,6 +463,31 @@ public abstract class WsRemoteEndpointIm } + /** + * If a transformation needs to split a {@link MessagePart} into multiple + * {@link MessagePart}s, it uses this handler as the end handler for each of + * the additional {@link MessagePart}s. This handler notifies this this + * class that the {@link MessagePart} has been processed and that the next + * {@link MessagePart} in the queue should be started. The final + * {@link MessagePart} will use the {@link EndMessageHandler} provided with + * the original {@link MessagePart}. + */ + private static class IntermediateMessageHandler implements SendHandler { + + private final WsRemoteEndpointImplBase endpoint; + + public IntermediateMessageHandler(WsRemoteEndpointImplBase endpoint) { + this.endpoint = endpoint; + } + + + @Override + public void onResult(SendResult result) { + endpoint.endMessage(null, result); + } + } + + public void sendObject(Object obj) throws IOException { Future<Void> f = sendObjectByFuture(obj); try { Modified: tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml?rev=1625506&r1=1625505&r2=1625506&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml (original) +++ tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Wed Sep 17 10:37:10 2014 @@ -251,6 +251,10 @@ It is expected that support will be extended to outgoing messages and to the client side shortly. (markt) </add> + <add> + Extend support for the <code>permessage-deflate</code> extension to + compression of outgoing messages on the server side. (markt) + </add> </changelog> </subsection> <subsection name="Web applications"> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org