[ https://issues.apache.org/jira/browse/CAMEL-9370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15029589#comment-15029589 ]
Tuomas Kiviaho commented on CAMEL-9370: --------------------------------------- http://www.reactive-streams.org/ could perhaps be used to hide content provider as it's part of jetty internals. > Support for async/deferred content > ---------------------------------- > > Key: CAMEL-9370 > URL: https://issues.apache.org/jira/browse/CAMEL-9370 > Project: Camel > Issue Type: Improvement > Components: camel-jetty > Affects Versions: 2.16.1 > Reporter: Tuomas Kiviaho > > I'm receiving {{text/event-stream}} and to my surprise there was no support > of anykind for {{AsyncContentProvider}}. > Hence here are my dealings with the issue in a form of a patch. Some of the > bits would work out of the box as-is, but others (such as {{jettyBinding > instanceof Response.ResponseListener}}). I've rammed in just to get it > working. It would be nice if this type of streaming could be enabled without > having to declare {{jettyBinding}}. A new option for instance would be more > practical solution. > {code:title=org/apache/camel/component/jetty/DefaultJettyHttpBinding.java} > @@ -191,7 +191,8 @@ > } > } else { > // just grab the raw content body > - return httpExchange.getBody(); > + byte[] body = httpExchange.getBody(); > + return body == null ? httpExchange.getResponseContentProvider() > : body; > } > } > > {code} > {code:title=org/apache/camel/component/jetty/JettyContentExchange.java} > @@ -25,6 +25,7 @@ > import org.apache.camel.AsyncCallback; > import org.apache.camel.Exchange; > import org.eclipse.jetty.client.HttpClient; > +import org.eclipse.jetty.client.api.ContentProvider; > > public interface JettyContentExchange { > > @@ -44,6 +45,8 @@ > void setRequestContent(String data, String charset) throws > UnsupportedEncodingException; > > void setRequestContent(InputStream ins); > + > + void setRequestContent(ContentProvider contentProvider); > > void addRequestHeader(String key, String s); > > @@ -63,6 +66,8 @@ > int getResponseStatus(); > > byte[] getResponseContentBytes(); > + > + ContentProvider getResponseContentProvider(); > > Map<String, Collection<String>> getResponseHeaders(); > > {code} > {code:title=org/apache/camel/component/jetty/JettyHttpProducer.java} > @@ -32,7 +32,6 @@ > import org.apache.camel.Message; > import org.apache.camel.http.common.HttpConstants; > import org.apache.camel.http.common.HttpHelper; > -import org.apache.camel.http.common.HttpMethods; > import org.apache.camel.impl.DefaultAsyncProducer; > import org.apache.camel.spi.HeaderFilterStrategy; > import org.apache.camel.util.ExchangeHelper; > @@ -40,6 +39,7 @@ > import org.apache.camel.util.ObjectHelper; > import org.apache.camel.util.URISupport; > import org.eclipse.jetty.client.HttpClient; > +import org.eclipse.jetty.client.api.ContentProvider; > import org.eclipse.jetty.util.component.LifeCycle; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > @@ -160,6 +160,9 @@ > // (for example application/x-www-form-urlencoded forms > being sent) > String charset = IOHelper.getCharsetName(exchange, > false); > httpExchange.setRequestContent(data, charset); > + } else if (body instanceof ContentProvider) { > + ContentProvider contentProvider = (ContentProvider) body; > + httpExchange.setRequestContent(contentProvider); > } else { > // then fallback to input stream > InputStream is = > exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, > exchange, exchange.getIn().getBody()); > {code} > {code:title=org/apache/camel/component/jetty9/JettyContentExchange9.java} > @@ -21,6 +21,7 @@ > import java.io.InputStream; > import java.io.UnsupportedEncodingException; > import java.net.MalformedURLException; > +import java.nio.ByteBuffer; > import java.util.Collection; > import java.util.Map; > import java.util.TreeMap; > @@ -35,14 +36,18 @@ > import org.apache.camel.component.jetty.JettyContentExchange; > import org.apache.camel.component.jetty.JettyHttpBinding; > import org.eclipse.jetty.client.HttpClient; > +import org.eclipse.jetty.client.Synchronizable; > +import org.eclipse.jetty.client.api.ContentProvider; > import org.eclipse.jetty.client.api.Request; > import org.eclipse.jetty.client.api.Response; > import org.eclipse.jetty.client.api.Result; > import org.eclipse.jetty.client.util.BufferingResponseListener; > import org.eclipse.jetty.client.util.BytesContentProvider; > +import org.eclipse.jetty.client.util.DeferredContentProvider; > import org.eclipse.jetty.client.util.InputStreamContentProvider; > import org.eclipse.jetty.client.util.StringContentProvider; > import org.eclipse.jetty.http.HttpFields; > +import org.eclipse.jetty.util.Callback; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > @@ -60,7 +65,7 @@ > private final CountDownLatch done = new CountDownLatch(1); > private Request request; > private Response response; > - private byte[] responseContent; > + private Object responseContent; > > private String requestContentType; > > @@ -183,16 +188,20 @@ > } > > public void setRequestContent(byte[] byteArray) { > - this.request.content(new BytesContentProvider(byteArray), > this.requestContentType); > + this.setRequestContent(new BytesContentProvider(byteArray)); > } > > public void setRequestContent(String data, String charset) throws > UnsupportedEncodingException { > StringContentProvider cp = charset != null ? new > StringContentProvider(data, charset) : new StringContentProvider(data); > - this.request.content(cp, this.requestContentType); > + this.setRequestContent(cp); > } > > public void setRequestContent(InputStream ins) { > - this.request.content(new InputStreamContentProvider(ins), > this.requestContentType); > + this.setRequestContent(new InputStreamContentProvider(ins)); > + } > + > + public void setRequestContent(ContentProvider contentProvider) { > + this.request.content(contentProvider, this.requestContentType); > } > > public void addRequestHeader(String key, String s) { > @@ -213,7 +222,66 @@ > } > > }; > - BufferingResponseListener responseListener = new > BufferingResponseListener() { > + Response.CompleteListener responseListener = jettyBinding instanceof > Response.ResponseListener ? new Response.Listener.Adapter() { > + > + @Override > + public void onHeaders(Response response) { > + LOG.trace("onResponseComplete"); > + done.countDown(); > + JettyContentExchange9.this.response = response; > + JettyContentExchange9.this.responseContent = new > DeferredContentProvider() { > + > + @Override > + public long getLength() > + { > + return -1; > + } > + > + }; > + try { > + jettyBinding.populateResponse(exchange, > JettyContentExchange9.this); > + } catch (Exception e) { > + exchange.setException(e); > + } finally { > + JettyContentExchange9.this.callback.done(false); > + } > + } > + > + @Override > + public void onContent(Response response, ByteBuffer content, > Callback callback) { > + DeferredContentProvider deferredContentProvider = > (DeferredContentProvider) JettyContentExchange9.this.responseContent; > + if (!deferredContentProvider.offer(content, callback)) { > + Synchronizable synchronizable = (Synchronizable) > JettyContentExchange9.this.responseContent; > + Object lock = synchronizable.getLock(); > + synchronized (lock) { > + if (!deferredContentProvider.offer(content, > callback)) { > + try { > + lock.wait(); > + } catch (InterruptedException e) { > + callback.failed(e); > + } > + } > + } > + } > + } > + > + @Override > + public void onFailure(Response response, Throwable failure) > + { > + doTaskCompleted(failure); > + } > + > + @Override > + public void onComplete(Result result) { > + DeferredContentProvider contentProvider = > (DeferredContentProvider) JettyContentExchange9.this.responseContent; > + if (result.isSucceeded()) { > + contentProvider.close(); > + } else { > + contentProvider.failed(result.getFailure()); > + } > + } > + > + } : new BufferingResponseListener() { > > @Override > public void onComplete(Result result) { > @@ -232,7 +300,11 @@ > } > > public byte[] getResponseContentBytes() { > - return responseContent; > + return (byte[]) responseContent; > + } > + > + public ContentProvider getResponseContentProvider() { > + return (ContentProvider) responseContent; > } > > private Map<String, Collection<String>> getFieldsAsMap(HttpFields > fields) { > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)