Tuomas Kiviaho created CAMEL-9370: ------------------------------------- Summary: Support for async/deferred content Key: CAMEL-9370 URL: https://issues.apache.org/jira/browse/CAMEL-9370 Project: Camel Issue Type: Improvement Components: camel-jetty Affects Versions: 2.16.1 Reporter: Tuomas Kiviaho
I'm receiving {{text/event-stream}} and to my surprise there was no support of anykind for {{AsyncContentProvider}}. Hence here are my dealings with the issue in a form of a patch. Some of the bits would work out of the box as-is, but others (such as {{jettyBinding instanceof Response.ResponseListener}}). I've rammed in just to get it working. It would be nice if this type of streaming could be enabled without having to declare {{jettyBinding}}. A new option for instance would be more practical solution. {code:title=org/apache/camel/component/jetty/DefaultJettyHttpBinding.java} @@ -191,7 +191,8 @@ } } else { // just grab the raw content body - return httpExchange.getBody(); + byte[] body = httpExchange.getBody(); + return body == null ? httpExchange.getResponseContentProvider() : body; } } {code} {code:title=org/apache/camel/component/jetty/JettyContentExchange.java} @@ -25,6 +25,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentProvider; public interface JettyContentExchange { @@ -44,6 +45,8 @@ void setRequestContent(String data, String charset) throws UnsupportedEncodingException; void setRequestContent(InputStream ins); + + void setRequestContent(ContentProvider contentProvider); void addRequestHeader(String key, String s); @@ -63,6 +66,8 @@ int getResponseStatus(); byte[] getResponseContentBytes(); + + ContentProvider getResponseContentProvider(); Map<String, Collection<String>> getResponseHeaders(); {code} {code:title=org/apache/camel/component/jetty/JettyHttpProducer.java} @@ -32,7 +32,6 @@ import org.apache.camel.Message; import org.apache.camel.http.common.HttpConstants; import org.apache.camel.http.common.HttpHelper; -import org.apache.camel.http.common.HttpMethods; import org.apache.camel.impl.DefaultAsyncProducer; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.util.ExchangeHelper; @@ -40,6 +39,7 @@ import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.util.component.LifeCycle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -160,6 +160,9 @@ // (for example application/x-www-form-urlencoded forms being sent) String charset = IOHelper.getCharsetName(exchange, false); httpExchange.setRequestContent(data, charset); + } else if (body instanceof ContentProvider) { + ContentProvider contentProvider = (ContentProvider) body; + httpExchange.setRequestContent(contentProvider); } else { // then fallback to input stream InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, exchange.getIn().getBody()); {code} {code:title=org/apache/camel/component/jetty9/JettyContentExchange9.java} @@ -21,6 +21,7 @@ import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.net.MalformedURLException; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.Map; import java.util.TreeMap; @@ -35,14 +36,18 @@ import org.apache.camel.component.jetty.JettyContentExchange; import org.apache.camel.component.jetty.JettyHttpBinding; import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.Synchronizable; +import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.BytesContentProvider; +import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.client.util.InputStreamContentProvider; import org.eclipse.jetty.client.util.StringContentProvider; import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.util.Callback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +65,7 @@ private final CountDownLatch done = new CountDownLatch(1); private Request request; private Response response; - private byte[] responseContent; + private Object responseContent; private String requestContentType; @@ -183,16 +188,20 @@ } public void setRequestContent(byte[] byteArray) { - this.request.content(new BytesContentProvider(byteArray), this.requestContentType); + this.setRequestContent(new BytesContentProvider(byteArray)); } public void setRequestContent(String data, String charset) throws UnsupportedEncodingException { StringContentProvider cp = charset != null ? new StringContentProvider(data, charset) : new StringContentProvider(data); - this.request.content(cp, this.requestContentType); + this.setRequestContent(cp); } public void setRequestContent(InputStream ins) { - this.request.content(new InputStreamContentProvider(ins), this.requestContentType); + this.setRequestContent(new InputStreamContentProvider(ins)); + } + + public void setRequestContent(ContentProvider contentProvider) { + this.request.content(contentProvider, this.requestContentType); } public void addRequestHeader(String key, String s) { @@ -213,7 +222,66 @@ } }; - BufferingResponseListener responseListener = new BufferingResponseListener() { + Response.CompleteListener responseListener = jettyBinding instanceof Response.ResponseListener ? new Response.Listener.Adapter() { + + @Override + public void onHeaders(Response response) { + LOG.trace("onResponseComplete"); + done.countDown(); + JettyContentExchange9.this.response = response; + JettyContentExchange9.this.responseContent = new DeferredContentProvider() { + + @Override + public long getLength() + { + return -1; + } + + }; + try { + jettyBinding.populateResponse(exchange, JettyContentExchange9.this); + } catch (Exception e) { + exchange.setException(e); + } finally { + JettyContentExchange9.this.callback.done(false); + } + } + + @Override + public void onContent(Response response, ByteBuffer content, Callback callback) { + DeferredContentProvider deferredContentProvider = (DeferredContentProvider) JettyContentExchange9.this.responseContent; + if (!deferredContentProvider.offer(content, callback)) { + Synchronizable synchronizable = (Synchronizable) JettyContentExchange9.this.responseContent; + Object lock = synchronizable.getLock(); + synchronized (lock) { + if (!deferredContentProvider.offer(content, callback)) { + try { + lock.wait(); + } catch (InterruptedException e) { + callback.failed(e); + } + } + } + } + } + + @Override + public void onFailure(Response response, Throwable failure) + { + doTaskCompleted(failure); + } + + @Override + public void onComplete(Result result) { + DeferredContentProvider contentProvider = (DeferredContentProvider) JettyContentExchange9.this.responseContent; + if (result.isSucceeded()) { + contentProvider.close(); + } else { + contentProvider.failed(result.getFailure()); + } + } + + } : new BufferingResponseListener() { @Override public void onComplete(Result result) { @@ -232,7 +300,11 @@ } public byte[] getResponseContentBytes() { - return responseContent; + return (byte[]) responseContent; + } + + public ContentProvider getResponseContentProvider() { + return (ContentProvider) responseContent; } private Map<String, Collection<String>> getFieldsAsMap(HttpFields fields) { {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)