(tomcat) 01/02: Improve handling of concurrent disconnects
This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch 9.0.x in repository https://gitbox.apache.org/repos/asf/tomcat.git commit b779a10c6a45c1531690fe2f67e9e7722c35cade Author: Mark Thomas AuthorDate: Wed May 1 12:31:07 2024 +0100 Improve handling of concurrent disconnects --- webapps/docs/changelog.xml | 8 +++ .../classes/websocket/chat/ChatAnnotation.java | 71 -- 2 files changed, 60 insertions(+), 19 deletions(-) diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index 5217123962..7eda587865 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -194,6 +194,14 @@ + + + +Examples: Improve performance of WebSocket chat application when +multiple clients disconnect at the same time. (markt) + + + diff --git a/webapps/examples/WEB-INF/classes/websocket/chat/ChatAnnotation.java b/webapps/examples/WEB-INF/classes/websocket/chat/ChatAnnotation.java index d1d55234da..dee5c4e5ab 100644 --- a/webapps/examples/WEB-INF/classes/websocket/chat/ChatAnnotation.java +++ b/webapps/examples/WEB-INF/classes/websocket/chat/ChatAnnotation.java @@ -17,6 +17,8 @@ package websocket.chat; import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Queue; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; @@ -40,11 +42,16 @@ public class ChatAnnotation { private static final String GUEST_PREFIX = "Guest"; private static final AtomicInteger connectionIds = new AtomicInteger(0); -private static final Set connections = -new CopyOnWriteArraySet<>(); +private static final Set connections = new CopyOnWriteArraySet<>(); private final String nickname; private Session session; +/* + * The queue of messages that may build up while another message is being sent. The thread that sends a message is + * responsible for clearing any queue that builds up while that message is being sent. + */ +private Queue messageBacklog = new ArrayDeque<>(); +private boolean messageInProgress = false; public ChatAnnotation() { nickname = GUEST_PREFIX + connectionIds.getAndIncrement(); @@ -63,8 +70,7 @@ public class ChatAnnotation { @OnClose public void end() { connections.remove(this); -String message = String.format("* %s %s", -nickname, "has disconnected."); +String message = String.format("* %s %s", nickname, "has disconnected."); broadcast(message); } @@ -72,37 +78,64 @@ public class ChatAnnotation { @OnMessage public void incoming(String message) { // Never trust the client -String filteredMessage = String.format("%s: %s", -nickname, HTMLFilter.filter(message.toString())); +String filteredMessage = String.format("%s: %s", nickname, HTMLFilter.filter(message.toString())); broadcast(filteredMessage); } - - @OnError public void onError(Throwable t) throws Throwable { log.error("Chat Error: " + t.toString(), t); } +/* + * synchronized blocks are limited to operations that are expected to be quick. More specifically, messages are not + * sent from within a synchronized block. + */ +private void sendMessage(String msg) throws IOException { + +synchronized (this) { +if (messageInProgress) { +messageBacklog.add(msg); +return; +} else { +messageInProgress = true; +} +} + +boolean queueHasMessagesToBeSent = true; + +String messageToSend = msg; +do { +session.getBasicRemote().sendText(messageToSend); +synchronized (this) { +messageToSend = messageBacklog.poll(); +if (messageToSend == null) { +messageInProgress = false; +queueHasMessagesToBeSent = false; +} +} + +} while (queueHasMessagesToBeSent); + } + + private static void broadcast(String msg) { for (ChatAnnotation client : connections) { try { -synchronized (client) { -client.session.getBasicRemote().sendText(msg); -} +client.sendMessage(msg); } catch (IOException e) { log.debug("Chat Error: Failed to send message to client", e); -connections.remove(client); -try { -client.session.close(); -} catch (IOException e1) { -// Ignore +if (connections.remove(client)) { +try { +client.session.close(); +
(tomcat) 01/02: Improve handling of concurrent disconnects
This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch 10.1.x in repository https://gitbox.apache.org/repos/asf/tomcat.git commit 39f75945410ef5412e7f9a0804bb0b3ce5535640 Author: Mark Thomas AuthorDate: Wed May 1 12:31:07 2024 +0100 Improve handling of concurrent disconnects --- webapps/docs/changelog.xml | 8 +++ .../classes/websocket/chat/ChatAnnotation.java | 71 -- 2 files changed, 60 insertions(+), 19 deletions(-) diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index 18667ad683..87b5b4bf5d 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -185,6 +185,14 @@ + + + +Examples: Improve performance of WebSocket chat application when +multiple clients disconnect at the same time. (markt) + + + diff --git a/webapps/examples/WEB-INF/classes/websocket/chat/ChatAnnotation.java b/webapps/examples/WEB-INF/classes/websocket/chat/ChatAnnotation.java index c0a2f2b515..337e08024d 100644 --- a/webapps/examples/WEB-INF/classes/websocket/chat/ChatAnnotation.java +++ b/webapps/examples/WEB-INF/classes/websocket/chat/ChatAnnotation.java @@ -17,6 +17,8 @@ package websocket.chat; import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Queue; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; @@ -40,11 +42,16 @@ public class ChatAnnotation { private static final String GUEST_PREFIX = "Guest"; private static final AtomicInteger connectionIds = new AtomicInteger(0); -private static final Set connections = -new CopyOnWriteArraySet<>(); +private static final Set connections = new CopyOnWriteArraySet<>(); private final String nickname; private Session session; +/* + * The queue of messages that may build up while another message is being sent. The thread that sends a message is + * responsible for clearing any queue that builds up while that message is being sent. + */ +private Queue messageBacklog = new ArrayDeque<>(); +private boolean messageInProgress = false; public ChatAnnotation() { nickname = GUEST_PREFIX + connectionIds.getAndIncrement(); @@ -63,8 +70,7 @@ public class ChatAnnotation { @OnClose public void end() { connections.remove(this); -String message = String.format("* %s %s", -nickname, "has disconnected."); +String message = String.format("* %s %s", nickname, "has disconnected."); broadcast(message); } @@ -72,37 +78,64 @@ public class ChatAnnotation { @OnMessage public void incoming(String message) { // Never trust the client -String filteredMessage = String.format("%s: %s", -nickname, HTMLFilter.filter(message.toString())); +String filteredMessage = String.format("%s: %s", nickname, HTMLFilter.filter(message.toString())); broadcast(filteredMessage); } - - @OnError public void onError(Throwable t) throws Throwable { log.error("Chat Error: " + t.toString(), t); } +/* + * synchronized blocks are limited to operations that are expected to be quick. More specifically, messages are not + * sent from within a synchronized block. + */ +private void sendMessage(String msg) throws IOException { + +synchronized (this) { +if (messageInProgress) { +messageBacklog.add(msg); +return; +} else { +messageInProgress = true; +} +} + +boolean queueHasMessagesToBeSent = true; + +String messageToSend = msg; +do { +session.getBasicRemote().sendText(messageToSend); +synchronized (this) { +messageToSend = messageBacklog.poll(); +if (messageToSend == null) { +messageInProgress = false; +queueHasMessagesToBeSent = false; +} +} + +} while (queueHasMessagesToBeSent); + } + + private static void broadcast(String msg) { for (ChatAnnotation client : connections) { try { -synchronized (client) { -client.session.getBasicRemote().sendText(msg); -} +client.sendMessage(msg); } catch (IOException e) { log.debug("Chat Error: Failed to send message to client", e); -connections.remove(client); -try { -client.session.close(); -} catch (IOException e1) { -// Ignore +if (connections.remove(client)) { +try { +client.session.close(); +
(tomcat) 01/02: Improve handling of concurrent disconnects
This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tomcat.git commit 8f62c52a79c5c35ee8a22f9de43d09a2461e4a7d Author: Mark Thomas AuthorDate: Wed May 1 12:31:07 2024 +0100 Improve handling of concurrent disconnects --- webapps/docs/changelog.xml | 8 +++ .../classes/websocket/chat/ChatAnnotation.java | 71 -- 2 files changed, 60 insertions(+), 19 deletions(-) diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index aa190cbae4..0e929662fa 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -207,6 +207,14 @@ + + + +Examples: Improve performance of WebSocket chat application when +multiple clients disconnect at the same time. (markt) + + + diff --git a/webapps/examples/WEB-INF/classes/websocket/chat/ChatAnnotation.java b/webapps/examples/WEB-INF/classes/websocket/chat/ChatAnnotation.java index c0a2f2b515..337e08024d 100644 --- a/webapps/examples/WEB-INF/classes/websocket/chat/ChatAnnotation.java +++ b/webapps/examples/WEB-INF/classes/websocket/chat/ChatAnnotation.java @@ -17,6 +17,8 @@ package websocket.chat; import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Queue; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; @@ -40,11 +42,16 @@ public class ChatAnnotation { private static final String GUEST_PREFIX = "Guest"; private static final AtomicInteger connectionIds = new AtomicInteger(0); -private static final Set connections = -new CopyOnWriteArraySet<>(); +private static final Set connections = new CopyOnWriteArraySet<>(); private final String nickname; private Session session; +/* + * The queue of messages that may build up while another message is being sent. The thread that sends a message is + * responsible for clearing any queue that builds up while that message is being sent. + */ +private Queue messageBacklog = new ArrayDeque<>(); +private boolean messageInProgress = false; public ChatAnnotation() { nickname = GUEST_PREFIX + connectionIds.getAndIncrement(); @@ -63,8 +70,7 @@ public class ChatAnnotation { @OnClose public void end() { connections.remove(this); -String message = String.format("* %s %s", -nickname, "has disconnected."); +String message = String.format("* %s %s", nickname, "has disconnected."); broadcast(message); } @@ -72,37 +78,64 @@ public class ChatAnnotation { @OnMessage public void incoming(String message) { // Never trust the client -String filteredMessage = String.format("%s: %s", -nickname, HTMLFilter.filter(message.toString())); +String filteredMessage = String.format("%s: %s", nickname, HTMLFilter.filter(message.toString())); broadcast(filteredMessage); } - - @OnError public void onError(Throwable t) throws Throwable { log.error("Chat Error: " + t.toString(), t); } +/* + * synchronized blocks are limited to operations that are expected to be quick. More specifically, messages are not + * sent from within a synchronized block. + */ +private void sendMessage(String msg) throws IOException { + +synchronized (this) { +if (messageInProgress) { +messageBacklog.add(msg); +return; +} else { +messageInProgress = true; +} +} + +boolean queueHasMessagesToBeSent = true; + +String messageToSend = msg; +do { +session.getBasicRemote().sendText(messageToSend); +synchronized (this) { +messageToSend = messageBacklog.poll(); +if (messageToSend == null) { +messageInProgress = false; +queueHasMessagesToBeSent = false; +} +} + +} while (queueHasMessagesToBeSent); + } + + private static void broadcast(String msg) { for (ChatAnnotation client : connections) { try { -synchronized (client) { -client.session.getBasicRemote().sendText(msg); -} +client.sendMessage(msg); } catch (IOException e) { log.debug("Chat Error: Failed to send message to client", e); -connections.remove(client); -try { -client.session.close(); -} catch (IOException e1) { -// Ignore +if (connections.remove(client)) { +try { +client.session.close(); +