Repository: camel Updated Branches: refs/heads/master 8e07c18c3 -> c08a60503
undertow async producer Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d03012bb Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d03012bb Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d03012bb Branch: refs/heads/master Commit: d03012bbbfc87bf84ceb71cacf2d53efdee208f0 Parents: 8e07c18 Author: Claus Ibsen <[email protected]> Authored: Wed Jul 15 14:31:51 2015 +0200 Committer: Claus Ibsen <[email protected]> Committed: Wed Jul 15 14:31:51 2015 +0200 ---------------------------------------------------------------------- .../component/undertow/UndertowProducer.java | 80 ++++++++++++-------- .../undertow/UndertowComponentTest.java | 15 ++-- 2 files changed, 54 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d03012bb/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java index 23e21a3..e1330d7 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java @@ -26,10 +26,11 @@ import io.undertow.client.ClientRequest; import io.undertow.client.UndertowClient; import io.undertow.util.Headers; import io.undertow.util.Protocols; +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.TypeConverter; -import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.impl.DefaultAsyncProducer; import org.apache.camel.util.ExchangeHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,12 +43,12 @@ import org.xnio.XnioWorker; /** * The Undertow producer. - * + * <p/> * The implementation of Producer is considered as experimental. The Undertow client classes are not thread safe, * their purpose is for the reverse proxy usage inside Undertow itself. This may change in the future versions and * general purpose HTTP client wrapper will be added. Therefore this Producer may be changed too. */ -public class UndertowProducer extends DefaultProducer { +public class UndertowProducer extends DefaultAsyncProducer { private static final Logger LOG = LoggerFactory.getLogger(UndertowProducer.class); private UndertowEndpoint endpoint; @@ -61,31 +62,36 @@ public class UndertowProducer extends DefaultProducer { return endpoint; } - public void setEndpoint(UndertowEndpoint endpoint) { - this.endpoint = endpoint; - } + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + try { + final UndertowClient client = UndertowClient.getInstance(); + XnioWorker worker = Xnio.getInstance().createWorker(OptionMap.EMPTY); - // TODO: use async routing engine + IoFuture<ClientConnection> connect = client.connect(endpoint.getHttpURI(), worker, new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8192, 8192 * 8192), OptionMap.EMPTY); - @Override - public void process(Exchange exchange) throws Exception { - final UndertowClient client = UndertowClient.getInstance(); - XnioWorker worker = Xnio.getInstance().createWorker(OptionMap.EMPTY); - IoFuture<ClientConnection> connect = client.connect(endpoint.getHttpURI(), worker, new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8192, 8192 * 8192), OptionMap.EMPTY); + ClientRequest request = new ClientRequest(); + request.setProtocol(Protocols.HTTP_1_1); - ClientRequest request = new ClientRequest(); - request.setProtocol(Protocols.HTTP_1_1); + Object body = getRequestBody(request, exchange); - Object body = getRequestBody(request, exchange); + TypeConverter tc = endpoint.getCamelContext().getTypeConverter(); + ByteBuffer bodyAsByte = tc.convertTo(ByteBuffer.class, body); - TypeConverter tc = endpoint.getCamelContext().getTypeConverter(); - ByteBuffer bodyAsByte = tc.convertTo(ByteBuffer.class, body); + if (body != null) { + request.getRequestHeaders().put(Headers.CONTENT_LENGTH, bodyAsByte.array().length); + } - if (body != null) { - request.getRequestHeaders().put(Headers.CONTENT_LENGTH, bodyAsByte.array().length); + connect.get().sendRequest(request, new UndertowProducerCallback(bodyAsByte, exchange, callback)); + + } catch (IOException e) { + exchange.setException(e); + callback.done(true); + return true; } - connect.get().sendRequest(request, new UndertowProducerCallback(bodyAsByte, exchange)); + // use async routing engine + return false; } private Object getRequestBody(ClientRequest request, Exchange camelExchange) { @@ -99,38 +105,46 @@ public class UndertowProducer extends DefaultProducer { */ private class UndertowProducerCallback implements ClientCallback<ClientExchange> { - private ByteBuffer body; - private Exchange camelExchange; + private final ByteBuffer body; + private final Exchange camelExchange; + private final AsyncCallback callback; - public UndertowProducerCallback(ByteBuffer body, Exchange camelExchange) { + public UndertowProducerCallback(ByteBuffer body, Exchange camelExchange, AsyncCallback callback) { this.body = body; this.camelExchange = camelExchange; + this.callback = callback; } + // TODO: Add some logging of those events at trace or debug level + @Override public void completed(ClientExchange clientExchange) { clientExchange.setResponseListener(new ClientCallback<ClientExchange>() { @Override public void completed(ClientExchange clientExchange) { - Message message = null; try { - message = endpoint.getUndertowHttpBinding().toCamelMessage(clientExchange, camelExchange); + Message message = endpoint.getUndertowHttpBinding().toCamelMessage(clientExchange, camelExchange); + if (ExchangeHelper.isOutCapable(camelExchange)) { + camelExchange.setOut(message); + } else { + camelExchange.setIn(message); + } } catch (Exception e) { camelExchange.setException(e); + } finally { + // make sure to call callback + callback.done(false); } - if (ExchangeHelper.isOutCapable(camelExchange)) { - camelExchange.setOut(message); - } else { - camelExchange.setIn(message); - } - } @Override public void failed(IOException e) { camelExchange.setException(e); + // make sure to call callback + callback.done(false); } }); + try { //send body if exists if (body != null) { @@ -138,12 +152,16 @@ public class UndertowProducer extends DefaultProducer { } } catch (IOException e) { camelExchange.setException(e); + // make sure to call callback + callback.done(false); } } @Override public void failed(IOException e) { camelExchange.setException(e); + // make sure to call callback + callback.done(false); } } http://git-wip-us.apache.org/repos/asf/camel/blob/d03012bb/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowComponentTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowComponentTest.java index 7eac71e..91b526d 100644 --- a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowComponentTest.java +++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowComponentTest.java @@ -28,24 +28,19 @@ public class UndertowComponentTest extends BaseUndertowTest { @Test public void testUndertow() throws Exception { + MockEndpoint mockEndpoint = getMockEndpoint("mock:myapp"); + mockEndpoint.expectedHeaderReceived(Exchange.HTTP_METHOD, "GET"); + LOG.debug("Number of exchanges in mock:myapp" + mockEndpoint.getExchanges().size()); - - String response = template.requestBody("undertow://http://localhost:{{port}}/myapp", "Hello Camel!", String.class); - + String response = template.requestBody("undertow:http://localhost:{{port}}/myapp", "Hello Camel!", String.class); assertNotNull(response); - assertEquals("Hello Camel!", response); - MockEndpoint mockEndpoint = getMockEndpoint("mock:myapp"); - mockEndpoint.expectedHeaderReceived(Exchange.HTTP_METHOD, "GET"); - LOG.debug("Number of exchanges in mock:myapp" + mockEndpoint.getExchanges().size()); + mockEndpoint.assertIsSatisfied(); for (Exchange exchange : mockEndpoint.getExchanges()) { assertEquals("Hello Camel! Bye Camel!", exchange.getIn().getBody(String.class)); } - - mockEndpoint.assertIsSatisfied(); - } @Override
