Repository: cxf Updated Branches: refs/heads/master df243e3d1 -> da321593b
Starting introducing an async response support to the Spark demo Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/da321593 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/da321593 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/da321593 Branch: refs/heads/master Commit: da321593b597711af7307a0fc2c11de070653356 Parents: df243e3 Author: Sergey Beryozkin <sberyoz...@gmail.com> Authored: Tue Sep 6 13:11:41 2016 +0100 Committer: Sergey Beryozkin <sberyoz...@gmail.com> Committed: Tue Sep 6 13:11:41 2016 +0100 ---------------------------------------------------------------------- .../release/samples/jax_rs/spark/README.txt | 2 +- .../demo/jaxrs/server/SparkStreamingOutput.java | 14 +++++------ .../demo/jaxrs/server/StreamingService.java | 25 ++++++++++++++------ 3 files changed, 26 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/da321593/distribution/src/main/release/samples/jax_rs/spark/README.txt ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/README.txt b/distribution/src/main/release/samples/jax_rs/spark/README.txt index 7f94387..e6a218d 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/README.txt +++ b/distribution/src/main/release/samples/jax_rs/spark/README.txt @@ -9,7 +9,7 @@ mvn exec:java Next do: -curl -X POST -H "Accept: text/plain" -H "Content-Type: text/plain" -d "Hello Spark" https://localhost:9000/stream +curl -X POST -H "Accept: text/plain" -H "Content-Type: text/plain" -d "Hello Spark" http://localhost:9000/stream Limitations: http://git-wip-us.apache.org/repos/asf/cxf/blob/da321593/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java index b0d5a50..2220dd4 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java @@ -99,7 +99,7 @@ public class SparkStreamingOutput implements StreamingOutput { throw new WebApplicationException(); } } - // Right now we assume by the time we call it the batch the whole InputStream has been + // Right now we assume by the time we call it the whole InputStream has been // processed releaseStreamingContext(); } @@ -116,27 +116,27 @@ public class SparkStreamingOutput implements StreamingOutput { } @Override - public void onBatchStarted(StreamingListenerBatchStarted arg0) { + public void onBatchStarted(StreamingListenerBatchStarted event) { } @Override - public void onBatchSubmitted(StreamingListenerBatchSubmitted arg0) { + public void onBatchSubmitted(StreamingListenerBatchSubmitted event) { } @Override - public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted arg0) { + public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) { } @Override - public void onOutputOperationStarted(StreamingListenerOutputOperationStarted arg0) { + public void onOutputOperationStarted(StreamingListenerOutputOperationStarted event) { } @Override - public void onReceiverError(StreamingListenerReceiverError arg0) { + public void onReceiverError(StreamingListenerReceiverError event) { } @Override - public void onReceiverStarted(StreamingListenerReceiverStarted arg0) { + public void onReceiverStarted(StreamingListenerReceiverStarted event) { } @Override http://git-wip-us.apache.org/repos/asf/cxf/blob/da321593/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java index 801fa55..b65a68b 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java @@ -21,14 +21,18 @@ package demo.jaxrs.server; import java.io.InputStream; import java.util.Arrays; import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.ws.rs.Consumes; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.StreamingOutput; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; import org.apache.spark.SparkConf; import org.apache.spark.SparkException; @@ -46,6 +50,8 @@ import scala.Tuple2; @Path("/") public class StreamingService { + private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, + new ArrayBlockingQueue<Runnable>(10)); private SparkConf sparkConf; public StreamingService(SparkConf sparkConf) { this.sparkConf = sparkConf; @@ -55,19 +61,24 @@ public class StreamingService { @Path("/stream") @Consumes("text/plain") @Produces("text/plain") - public StreamingOutput getStream(InputStream is) { + public void getStream(@Suspended AsyncResponse async, InputStream is) { try { JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); JavaReceiverInputDStream<String> receiverStream = jssc.receiverStream(new InputStreamReceiver(is)); - return new SparkStreamingOutput(jssc, - createOutputDStream(receiverStream)); + + executor.execute(new Runnable() { + public void run() { + async.resume(new SparkStreamingOutput(jssc, + createOutputDStream(receiverStream))); + } + }); } catch (Exception ex) { // the compiler does not allow to catch SparkException directly if (ex instanceof SparkException) { - throw new WebApplicationException(Response.status(503).header("Retry-After", "60").build()); + async.cancel(60); } else { - throw new WebApplicationException(ex); + async.resume(new WebApplicationException(ex)); } } }