Author: markt
Date: Thu Nov 20 06:30:08 2014
New Revision: 1640688

URL: http://svn.apache.org/r1640688
Log:
Fix various problems identified with flushing batched messages:
- Flush triggered by disabling batching failed to flip buffer before writing 
and also failed to clear the buffer after writing was complete. This resulted 
in duplicated and/or corrupted messages.
- The flush triggered by session close was too late since no writes are 
permitted once the close process starts. This resulted in an exception being 
thrown.

Modified:
    tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
    tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
    tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
    tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties?rev=1640688&r1=1640687&r2=1640688&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties Thu 
Nov 20 06:30:08 2014
@@ -70,7 +70,7 @@ wsRemoteEndpoint.closedOutputStream=This
 wsRemoteEndpoint.closedWriter=This method may not be called as the Writer has 
been closed
 wsRemoteEndpoint.changeType=When sending a fragmented message, all fragments 
bust be of the same type
 wsRemoteEndpoint.concurrentMessageSend=Messages may not be sent concurrently 
even when using the asynchronous send messages. The client must wait for the 
previous message to complete before sending the next.
-wsRemoteEndpoint.flushOnCloseFailed=Flushing batched messages before closing 
the session failed
+wsRemoteEndpoint.flushOnCloseFailed=Batched messages still enabled after 
session has been closed. Unable to flush remaining batched message.
 wsRemoteEndpoint.invalidEncoder=The specified encoder of type [{0}] could not 
be instantiated
 wsRemoteEndpoint.noEncoder=No encoder specified for object of class [{0}]
 wsRemoteEndpoint.wrongState=The remote endpoint was in state [{0}] which is an 
invalid state for called method
@@ -88,6 +88,7 @@ wsSession.duplicateHandlerBinary=A binar
 wsSession.duplicateHandlerPong=A pong message handler has already been 
configured
 wsSession.duplicateHandlerText=A text message handler has already been 
configured
 wsSession.invalidHandlerTypePong=A pong message handler must implement 
MessageHandler.Basic
+wsSession.flushFailOnClose=Failed to flush batched messages on session close
 wsSession.messageFailed=Unable to write the complete message as the WebSocket 
connection has been closed
 wsSession.sendCloseFail=Failed to send close message to remote endpoint
 wsSession.removeHandlerFailed=Unable to remove the handler [{0}] as it was not 
registered with this session

Modified: 
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1640688&r1=1640687&r2=1640688&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java 
Thu Nov 20 06:30:08 2014
@@ -302,13 +302,10 @@ public abstract class WsRemoteEndpointIm
 
         boolean doWrite = false;
         synchronized (messagePartLock) {
-            if (Constants.OPCODE_CLOSE == mp.getOpCode()) {
-                try {
-                    setBatchingAllowed(false);
-                } catch (IOException e) {
-                    log.warn(sm.getString(
-                            "wsRemoteEndpoint.flushOnCloseFailed"), e);
-                }
+            if (Constants.OPCODE_CLOSE == mp.getOpCode() && 
getBatchingAllowed()) {
+                // Should not happen. To late to send batched messages now 
since
+                // the session has been closed. Complain loudly.
+                log.warn(sm.getString("wsRemoteEndpoint.flushOnCloseFailed"));
             }
             if (messagePartInProgress) {
                 // When a control message is sent while another message is 
being
@@ -382,7 +379,10 @@ public abstract class WsRemoteEndpointIm
         if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) {
             nextFragmented = fragmented;
             nextText = text;
-            doWrite(mp.getEndHandler(), outputBuffer);
+            outputBuffer.flip();
+            SendHandler flushHandler = new OutputBufferFlushSendHandler(
+                    outputBuffer, mp.getEndHandler());
+            doWrite(flushHandler, outputBuffer);
             return;
         }
 
@@ -836,6 +836,30 @@ public abstract class WsRemoteEndpointIm
         }
     }
 
+
+    /**
+     * Ensures that tne output buffer is cleared after it has been flushed.
+     */
+    private static class OutputBufferFlushSendHandler implements SendHandler {
+
+        private final ByteBuffer outputBuffer;
+        private final SendHandler handler;
+
+        public OutputBufferFlushSendHandler(ByteBuffer outputBuffer, 
SendHandler handler) {
+            this.outputBuffer = outputBuffer;
+            this.handler = handler;
+        }
+
+        @Override
+        public void onResult(SendResult result) {
+            if (result.isOK()) {
+                outputBuffer.clear();
+            }
+            handler.onResult(result);
+        }
+    }
+
+
     private class WsOutputStream extends OutputStream {
 
         private final WsRemoteEndpointImplBase endpoint;

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java?rev=1640688&r1=1640687&r2=1640688&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java Thu Nov 20 
06:30:08 2014
@@ -459,6 +459,13 @@ public class WsSession implements Sessio
                 return;
             }
 
+            try {
+                wsRemoteEndpoint.setBatchingAllowed(false);
+            } catch (IOException e) {
+                log.warn(sm.getString("wsSession.flushFailOnClose"), e);
+                fireEndpointOnError(e);
+            }
+
             state = State.CLOSING;
 
             sendCloseMessage(closeReasonMessage);
@@ -487,6 +494,12 @@ public class WsSession implements Sessio
 
         synchronized (stateLock) {
             if (state == State.OPEN) {
+                try {
+                    wsRemoteEndpoint.setBatchingAllowed(false);
+                } catch (IOException e) {
+                    log.warn(sm.getString("wsSession.flushFailOnClose"), e);
+                    fireEndpointOnError(e);
+                }
                 sendCloseMessage(closeReason);
                 fireEndpointOnClose(closeReason);
                 state = State.CLOSED;
@@ -497,7 +510,6 @@ public class WsSession implements Sessio
         }
     }
 
-
     private void fireEndpointOnClose(CloseReason closeReason) {
 
         // Fire the onClose event
@@ -515,6 +527,21 @@ public class WsSession implements Sessio
     }
 
 
+
+    private void fireEndpointOnError(Throwable throwable) {
+
+        // Fire the onError event
+        Thread t = Thread.currentThread();
+        ClassLoader cl = t.getContextClassLoader();
+        t.setContextClassLoader(applicationClassLoader);
+        try {
+            localEndpoint.onError(this, throwable);
+        } finally {
+            t.setContextClassLoader(cl);
+        }
+    }
+
+
     private void sendCloseMessage(CloseReason closeReason) {
         // 125 is maximum size for the payload of a control message
         ByteBuffer msg = ByteBuffer.allocate(125);

Modified: 
tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java?rev=1640688&r1=1640687&r2=1640688&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java 
(original)
+++ tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java Thu 
Nov 20 06:30:08 2014
@@ -119,10 +119,14 @@ public class TesterFirehoseServer {
 
             for (int i = 0; i < MESSAGE_COUNT; i++) {
                 remote.sendText(MESSAGE);
+                if (i % (MESSAGE_COUNT * 0.4) == 0) {
+                    remote.setBatchingAllowed(false);
+                    remote.setBatchingAllowed(true);
+                }
             }
 
-            // Ensure remaining messages are flushed
-            remote.setBatchingAllowed(false);
+            // Flushing should happen automatically on session close
+            session.close();
         }
 
         @OnError



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to