This is an automated email from the ASF dual-hosted git repository.

fmariani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c609203d735db6a54cabc8e92d35005a9f334068
Author: Croway <[email protected]>
AuthorDate: Wed Feb 11 11:41:07 2026 +0100

    Use awaitility in camel-atmoshpere and add a note about jdk25 streaming
---
 components/camel-atmosphere-websocket/pom.xml      |  6 ++++++
 .../main/docs/atmosphere-websocket-component.adoc  |  4 ++++
 .../atmosphere/websocket/WebsocketRoute1Test.java  | 18 +++++++++++-----
 .../atmosphere/websocket/WebsocketRoute2Test.java  | 18 +++++++++++-----
 .../WebsocketRoute2WithInitParamTest.java          | 24 ++++++++++++----------
 .../atmosphere/websocket/WebsocketRoute3Test.java  | 14 ++++++++-----
 .../WebsocketRoute3WithInitParamTest.java          | 22 ++++++++++++--------
 .../atmosphere/websocket/WebsocketRoute4Test.java  | 14 ++++++++-----
 8 files changed, 80 insertions(+), 40 deletions(-)

diff --git a/components/camel-atmosphere-websocket/pom.xml 
b/components/camel-atmosphere-websocket/pom.xml
index ee40caa8c6ca..2380679c85d4 100644
--- a/components/camel-atmosphere-websocket/pom.xml
+++ b/components/camel-atmosphere-websocket/pom.xml
@@ -71,6 +71,12 @@
             <version>${mockito-version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <version>${awaitility-version}</version>
+            <scope>test</scope>
+        </dependency>
 
         <!-- test infra -->
         <dependency>
diff --git 
a/components/camel-atmosphere-websocket/src/main/docs/atmosphere-websocket-component.adoc
 
b/components/camel-atmosphere-websocket/src/main/docs/atmosphere-websocket-component.adoc
index 3de9702dc6da..cb008cfc3d00 100644
--- 
a/components/camel-atmosphere-websocket/src/main/docs/atmosphere-websocket-component.adoc
+++ 
b/components/camel-atmosphere-websocket/src/main/docs/atmosphere-websocket-component.adoc
@@ -21,6 +21,10 @@ connections from external clients). This component uses
 the https://github.com/Atmosphere/atmosphere[Atmosphere] library to
 support the Websocket transport in various Servlet containers.
 
+NOTE: On JDK 25 or later, websocket messages are delivered in streaming mode 
(`Reader`/`InputStream`) even when the `useStreaming` option is not enabled.
+This is due to a change in the JDK's internal JSR 356 WebSocket implementation.
+The component handles both modes transparently, but if your route processors 
check the body type, they should account for this difference.
+
 Maven users will need to add the following dependency to
 their `pom.xml` for this component:
 
diff --git 
a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute1Test.java
 
b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute1Test.java
index cc43d668152c..da0f7fd09a5e 100644
--- 
a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute1Test.java
+++ 
b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute1Test.java
@@ -20,6 +20,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
+import java.io.StringReader;
+import java.lang.Runtime.Version;
 import java.util.List;
 
 import org.apache.camel.Exchange;
@@ -72,24 +74,30 @@ public class WebsocketRoute1Test extends 
WebsocketCamelRouterTestSupport {
                 // route for a single line
                 
from("atmosphere-websocket:///hola").to("log:info").process(new Processor() {
                     public void process(final Exchange exchange) {
-                        createResponse(exchange);
+                        // JDK 25+ delivers websocket messages in streaming 
mode (Reader/InputStream)
+                        // even without useStreaming=true, due to a change in 
the JSR 356 implementation
+                        boolean streaming = 
Runtime.version().compareTo(Version.parse("25")) >= 0;
+                        createResponse(exchange, streaming);
                     }
                 }).to("atmosphere-websocket:///hola");
             }
         };
     }
 
