[ 
https://issues.apache.org/jira/browse/CAMEL-9370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15029589#comment-15029589
 ] 

Tuomas Kiviaho commented on CAMEL-9370:
---------------------------------------

http://www.reactive-streams.org/ could perhaps be used to hide content provider 
as it's part of jetty internals.

> Support for async/deferred content
> ----------------------------------
>
>                 Key: CAMEL-9370
>                 URL: https://issues.apache.org/jira/browse/CAMEL-9370
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-jetty
>    Affects Versions: 2.16.1
>            Reporter: Tuomas Kiviaho
>
> I'm receiving {{text/event-stream}} and to my surprise there was no support 
> of anykind for {{AsyncContentProvider}}. 
> Hence here are my dealings with the issue in a form of a patch. Some of the 
> bits would work out of the box as-is, but others (such as {{jettyBinding 
> instanceof Response.ResponseListener}}). I've rammed in just to get it 
> working. It would be nice if this type of streaming could be enabled without 
> having to declare {{jettyBinding}}. A new option for instance would be more 
> practical solution.
> {code:title=org/apache/camel/component/jetty/DefaultJettyHttpBinding.java}
> @@ -191,7 +191,8 @@
>              }
>          } else {
>              // just grab the raw content body
> -            return httpExchange.getBody();
> +            byte[] body = httpExchange.getBody();
> +            return body == null ? httpExchange.getResponseContentProvider() 
> : body;
>          }
>      }
>  
> {code}
> {code:title=org/apache/camel/component/jetty/JettyContentExchange.java}
> @@ -25,6 +25,7 @@
>  import org.apache.camel.AsyncCallback;
>  import org.apache.camel.Exchange;
>  import org.eclipse.jetty.client.HttpClient;
> +import org.eclipse.jetty.client.api.ContentProvider;
>  
>  public interface JettyContentExchange {
>  
> @@ -44,6 +45,8 @@
>      void setRequestContent(String data, String charset) throws 
> UnsupportedEncodingException;
>  
>      void setRequestContent(InputStream ins);
> +    
> +    void setRequestContent(ContentProvider contentProvider);
>  
>      void addRequestHeader(String key, String s);
>  
> @@ -63,6 +66,8 @@
>      int getResponseStatus();
>  
>      byte[] getResponseContentBytes();
> +    
> +    ContentProvider getResponseContentProvider();
>  
>      Map<String, Collection<String>> getResponseHeaders();
>  
> {code}
> {code:title=org/apache/camel/component/jetty/JettyHttpProducer.java}
> @@ -32,7 +32,6 @@
>  import org.apache.camel.Message;
>  import org.apache.camel.http.common.HttpConstants;
>  import org.apache.camel.http.common.HttpHelper;
> -import org.apache.camel.http.common.HttpMethods;
>  import org.apache.camel.impl.DefaultAsyncProducer;
>  import org.apache.camel.spi.HeaderFilterStrategy;
>  import org.apache.camel.util.ExchangeHelper;
> @@ -40,6 +39,7 @@
>  import org.apache.camel.util.ObjectHelper;
>  import org.apache.camel.util.URISupport;
>  import org.eclipse.jetty.client.HttpClient;
> +import org.eclipse.jetty.client.api.ContentProvider;
>  import org.eclipse.jetty.util.component.LifeCycle;
>  import org.slf4j.Logger;
>  import org.slf4j.LoggerFactory;
> @@ -160,6 +160,9 @@
>                      // (for example application/x-www-form-urlencoded forms 
> being sent)
>                      String charset = IOHelper.getCharsetName(exchange, 
> false);
>                      httpExchange.setRequestContent(data, charset);
> +                } else if (body instanceof ContentProvider) {
> +                    ContentProvider contentProvider = (ContentProvider) body;
> +                    httpExchange.setRequestContent(contentProvider);
>                  } else {
>                      // then fallback to input stream
>                      InputStream is = 
> exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class,
>  exchange, exchange.getIn().getBody());
> {code}
> {code:title=org/apache/camel/component/jetty9/JettyContentExchange9.java}
> @@ -21,6 +21,7 @@
>  import java.io.InputStream;
>  import java.io.UnsupportedEncodingException;
>  import java.net.MalformedURLException;
> +import java.nio.ByteBuffer;
>  import java.util.Collection;
>  import java.util.Map;
>  import java.util.TreeMap;
> @@ -35,14 +36,18 @@
>  import org.apache.camel.component.jetty.JettyContentExchange;
>  import org.apache.camel.component.jetty.JettyHttpBinding;
>  import org.eclipse.jetty.client.HttpClient;
> +import org.eclipse.jetty.client.Synchronizable;
> +import org.eclipse.jetty.client.api.ContentProvider;
>  import org.eclipse.jetty.client.api.Request;
>  import org.eclipse.jetty.client.api.Response;
>  import org.eclipse.jetty.client.api.Result;
>  import org.eclipse.jetty.client.util.BufferingResponseListener;
>  import org.eclipse.jetty.client.util.BytesContentProvider;
> +import org.eclipse.jetty.client.util.DeferredContentProvider;
>  import org.eclipse.jetty.client.util.InputStreamContentProvider;
>  import org.eclipse.jetty.client.util.StringContentProvider;
>  import org.eclipse.jetty.http.HttpFields;
> +import org.eclipse.jetty.util.Callback;
>  import org.slf4j.Logger;
>  import org.slf4j.LoggerFactory;
>  
> @@ -60,7 +65,7 @@
>      private final CountDownLatch done = new CountDownLatch(1);
>      private Request request;
>      private Response response;
> -    private byte[] responseContent;
> +    private Object responseContent;
>  
>      private String requestContentType;
>  
> @@ -183,16 +188,20 @@
>      }
>  
>      public void setRequestContent(byte[] byteArray) {
> -        this.request.content(new BytesContentProvider(byteArray), 
> this.requestContentType);
> +        this.setRequestContent(new BytesContentProvider(byteArray));
>      }
>  
>      public void setRequestContent(String data, String charset) throws 
> UnsupportedEncodingException {
>          StringContentProvider cp = charset != null ? new 
> StringContentProvider(data, charset) : new StringContentProvider(data);
> -        this.request.content(cp, this.requestContentType);
> +        this.setRequestContent(cp);
>      }
>  
>      public void setRequestContent(InputStream ins) {
> -        this.request.content(new InputStreamContentProvider(ins), 
> this.requestContentType);
> +        this.setRequestContent(new InputStreamContentProvider(ins));
> +    }
> +    
> +    public void setRequestContent(ContentProvider contentProvider) {
> +        this.request.content(contentProvider, this.requestContentType);
>      }
>  
>      public void addRequestHeader(String key, String s) {
> @@ -213,7 +222,66 @@
>              }
>  
>          };
> -        BufferingResponseListener responseListener = new 
> BufferingResponseListener() {
> +        Response.CompleteListener responseListener = jettyBinding instanceof 
> Response.ResponseListener ? new Response.Listener.Adapter() {
> +            
> +            @Override
> +            public void onHeaders(Response response) {
> +                LOG.trace("onResponseComplete");
> +                done.countDown();
> +                JettyContentExchange9.this.response = response;
> +                JettyContentExchange9.this.responseContent = new 
> DeferredContentProvider() {
> +
> +                    @Override
> +                    public long getLength()
> +                    {
> +                        return -1;
> +                    }
> +                    
> +                };
> +                try {
> +                    jettyBinding.populateResponse(exchange, 
> JettyContentExchange9.this);
> +                } catch (Exception e) {
> +                    exchange.setException(e);
> +                } finally {
> +                    JettyContentExchange9.this.callback.done(false);
> +                }
> +            }
> +
> +            @Override
> +            public void onContent(Response response, ByteBuffer content, 
> Callback callback) {
> +                DeferredContentProvider deferredContentProvider = 
> (DeferredContentProvider) JettyContentExchange9.this.responseContent;
> +                if (!deferredContentProvider.offer(content, callback)) {
> +                    Synchronizable synchronizable = (Synchronizable) 
> JettyContentExchange9.this.responseContent;
> +                    Object lock = synchronizable.getLock();
> +                    synchronized (lock) {
> +                        if (!deferredContentProvider.offer(content, 
> callback)) {
> +                            try {
> +                                lock.wait();
> +                            } catch (InterruptedException e) {
> +                                callback.failed(e);
> +                            }
> +                        }
> +                    }
> +                }
> +            }
> +            
> +            @Override
> +            public void onFailure(Response response, Throwable failure)
> +            {
> +                doTaskCompleted(failure);
> +            }
> +
> +            @Override
> +            public void onComplete(Result result) {
> +                DeferredContentProvider contentProvider = 
> (DeferredContentProvider) JettyContentExchange9.this.responseContent;
> +                if (result.isSucceeded()) {
> +                    contentProvider.close();
> +                } else {
> +                    contentProvider.failed(result.getFailure());
> +                }
> +            }
> +            
> +        } : new BufferingResponseListener() {
>  
>              @Override
>              public void onComplete(Result result) {
> @@ -232,7 +300,11 @@
>      }
>  
>      public byte[] getResponseContentBytes() {
> -        return responseContent;
> +        return (byte[]) responseContent;
> +    }
> +    
> +    public ContentProvider getResponseContentProvider() {
> +        return (ContentProvider) responseContent;
>      }
>  
>      private Map<String, Collection<String>> getFieldsAsMap(HttpFields 
> fields) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to