Repository: camel Updated Branches: refs/heads/master e5c130b3e -> 36297987b
[CAMEL-9364] Add ability to receive onOpen/onClose/onError websocket events through camel rout. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/36297987 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/36297987 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/36297987 Branch: refs/heads/master Commit: 36297987bb152beacffcfc5e9ae1ee4a050752a3 Parents: e5c130b Author: Pavlo Kletsko <pklet...@gmail.com> Authored: Thu Nov 26 10:30:04 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Nov 26 11:29:16 2015 +0100 ---------------------------------------------------------------------- .../websocket/CamelWebSocketServlet.java | 16 +++++- .../websocket/WebsocketConstants.java | 7 ++- .../atmosphere/websocket/WebsocketConsumer.java | 37 +++++++++---- .../atmosphere/websocket/WebsocketHandler.java | 11 +++- .../WebsocketCamelRouterTestSupport.java | 4 +- .../websocket/WebsocketRouteTest.java | 57 +++++++++++++++++++- 6 files changed, 115 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/36297987/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/CamelWebSocketServlet.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/CamelWebSocketServlet.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/CamelWebSocketServlet.java index 07c9779..6c9be21 100644 --- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/CamelWebSocketServlet.java +++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/CamelWebSocketServlet.java @@ -17,6 +17,7 @@ package org.apache.camel.component.atmosphere.websocket; import java.io.IOException; +import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -34,6 +35,19 @@ import org.apache.camel.http.common.HttpConsumer; */ public class CamelWebSocketServlet extends CamelHttpTransportServlet { private static final long serialVersionUID = 1764707448550670635L; + private static final String RESEND_ALL_WEBSOCKET_EVENTS_PARAM_KEY = "events"; + private boolean enableEventsResending; + + @Override + public void init(ServletConfig config) throws ServletException { + super.init(config); + + String eventsResendingParameter = config.getInitParameter(RESEND_ALL_WEBSOCKET_EVENTS_PARAM_KEY); + if ("true".equals(eventsResendingParameter)) { + log.debug("Events resending enabled"); + enableEventsResending = true; + } + } @Override protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { @@ -72,7 +86,7 @@ public class CamelWebSocketServlet extends CamelHttpTransportServlet { } log.debug("Dispatching to Websocket Consumer at {}", consumer.getPath()); - ((WebsocketConsumer)consumer).service(request, response); + ((WebsocketConsumer)consumer).service(request, response, enableEventsResending); } } http://git-wip-us.apache.org/repos/asf/camel/blob/36297987/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java index 9b21354..b85b039 100644 --- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java +++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java @@ -23,7 +23,12 @@ public final class WebsocketConstants { public static final String CONNECTION_KEY = "websocket.connectionKey"; public static final String SEND_TO_ALL = "websocket.sendToAll"; - + public static final String EVENT_TYPE = "websocket.eventType"; + + public static final int ONOPEN_EVENT_TYPE = 1; + public static final int ONCLOSE_EVENT_TYPE = 0; + public static final int ONERROR_EVENT_TYPE = -1; + private WebsocketConstants() { //helper class } http://git-wip-us.apache.org/repos/asf/camel/blob/36297987/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java index 22beae5..934fa5f 100644 --- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java +++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java @@ -17,7 +17,6 @@ package org.apache.camel.component.atmosphere.websocket; import java.io.IOException; - import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -28,22 +27,17 @@ import org.apache.camel.Processor; import org.apache.camel.component.servlet.ServletConsumer; import org.atmosphere.cpr.ApplicationConfig; import org.atmosphere.cpr.AtmosphereFramework; -import org.atmosphere.cpr.AtmosphereRequest; import org.atmosphere.cpr.AtmosphereRequestImpl; -import org.atmosphere.cpr.AtmosphereResponse; import org.atmosphere.cpr.AtmosphereResponseImpl; import org.atmosphere.websocket.WebSocketProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * */ public class WebsocketConsumer extends ServletConsumer { - private static final transient Logger LOG = LoggerFactory.getLogger(WebsocketConsumer.class); - private AtmosphereFramework framework; - + private boolean enableEventsResending; + public WebsocketConsumer(WebsocketEndpoint endpoint, Processor processor) { super(endpoint, processor); this.framework = new AtmosphereFramework(false, true); @@ -58,8 +52,7 @@ public class WebsocketConsumer extends ServletConsumer { if (wsp instanceof WebsocketHandler) { ((WebsocketHandler)wsp).setConsumer(this); } else { - // this should not normally happen - LOG.error("unexpected WebSocketHandler: {}", wsp); + throw new IllegalArgumentException("Unexpected WebSocketHandler: " + wsp); } } @@ -68,8 +61,9 @@ public class WebsocketConsumer extends ServletConsumer { return (WebsocketEndpoint)super.getEndpoint(); } - void service(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { + void service(HttpServletRequest request, HttpServletResponse response, boolean enableEventsResending) throws IOException, ServletException { framework.doCometSupport(AtmosphereRequestImpl.wrap(request), AtmosphereResponseImpl.wrap(response)); + this.enableEventsResending = enableEventsResending; } public void sendMessage(final String connectionKey, Object message) { @@ -88,4 +82,25 @@ public class WebsocketConsumer extends ServletConsumer { } }); } + + public void sendEventNotification(String connectionKey, int eventType) { + final Exchange exchange = getEndpoint().createExchange(); + + // set header + exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY, connectionKey); + exchange.getIn().setHeader(WebsocketConstants.EVENT_TYPE, eventType); + + // send exchange using the async routing engine + getAsyncProcessor().process(exchange, new AsyncCallback() { + public void done(boolean doneSync) { + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); + } + } + }); + } + + public boolean isEnableEventsResending() { + return enableEventsResending; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/36297987/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketHandler.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketHandler.java index 09d803d..fd5f836 100644 --- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketHandler.java +++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketHandler.java @@ -41,14 +41,17 @@ public class WebsocketHandler implements WebSocketProtocol { @Override public void onClose(WebSocket webSocket) { LOG.debug("closing websocket"); + String connectionKey = store.getConnectionKey(webSocket); + sendEventNotification(connectionKey, WebsocketConstants.ONCLOSE_EVENT_TYPE); store.removeWebSocket(webSocket); - LOG.debug("websocket closed"); } @Override public void onError(WebSocket webSocket, WebSocketException t) { LOG.error("websocket on error", t); + String connectionKey = store.getConnectionKey(webSocket); + sendEventNotification(connectionKey, WebsocketConstants.ONERROR_EVENT_TYPE); } @Override @@ -56,6 +59,7 @@ public class WebsocketHandler implements WebSocketProtocol { LOG.debug("opening websocket"); String connectionKey = UUID.randomUUID().toString(); store.addWebSocket(connectionKey, webSocket); + sendEventNotification(connectionKey, WebsocketConstants.ONOPEN_EVENT_TYPE); LOG.debug("websocket opened"); } @@ -89,4 +93,9 @@ public class WebsocketHandler implements WebSocketProtocol { this.store = consumer.getEndpoint().getWebSocketStore(); } + private void sendEventNotification(final String connectionKey, final int eventType) { + if (consumer.isEnableEventsResending()) { + consumer.sendEventNotification(connectionKey, eventType); + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/36297987/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketCamelRouterTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketCamelRouterTestSupport.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketCamelRouterTestSupport.java index 569354f..95a23bc 100644 --- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketCamelRouterTestSupport.java +++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketCamelRouterTestSupport.java @@ -34,6 +34,8 @@ public class WebsocketCamelRouterTestSupport extends CamelTestSupport { protected Server server; + protected ServletHolder servletHolder; + @Before public void setUp() throws Exception { server = new Server(); @@ -46,7 +48,7 @@ public class WebsocketCamelRouterTestSupport extends CamelTestSupport { context.setContextPath("/"); server.setHandler(context); - ServletHolder servletHolder = new ServletHolder(new CamelWebSocketServlet()); + servletHolder = new ServletHolder(new CamelWebSocketServlet()); servletHolder.setName("CamelWsServlet"); context.addServlet(servletHolder, "/*"); http://git-wip-us.apache.org/repos/asf/camel/blob/36297987/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java index f48c651..dd4063b 100644 --- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java +++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java @@ -116,6 +116,23 @@ public class WebsocketRouteTest extends WebsocketCamelRouterTestSupport { wsclient2.close(); } + @Test + public void testWebsocketEventsResendingEnabled() throws Exception { + servletHolder.setInitParameter("events", "true"); + + TestClient wsclient = new TestClient("ws://localhost:" + PORT + "/hola4"); + wsclient.connect(); + wsclient.close(); + } + + @Test + public void testWebsocketEventsResendingDisabled() throws Exception { + TestClient wsclient = new TestClient("ws://localhost:" + PORT + "/hola5"); + wsclient.connect(); + assertFalse(wsclient.await(10)); + wsclient.close(); + } + // START SNIPPET: payload protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { @@ -140,7 +157,20 @@ public class WebsocketRouteTest extends WebsocketCamelRouterTestSupport { createResponse(exchange, true); } }).to("atmosphere-websocket:///hola3"); - + + // route for events resending enabled + from("atmosphere-websocket:///hola4").to("log:info").process(new Processor() { + public void process(final Exchange exchange) throws Exception { + checkEventsResendingEnabled(exchange); + } + }); + + // route for events resending disabled + from("atmosphere-websocket:///hola5").to("log:info").process(new Processor() { + public void process(final Exchange exchange) throws Exception { + checkEventsResendingDisabled(exchange); + } + }).to("atmosphere-websocket:///hola5"); } }; } @@ -163,7 +193,30 @@ public class WebsocketRouteTest extends WebsocketCamelRouterTestSupport { exchange.getIn().setBody(createByteResponse(readAll((InputStream)msg))); } } - + + private static void checkEventsResendingEnabled(Exchange exchange) { + Object connectionKey = exchange.getIn().getHeader(WebsocketConstants.CONNECTION_KEY); + Object eventType = exchange.getIn().getHeader(WebsocketConstants.EVENT_TYPE); + Object msg = exchange.getIn().getBody(); + + assertEquals(null, msg); + assertTrue(connectionKey != null); + + if (eventType instanceof Integer) { + assertTrue(eventType.equals(1) || eventType.equals(0) || eventType.equals(-1)); + } + } + + private static void checkEventsResendingDisabled(Exchange exchange) { + Object eventType = exchange.getIn().getHeader(WebsocketConstants.EVENT_TYPE); + + if (eventType instanceof Integer) { + if (eventType.equals(1) || eventType.equals(0) || eventType.equals(-1)) { + exchange.getIn().setBody("Error. This place should never be reached."); + } + } + } + private static byte[] createByteResponse(byte[] req) { byte[] resp = new byte[req.length + RESPONSE_GREETING_BYTES.length]; System.arraycopy(RESPONSE_GREETING_BYTES, 0, resp, 0, RESPONSE_GREETING_BYTES.length);