Author: markt Date: Thu Nov 20 06:30:08 2014 New Revision: 1640688 URL: http://svn.apache.org/r1640688 Log: Fix various problems identified with flushing batched messages: - Flush triggered by disabling batching failed to flip buffer before writing and also failed to clear the buffer after writing was complete. This resulted in duplicated and/or corrupted messages. - The flush triggered by session close was too late since no writes are permitted once the close process starts. This resulted in an exception being thrown.
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java Modified: tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties?rev=1640688&r1=1640687&r2=1640688&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties Thu Nov 20 06:30:08 2014 @@ -70,7 +70,7 @@ wsRemoteEndpoint.closedOutputStream=This wsRemoteEndpoint.closedWriter=This method may not be called as the Writer has been closed wsRemoteEndpoint.changeType=When sending a fragmented message, all fragments bust be of the same type wsRemoteEndpoint.concurrentMessageSend=Messages may not be sent concurrently even when using the asynchronous send messages. The client must wait for the previous message to complete before sending the next. -wsRemoteEndpoint.flushOnCloseFailed=Flushing batched messages before closing the session failed +wsRemoteEndpoint.flushOnCloseFailed=Batched messages still enabled after session has been closed. Unable to flush remaining batched message. wsRemoteEndpoint.invalidEncoder=The specified encoder of type [{0}] could not be instantiated wsRemoteEndpoint.noEncoder=No encoder specified for object of class [{0}] wsRemoteEndpoint.wrongState=The remote endpoint was in state [{0}] which is an invalid state for called method @@ -88,6 +88,7 @@ wsSession.duplicateHandlerBinary=A binar wsSession.duplicateHandlerPong=A pong message handler has already been configured wsSession.duplicateHandlerText=A text message handler has already been configured wsSession.invalidHandlerTypePong=A pong message handler must implement MessageHandler.Basic +wsSession.flushFailOnClose=Failed to flush batched messages on session close wsSession.messageFailed=Unable to write the complete message as the WebSocket connection has been closed wsSession.sendCloseFail=Failed to send close message to remote endpoint wsSession.removeHandlerFailed=Unable to remove the handler [{0}] as it was not registered with this session Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1640688&r1=1640687&r2=1640688&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java Thu Nov 20 06:30:08 2014 @@ -302,13 +302,10 @@ public abstract class WsRemoteEndpointIm boolean doWrite = false; synchronized (messagePartLock) { - if (Constants.OPCODE_CLOSE == mp.getOpCode()) { - try { - setBatchingAllowed(false); - } catch (IOException e) { - log.warn(sm.getString( - "wsRemoteEndpoint.flushOnCloseFailed"), e); - } + if (Constants.OPCODE_CLOSE == mp.getOpCode() && getBatchingAllowed()) { + // Should not happen. To late to send batched messages now since + // the session has been closed. Complain loudly. + log.warn(sm.getString("wsRemoteEndpoint.flushOnCloseFailed")); } if (messagePartInProgress) { // When a control message is sent while another message is being @@ -382,7 +379,10 @@ public abstract class WsRemoteEndpointIm if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) { nextFragmented = fragmented; nextText = text; - doWrite(mp.getEndHandler(), outputBuffer); + outputBuffer.flip(); + SendHandler flushHandler = new OutputBufferFlushSendHandler( + outputBuffer, mp.getEndHandler()); + doWrite(flushHandler, outputBuffer); return; } @@ -836,6 +836,30 @@ public abstract class WsRemoteEndpointIm } } + + /** + * Ensures that tne output buffer is cleared after it has been flushed. + */ + private static class OutputBufferFlushSendHandler implements SendHandler { + + private final ByteBuffer outputBuffer; + private final SendHandler handler; + + public OutputBufferFlushSendHandler(ByteBuffer outputBuffer, SendHandler handler) { + this.outputBuffer = outputBuffer; + this.handler = handler; + } + + @Override + public void onResult(SendResult result) { + if (result.isOK()) { + outputBuffer.clear(); + } + handler.onResult(result); + } + } + + private class WsOutputStream extends OutputStream { private final WsRemoteEndpointImplBase endpoint; Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java?rev=1640688&r1=1640687&r2=1640688&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java Thu Nov 20 06:30:08 2014 @@ -459,6 +459,13 @@ public class WsSession implements Sessio return; } + try { + wsRemoteEndpoint.setBatchingAllowed(false); + } catch (IOException e) { + log.warn(sm.getString("wsSession.flushFailOnClose"), e); + fireEndpointOnError(e); + } + state = State.CLOSING; sendCloseMessage(closeReasonMessage); @@ -487,6 +494,12 @@ public class WsSession implements Sessio synchronized (stateLock) { if (state == State.OPEN) { + try { + wsRemoteEndpoint.setBatchingAllowed(false); + } catch (IOException e) { + log.warn(sm.getString("wsSession.flushFailOnClose"), e); + fireEndpointOnError(e); + } sendCloseMessage(closeReason); fireEndpointOnClose(closeReason); state = State.CLOSED; @@ -497,7 +510,6 @@ public class WsSession implements Sessio } } - private void fireEndpointOnClose(CloseReason closeReason) { // Fire the onClose event @@ -515,6 +527,21 @@ public class WsSession implements Sessio } + + private void fireEndpointOnError(Throwable throwable) { + + // Fire the onError event + Thread t = Thread.currentThread(); + ClassLoader cl = t.getContextClassLoader(); + t.setContextClassLoader(applicationClassLoader); + try { + localEndpoint.onError(this, throwable); + } finally { + t.setContextClassLoader(cl); + } + } + + private void sendCloseMessage(CloseReason closeReason) { // 125 is maximum size for the payload of a control message ByteBuffer msg = ByteBuffer.allocate(125); Modified: tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java?rev=1640688&r1=1640687&r2=1640688&view=diff ============================================================================== --- tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java (original) +++ tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java Thu Nov 20 06:30:08 2014 @@ -119,10 +119,14 @@ public class TesterFirehoseServer { for (int i = 0; i < MESSAGE_COUNT; i++) { remote.sendText(MESSAGE); + if (i % (MESSAGE_COUNT * 0.4) == 0) { + remote.setBatchingAllowed(false); + remote.setBatchingAllowed(true); + } } - // Ensure remaining messages are flushed - remote.setBatchingAllowed(false); + // Flushing should happen automatically on session close + session.close(); } @OnError --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org