Tuomas Kiviaho created CAMEL-9370:
-------------------------------------

             Summary: Support for async/deferred content
                 Key: CAMEL-9370
                 URL: https://issues.apache.org/jira/browse/CAMEL-9370
             Project: Camel
          Issue Type: Improvement
          Components: camel-jetty
    Affects Versions: 2.16.1
            Reporter: Tuomas Kiviaho


I'm receiving {{text/event-stream}} and to my surprise there was no support of 
anykind for {{AsyncContentProvider}}. 

Hence here are my dealings with the issue in a form of a patch. Some of the 
bits would work out of the box as-is, but others (such as {{jettyBinding 
instanceof Response.ResponseListener}}). I've rammed in just to get it working. 
It would be nice if this type of streaming could be enabled without having to 
declare {{jettyBinding}}. A new option for instance would be more practical 
solution.

{code:title=org/apache/camel/component/jetty/DefaultJettyHttpBinding.java}
@@ -191,7 +191,8 @@
             }
         } else {
             // just grab the raw content body
-            return httpExchange.getBody();
+            byte[] body = httpExchange.getBody();
+            return body == null ? httpExchange.getResponseContentProvider() : 
body;
         }
     }
 
{code}
{code:title=org/apache/camel/component/jetty/JettyContentExchange.java}
@@ -25,6 +25,7 @@
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentProvider;
 
 public interface JettyContentExchange {
 
@@ -44,6 +45,8 @@
     void setRequestContent(String data, String charset) throws 
UnsupportedEncodingException;
 
     void setRequestContent(InputStream ins);
+    
+    void setRequestContent(ContentProvider contentProvider);
 
     void addRequestHeader(String key, String s);
 
@@ -63,6 +66,8 @@
     int getResponseStatus();
 
     byte[] getResponseContentBytes();
+    
+    ContentProvider getResponseContentProvider();
 
     Map<String, Collection<String>> getResponseHeaders();
 
{code}
{code:title=org/apache/camel/component/jetty/JettyHttpProducer.java}
@@ -32,7 +32,6 @@
 import org.apache.camel.Message;
 import org.apache.camel.http.common.HttpConstants;
 import org.apache.camel.http.common.HttpHelper;
-import org.apache.camel.http.common.HttpMethods;
 import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.util.ExchangeHelper;
@@ -40,6 +39,7 @@
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
 import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentProvider;
 import org.eclipse.jetty.util.component.LifeCycle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -160,6 +160,9 @@
                     // (for example application/x-www-form-urlencoded forms 
being sent)
                     String charset = IOHelper.getCharsetName(exchange, false);
                     httpExchange.setRequestContent(data, charset);
+                } else if (body instanceof ContentProvider) {
+                    ContentProvider contentProvider = (ContentProvider) body;
+                    httpExchange.setRequestContent(contentProvider);
                 } else {
                     // then fallback to input stream
                     InputStream is = 
exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, 
exchange, exchange.getIn().getBody());
{code}
{code:title=org/apache/camel/component/jetty9/JettyContentExchange9.java}
@@ -21,6 +21,7 @@
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
 import java.net.MalformedURLException;
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Map;
 import java.util.TreeMap;
@@ -35,14 +36,18 @@
 import org.apache.camel.component.jetty.JettyContentExchange;
 import org.apache.camel.component.jetty.JettyHttpBinding;
 import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.Synchronizable;
+import org.eclipse.jetty.client.api.ContentProvider;
 import org.eclipse.jetty.client.api.Request;
 import org.eclipse.jetty.client.api.Response;
 import org.eclipse.jetty.client.api.Result;
 import org.eclipse.jetty.client.util.BufferingResponseListener;
 import org.eclipse.jetty.client.util.BytesContentProvider;
