This is an automated email from the ASF dual-hosted git repository.

reiern70 pushed a commit to branch feature/reiern70/WICKET-6969
in repository https://gitbox.apache.org/repos/asf/wicket.git

commit 021b63d8752da8d679a2f5f0ee0f89b74f8b03af
Author: reiern70 <reier...@gmail.com>
AuthorDate: Tue Apr 5 10:06:11 2022 -0600

    {WICKET-6969} allow asynchronous pushing of messages.
---
 .../wicket/protocol/ws/WebSocketSettings.java      | 31 ++++++++---
 .../ws/api/AbstractWebSocketConnection.java        | 10 +++-
 .../ws/api/AbstractWebSocketProcessor.java         |  8 +--
 .../protocol/ws/api/IWebSocketConnection.java      | 21 ++++++--
 .../protocol/ws/api/IWebSocketRequestHandler.java  | 54 +++++++++++++++++++
 .../protocol/ws/api/WebSocketPushBroadcaster.java  |  9 +++-
 .../protocol/ws/api/WebSocketRequestHandler.java   | 61 ++++++++++++++++++++++
 .../wicket/protocol/ws/api/WebSocketResponse.java  | 23 ++++++--
 .../ws/util/tester/TestWebSocketConnection.java    |  5 +-
 .../ws/util/tester/TestWebSocketProcessor.java     |  6 +++
 10 files changed, 206 insertions(+), 22 deletions(-)

diff --git 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
index df78a18bbf..e151b1f3d3 100644
--- 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
+++ 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
@@ -158,11 +158,18 @@ public class WebSocketSettings
         */
        private Function<Integer, Boolean> notifyOnCloseEvent = (code) -> true;
 