-    private static void createResponse(Exchange exchange) {
+    private static void createResponse(Exchange exchange, boolean streaming) {
         Object msg = exchange.getIn().getBody();
-        assertTrue(msg instanceof String || msg instanceof byte[] || msg 
instanceof Reader || msg instanceof InputStream,
-                "Expects String, byte[], Reader or InputStream");
+        if (streaming) {
+            assertTrue(msg instanceof Reader || msg instanceof InputStream, 
"Expects Reader or InputStream");
+        } else {
+            assertTrue(msg instanceof String || msg instanceof byte[], 
"Expects String or byte[]");
+        }
 
         if (msg instanceof String) {
             exchange.getIn().setBody(RESPONSE_GREETING + msg);
         } else if (msg instanceof byte[]) {
             exchange.getIn().setBody(createByteResponse((byte[]) msg));
         } else if (msg instanceof Reader) {
-            exchange.getIn().setBody(RESPONSE_GREETING + readAll((Reader) 
msg));
+            exchange.getIn().setBody(new StringReader(RESPONSE_GREETING + 
readAll((Reader) msg)));
         } else if (msg instanceof InputStream) {
             exchange.getIn().setBody(createByteResponse(readAll((InputStream) 
msg)));
         }
diff --git 
a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute2Test.java
 
b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute2Test.java
index cf97b5d5cac4..1f27e08fecf1 100644
--- 
a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute2Test.java
+++ 
b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute2Test.java
@@ -20,6 +20,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
+import java.io.StringReader;
+import java.lang.Runtime.Version;
 import java.util.List;
 
 import org.apache.camel.Exchange;
@@ -72,24 +74,30 @@ public class WebsocketRoute2Test extends 
WebsocketCamelRouterTestSupport {
                 // route for a broadcast line
                 
from("atmosphere-websocket:///broadcast").to("log:info").process(new 
Processor() {
                     public void process(final Exchange exchange) {
-                        createResponse(exchange);
+                        // JDK 25+ delivers websocket messages in streaming 
mode (Reader/InputStream)
+                        // even without useStreaming=true, due to a change in 
the JSR 356 implementation
+                        boolean streaming = 
Runtime.version().compareTo(Version.parse("25")) >= 0;
+                        createResponse(exchange, streaming);
                     }
                 }).to("atmosphere-websocket:///broadcast?sendToAll=true");
             }
         };
     }
 
-    private static void createResponse(Exchange exchange) {
+    private static void createResponse(Exchange exchange, boolean streaming) {
         Object msg = exchange.getIn().getBody();
-        assertTrue(msg instanceof String || msg instanceof byte[] || msg 
instanceof Reader || msg instanceof InputStream,
-                "Expects String, byte[], Reader or InputStream");
+        if (streaming) {
+            assertTrue(msg instanceof Reader || msg instanceof InputStream, 
"Expects Reader or InputStream");
+        } else {
+            assertTrue(msg instanceof String || msg instanceof byte[], 
"Expects String or byte[]");
+        }
 
         if (msg instanceof String) {
             exchange.getIn().setBody(RESPONSE_GREETING + msg);
         } else if (msg instanceof byte[]) {
             exchange.getIn().setBody(createByteResponse((byte[]) msg));
         } else if (msg instanceof Reader) {
-            exchange.getIn().setBody(RESPONSE_GREETING + readAll((Reader) 
msg));
+            exchange.getIn().setBody(new StringReader(RESPONSE_GREETING + 
readAll((Reader) msg)));
         } else if (msg instanceof InputStream) {
             exchange.getIn().setBody(createByteResponse(readAll((InputStream) 
msg)));
         }
diff --git 
a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute2WithInitParamTest.java
 
b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute2WithInitParamTest.java
index 030d1031fc8d..d0b134e2cf54 100644
--- 
a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute2WithInitParamTest.java
+++ 
b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute2WithInitParamTest.java
@@ -20,11 +20,13 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.infra.common.http.WebsocketTestClient;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -38,7 +40,6 @@ public class WebsocketRoute2WithInitParamTest extends 
WebsocketCamelRouterWithIn
 
     @Test
     void testWebsocketSingleClientBroadcastMultipleClients() throws Exception {
-        final int awaitTime = 2;
         connectionKeyUserMap.clear();
 
         WebsocketTestClient wsclient1 = new 
WebsocketTestClient("ws://localhost:" + PORT + "/broadcast", 2);
@@ -46,31 +47,32 @@ public class WebsocketRoute2WithInitParamTest extends 
WebsocketCamelRouterWithIn
         WebsocketTestClient wsclient3 = new 
WebsocketTestClient("ws://localhost:" + PORT + "/broadcast", 2);
 
         wsclient1.connect();
-        wsclient1.await(awaitTime);
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(1, 
connectionKeyUserMap.size()));
 
         wsclient2.connect();
-        wsclient2.await(awaitTime);
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(2, 
connectionKeyUserMap.size()));
 
         wsclient3.connect();
-        wsclient3.await(awaitTime);
-
-        //all connections were registered in external store
-        assertEquals(EXISTED_USERS.length, connectionKeyUserMap.size());
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(EXISTED_USERS.length, 
connectionKeyUserMap.size()));
 
         broadcastMessageTo = new String[] { EXISTED_USERS[0], EXISTED_USERS[1] 
};
 
         wsclient1.sendTextMessage("Gambas");
