This is an automated email from the ASF dual-hosted git repository. reiern70 pushed a commit to branch feature/reiern70/WICKET-6969 in repository https://gitbox.apache.org/repos/asf/wicket.git
commit 021b63d8752da8d679a2f5f0ee0f89b74f8b03af Author: reiern70 <reier...@gmail.com> AuthorDate: Tue Apr 5 10:06:11 2022 -0600 {WICKET-6969} allow asynchronous pushing of messages. --- .../wicket/protocol/ws/WebSocketSettings.java | 31 ++++++++--- .../ws/api/AbstractWebSocketConnection.java | 10 +++- .../ws/api/AbstractWebSocketProcessor.java | 8 +-- .../protocol/ws/api/IWebSocketConnection.java | 21 ++++++-- .../protocol/ws/api/IWebSocketRequestHandler.java | 54 +++++++++++++++++++ .../protocol/ws/api/WebSocketPushBroadcaster.java | 9 +++- .../protocol/ws/api/WebSocketRequestHandler.java | 61 ++++++++++++++++++++++ .../wicket/protocol/ws/api/WebSocketResponse.java | 23 ++++++-- .../ws/util/tester/TestWebSocketConnection.java | 5 +- .../ws/util/tester/TestWebSocketProcessor.java | 6 +++ 10 files changed, 206 insertions(+), 22 deletions(-) diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java index df78a18bbf..e151b1f3d3 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java @@ -158,11 +158,18 @@ public class WebSocketSettings */ private Function<Integer, Boolean> notifyOnCloseEvent = (code) -> true; - public boolean shouldNotifyOnCloseEvent(int closeCode) { + /** + * Flag that allows to use asynchronous push. By default, it is set to false. + */ + private boolean asynchronousPush = false; + + public boolean shouldNotifyOnCloseEvent(int closeCode) + { return notifyOnCloseEvent == null || notifyOnCloseEvent.apply(closeCode); } - public void setNotifyOnCloseEvent(Function<Integer, Boolean> notifyOnCloseEvent) { + public void setNotifyOnCloseEvent(Function<Integer, Boolean> notifyOnCloseEvent) + { this.notifyOnCloseEvent = notifyOnCloseEvent; } @@ -174,11 +181,13 @@ public class WebSocketSettings */ private Function<Throwable, Boolean> notifyOnErrorEvent = (throwable) -> true; - public boolean shouldNotifyOnErrorEvent(Throwable throwable) { + public boolean shouldNotifyOnErrorEvent(Throwable throwable) + { return notifyOnErrorEvent == null || notifyOnErrorEvent.apply(throwable); } - public void setNotifyOnErrorEvent(Function<Throwable, Boolean> notifyOnErrorEvent) { + public void setNotifyOnErrorEvent(Function<Throwable, Boolean> notifyOnErrorEvent) + { this.notifyOnErrorEvent = notifyOnErrorEvent; } @@ -303,9 +312,9 @@ public class WebSocketSettings * The active web socket connection * @return the response object that should be used to write the response back to the client */ - public WebResponse newWebSocketResponse(IWebSocketConnection connection) + public WebResponse newWebSocketResponse(IWebSocketConnection connection, boolean asynchronousPush) { - return new WebSocketResponse(connection); + return new WebSocketResponse(connection, asynchronousPush); } /** @@ -497,4 +506,14 @@ public class WebSocketSettings return new Thread(r, "Wicket-WebSocket-HttpRequest-Thread-" + counter.getAndIncrement()); } } + + public void setAsynchronousPush(boolean asynchronousPush) + { + this.asynchronousPush = asynchronousPush; + } + + public boolean isAsynchronousPush() + { + return asynchronousPush; + } } diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java index 3cd0f627d4..c9b64029b7 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java @@ -21,6 +21,8 @@ import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage; import org.apache.wicket.protocol.ws.api.registry.IKey; import org.apache.wicket.util.lang.Args; +import java.util.concurrent.Future; + /** * Abstract class handling the Web Socket broadcast messages. */ @@ -50,7 +52,13 @@ public abstract class AbstractWebSocketConnection implements IWebSocketConnectio @Override public void sendMessage(IWebSocketPushMessage message) { - webSocketProcessor.broadcastMessage(message, this); + webSocketProcessor.broadcastMessage(message, this, false); + } + + @Override + public void sendMessageAsync(IWebSocketPushMessage message) + { + webSocketProcessor.broadcastMessage(message, this, true); } @Override diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java index bf5a6a812d..4159465451 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java @@ -184,7 +184,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor } } - broadcastMessage(new ConnectedMessage(getApplication(), getSessionId(), key), connection); + broadcastMessage(new ConnectedMessage(getApplication(), getSessionId(), key), connection, webSocketSettings.isAsynchronousPush()); } @Override @@ -210,7 +210,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor { IKey key = getRegistryKey(); IWebSocketConnection connection = connectionRegistry.getConnection(application, sessionId, key); - broadcastMessage(message, connection); + broadcastMessage(message, connection, webSocketSettings.isAsynchronousPush()); } /** @@ -225,7 +225,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor * @param message * the message to broadcast */ - public final void broadcastMessage(final IWebSocketMessage message, IWebSocketConnection connection) + public final void broadcastMessage(final IWebSocketMessage message, IWebSocketConnection connection, boolean asynchronousPush) { if (connection != null && (connection.isOpen() || isSpecialMessage(message))) { @@ -233,7 +233,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor Session oldSession = ThreadContext.getSession(); RequestCycle oldRequestCycle = ThreadContext.getRequestCycle(); - WebResponse webResponse = webSocketSettings.newWebSocketResponse(connection); + WebResponse webResponse = webSocketSettings.newWebSocketResponse(connection, asynchronousPush); try { WebSocketRequestMapper requestMapper = new WebSocketRequestMapper(application.getRootRequestMapper()); diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java index 7275559bd5..762025602c 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java @@ -71,11 +71,11 @@ public interface IWebSocketConnection * * @param message * the text message - * @param timeOut + * @param timeout * the timeout for operation * @return a {@link java.util.concurrent.Future} representing the send operation */ - Future<Void> sendMessageAsync(String message, long timeOut); + Future<Void> sendMessageAsync(String message, long timeout); /** * Sends a binary message to the client. @@ -113,11 +113,11 @@ public interface IWebSocketConnection * the offset to read from * @param length * how much data to read - * @param timeOut - * * the timeout for operation + * @param timeout + * the timeout for operation * @return a {@link java.util.concurrent.Future} representing the send operation */ - Future<Void> sendMessageAsync(byte[] message, int offset, int length, long timeOut); + Future<Void> sendMessageAsync(byte[] message, int offset, int length, long timeout); /** * Broadcasts a push message to the wicket page (and it's components) associated with this @@ -130,6 +130,17 @@ public interface IWebSocketConnection */ void sendMessage(IWebSocketPushMessage message); + /** + * Broadcasts a push message to the wicket page (and it's components) associated with this + * connection. The components can then send messages or component updates to client by adding + * them to the target. Pushing to client is done asynchronously. + * + * @param message + * the push message to send + * + */ + void sendMessageAsync(IWebSocketPushMessage message); + /** * @return The application for which this WebSocket connection is registered */ diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java index 8d49155c28..eae4504d04 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java @@ -19,6 +19,8 @@ package org.apache.wicket.protocol.ws.api; import org.apache.wicket.core.request.handler.IPartialPageRequestHandler; import org.apache.wicket.request.ILoggableRequestHandler; +import java.util.concurrent.Future; + /** * An interface for outbound communication with web socket clients * @@ -34,6 +36,28 @@ public interface IWebSocketRequestHandler extends IPartialPageRequestHandler, IL */ void push(CharSequence message); + /** + * Pushes a text message to the client in an asynchronous way. + * + * @param message + * the text message to push to the client if the web socket connection is open + * @return + * a {@link java.util.concurrent.Future} representing the send operation. Or null if connection is closed. + */ + Future<Void> pushAsync(CharSequence message); + + /** + * Pushes a text message to the client in an asynchronous way. + * + * @param message + * the text message to push to the client if the web socket connection is open + * @param timeout + * the timeout for operation + * @return + * a {@link java.util.concurrent.Future} representing the send operation. Or null if connection is closed. + */ + Future<Void> pushAsync(CharSequence message, long timeout); + /** * Pushes a binary message to the client. * @@ -45,4 +69,34 @@ public interface IWebSocketRequestHandler extends IPartialPageRequestHandler, IL * how many bytes to read from the message */ void push(byte[] message, int offset, int length); + + /** + * Pushes a binary message to the client. + * + * @param message + * the binary message to push to the client if the web socket connection is open + * @param offset + * the offset to start to read from the message + * @param length + * how many bytes to read from the message + * @return + * a {@link java.util.concurrent.Future} representing the send operation. Or null if connection is closed. + */ + Future<Void> pushAsync(byte[] message, int offset, int length); + + /** + * Pushes a binary message to the client. + * + * @param message + * the binary message to push to the client if the web socket connection is open + * @param offset + * the offset to start to read from the message + * @param length + * how many bytes to read from the message + * @param timeout + * the timeout for operation + * @return + * a {@link java.util.concurrent.Future} representing the send operation. Or null if connection is closed. + */ + Future<Void> pushAsync(byte[] message, int offset, int length, long timeout); } diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java index c5f6d78e02..23a71476f9 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java @@ -189,7 +189,14 @@ public class WebSocketPushBroadcaster @Override public void run() { - wsConnection.sendMessage(message); + if (webSocketSettings.isAsynchronousPush()) + { + wsConnection.sendMessageAsync(message); + } + else + { + wsConnection.sendMessage(message); + } } }); } diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java index 7ce1f620b5..58d9f4d280 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java @@ -19,6 +19,7 @@ package org.apache.wicket.protocol.ws.api; import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.concurrent.Future; import org.apache.wicket.Component; import org.apache.wicket.Page; @@ -77,6 +78,36 @@ public class WebSocketRequestHandler extends AbstractPartialPageRequestHandler i } } + @Override + public Future<Void> pushAsync(CharSequence message, long timeout) + { + if (connection.isOpen()) + { + Args.notNull(message, "message"); + return connection.sendMessageAsync(message.toString(), timeout); + } + else + { + LOG.warn("The websocket connection is already closed. Cannot push the text message '{}'", message); + } + return null; + } + + @Override + public Future<Void> pushAsync(CharSequence message) + { + if (connection.isOpen()) + { + Args.notNull(message, "message"); + return connection.sendMessageAsync(message.toString()); + } + else + { + LOG.warn("The websocket connection is already closed. Cannot push the text message '{}'", message); + } + return null; + } + @Override public void push(byte[] message, int offset, int length) { @@ -97,6 +128,36 @@ public class WebSocketRequestHandler extends AbstractPartialPageRequestHandler i } } + @Override + public Future<Void> pushAsync(byte[] message, int offset, int length) + { + if (connection.isOpen()) + { + Args.notNull(message, "message"); + return connection.sendMessageAsync(message, offset, length); + } + else + { + LOG.warn("The websocket connection is already closed. Cannot push the binary message '{}'", message); + } + return null; + } + + @Override + public Future<Void> pushAsync(byte[] message, int offset, int length, long timeout) + { + if (connection.isOpen()) + { + Args.notNull(message, "message"); + return connection.sendMessageAsync(message, offset, length); + } + else + { + LOG.warn("The websocket connection is already closed. Cannot push the binary message '{}'", message); + } + return null; + } + /** * @return if <code>true</code> then EMPTY partial updates will se send. If <code>false</code> then EMPTY * partial updates will be skipped. A possible use case is: a page receives and a push event but no one is diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java index af1c938afc..9cb330355b 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java @@ -47,9 +47,12 @@ public class WebSocketResponse extends WebResponse private boolean isRedirect = false; - public WebSocketResponse(final IWebSocketConnection conn) + private final boolean asynchronous; + + public WebSocketResponse(final IWebSocketConnection conn, boolean asynchronous) { this.connection = conn; + this.asynchronous = asynchronous; } @Override @@ -87,13 +90,27 @@ public class WebSocketResponse extends WebResponse { if (text != null) { - connection.sendMessage(text.toString()); + if (asynchronous) + { + connection.sendMessageAsync(text.toString()); + } + else + { + connection.sendMessage(text.toString()); + } text = null; } else if (binary != null) { byte[] bytes = binary.toByteArray(); - connection.sendMessage(bytes, 0, bytes.length); + if (asynchronous) + { + connection.sendMessageAsync(bytes, 0, bytes.length); + } + else + { + connection.sendMessage(bytes, 0, bytes.length); + } binary.close(); binary = null; } diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java index 28dce651a5..ed4bd3748e 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java @@ -22,6 +22,7 @@ import java.util.concurrent.Future; import org.apache.wicket.Application; import org.apache.wicket.protocol.http.WebApplication; import org.apache.wicket.protocol.ws.api.IWebSocketConnection; +import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage; import org.apache.wicket.protocol.ws.api.registry.IKey; /** @@ -92,14 +93,14 @@ abstract class TestWebSocketConnection implements IWebSocketConnection } @Override - public Future<Void> sendMessageAsync(byte[] message, int offset, int length, long timeOut) + public Future<Void> sendMessageAsync(byte[] message, int offset, int length, long timeout) { checkOpenness(); onOutMessage(message, offset, length); return null; } - /** + /** * A callback method that is called when a text message should be send to the client * * @param message diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java index cc121e6b7a..013ecefc6c 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java @@ -158,6 +158,12 @@ abstract class TestWebSocketProcessor extends AbstractWebSocketProcessor { TestWebSocketProcessor.this.broadcastMessage(message); } + + @Override + public void sendMessageAsync(IWebSocketPushMessage message) + { + TestWebSocketProcessor.this.broadcastMessage(message); + } }); }