Repository: camel Updated Branches: refs/heads/camel-2.16.x 1cc925e76 -> a3e4949d3
CAMEL-9566: Improved camel-ahc-ws to better re-connect in case of ws failures. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a3e4949d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a3e4949d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a3e4949d Branch: refs/heads/camel-2.16.x Commit: a3e4949d3a2bc220784be15a45a20a7b9a1393a5 Parents: 1cc925e Author: Claus Ibsen <[email protected]> Authored: Thu Mar 17 08:40:08 2016 +0100 Committer: Claus Ibsen <[email protected]> Committed: Thu Mar 17 18:02:56 2016 +0100 ---------------------------------------------------------------------- .../camel/component/ahc/ws/WsConsumer.java | 11 ++- .../camel/component/ahc/ws/WsEndpoint.java | 81 +++++++++++++------- .../ahc/ws/WsProducerConsumerTest.java | 47 +++++++++++- 3 files changed, 111 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a3e4949d/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java index ab8b5a5..808d76d 100644 --- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java +++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java @@ -51,6 +51,10 @@ public class WsConsumer extends DefaultConsumer { sendMessageInternal(message); } + public void sendMessage(Throwable throwable) { + sendMessageInternal(throwable); + } + public void sendMessage(byte[] message) { sendMessageInternal(message); } @@ -68,7 +72,12 @@ public class WsConsumer extends DefaultConsumer { //TODO may set some headers with some meta info (e.g., socket info, unique-id for correlation purpose, etc0 // set the body - exchange.getIn().setBody(message); + + if (message instanceof Throwable) { + exchange.setException((Throwable) message); + } else { + exchange.getIn().setBody(message); + } // send exchange using the async routing engine getAsyncProcessor().process(exchange, new AsyncCallback() { http://git-wip-us.apache.org/repos/asf/camel/blob/a3e4949d/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java index 86c1fd0..88e1760 100644 --- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java +++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java @@ -23,9 +23,8 @@ import com.ning.http.client.AsyncHttpClient; import com.ning.http.client.AsyncHttpClientConfig; import com.ning.http.client.AsyncHttpProvider; import com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider; +import com.ning.http.client.ws.DefaultWebSocketListener; import com.ning.http.client.ws.WebSocket; -import com.ning.http.client.ws.WebSocketByteListener; -import com.ning.http.client.ws.WebSocketTextListener; import com.ning.http.client.ws.WebSocketUpgradeHandler; import org.apache.camel.Consumer; import org.apache.camel.Processor; @@ -45,25 +44,19 @@ public class WsEndpoint extends AhcEndpoint { private static final boolean GRIZZLY_AVAILABLE = probeClass("com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider"); - private final Set<WsConsumer> consumers = new HashSet<WsConsumer>(); + private final Set<WsConsumer> consumers = new HashSet<WsConsumer>(); + private final WsListener listener = new WsListener(); + private transient WebSocket websocket; - private WebSocket websocket; - @UriParam + @UriParam(label = "producer") private boolean useStreaming; + @UriParam(label = "consumer") + private boolean sendMessageOnError; public WsEndpoint(String endpointUri, WsComponent component) { super(endpointUri, component, null); } - private static boolean probeClass(String name) { - try { - Class.forName(name, true, WsEndpoint.class.getClassLoader()); - return true; - } catch (Throwable t) { - return false; - } - } - @Override public WsComponent getComponent() { return (WsComponent) super.getComponent(); @@ -81,9 +74,8 @@ public class WsEndpoint extends AhcEndpoint { WebSocket getWebSocket() throws Exception { synchronized (this) { - if (websocket == null) { - connect(); - } + // ensure we are connected + reConnect(); } return websocket; } @@ -103,6 +95,17 @@ public class WsEndpoint extends AhcEndpoint { this.useStreaming = useStreaming; } + public boolean isSendMessageOnError() { + return sendMessageOnError; + } + + /** + * Whether to send an message if the web-socket listener received an error. + */ + public void setSendMessageOnError(boolean sendMessageOnError) { + this.sendMessageOnError = sendMessageOnError; + } + @Override protected AsyncHttpClient createClient(AsyncHttpClientConfig config) { AsyncHttpClient client; @@ -124,7 +127,7 @@ public class WsEndpoint extends AhcEndpoint { LOG.debug("Connecting to {}", uri); websocket = getClient().prepareGet(uri).execute( new WebSocketUpgradeHandler.Builder() - .addWebSocketListener(new WsListener()).build()).get(); + .addWebSocketListener(listener).build()).get(); } @Override @@ -133,6 +136,7 @@ public class WsEndpoint extends AhcEndpoint { if (LOG.isDebugEnabled()) { LOG.debug("Disconnecting from {}", getHttpUri().toASCIIString()); } + websocket.removeWebSocketListener(listener); websocket.close(); websocket = null; } @@ -141,31 +145,46 @@ public class WsEndpoint extends AhcEndpoint { void connect(WsConsumer wsConsumer) throws Exception { consumers.add(wsConsumer); - - if (websocket == null || !websocket.isOpen()) { - connect(); - } + reConnect(); } void disconnect(WsConsumer wsConsumer) { consumers.remove(wsConsumer); } - class WsListener implements WebSocketTextListener, WebSocketByteListener { + void reConnect() throws Exception { + if (websocket == null || !websocket.isOpen()) { + String uri = getHttpUri().toASCIIString(); + LOG.info("Reconnecting websocket: {}", uri); + connect(); + } + } + + class WsListener extends DefaultWebSocketListener { @Override public void onOpen(WebSocket websocket) { - LOG.debug("websocket opened"); + LOG.debug("Websocket opened"); } @Override public void onClose(WebSocket websocket) { - LOG.debug("websocket closed"); + LOG.debug("websocket closed - reconnecting"); + try { + reConnect(); + } catch (Exception e) { + LOG.warn("Error re-connecting to websocket", e); + } } @Override public void onError(Throwable t) { - LOG.error("websocket on error", t); + LOG.debug("websocket on error", t); + if (isSendMessageOnError()) { + for (WsConsumer consumer : consumers) { + consumer.sendMessage(t); + } + } } @Override @@ -192,4 +211,14 @@ public class WsEndpoint extends AhcEndpoint { } return null; } + + private static boolean probeClass(String name) { + try { + Class.forName(name, true, WsEndpoint.class.getClassLoader()); + return true; + } catch (Throwable t) { + return false; + } + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/a3e4949d/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java b/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java index 54d8b09..e3d35d6 100644 --- a/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java +++ b/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java @@ -87,7 +87,52 @@ public class WsProducerConsumerTest extends CamelTestSupport { mock.assertIsSatisfied(); } - + @Test + public void testTwoRoutesRestartConsumer() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived(TEST_MESSAGE); + + template.sendBody("direct:input", TEST_MESSAGE); + + mock.assertIsSatisfied(); + + resetMocks(); + + log.info("Restarting bar route"); + context.stopRoute("bar"); + Thread.sleep(500); + context.startRoute("bar"); + + mock.expectedBodiesReceived(TEST_MESSAGE); + + template.sendBody("direct:input", TEST_MESSAGE); + + mock.assertIsSatisfied(); + } + + @Test + public void testTwoRoutesRestartProducer() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived(TEST_MESSAGE); + + template.sendBody("direct:input", TEST_MESSAGE); + + mock.assertIsSatisfied(); + + resetMocks(); + + log.info("Restarting foo route"); + context.stopRoute("foo"); + Thread.sleep(500); + context.startRoute("foo"); + + mock.expectedBodiesReceived(TEST_MESSAGE); + + template.sendBody("direct:input", TEST_MESSAGE); + + mock.assertIsSatisfied(); + } + @Override protected RouteBuilder[] createRouteBuilders() throws Exception { RouteBuilder[] rbs = new RouteBuilder[2];
