Repository: cxf Updated Branches: refs/heads/master e6d84cf9f -> 203b5433b
[CXF-6618] Simplifying the sync code Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/203b5433 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/203b5433 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/203b5433 Branch: refs/heads/master Commit: 203b5433bc44aa831bdd9b1dd4ab474d192dea6c Parents: e6d84cf Author: Sergey Beryozkin <sberyoz...@gmail.com> Authored: Tue Sep 6 13:55:53 2016 +0100 Committer: Sergey Beryozkin <sberyoz...@gmail.com> Committed: Tue Sep 6 13:55:53 2016 +0100 ---------------------------------------------------------------------- .../jaxrs/server/SparkStreamingListener.java | 5 +--- .../demo/jaxrs/server/SparkStreamingOutput.java | 28 +++++++------------- .../demo/jaxrs/server/StreamingService.java | 5 ++-- 3 files changed, 13 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/203b5433/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java index 3ee5558..1881857 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java @@ -19,10 +19,6 @@ public class SparkStreamingListener implements StreamingListener { @Override public void onBatchCompleted(StreamingListenerBatchCompleted event) { - // as soon as the batch is finished we let the streaming context go - // but this may need to be revisited if a given InputStream happens to be processed in - // multiple batches ? - sparkStreamingOutput.setBatchCompleted(); } @Override @@ -35,6 +31,7 @@ public class SparkStreamingListener implements StreamingListener { @Override public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) { + sparkStreamingOutput.setOperationCompleted(); } @Override http://git-wip-us.apache.org/repos/asf/cxf/blob/203b5433/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java index e3ac218..e28bb5a 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java @@ -33,8 +33,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; public class SparkStreamingOutput implements StreamingOutput { private JavaPairDStream<String, Integer> wordCounts; private JavaStreamingContext jssc; - private boolean sparkDone; - private boolean batchCompleted; + private boolean operationCompleted; public SparkStreamingOutput(JavaStreamingContext jssc, JavaPairDStream<String, Integer> wordCounts) { this.jssc = jssc; this.wordCounts = wordCounts; @@ -44,13 +43,13 @@ public class SparkStreamingOutput implements StreamingOutput { public void write(final OutputStream output) throws IOException, WebApplicationException { wordCounts.foreachRDD(new OutputFunction(output)); jssc.start(); - awaitTermination(); + waitForOperationCompleted(); jssc.stop(false); jssc.close(); } - private synchronized void awaitTermination() { - while (!sparkDone) { + private synchronized void waitForOperationCompleted() { + while (!operationCompleted) { try { wait(); } catch (InterruptedException e) { @@ -58,18 +57,14 @@ public class SparkStreamingOutput implements StreamingOutput { } } } - private synchronized void releaseStreamingContext() { - if (batchCompleted) { - sparkDone = true; - notify(); - } - } - - public synchronized void setBatchCompleted() { - batchCompleted = true; - } + public synchronized void setOperationCompleted() { + this.operationCompleted = true; + notify(); + } + + // This dedicated class was introduced to validate that when Spark is running it does not // fail the processing due to OutputStream being one of the fields in the serializable class, private class OutputFunction implements VoidFunction<JavaPairRDD<String, Integer>> { @@ -89,9 +84,6 @@ public class SparkStreamingOutput implements StreamingOutput { throw new WebApplicationException(); } } - // Right now we assume by the time we call it the whole InputStream has been - // processed - releaseStreamingContext(); } } http://git-wip-us.apache.org/repos/asf/cxf/blob/203b5433/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java index 19d1dac..7d14e9e 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java @@ -52,9 +52,9 @@ import scala.Tuple2; public class StreamingService { private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10)); - private SparkConf sparkConf; + private JavaStreamingContext jssc; public StreamingService(SparkConf sparkConf) { - this.sparkConf = sparkConf; + jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); } @POST @@ -63,7 +63,6 @@ public class StreamingService { @Produces("text/plain") public void getStream(@Suspended AsyncResponse async, InputStream is) { try { - JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); JavaReceiverInputDStream<String> receiverStream = jssc.receiverStream(new InputStreamReceiver(is)); SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc,