This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch 10.1.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git

commit 39f75945410ef5412e7f9a0804bb0b3ce5535640
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Wed May 1 12:31:07 2024 +0100

    Improve handling of concurrent disconnects
---
 webapps/docs/changelog.xml                         |  8 +++
 .../classes/websocket/chat/ChatAnnotation.java     | 71 ++++++++++++++++------
 2 files changed, 60 insertions(+), 19 deletions(-)

diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 18667ad683..87b5b4bf5d 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -185,6 +185,14 @@
       </add>
     </changelog>
   </subsection>
+  <subsection name="Web applications">
+    <changelog>
+      <fix>
+        Examples: Improve performance of WebSocket chat application when
+        multiple clients disconnect at the same time. (markt)
+      </fix>
+    </changelog>
+  </subsection>
   <subsection name="Other">
     <changelog>
       <update>
diff --git 
a/webapps/examples/WEB-INF/classes/websocket/chat/ChatAnnotation.java 
b/webapps/examples/WEB-INF/classes/websocket/chat/ChatAnnotation.java
index c0a2f2b515..337e08024d 100644
--- a/webapps/examples/WEB-INF/classes/websocket/chat/ChatAnnotation.java
+++ b/webapps/examples/WEB-INF/classes/websocket/chat/ChatAnnotation.java
@@ -17,6 +17,8 @@
 package websocket.chat;
 
 import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -40,11 +42,16 @@ public class ChatAnnotation {
 
     private static final String GUEST_PREFIX = "Guest";
     private static final AtomicInteger connectionIds = new AtomicInteger(0);
-    private static final Set<ChatAnnotation> connections =
-            new CopyOnWriteArraySet<>();
+    private static final Set<ChatAnnotation> connections = new 
CopyOnWriteArraySet<>();
 
     private final String nickname;
     private Session session;
+    /*
+     * The queue of messages that may build up while another message is being 
sent. The thread that sends a message is
+     * responsible for clearing any queue that builds up while that message is 
being sent.
+     */
+    private Queue<String> messageBacklog = new ArrayDeque<>();
+    private boolean messageInProgress = false;
 
     public ChatAnnotation() {
         nickname = GUEST_PREFIX + connectionIds.getAndIncrement();
@@ -63,8 +70,7 @@ public class ChatAnnotation {
     @OnClose
     public void end() {
         connections.remove(this);
-        String message = String.format("* %s %s",
-                nickname, "has disconnected.");
+        String message = String.format("* %s %s", nickname, "has 
disconnected.");
         broadcast(message);
     }
 
@@ -72,37 +78,64 @@ public class ChatAnnotation {
     @OnMessage
     public void incoming(String message) {
         // Never trust the client
-        String filteredMessage = String.format("%s: %s",
-                nickname, HTMLFilter.filter(message.toString()));
+        String filteredMessage = String.format("%s: %s", nickname, 
HTMLFilter.filter(message.toString()));
         broadcast(filteredMessage);
     }
 
 
-
-
     @OnError
     public void onError(Throwable t) throws Throwable {
         log.error("Chat Error: " + t.toString(), t);
     }
 
 
+    /*
+     * synchronized blocks are limited to operations that are expected to be 
quick. More specifically, messages are not
+     * sent from within a synchronized block.
+     */
+    private void sendMessage(String msg) throws IOException {
+
+        synchronized (this) {
+            if (messageInProgress) {
+                messageBacklog.add(msg);
+                return;
+            } else {
+                messageInProgress = true;
+            }
+        }
+
+        boolean queueHasMessagesToBeSent = true;
+
+        String messageToSend = msg;
+        do {
+            session.getBasicRemote().sendText(messageToSend);
+            synchronized (this) {
+                messageToSend = messageBacklog.poll();
+                if (messageToSend == null) {
+                    messageInProgress = false;
+                    queueHasMessagesToBeSent = false;
+                }
+            }
+
+        } while (queueHasMessagesToBeSent);
+     }
+
+
     private static void broadcast(String msg) {
         for (ChatAnnotation client : connections) {
             try {
-                synchronized (client) {
-                    client.session.getBasicRemote().sendText(msg);
-                }
+                client.sendMessage(msg);
             } catch (IOException e) {
                 log.debug("Chat Error: Failed to send message to client", e);
-                connections.remove(client);
-                try {
-                    client.session.close();
-                } catch (IOException e1) {
-                    // Ignore
+                if (connections.remove(client)) {
+                    try {
+                        client.session.close();
+                    } catch (IOException e1) {
+                        // Ignore
+                    }
+                    String message = String.format("* %s %s", client.nickname, 
"has been disconnected.");
+                    broadcast(message);
                 }
-                String message = String.format("* %s %s",
-                        client.nickname, "has been disconnected.");
-                broadcast(message);
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to