-        wsclient1.await(awaitTime);
 
-        List<String> received1 = wsclient1.getReceived(String.class);
-        assertEquals(1, received1.size());
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(1, 
wsclient1.getReceived(String.class).size()));
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(1, 
wsclient2.getReceived(String.class).size()));
 
+        List<String> received1 = wsclient1.getReceived(String.class);
         for (String element : broadcastMessageTo) {
             assertTrue(received1.get(0).contains(element));
         }
 
         List<String> received2 = wsclient2.getReceived(String.class);
-        assertEquals(1, received2.size());
         for (String element : broadcastMessageTo) {
             assertTrue(received2.get(0).contains(element));
         }
diff --git 
a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute3Test.java
 
b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute3Test.java
index b79e135ec090..2dc9664ce94d 100644
--- 
a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute3Test.java
+++ 
b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute3Test.java
@@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
+import java.io.StringReader;
 import java.util.List;
 
 import org.apache.camel.Exchange;
@@ -72,24 +73,27 @@ public class WebsocketRoute3Test extends 
WebsocketCamelRouterTestSupport {
                 // route for a single stream line
                 
from("atmosphere-websocket:///hola3?useStreaming=true").to("log:info").process(new
 Processor() {
                     public void process(final Exchange exchange) {
-                        createResponse(exchange);
+                        createResponse(exchange, true);
                     }
                 }).to("atmosphere-websocket:///hola3");
             }
         };
     }
 
-    private static void createResponse(Exchange exchange) {
+    private static void createResponse(Exchange exchange, boolean streaming) {
         Object msg = exchange.getIn().getBody();
-        assertTrue(msg instanceof String || msg instanceof byte[] || msg 
instanceof Reader || msg instanceof InputStream,
-                "Expects String, byte[], Reader or InputStream");
+        if (streaming) {
+            assertTrue(msg instanceof Reader || msg instanceof InputStream, 
"Expects Reader or InputStream");
+        } else {
+            assertTrue(msg instanceof String || msg instanceof byte[], 
"Expects String or byte[]");
+        }
 
         if (msg instanceof String) {
             exchange.getIn().setBody(RESPONSE_GREETING + msg);
         } else if (msg instanceof byte[]) {
             exchange.getIn().setBody(createByteResponse((byte[]) msg));
         } else if (msg instanceof Reader) {
-            exchange.getIn().setBody(RESPONSE_GREETING + readAll((Reader) 
msg));
+            exchange.getIn().setBody(new StringReader(RESPONSE_GREETING + 
readAll((Reader) msg)));
         } else if (msg instanceof InputStream) {
             exchange.getIn().setBody(createByteResponse(readAll((InputStream) 
msg)));
         }
diff --git 
a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute3WithInitParamTest.java
 
b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute3WithInitParamTest.java
index 311bc64f40e4..9aa2dcef7cdd 100644
--- 
a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute3WithInitParamTest.java
+++ 
b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute3WithInitParamTest.java
@@ -20,11 +20,13 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.infra.common.http.WebsocketTestClient;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -38,7 +40,6 @@ public class WebsocketRoute3WithInitParamTest extends 
WebsocketCamelRouterWithIn
 
     @Test
     void testWebsocketSingleClientBroadcastMultipleClientsGuaranteeDelivery() 
throws Exception {
-        final int awaitTime = 2;
         connectionKeyUserMap.clear();
 
         WebsocketTestClient wsclient1 = new 
WebsocketTestClient("ws://localhost:" + PORT + "/guarantee", 2);
@@ -46,27 +47,30 @@ public class WebsocketRoute3WithInitParamTest extends 
WebsocketCamelRouterWithIn
         WebsocketTestClient wsclient3 = new 
WebsocketTestClient("ws://localhost:" + PORT + "/guarantee", 2);
 
         wsclient1.connect();
-        wsclient1.await(awaitTime);
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(1, 
connectionKeyUserMap.size()));
 
         wsclient2.connect();
-        wsclient2.await(awaitTime);
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(2, 
connectionKeyUserMap.size()));
 
         wsclient3.connect();
-        wsclient3.await(awaitTime);
-
         //all connections were registered in external store
-        assertEquals(EXISTED_USERS.length, connectionKeyUserMap.size());
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(EXISTED_USERS.length, 
connectionKeyUserMap.size()));
 
         wsclient2.close();