+import org.eclipse.jetty.client.util.DeferredContentProvider;
 import org.eclipse.jetty.client.util.InputStreamContentProvider;
 import org.eclipse.jetty.client.util.StringContentProvider;
 import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.util.Callback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,7 +65,7 @@
     private final CountDownLatch done = new CountDownLatch(1);
     private Request request;
     private Response response;
-    private byte[] responseContent;
+    private Object responseContent;
 
     private String requestContentType;
 
@@ -183,16 +188,20 @@
     }
 
     public void setRequestContent(byte[] byteArray) {
-        this.request.content(new BytesContentProvider(byteArray), 
this.requestContentType);
+        this.setRequestContent(new BytesContentProvider(byteArray));
     }
 
     public void setRequestContent(String data, String charset) throws 
UnsupportedEncodingException {
         StringContentProvider cp = charset != null ? new 
StringContentProvider(data, charset) : new StringContentProvider(data);
-        this.request.content(cp, this.requestContentType);
+        this.setRequestContent(cp);
     }
 
     public void setRequestContent(InputStream ins) {
-        this.request.content(new InputStreamContentProvider(ins), 
this.requestContentType);
+        this.setRequestContent(new InputStreamContentProvider(ins));
+    }
+    
+    public void setRequestContent(ContentProvider contentProvider) {
+        this.request.content(contentProvider, this.requestContentType);
     }
 
     public void addRequestHeader(String key, String s) {
@@ -213,7 +222,66 @@
             }
 
         };
-        BufferingResponseListener responseListener = new 
BufferingResponseListener() {
+        Response.CompleteListener responseListener = jettyBinding instanceof 
Response.ResponseListener ? new Response.Listener.Adapter() {
+            
+            @Override
+            public void onHeaders(Response response) {
+                LOG.trace("onResponseComplete");
+                done.countDown();
+                JettyContentExchange9.this.response = response;
+                JettyContentExchange9.this.responseContent = new 
DeferredContentProvider() {
+
+                    @Override
+                    public long getLength()
+                    {
+                        return -1;
+                    }
+                    
+                };
+                try {
+                    jettyBinding.populateResponse(exchange, 
JettyContentExchange9.this);
+                } catch (Exception e) {
+                    exchange.setException(e);
+                } finally {
+                    JettyContentExchange9.this.callback.done(false);
+                }
+            }
+
+            @Override
+            public void onContent(Response response, ByteBuffer content, 
Callback callback) {
+                DeferredContentProvider deferredContentProvider = 
(DeferredContentProvider) JettyContentExchange9.this.responseContent;
+                if (!deferredContentProvider.offer(content, callback)) {
+                    Synchronizable synchronizable = (Synchronizable) 
JettyContentExchange9.this.responseContent;
+                    Object lock = synchronizable.getLock();
+                    synchronized (lock) {
+                        if (!deferredContentProvider.offer(content, callback)) 
{
+                            try {
+                                lock.wait();
+                            } catch (InterruptedException e) {
+                                callback.failed(e);
+                            }
+                        }
+                    }
+                }
+            }
+            
+            @Override
+            public void onFailure(Response response, Throwable failure)
+            {
+                doTaskCompleted(failure);
+            }
+
+            @Override
+            public void onComplete(Result result) {
+                DeferredContentProvider contentProvider = 
(DeferredContentProvider) JettyContentExchange9.this.responseContent;
+                if (result.isSucceeded()) {
+                    contentProvider.close();
+                } else {
+                    contentProvider.failed(result.getFailure());
+                }
+            }
+            
+        } : new BufferingResponseListener() {
 
             @Override
             public void onComplete(Result result) {
@@ -232,7 +300,11 @@
     }
 
     public byte[] getResponseContentBytes() {
-        return responseContent;
+        return (byte[]) responseContent;
+    }
+    
+    public ContentProvider getResponseContentProvider() {
+        return (ContentProvider) responseContent;
     }
 
     private Map<String, Collection<String>> getFieldsAsMap(HttpFields fields) {
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to