Repository: camel Updated Branches: refs/heads/master 6407d35c0 -> fedb9c609
CAMEL-9566: Improved camel-ahc-ws to better re-connect in case of ws failures. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fedb9c60 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fedb9c60 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fedb9c60 Branch: refs/heads/master Commit: fedb9c6097e248e26f9b589cc1428fbafe1131b5 Parents: 6407d35 Author: Claus Ibsen <[email protected]> Authored: Thu Mar 17 08:40:08 2016 +0100 Committer: Claus Ibsen <[email protected]> Committed: Thu Mar 17 08:41:19 2016 +0100 ---------------------------------------------------------------------- .../camel/component/ahc/ws/WsConsumer.java | 11 ++- .../camel/component/ahc/ws/WsEndpoint.java | 79 +++++++++++++------- .../ahc/ws/WsProducerConsumerTest.java | 25 ++++++- 3 files changed, 88 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/fedb9c60/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java index ab8b5a5..808d76d 100644 --- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java +++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java @@ -51,6 +51,10 @@ public class WsConsumer extends DefaultConsumer { sendMessageInternal(message); } + public void sendMessage(Throwable throwable) { + sendMessageInternal(throwable); + } + public void sendMessage(byte[] message) { sendMessageInternal(message); } @@ -68,7 +72,12 @@ public class WsConsumer extends DefaultConsumer { //TODO may set some headers with some meta info (e.g., socket info, unique-id for correlation purpose, etc0 // set the body - exchange.getIn().setBody(message); + + if (message instanceof Throwable) { + exchange.setException((Throwable) message); + } else { + exchange.getIn().setBody(message); + } // send exchange using the async routing engine getAsyncProcessor().process(exchange, new AsyncCallback() { http://git-wip-us.apache.org/repos/asf/camel/blob/fedb9c60/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java index 5187673..b250c2c 100644 --- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java +++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java @@ -23,9 +23,8 @@ import com.ning.http.client.AsyncHttpClient; import com.ning.http.client.AsyncHttpClientConfig; import com.ning.http.client.AsyncHttpProvider; import com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider; +import com.ning.http.client.ws.DefaultWebSocketListener; import com.ning.http.client.ws.WebSocket; -import com.ning.http.client.ws.WebSocketByteListener; -import com.ning.http.client.ws.WebSocketTextListener; import com.ning.http.client.ws.WebSocketUpgradeHandler; import org.apache.camel.Consumer; import org.apache.camel.Processor; @@ -49,24 +48,18 @@ public class WsEndpoint extends AhcEndpoint { probeClass("com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider"); private final Set<WsConsumer> consumers = new HashSet<WsConsumer>(); + private final WsListener listener = new WsListener(); + private transient WebSocket websocket; - private WebSocket websocket; - @UriParam + @UriParam(label = "producer") private boolean useStreaming; + @UriParam(label = "consumer") + private boolean sendMessageOnError; public WsEndpoint(String endpointUri, WsComponent component) { super(endpointUri, component, null); } - private static boolean probeClass(String name) { - try { - Class.forName(name, true, WsEndpoint.class.getClassLoader()); - return true; - } catch (Throwable t) { - return false; - } - } - @Override public WsComponent getComponent() { return (WsComponent) super.getComponent(); @@ -84,9 +77,8 @@ public class WsEndpoint extends AhcEndpoint { WebSocket getWebSocket() throws Exception { synchronized (this) { - if (websocket == null) { - connect(); - } + // ensure we are connected + reConnect(); } return websocket; } @@ -106,6 +98,17 @@ public class WsEndpoint extends AhcEndpoint { this.useStreaming = useStreaming; } + public boolean isSendMessageOnError() { + return sendMessageOnError; + } + + /** + * Whether to send an message if the web-socket listener received an error. + */ + public void setSendMessageOnError(boolean sendMessageOnError) { + this.sendMessageOnError = sendMessageOnError; + } + @Override protected AsyncHttpClient createClient(AsyncHttpClientConfig config) { AsyncHttpClient client; @@ -127,7 +130,7 @@ public class WsEndpoint extends AhcEndpoint { LOG.debug("Connecting to {}", uri); websocket = getClient().prepareGet(uri).execute( new WebSocketUpgradeHandler.Builder() - .addWebSocketListener(new WsListener()).build()).get(); + .addWebSocketListener(listener).build()).get(); } @Override @@ -136,6 +139,7 @@ public class WsEndpoint extends AhcEndpoint { if (LOG.isDebugEnabled()) { LOG.debug("Disconnecting from {}", getHttpUri().toASCIIString()); } + websocket.removeWebSocketListener(listener); websocket.close(); websocket = null; } @@ -144,31 +148,46 @@ public class WsEndpoint extends AhcEndpoint { void connect(WsConsumer wsConsumer) throws Exception { consumers.add(wsConsumer); - - if (websocket == null || !websocket.isOpen()) { - connect(); - } + reConnect(); } void disconnect(WsConsumer wsConsumer) { consumers.remove(wsConsumer); } - class WsListener implements WebSocketTextListener, WebSocketByteListener { + void reConnect() throws Exception { + if (websocket == null || !websocket.isOpen()) { + String uri = getHttpUri().toASCIIString(); + LOG.info("Reconnecting websocket: {}", uri); + connect(); + } + } + + class WsListener extends DefaultWebSocketListener { @Override public void onOpen(WebSocket websocket) { - LOG.debug("websocket opened"); + LOG.debug("Websocket opened"); } @Override public void onClose(WebSocket websocket) { - LOG.debug("websocket closed"); + LOG.debug("websocket closed - reconnecting"); + try { + reConnect(); + } catch (Exception e) { + LOG.warn("Error re-connecting to websocket", e); + } } @Override public void onError(Throwable t) { - LOG.error("websocket on error", t); + LOG.debug("websocket on error", t); + if (isSendMessageOnError()) { + for (WsConsumer consumer : consumers) { + consumer.sendMessage(t); + } + } } @Override @@ -195,4 +214,14 @@ public class WsEndpoint extends AhcEndpoint { } return null; } + + private static boolean probeClass(String name) { + try { + Class.forName(name, true, WsEndpoint.class.getClassLoader()); + return true; + } catch (Throwable t) { + return false; + } + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/fedb9c60/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java b/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java index cf7403d..ee7caf8 100644 --- a/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java +++ b/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java @@ -88,7 +88,7 @@ public class WsProducerConsumerTest extends CamelTestSupport { } @Test - public void testTwoRoutesRestart() throws Exception { + public void testTwoRoutesRestartConsumer() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived(TEST_MESSAGE); @@ -110,6 +110,29 @@ public class WsProducerConsumerTest extends CamelTestSupport { mock.assertIsSatisfied(); } + @Test + public void testTwoRoutesRestartProducer() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived(TEST_MESSAGE); + + template.sendBody("direct:input", TEST_MESSAGE); + + mock.assertIsSatisfied(); + + resetMocks(); + + log.info("Restarting foo route"); + context.stopRoute("foo"); + Thread.sleep(500); + context.startRoute("foo"); + + mock.expectedBodiesReceived(TEST_MESSAGE); + + template.sendBody("direct:input", TEST_MESSAGE); + + mock.assertIsSatisfied(); + } + @Override protected RouteBuilder[] createRouteBuilders() throws Exception { RouteBuilder[] rbs = new RouteBuilder[2];
