This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 8e539c3056a CAMEL-19692: Improve producer sendToAll handling of parameterized server paths 8e539c3056a is described below commit 8e539c3056a1cd5105e5bd12efb2f0b3ffdbc16f Author: James Netherton <jamesnether...@gmail.com> AuthorDate: Tue Aug 1 10:36:01 2023 +0100 CAMEL-19692: Improve producer sendToAll handling of parameterized server paths --- .../vertx/websocket/VertxWebsocketEndpoint.java | 22 +- .../vertx/websocket/VertxWebsocketHelper.java | 20 +- .../vertx/websocket/VertxWebsocketHost.java | 40 +-- .../vertx/websocket/VertxWebsocketPeer.java | 70 ++++++ .../vertx/websocket/VertxWebsocketHelperTest.java | 21 -- .../vertx/websocket/VertxWebsocketTest.java | 279 ++++++++++++++++++++- 6 files changed, 392 insertions(+), 60 deletions(-) diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java index 3a345624138..e160b4204b4 100644 --- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java +++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java @@ -202,10 +202,10 @@ public class VertxWebsocketEndpoint extends DefaultEndpoint { protected ServerWebSocket findPeerForConnectionKey(String connectionKey) { Map<VertxWebsocketHostKey, VertxWebsocketHost> registry = getVertxHostRegistry(); for (VertxWebsocketHost host : registry.values()) { - Map<String, ServerWebSocket> hostPeers = host.getConnectedPeers(); - if (hostPeers.containsKey(connectionKey) && host.isManagedHost(getConfiguration().getWebsocketURI().getHost()) + VertxWebsocketPeer peer = host.getConnectedPeer(connectionKey); + if (peer != null && host.isManagedHost(getConfiguration().getWebsocketURI().getHost()) && host.isManagedPort(getConfiguration().getWebsocketURI().getPort())) { - return hostPeers.get(connectionKey); + return peer.getWebSocket(); } } return null; @@ -220,9 +220,17 @@ public class VertxWebsocketEndpoint extends DefaultEndpoint { .stream() .filter(host -> host.isManagedHost(getConfiguration().getWebsocketURI().getHost())) .filter(host -> host.isManagedPort(getConfiguration().getWebsocketURI().getPort())) - .flatMap(host -> host.getConnectedPeers().entrySet().stream()) - .filter(entry -> VertxWebsocketHelper.webSocketHostPathMatches(entry.getValue().path(), - getConfiguration().getWebsocketURI().getPath())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + .flatMap(host -> host.getConnectedPeers().stream()) + .filter(connectedPeer -> { + String producerPath = getConfiguration().getWebsocketURI().getPath(); + String peerConnectedPath; + if (producerPath.contains("{") || producerPath.contains("*")) { + peerConnectedPath = connectedPeer.getRawPath(); + } else { + peerConnectedPath = connectedPeer.getPath(); + } + return VertxWebsocketHelper.webSocketHostPathMatches(peerConnectedPath, producerPath); + }) + .collect(Collectors.toMap(VertxWebsocketPeer::getConnectionKey, VertxWebsocketPeer::getWebSocket)); } } diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelper.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelper.java index 27f61c5b483..3e580902b83 100644 --- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelper.java +++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelper.java @@ -84,24 +84,10 @@ public final class VertxWebsocketHelper { return false; } - if (normalizedHostPath.contains("{")) { - // For a parameterized paths verify the non-parameterized elements match - for (int i = 0; i < hostPathElements.length; i++) { - String hostPathElement = hostPathElements[i]; - String targetPathElement = targetPathElements[i]; - if (!hostPathElement.startsWith("{") && !hostPathElement.endsWith("}") - && !hostPathElement.equals(targetPathElement)) { - return false; - } - } + if (exactPathMatch) { + return normalizedHostPath.equals(normalizedTargetPath); } else { - if (exactPathMatch) { - return normalizedHostPath.equals(normalizedTargetPath); - } else { - return normalizedTargetPath.startsWith(normalizedHostPath); - } + return normalizedTargetPath.startsWith(normalizedHostPath); } - - return true; } } diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java index 3386bc7d1b5..c473e3325c8 100644 --- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java +++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java @@ -17,11 +17,12 @@ package org.apache.camel.component.vertx.websocket; import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; @@ -52,7 +53,7 @@ public class VertxWebsocketHost { private final VertxWebsocketHostConfiguration hostConfiguration; private final VertxWebsocketHostKey hostKey; private final Map<String, Route> routeRegistry = new HashMap<>(); - private final Map<String, ServerWebSocket> connectedPeers = new ConcurrentHashMap<>(); + private final List<VertxWebsocketPeer> connectedPeers = Collections.synchronizedList(new ArrayList<>()); private final CamelContext camelContext; private HttpServer server; private int port = VertxWebsocketConstants.DEFAULT_VERTX_SERVER_PORT; @@ -110,39 +111,40 @@ public class VertxWebsocketHost { SocketAddress socketAddress = webSocket.localAddress(); SocketAddress remote = webSocket.remoteAddress(); - String connectionKey = UUID.randomUUID().toString(); - connectedPeers.put(connectionKey, webSocket); + VertxWebsocketPeer peer = new VertxWebsocketPeer(webSocket, websocketURI.getPath()); + connectedPeers.add(peer); if (LOG.isDebugEnabled()) { if (socketAddress != null) { - LOG.debug("WebSocket peer {} connected from {}", connectionKey, socketAddress.host()); + LOG.debug("WebSocket peer {} connected from {}", peer.getConnectionKey(), socketAddress.host()); } } webSocket.textMessageHandler( - message -> consumer.onMessage(connectionKey, message, remote, routingContext)); + message -> consumer.onMessage(peer.getConnectionKey(), message, remote, routingContext)); webSocket .binaryMessageHandler( - message -> consumer.onMessage(connectionKey, message.getBytes(), remote, + message -> consumer.onMessage(peer.getConnectionKey(), message.getBytes(), remote, routingContext)); webSocket.exceptionHandler( - exception -> consumer.onException(connectionKey, exception, remote, routingContext)); + exception -> consumer.onException(peer.getConnectionKey(), exception, remote, routingContext)); webSocket.closeHandler(closeEvent -> { if (LOG.isDebugEnabled()) { if (socketAddress != null) { - LOG.debug("WebSocket peer {} disconnected from {}", connectionKey, socketAddress.host()); + LOG.debug("WebSocket peer {} disconnected from {}", peer.getConnectionKey(), + socketAddress.host()); } } if (configuration.isFireWebSocketConnectionEvents()) { - consumer.onClose(connectionKey, remote, routingContext); + consumer.onClose(peer.getConnectionKey(), remote, routingContext); } - connectedPeers.remove(connectionKey); + connectedPeers.remove(peer); }); if (configuration.isFireWebSocketConnectionEvents()) { - consumer.onOpen(connectionKey, remote, routingContext, webSocket); + consumer.onOpen(peer.getConnectionKey(), remote, routingContext, webSocket); } } else { // the upgrade failed @@ -238,10 +240,20 @@ public class VertxWebsocketHost { /** * Gets all WebSocket peers connected to the Vert.x HTTP sever together with their associated connection key */ - public Map<String, ServerWebSocket> getConnectedPeers() { + public List<VertxWebsocketPeer> getConnectedPeers() { return connectedPeers; } + /** + * Gets a connected peer for the given connection key + */ + public VertxWebsocketPeer getConnectedPeer(String connectionKey) { + return getConnectedPeers().stream() + .filter(peer -> peer.getConnectionKey().equals(connectionKey)) + .findFirst() + .orElse(null); + } + /** * Gets the port that the server is bound to. This could be a random value if 0 was specified as the initial port * number diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketPeer.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketPeer.java new file mode 100644 index 00000000000..5c401854ce7 --- /dev/null +++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketPeer.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vertx.websocket; + +import java.util.Objects; +import java.util.UUID; + +import io.vertx.core.http.ServerWebSocket; + +/** + * Represents a WebSocket peer connection + */ +public class VertxWebsocketPeer { + private final ServerWebSocket webSocket; + private final String rawPath; + private final String path; + private final String connectionKey; + + public VertxWebsocketPeer(ServerWebSocket webSocket, String rawPath) { + this.webSocket = Objects.requireNonNull(webSocket); + this.rawPath = Objects.requireNonNull(rawPath); + this.path = webSocket.path(); + this.connectionKey = UUID.randomUUID().toString(); + } + + public ServerWebSocket getWebSocket() { + return webSocket; + } + + public String getRawPath() { + return rawPath; + } + + public String getPath() { + return path; + } + + public String getConnectionKey() { + return connectionKey; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + VertxWebsocketPeer that = (VertxWebsocketPeer) o; + return Objects.equals(connectionKey, that.connectionKey); + } + + @Override + public int hashCode() { + return Objects.hash(connectionKey); + } +} diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelperTest.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelperTest.java index 4af16a6f68a..2d2f1d3d312 100644 --- a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelperTest.java +++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelperTest.java @@ -44,13 +44,6 @@ public class VertxWebsocketHelperTest { assertFalse(VertxWebsocketHelper.webSocketHostPathMatches(hostPath, targetPath)); } - @Test - void webSocketHostExactPathWithParamsMatches() { - String hostPath = "/foo/{bar}/cheese/{wine}"; - String targetPath = "/foo/bar/cheese/wine"; - assertTrue(VertxWebsocketHelper.webSocketHostPathMatches(hostPath, targetPath)); - } - @Test void webSocketHostExactPathWithParamsNotMatches() { String hostPath = "/foo/{bar}/cheese/{wine}"; @@ -72,13 +65,6 @@ public class VertxWebsocketHelperTest { assertFalse(VertxWebsocketHelper.webSocketHostPathMatches(hostPath, targetPath)); } - @Test - void webSocketHostWildcardPathWithParamsMatches() { - String hostPath = "/foo/{bar}/cheese/{wine}*"; - String targetPath = "/foo/bar/cheese/wine/beer/additional/path"; - assertTrue(VertxWebsocketHelper.webSocketHostPathMatches(hostPath, targetPath)); - } - @Test void webSocketHostWildcardPathWithParamsNotMatches() { String hostPath = "/foo/{bar}/cheese/{wine}*"; @@ -100,13 +86,6 @@ public class VertxWebsocketHelperTest { assertTrue(VertxWebsocketHelper.webSocketHostPathMatches(hostPath, targetPath)); } - @Test - void webSocketHostWildcardPathWithTrailingSlashStarMatches() { - String hostPath = "/foo/{bar}/cheese/{wine}/*"; - String targetPath = "/foo/bar/cheese/wine/beer/additional/path"; - assertTrue(VertxWebsocketHelper.webSocketHostPathMatches(hostPath, targetPath)); - } - @Test void webSocketHostDefaultPathMatches() { String hostPath = "/"; diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java index fd055af6263..fbae556d594 100644 --- a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java +++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java @@ -119,7 +119,147 @@ public class VertxWebsocketTest extends VertxWebSocketTestSupport { } @Test - public void testSendWithInvalidConnectionKey() throws Exception { + void testSendWithConnectionKeyForParameterizedPath() throws Exception { + int expectedResultCount = 1; + CountDownLatch latch = new CountDownLatch(expectedResultCount); + List<String> results = new ArrayList<>(); + + for (int i = 0; i < 2; i++) { + openWebSocketConnection("localhost", port, "/test/paramA/other/paramB", message -> { + synchronized (latch) { + results.add(message); + latch.countDown(); + } + }); + } + + VertxWebsocketEndpoint endpoint + = context.getEndpoint("vertx-websocket:localhost:" + port + "/test/paramA/other/paramB", + VertxWebsocketEndpoint.class); + Map<String, ServerWebSocket> connectedPeers = endpoint.findPeersForHostPort(); + assertEquals(2, connectedPeers.size()); + + String connectionKey = connectedPeers.keySet().iterator().next(); + + template.sendBodyAndHeader("vertx-websocket:localhost:" + port + "/test/paramA/other/paramB", + "Hello World", + VertxWebsocketConstants.CONNECTION_KEY, connectionKey); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals(expectedResultCount, results.size()); + assertTrue(results.contains("Hello World")); + } + + @Test + void testSendWithConnectionKeyForRawParameterizedPath() throws Exception { + int expectedResultCount = 1; + CountDownLatch latch = new CountDownLatch(expectedResultCount); + List<String> results = new ArrayList<>(); + + for (int i = 0; i < 2; i++) { + openWebSocketConnection("localhost", port, "/test/paramA/other/paramB", message -> { + synchronized (latch) { + results.add(message); + latch.countDown(); + } + }); + } + + VertxWebsocketEndpoint endpoint + = context.getEndpoint("vertx-websocket:localhost:" + port + "/test/paramA/other/paramB", + VertxWebsocketEndpoint.class); + Map<String, ServerWebSocket> connectedPeers = endpoint.findPeersForHostPort(); + assertEquals(2, connectedPeers.size()); + + String connectionKey = connectedPeers.keySet().iterator().next(); + + template.sendBodyAndHeader("vertx-websocket:localhost:" + port + "/test/{paramA}/other/{paramB}", + "Hello World", + VertxWebsocketConstants.CONNECTION_KEY, connectionKey); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals(expectedResultCount, results.size()); + assertTrue(results.contains("Hello World")); + } + + @Test + void testSendWithConnectionKeyForWildcardPath() throws Exception { + int expectedResultCount = 1; + CountDownLatch latch = new CountDownLatch(expectedResultCount); + List<String> results = new ArrayList<>(); + + for (int i = 0; i < 2; i++) { + openWebSocketConnection("localhost", port, "/test/wildcarded/path", message -> { + synchronized (latch) { + results.add(message); + latch.countDown(); + } + }); + } + + openWebSocketConnection("localhost", port, "/test/wildcarded/otherpath", message -> { + synchronized (latch) { + results.add(message); + latch.countDown(); + } + }); + + VertxWebsocketEndpoint endpoint + = context.getEndpoint("vertx-websocket:localhost:" + port + "/test/wildcarded/path", + VertxWebsocketEndpoint.class); + Map<String, ServerWebSocket> connectedPeers = endpoint.findPeersForHostPort(); + assertEquals(2, connectedPeers.size()); + + String connectionKey = connectedPeers.keySet().iterator().next(); + + template.sendBodyAndHeader("vertx-websocket:localhost:" + port + "/test/wildcarded/path", "Hello World", + VertxWebsocketConstants.CONNECTION_KEY, connectionKey); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals(expectedResultCount, results.size()); + assertTrue(results.contains("Hello World")); + } + + @Test + void testSendWithConnectionKeyForRawPath() throws Exception { + int expectedResultCount = 1; + CountDownLatch latch = new CountDownLatch(expectedResultCount); + List<String> results = new ArrayList<>(); + + for (int i = 0; i < 2; i++) { + openWebSocketConnection("localhost", port, "/test/wildcarded/path", message -> { + synchronized (latch) { + results.add(message); + latch.countDown(); + } + }); + } + + openWebSocketConnection("localhost", port, "/test/wildcarded/otherpath", message -> { + synchronized (latch) { + results.add(message); + latch.countDown(); + } + }); + + VertxWebsocketEndpoint endpoint + = context.getEndpoint("vertx-websocket:localhost:" + port + "/test/wildcarded/path", + VertxWebsocketEndpoint.class); + Map<String, ServerWebSocket> connectedPeers = endpoint.findPeersForHostPort(); + assertEquals(2, connectedPeers.size()); + + String connectionKey = connectedPeers.keySet().iterator().next(); + + template.sendBodyAndHeader("vertx-websocket:localhost:" + port + "/test/wildcarded/*", "Hello World", + VertxWebsocketConstants.CONNECTION_KEY, connectionKey); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals(expectedResultCount, results.size()); + assertTrue(results.contains("Hello World")); + } + + @Test + void testSendWithInvalidConnectionKey() throws Exception { MockEndpoint mockEndpoint = getMockEndpoint("mock:result"); mockEndpoint.expectedBodiesReceived("Hello world"); mockEndpoint.setResultWaitTime(500); @@ -249,6 +389,137 @@ public class VertxWebsocketTest extends VertxWebSocketTestSupport { mockEndpoint.assertIsSatisfied(TimeUnit.SECONDS.toMillis(1)); } + @Test + void testSendToAllForExactParameterizedPath() throws Exception { + int expectedResultCount = 5; + CountDownLatch latch = new CountDownLatch(expectedResultCount); + List<String> results = new ArrayList<>(); + + for (int i = 0; i < expectedResultCount; i++) { + openWebSocketConnection("localhost", port, "/test/firstParam/other/secondParam", message -> { + synchronized (latch) { + results.add(message + " " + latch.getCount()); + latch.countDown(); + } + }); + + // Below we produce to an explicit path so this peer should be ignored + openWebSocketConnection("localhost", port, "/test/otherFirstParam/other/otherSecondParam", message -> { + synchronized (latch) { + results.add(message + " " + latch.getCount()); + latch.countDown(); + } + }); + } + + template.sendBody("vertx-websocket:localhost:" + port + "/test/firstParam/other/secondParam?sendToAll=true", + "Hello World"); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals(expectedResultCount, results.size()); + + for (int i = 1; i <= expectedResultCount; i++) { + assertTrue(results.contains("Hello World " + i)); + } + } + + @Test + void testSendToAllForRawParameterizedPath() throws Exception { + int expectedResultCount = 10; + CountDownLatch latch = new CountDownLatch(expectedResultCount); + List<String> results = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + openWebSocketConnection("localhost", port, "/test/firstParam/other/secondParam", message -> { + synchronized (latch) { + results.add(message + " " + latch.getCount()); + latch.countDown(); + } + }); + + openWebSocketConnection("localhost", port, "/test/otherFirstParam/other/otherSecondParam", message -> { + synchronized (latch) { + results.add(message + " " + latch.getCount()); + latch.countDown(); + } + }); + } + + template.sendBody("vertx-websocket:localhost:" + port + "/test/{paramA}/other/{paramB}?sendToAll=true", "Hello World"); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals(expectedResultCount, results.size()); + + for (int i = 1; i <= expectedResultCount; i++) { + assertTrue(results.contains("Hello World " + i)); + } + } + + @Test + void testSendToAllForWildcardPath() throws Exception { + int expectedResultCount = 5; + CountDownLatch latch = new CountDownLatch(expectedResultCount); + List<String> results = new ArrayList<>(); + + for (int i = 0; i < expectedResultCount; i++) { + openWebSocketConnection("localhost", port, "/test/wildcarded/path", message -> { + synchronized (latch) { + results.add(message + " " + latch.getCount()); + latch.countDown(); + } + }); + + // Below we produce to an explicit path so this peer should be ignored + openWebSocketConnection("localhost", port, "/test/wildcarded/other", message -> { + synchronized (latch) { + results.add(message + " " + latch.getCount()); + latch.countDown(); + } + }); + } + + template.sendBody("vertx-websocket:localhost:" + port + "/test/wildcarded/path?sendToAll=true", "Hello World"); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals(expectedResultCount, results.size()); + + for (int i = 1; i <= expectedResultCount; i++) { + assertTrue(results.contains("Hello World " + i)); + } + } + + @Test + void testSendToAllForRawWildcardPath() throws Exception { + int expectedResultCount = 10; + CountDownLatch latch = new CountDownLatch(expectedResultCount); + List<String> results = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + openWebSocketConnection("localhost", port, "/test/wildcarded/path", message -> { + synchronized (latch) { + results.add(message + " " + latch.getCount()); + latch.countDown(); + } + }); + + openWebSocketConnection("localhost", port, "/test/wildcarded/other", message -> { + synchronized (latch) { + results.add(message + " " + latch.getCount()); + latch.countDown(); + } + }); + } + + template.sendBody("vertx-websocket:localhost:" + port + "/test/wildcarded/*?sendToAll=true", "Hello World"); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals(expectedResultCount, results.size()); + + for (int i = 1; i <= expectedResultCount; i++) { + assertTrue(results.contains("Hello World " + i)); + } + } + @Test public void testEchoRoute() throws Exception { CountDownLatch latch = new CountDownLatch(1); @@ -422,6 +693,9 @@ public class VertxWebsocketTest extends VertxWebSocketTestSupport { .setBody(simple("${body} ${header.firstParam} ${header.secondParam}")) .toF("vertx-websocket:localhost:%d/testA/echo/testB", port); + fromF("vertx-websocket:localhost:%d/test/{paramA}/other/{paramB}", port) + .setBody(simple("${header.firstParam} ${header.secondParam}")); + fromF("vertx-websocket:localhost:%d/query/params", port) .setBody(simple("${header.firstParam} ${header.secondParam}")) .to("mock:queryParamResult"); @@ -448,6 +722,9 @@ public class VertxWebsocketTest extends VertxWebSocketTestSupport { fromF("vertx-websocket:localhost:%d/wildcard/echo*", port) .setBody().simple("${body} World") .toF("vertx-websocket:localhost:%d/wildcard/echo/foo/bar", port); + + fromF("vertx-websocket:localhost:%d/test/wildcarded*", port) + .setBody().simple("Hello ${body} from the wildcard path"); } }; }