-       public boolean shouldNotifyOnCloseEvent(int closeCode) {
+       /**
+        * Flag that allows to use asynchronous push. By default, it is set to 
false.
+        */
+       private boolean asynchronousPush = false;
+
+       public boolean shouldNotifyOnCloseEvent(int closeCode)
+       {
                return notifyOnCloseEvent == null || 
notifyOnCloseEvent.apply(closeCode);
        }
 
-       public void setNotifyOnCloseEvent(Function<Integer, Boolean> 
notifyOnCloseEvent) {
+       public void setNotifyOnCloseEvent(Function<Integer, Boolean> 
notifyOnCloseEvent)
+       {
                this.notifyOnCloseEvent = notifyOnCloseEvent;
        }
 
@@ -174,11 +181,13 @@ public class WebSocketSettings
         */
        private Function<Throwable, Boolean> notifyOnErrorEvent = (throwable) 
-> true;
 
-       public boolean shouldNotifyOnErrorEvent(Throwable throwable) {
+       public boolean shouldNotifyOnErrorEvent(Throwable throwable)
+       {
                return notifyOnErrorEvent == null || 
notifyOnErrorEvent.apply(throwable);
        }
 
-       public void setNotifyOnErrorEvent(Function<Throwable, Boolean> 
notifyOnErrorEvent) {
+       public void setNotifyOnErrorEvent(Function<Throwable, Boolean> 
notifyOnErrorEvent)
+       {
                this.notifyOnErrorEvent = notifyOnErrorEvent;
        }
 
@@ -303,9 +312,9 @@ public class WebSocketSettings
         *              The active web socket connection
         * @return the response object that should be used to write the 
response back to the client
         */
-       public WebResponse newWebSocketResponse(IWebSocketConnection connection)
+       public WebResponse newWebSocketResponse(IWebSocketConnection 
connection, boolean asynchronousPush)
        {
-               return new WebSocketResponse(connection);
+               return new WebSocketResponse(connection, asynchronousPush);
        }
 
        /**
@@ -497,4 +506,14 @@ public class WebSocketSettings
                        return new Thread(r, 
"Wicket-WebSocket-HttpRequest-Thread-" + counter.getAndIncrement());
                }
        }
+
+       public void setAsynchronousPush(boolean asynchronousPush)
+       {
+               this.asynchronousPush = asynchronousPush;
+       }
+
+       public boolean isAsynchronousPush()
+       {
+               return asynchronousPush;
+       }
 }
diff --git 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java
 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java
index 3cd0f627d4..c9b64029b7 100644
--- 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java
+++ 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java
@@ -21,6 +21,8 @@ import 
org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
 import org.apache.wicket.protocol.ws.api.registry.IKey;
 import org.apache.wicket.util.lang.Args;
 
+import java.util.concurrent.Future;
+
 /**
  * Abstract class handling the Web Socket broadcast messages.
  */
@@ -50,7 +52,13 @@ public abstract class AbstractWebSocketConnection implements 
IWebSocketConnectio
        @Override
        public void sendMessage(IWebSocketPushMessage message)
        {
-               webSocketProcessor.broadcastMessage(message, this);
+               webSocketProcessor.broadcastMessage(message, this, false);
+       }
+
+       @Override
+       public void sendMessageAsync(IWebSocketPushMessage message)
+       {
+               webSocketProcessor.broadcastMessage(message, this, true);
        }
 
        @Override
diff --git 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
index bf5a6a812d..4159465451 100644
--- 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
+++ 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
@@ -184,7 +184,7 @@ public abstract class AbstractWebSocketProcessor implements 
IWebSocketProcessor
                        }
                }
 
-               broadcastMessage(new ConnectedMessage(getApplication(), 
getSessionId(), key), connection);
+               broadcastMessage(new ConnectedMessage(getApplication(), 
getSessionId(), key), connection, webSocketSettings.isAsynchronousPush());
        }
 
        @Override
@@ -210,7 +210,7 @@ public abstract class AbstractWebSocketProcessor implements 
IWebSocketProcessor
        {
                IKey key = getRegistryKey();
                IWebSocketConnection connection = 
connectionRegistry.getConnection(application, sessionId, key);
-               broadcastMessage(message, connection);
+               broadcastMessage(message, connection, 
webSocketSettings.isAsynchronousPush());
        }
 
        /**
@@ -225,7 +225,7 @@ public abstract class AbstractWebSocketProcessor implements 
IWebSocketProcessor
         * @param message
         *      the message to broadcast
         */
-       public final void broadcastMessage(final IWebSocketMessage message, 
IWebSocketConnection connection)
+       public final void broadcastMessage(final IWebSocketMessage message, 
IWebSocketConnection connection, boolean asynchronousPush)
        {
                if (connection != null && (connection.isOpen() || 
isSpecialMessage(message)))
                {
@@ -233,7 +233,7 @@ public abstract class AbstractWebSocketProcessor implements 
IWebSocketProcessor
                        Session oldSession = ThreadContext.getSession();
                        RequestCycle oldRequestCycle = 
ThreadContext.getRequestCycle();
 
-                       WebResponse webResponse = 
webSocketSettings.newWebSocketResponse(connection);
+                       WebResponse webResponse = 
webSocketSettings.newWebSocketResponse(connection, asynchronousPush);
                        try
                        {
                                WebSocketRequestMapper requestMapper = new 
WebSocketRequestMapper(application.getRootRequestMapper());
diff --git 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
index 7275559bd5..762025602c 100644
--- 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
+++ 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
@@ -71,11 +71,11 @@ public interface IWebSocketConnection
      *
      * @param message
      *      the text message
-     * @param timeOut
+     * @param timeout
      *      the timeout for operation
      * @return a {@link java.util.concurrent.Future} representing the send 
operation
      */
-    Future<Void> sendMessageAsync(String message, long timeOut);
+    Future<Void> sendMessageAsync(String message, long timeout);
 
        /**
         * Sends a binary message to the client.
@@ -113,11 +113,11 @@ public interface IWebSocketConnection
      *      the offset to read from
      * @param length
      *      how much data to read
-     * @param timeOut
-     *      *      the timeout for operation
+     * @param timeout
+     *      the timeout for operation
      * @return a {@link java.util.concurrent.Future} representing the send 
operation
      */
-    Future<Void> sendMessageAsync(byte[] message, int offset, int length, long 
timeOut);
+    Future<Void> sendMessageAsync(byte[] message, int offset, int length, long 
timeout);
 
        /**
         * Broadcasts a push message to the wicket page (and it's components) 
associated with this
@@ -130,6 +130,17 @@ public interface IWebSocketConnection
         */
        void sendMessage(IWebSocketPushMessage message);
 
+       /**
+        * Broadcasts a push message to the wicket page (and it's components) 
associated with this
+        * connection. The components can then send messages or component 
updates to client by adding
+        * them to the target. Pushing to client is done asynchronously.
+        *
+        * @param message
+        *     the push message to send
+        *
+        */
+       void sendMessageAsync(IWebSocketPushMessage message);
+
        /**
         * @return The application for which this WebSocket connection is 
registered
         */
diff --git 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java
 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java
index 8d49155c28..eae4504d04 100644
--- 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java
+++ 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java
@@ -19,6 +19,8 @@ package org.apache.wicket.protocol.ws.api;
 import org.apache.wicket.core.request.handler.IPartialPageRequestHandler;
 import org.apache.wicket.request.ILoggableRequestHandler;
 
+import java.util.concurrent.Future;
+
 /**
  * An interface for outbound communication with web socket clients
  *
@@ -34,6 +36,28 @@ public interface IWebSocketRequestHandler extends 
IPartialPageRequestHandler, IL
         */
        void push(CharSequence message);
 
+       /**
+        * Pushes a text message to the client in an asynchronous way.
+        *
+        * @param message
+        *      the text message to push to the client if the web socket 
connection is open
+        * @return
+        *      a {@link java.util.concurrent.Future} representing the send 
operation. Or null if connection is closed.
+        */
+       Future<Void> pushAsync(CharSequence message);
+
+       /**
+        * Pushes a text message to the client in an asynchronous way.
+        *
+        * @param message
+        *      the text message to push to the client if the web socket 
connection is open
+        * @param timeout
+        *      the timeout for operation
+        * @return
+        *              a {@link java.util.concurrent.Future} representing the 
send operation. Or null if connection is closed.
+        */
+       Future<Void> pushAsync(CharSequence message, long timeout);
+
        /**
         * Pushes a binary message to the client.
         *
@@ -45,4 +69,34 @@ public interface IWebSocketRequestHandler extends 
IPartialPageRequestHandler, IL
         *      how many bytes to read from the message
         */
        void push(byte[] message, int offset, int length);
+
+       /**
+        * Pushes a binary message to the client.
+        *
+        * @param message
+        *      the binary message to push to the client if the web socket 
connection is open
+        * @param offset
+        *      the offset to start to read from the message
+        * @param length
+        *      how many bytes to read from the message
+        * @return
+        *              a {@link java.util.concurrent.Future} representing the 
send operation. Or null if connection is closed.
+        */
+       Future<Void> pushAsync(byte[] message, int offset, int length);
+
+       /**
+        * Pushes a binary message to the client.
+        *
+        * @param message
+        *      the binary message to push to the client if the web socket 
connection is open
+        * @param offset
+        *      the offset to start to read from the message
+        * @param length
+        *      how many bytes to read from the message
+        * @param timeout
+        *      the timeout for operation
+        * @return
+        *              a {@link java.util.concurrent.Future} representing the 
send operation. Or null if connection is closed.
+        */
+       Future<Void> pushAsync(byte[] message, int offset, int length, long 
timeout);
 }
diff --git 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java
 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java
index c5f6d78e02..23a71476f9 100644
--- 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java
+++ 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java
@@ -189,7 +189,14 @@ public class WebSocketPushBroadcaster
                                @Override
                                public void run()
                                {
-                                       wsConnection.sendMessage(message);
+                                       if 
(webSocketSettings.isAsynchronousPush())
+                                       {
+                                               
wsConnection.sendMessageAsync(message);
+                                       }
+                                       else
+                                       {
+                                               
wsConnection.sendMessage(message);
+                                       }
                                }
                        });
                }