-        wsclient2.await(awaitTime);
+        // brief wait for the close event to be processed server-side
+        Thread.sleep(500);
 
         broadcastMessageTo = new String[] { EXISTED_USERS[0], EXISTED_USERS[1] 
};
 
         wsclient1.sendTextMessage("Gambas");
-        wsclient1.await(awaitTime);
+
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(1, 
wsclient1.getReceived(String.class).size()));
 
         List<String> received1 = wsclient1.getReceived(String.class);
-        assertEquals(1, received1.size());
 
         for (String element : broadcastMessageTo) {
             assertTrue(received1.get(0).contains(element));
diff --git 
a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute4Test.java
 
b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute4Test.java
index a5a549d88007..1331d18708ff 100644
--- 
a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute4Test.java
+++ 
b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRoute4Test.java
@@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
+import java.io.StringReader;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -39,7 +40,7 @@ public class WebsocketRoute4Test extends 
WebsocketCamelRouterTestSupport {
     void testWebsocketEventsResendingDisabled() throws Exception {
         WebsocketTestClient wsclient = new 
WebsocketTestClient("ws://localhost:" + PORT + "/hola4");
         wsclient.connect();
-        assertFalse(wsclient.await(10));
+        assertFalse(wsclient.await(2));
         wsclient.close();
     }
 
@@ -58,17 +59,20 @@ public class WebsocketRoute4Test extends 
WebsocketCamelRouterTestSupport {
         };
     }
 
-    private static void createResponse(Exchange exchange) {
+    private static void createResponse(Exchange exchange, boolean streaming) {
         Object msg = exchange.getIn().getBody();
-        assertTrue(msg instanceof String || msg instanceof byte[] || msg 
instanceof Reader || msg instanceof InputStream,
-                "Expects String, byte[], Reader or InputStream");
+        if (streaming) {
+            assertTrue(msg instanceof Reader || msg instanceof InputStream, 
"Expects Reader or InputStream");
+        } else {
+            assertTrue(msg instanceof String || msg instanceof byte[], 
"Expects String or byte[]");
+        }
 
         if (msg instanceof String) {
             exchange.getIn().setBody(RESPONSE_GREETING + msg);
         } else if (msg instanceof byte[]) {
             exchange.getIn().setBody(createByteResponse((byte[]) msg));
         } else if (msg instanceof Reader) {
-            exchange.getIn().setBody(RESPONSE_GREETING + readAll((Reader) 
msg));
+            exchange.getIn().setBody(new StringReader(RESPONSE_GREETING + 
readAll((Reader) msg)));
         } else if (msg instanceof InputStream) {
             exchange.getIn().setBody(createByteResponse(readAll((InputStream) 
msg)));
         }

Reply via email to