This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch 9.0.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/9.0.x by this push:
     new 0046da8747 Fix BZ 66574 - refactor close to avoid possible deadlock
0046da8747 is described below

commit 0046da8747927806c351e0032a7caaabf91c042e
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Tue Apr 25 21:09:30 2023 +0100

    Fix BZ 66574 - refactor close to avoid possible deadlock
    
    https://bz.apache.org/bugzilla/show_bug.cgi?id=66574
---
 java/org/apache/tomcat/websocket/WsSession.java | 122 +++++++++++++-----------
 webapps/docs/changelog.xml                      |   5 +
 2 files changed, 69 insertions(+), 58 deletions(-)

diff --git a/java/org/apache/tomcat/websocket/WsSession.java 
b/java/org/apache/tomcat/websocket/WsSession.java
index 890f4f9d8a..18d7704f39 100644
--- a/java/org/apache/tomcat/websocket/WsSession.java
+++ b/java/org/apache/tomcat/websocket/WsSession.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.naming.NamingException;
 import javax.websocket.ClientEndpointConfig;
@@ -105,7 +106,7 @@ public class WsSession implements Session {
     // Expected to handle message types of <ByteBuffer> only
     private volatile MessageHandler binaryMessageHandler = null;
     private volatile MessageHandler.Whole<PongMessage> pongMessageHandler = 
null;
-    private volatile State state = State.OPEN;
+    private AtomicReference<State> state = new AtomicReference<>(State.OPEN);
     private final Map<String, Object> userProperties = new 
ConcurrentHashMap<>();
     private volatile int maxBinaryMessageBufferSize = 
Constants.DEFAULT_BUFFER_SIZE;
     private volatile int maxTextMessageBufferSize = 
Constants.DEFAULT_BUFFER_SIZE;
@@ -541,12 +542,12 @@ public class WsSession implements Session {
 
     @Override
     public boolean isOpen() {
-        return state == State.OPEN;
+        return state.get() == State.OPEN;
     }
 
 
     public boolean isClosed() {
-        return state == State.CLOSED;
+        return state.get() == State.CLOSED;
     }
 
 
@@ -646,46 +647,38 @@ public class WsSession implements Session {
      * @param closeSocket        Should the socket be closed immediately 
rather than waiting for the server to respond
      */
     public void doClose(CloseReason closeReasonMessage, CloseReason 
closeReasonLocal, boolean closeSocket) {
-        // Double-checked locking. OK because state is volatile
-        if (state != State.OPEN) {
+
+        if (!state.compareAndSet(State.OPEN, State.OUTPUT_CLOSING)) {
+            // Close process has already been started. Don't start it again.
             return;
         }
 
-        wsRemoteEndpoint.getLock().lock();
-        try {
-            if (state != State.OPEN) {
-                return;
-            }
-
-            if (log.isDebugEnabled()) {
-                log.debug(sm.getString("wsSession.doClose", id));
-            }
+        if (log.isDebugEnabled()) {
+            log.debug(sm.getString("wsSession.doClose", id));
+        }
 
-            // This will trigger a flush of any batched messages.
-            try {
-                wsRemoteEndpoint.setBatchingAllowed(false);
-            } catch (IOException e) {
-                log.warn(sm.getString("wsSession.flushFailOnClose"), e);
-                fireEndpointOnError(e);
-            }
+        // Flush any batched messages not yet sent.
+        try {
+            wsRemoteEndpoint.setBatchingAllowed(false);
+        } catch (IOException e) {
+            log.warn(sm.getString("wsSession.flushFailOnClose"), e);
+            fireEndpointOnError(e);
+        }
 
+        // Send the close message to the remote endpoint.
+        sendCloseMessage(closeReasonMessage);
+        fireEndpointOnClose(closeReasonLocal);
+        if (!state.compareAndSet(State.OUTPUT_CLOSING, State.OUTPUT_CLOSED) || 
closeSocket) {
             /*
-             * If the flush above fails the error handling could call this 
method recursively. Without this check, the
-             * close message and notifications could be sent multiple times.
+             * A close message was received in another thread or this is 
handling an error condition. Either way, no
+             * further close message is expected to be received. Mark the 
session as fully closed...
              */
-            if (state != State.OUTPUT_CLOSED) {
-                state = State.OUTPUT_CLOSED;
-
-                sendCloseMessage(closeReasonMessage);
-                if (closeSocket) {
-                    wsRemoteEndpoint.close();
-                }
-                fireEndpointOnClose(closeReasonLocal);
-            }
-        } finally {
-            wsRemoteEndpoint.getLock().unlock();
+            state.set(State.CLOSED);
+            // ... and close the network connection.
+            wsRemoteEndpoint.close();
         }
 
+        // Fail any uncompleted messages.
         IOException ioe = new 
IOException(sm.getString("wsSession.messageFailed"));
         SendResult sr = new SendResult(ioe);
         for (FutureToSendHandler f2sh : futures.keySet()) {
@@ -701,29 +694,40 @@ public class WsSession implements Session {
      * @param closeReason The reason contained within the received close 
message.
      */
     public void onClose(CloseReason closeReason) {
+        if (state.compareAndSet(State.OPEN, State.CLOSING)) {
+            // Standard close.
 
-        wsRemoteEndpoint.getLock().lock();
-        try {
-            if (state != State.CLOSED) {
-                try {
-                    wsRemoteEndpoint.setBatchingAllowed(false);
-                } catch (IOException e) {
-                    log.warn(sm.getString("wsSession.flushFailOnClose"), e);
-                    fireEndpointOnError(e);
-                }
-                if (state == State.OPEN) {
-                    state = State.OUTPUT_CLOSED;
-                    sendCloseMessage(closeReason);
-                    fireEndpointOnClose(closeReason);
-                }
-                state = State.CLOSED;
-
-                // Close the socket
-                wsRemoteEndpoint.close();
+            // Flush any batched messages not yet sent.
+            try {
+                wsRemoteEndpoint.setBatchingAllowed(false);
+            } catch (IOException e) {
+                log.warn(sm.getString("wsSession.flushFailOnClose"), e);
+                fireEndpointOnError(e);
             }
-        } finally {
-            wsRemoteEndpoint.getLock().unlock();
+
+            // Send the close message response to the remote endpoint.
+            sendCloseMessage(closeReason);
+            fireEndpointOnClose(closeReason);
+
+            // Mark the session as fully closed.
+            state.set(State.CLOSED);
+
+            // Close the network connection.
+            wsRemoteEndpoint.close();
+        } else if (state.compareAndSet(State.OUTPUT_CLOSING, State.CLOSING)) {
+            /*
+             * The local endpoint sent a close message the the same time as 
the remote endpoint. The local close is
+             * still being processed. Update the state so the the local close 
process will also close the network
+             * connection once it has finished sending a close message.
+             */
+        } else if (state.compareAndSet(State.OUTPUT_CLOSED, State.CLOSED)) {
+            /*
+             * The local endpoint sent the first close message. The remote 
endpoint has now responded with its own close
+             * message so mark the session as fully closed and close the 
network connection.
+             */
+            wsRemoteEndpoint.close();
         }
+        // CLOSING and CLOSED are NO-OPs
     }
 
 
@@ -871,13 +875,13 @@ public class WsSession implements Session {
         // Always register the future.
         futures.put(f2sh, f2sh);
 
-        if (state == State.OPEN) {
+        if (isOpen()) {
             // The session is open. The future has been registered with the 
open
             // session. Normal processing continues.
             return;
         }
 
-        // The session is closed. The future may or may not have been 
registered
+        // The session is closing / closed. The future may or may not have 
been registered
         // in time for it to be processed during session closure.
 
         if (f2sh.isDone()) {
@@ -887,7 +891,7 @@ public class WsSession implements Session {
             return;
         }
 
-        // The session is closed. The Future had not completed when last 
checked.
+        // The session is closing / closed. The Future had not completed when 
last checked.
         // There is a small timing window that means the Future may have been
         // completed since the last check. There is also the possibility that
         // the Future was not registered in time to be cleaned up during 
session
@@ -1047,7 +1051,7 @@ public class WsSession implements Session {
 
 
     private void checkState() {
-        if (state == State.CLOSED) {
+        if (isClosed()) {
             /*
              * As per RFC 6455, a WebSocket connection is considered to be 
closed once a peer has sent and received a
              * WebSocket close frame.
@@ -1058,7 +1062,9 @@ public class WsSession implements Session {
 
     private enum State {
         OPEN,
+        OUTPUT_CLOSING,
         OUTPUT_CLOSED,
+        CLOSING,
         CLOSED
     }
 
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index c0c2383e6c..c9f7fa6ada 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -123,6 +123,11 @@
   </subsection>
   <subsection name="WebSocket">
     <changelog>
+      <fix>
+        <bug>66574</bug>: Refactor WebSocket session close to remove the lock 
on
+        the <code>SocketWrapper</code> which was a potential cause of deadlocks
+        if the application code used simulated blocking. (markt)
+      </fix>
       <fix>
         <bug>66575</bug>: Avoid unchecked use of the backing array of a
         buffer provided by the user in the compression transformation. (remm)


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to