diff --git 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java
 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java
index 7ce1f620b5..58d9f4d280 100644
--- 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java
+++ 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java
@@ -19,6 +19,7 @@ package org.apache.wicket.protocol.ws.api;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.Future;
 
 import org.apache.wicket.Component;
 import org.apache.wicket.Page;
@@ -77,6 +78,36 @@ public class WebSocketRequestHandler extends 
AbstractPartialPageRequestHandler i
                }
        }
 
+       @Override
+       public Future<Void> pushAsync(CharSequence message, long timeout)
+       {
+               if (connection.isOpen())
+               {
+                       Args.notNull(message, "message");
+                       return connection.sendMessageAsync(message.toString(), 
timeout);
+               }
+               else
+               {
+                       LOG.warn("The websocket connection is already closed. 
Cannot push the text message '{}'", message);
+               }
+               return null;
+       }
+
+       @Override
+       public Future<Void> pushAsync(CharSequence message)
+       {
+               if (connection.isOpen())
+               {
+                       Args.notNull(message, "message");
+                       return connection.sendMessageAsync(message.toString());
+               }
+               else
+               {
+                       LOG.warn("The websocket connection is already closed. 
Cannot push the text message '{}'", message);
+               }
+               return null;
+       }
+
        @Override
        public void push(byte[] message, int offset, int length)
        {
@@ -97,6 +128,36 @@ public class WebSocketRequestHandler extends 
AbstractPartialPageRequestHandler i
                }
        }
 
+       @Override
+       public Future<Void> pushAsync(byte[] message, int offset, int length)
+       {
+               if (connection.isOpen())
+               {
+                       Args.notNull(message, "message");
+                       return connection.sendMessageAsync(message, offset, 
length);
+               }
+               else
+               {
+                       LOG.warn("The websocket connection is already closed. 
Cannot push the binary message '{}'", message);
+               }
+               return null;
+       }
+
+       @Override
+       public Future<Void> pushAsync(byte[] message, int offset, int length, 
long timeout)
+       {
+               if (connection.isOpen())
+               {
+                       Args.notNull(message, "message");
+                       return connection.sendMessageAsync(message, offset, 
length);
+               }
+               else
+               {
+                       LOG.warn("The websocket connection is already closed. 
Cannot push the binary message '{}'", message);
+               }
+               return null;
+       }
+
        /**
         * @return if <code>true</code> then EMPTY partial updates will se 
send. If <code>false</code> then EMPTY
         *    partial updates will be skipped. A possible use case is: a page 
receives and a push event but no one is
diff --git 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java
 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java
index af1c938afc..9cb330355b 100644
--- 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java
+++ 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java
@@ -47,9 +47,12 @@ public class WebSocketResponse extends WebResponse
 
        private boolean isRedirect = false;
 
-       public WebSocketResponse(final IWebSocketConnection conn)
+       private final boolean asynchronous;
+
+       public WebSocketResponse(final IWebSocketConnection conn, boolean 
asynchronous)
        {
                this.connection = conn;
+               this.asynchronous = asynchronous;
        }
 
        @Override
@@ -87,13 +90,27 @@ public class WebSocketResponse extends WebResponse
                        {
                                if (text != null)
                                {
-                                       connection.sendMessage(text.toString());
+                                       if (asynchronous)
+                                       {
+                                               
connection.sendMessageAsync(text.toString());
+                                       }
+                                       else
+                                       {
+                                               
connection.sendMessage(text.toString());
+                                       }
                                        text = null;
                                }
                                else if (binary != null)
                                {
                                        byte[] bytes = binary.toByteArray();
-                                       connection.sendMessage(bytes, 0, 
bytes.length);
+                                       if (asynchronous)
+                                       {
+                                               
connection.sendMessageAsync(bytes, 0, bytes.length);
+                                       }
+                                       else
+                                       {
+                                               connection.sendMessage(bytes, 
0, bytes.length);
+                                       }
                                        binary.close();
                                        binary = null;
                                }
diff --git 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java
 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java
index 28dce651a5..ed4bd3748e 100644
--- 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java
+++ 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Future;
 import org.apache.wicket.Application;
 import org.apache.wicket.protocol.http.WebApplication;
 import org.apache.wicket.protocol.ws.api.IWebSocketConnection;
+import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
 import org.apache.wicket.protocol.ws.api.registry.IKey;
 
 /**
@@ -92,14 +93,14 @@ abstract class TestWebSocketConnection implements 
IWebSocketConnection
     }
 
     @Override
-    public Future<Void> sendMessageAsync(byte[] message, int offset, int 
length, long timeOut)
+    public Future<Void> sendMessageAsync(byte[] message, int offset, int 
length, long timeout)
     {
         checkOpenness();
         onOutMessage(message, offset, length);
         return null;
     }
 
-    /**
+       /**
         * A callback method that is called when a text message should be send 
to the client
         *
         * @param message
diff --git 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java
 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java
index cc121e6b7a..013ecefc6c 100644
--- 
a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java
+++ 
b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java
@@ -158,6 +158,12 @@ abstract class TestWebSocketProcessor extends 
AbstractWebSocketProcessor
                        {
                                
TestWebSocketProcessor.this.broadcastMessage(message);
                        }
+
+                       @Override
+                       public void sendMessageAsync(IWebSocketPushMessage 
message)
+                       {
+                               
TestWebSocketProcessor.this.broadcastMessage(message);
+                       }
                });
        }
 

Reply via email to