Repository: cxf
Updated Branches:
  refs/heads/master df243e3d1 -> da321593b


Starting introducing an async response support to the Spark demo


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/da321593
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/da321593
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/da321593

Branch: refs/heads/master
Commit: da321593b597711af7307a0fc2c11de070653356
Parents: df243e3
Author: Sergey Beryozkin <sberyoz...@gmail.com>
Authored: Tue Sep 6 13:11:41 2016 +0100
Committer: Sergey Beryozkin <sberyoz...@gmail.com>
Committed: Tue Sep 6 13:11:41 2016 +0100

----------------------------------------------------------------------
 .../release/samples/jax_rs/spark/README.txt     |  2 +-
 .../demo/jaxrs/server/SparkStreamingOutput.java | 14 +++++------
 .../demo/jaxrs/server/StreamingService.java     | 25 ++++++++++++++------
 3 files changed, 26 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/da321593/distribution/src/main/release/samples/jax_rs/spark/README.txt
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/README.txt 
b/distribution/src/main/release/samples/jax_rs/spark/README.txt
index 7f94387..e6a218d 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/README.txt
+++ b/distribution/src/main/release/samples/jax_rs/spark/README.txt
@@ -9,7 +9,7 @@ mvn exec:java
 
 Next do: 
 
-curl -X POST -H "Accept: text/plain" -H "Content-Type: text/plain" -d "Hello 
Spark" https://localhost:9000/stream
+curl -X POST -H "Accept: text/plain" -H "Content-Type: text/plain" -d "Hello 
Spark" http://localhost:9000/stream
 
 Limitations: 
 

http://git-wip-us.apache.org/repos/asf/cxf/blob/da321593/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
index b0d5a50..2220dd4 100644
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
@@ -99,7 +99,7 @@ public class SparkStreamingOutput implements StreamingOutput {
                     throw new WebApplicationException(); 
                 }
             }
-            // Right now we assume by the time we call it the batch the whole 
InputStream has been
+            // Right now we assume by the time we call it the whole 
InputStream has been
             // processed
             releaseStreamingContext();
         }
@@ -116,27 +116,27 @@ public class SparkStreamingOutput implements 
StreamingOutput {
         }
 
         @Override
-        public void onBatchStarted(StreamingListenerBatchStarted arg0) {
+        public void onBatchStarted(StreamingListenerBatchStarted event) {
         }
 
         @Override
-        public void onBatchSubmitted(StreamingListenerBatchSubmitted arg0) {
+        public void onBatchSubmitted(StreamingListenerBatchSubmitted event) {
         }
 
         @Override
-        public void 
onOutputOperationCompleted(StreamingListenerOutputOperationCompleted arg0) {
+        public void 
onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) {
         }
 
         @Override
-        public void 
onOutputOperationStarted(StreamingListenerOutputOperationStarted arg0) {
+        public void 
onOutputOperationStarted(StreamingListenerOutputOperationStarted event) {
         }
 
         @Override
-        public void onReceiverError(StreamingListenerReceiverError arg0) {
+        public void onReceiverError(StreamingListenerReceiverError event) {
         }
 
         @Override
-        public void onReceiverStarted(StreamingListenerReceiverStarted arg0) {
+        public void onReceiverStarted(StreamingListenerReceiverStarted event) {
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cxf/blob/da321593/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
index 801fa55..b65a68b 100644
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -21,14 +21,18 @@ package demo.jaxrs.server;
 import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.Consumes;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.StreamingOutput;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkException;
@@ -46,6 +50,8 @@ import scala.Tuple2;
 
 @Path("/")
 public class StreamingService {
+    private Executor executor = new ThreadPoolExecutor(5, 5, 0, 
TimeUnit.SECONDS,
+                                                       new 
ArrayBlockingQueue<Runnable>(10));
     private SparkConf sparkConf;
     public StreamingService(SparkConf sparkConf) {
         this.sparkConf = sparkConf;
@@ -55,19 +61,24 @@ public class StreamingService {
     @Path("/stream")
     @Consumes("text/plain")
     @Produces("text/plain")
-    public StreamingOutput getStream(InputStream is) {
+    public void getStream(@Suspended AsyncResponse async, InputStream is) {
         try {
             JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(1));
             JavaReceiverInputDStream<String> receiverStream = 
                 jssc.receiverStream(new InputStreamReceiver(is));
-            return new SparkStreamingOutput(jssc, 
-                                            
createOutputDStream(receiverStream));
+            
+            executor.execute(new Runnable() {
+                public void run() {
+                     async.resume(new SparkStreamingOutput(jssc, 
+                                      createOutputDStream(receiverStream)));
+                }
+            });
         } catch (Exception ex) {
             // the compiler does not allow to catch SparkException directly
             if (ex instanceof SparkException) {
-                throw new 
WebApplicationException(Response.status(503).header("Retry-After", 
"60").build());
+                async.cancel(60);
             } else {
-                throw new WebApplicationException(ex);
+                async.resume(new WebApplicationException(ex));
             }
         }
     }

Reply via email to