[12/37] cxf git commit: [CXF-6618] Minimizing the dep on the Spark api in StreamingOutputImpl
[CXF-6618] Minimizing the dep on the Spark api in StreamingOutputImpl Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/46438c48 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/46438c48 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/46438c48 Branch: refs/heads/master-jaxrs-2.1 Commit: 46438c487115889c4cadd86f70fd4109a0daa2b8 Parents: e2c866a Author: Sergey Beryozkin Authored: Tue Sep 6 15:49:17 2016 +0100 Committer: Sergey Beryozkin Committed: Tue Sep 6 15:49:17 2016 +0100 -- .../jaxrs/server/SparkStreamingListener.java| 8 +-- .../demo/jaxrs/server/SparkStreamingOutput.java | 67 +++- .../demo/jaxrs/server/StreamingService.java | 27 ++-- 3 files changed, 49 insertions(+), 53 deletions(-) -- http://git-wip-us.apache.org/repos/asf/cxf/blob/46438c48/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java -- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java index 1881857..3fd7284 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java @@ -11,14 +11,15 @@ import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted; import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped; public class SparkStreamingListener implements StreamingListener { -private SparkStreamingOutput sparkStreamingOutput; +private SparkStreamingOutput streamOutput; -public SparkStreamingListener(SparkStreamingOutput sparkStreamingOutput) { -this.sparkStreamingOutput = sparkStreamingOutput; +public SparkStreamingListener(SparkStreamingOutput streamOutput) { +this.streamOutput = streamOutput; } @Override public void onBatchCompleted(StreamingListenerBatchCompleted event) { +streamOutput.setSparkBatchCompleted(); } @Override @@ -31,7 +32,6 @@ public class SparkStreamingListener implements StreamingListener { @Override public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) { -sparkStreamingOutput.setOperationCompleted(); } @Override http://git-wip-us.apache.org/repos/asf/cxf/blob/46438c48/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java -- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java index e28bb5a..43166fe 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java @@ -20,72 +20,49 @@ package demo.jaxrs.server; import java.io.IOException; import java.io.OutputStream; -import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.StreamingOutput; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.function.VoidFunction; -import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.cxf.common.util.StringUtils; import org.apache.spark.streaming.api.java.JavaStreamingContext; public class SparkStreamingOutput implements StreamingOutput { -private JavaPairDStream wordCounts; +private BlockingQueue responseQueue = new LinkedBlockingQueue(); + private JavaStreamingContext jssc; -private boolean operationCompleted; -public SparkStreamingOutput(JavaStreamingContext jssc, JavaPairDStream wordCounts) { +private volatile boolean sparkBatchCompleted; +public SparkStreamingOutput(JavaStreamingContext jssc) { this.jssc = jssc; -this.wordCounts = wordCounts; } @Override public void write(final OutputStream output) throws IOException, WebApplicationException { -wordCounts.foreachRDD(new OutputFunction(output)); -jssc.start(); -waitForOperationCompleted(); -jssc.stop(false); -jssc.close(); -} - -private synchronized void waitF
cxf git commit: [CXF-6618] Minimizing the dep on the Spark api in StreamingOutputImpl
Repository: cxf Updated Branches: refs/heads/master e2c866a5c -> 46438c487 [CXF-6618] Minimizing the dep on the Spark api in StreamingOutputImpl Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/46438c48 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/46438c48 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/46438c48 Branch: refs/heads/master Commit: 46438c487115889c4cadd86f70fd4109a0daa2b8 Parents: e2c866a Author: Sergey Beryozkin Authored: Tue Sep 6 15:49:17 2016 +0100 Committer: Sergey Beryozkin Committed: Tue Sep 6 15:49:17 2016 +0100 -- .../jaxrs/server/SparkStreamingListener.java| 8 +-- .../demo/jaxrs/server/SparkStreamingOutput.java | 67 +++- .../demo/jaxrs/server/StreamingService.java | 27 ++-- 3 files changed, 49 insertions(+), 53 deletions(-) -- http://git-wip-us.apache.org/repos/asf/cxf/blob/46438c48/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java -- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java index 1881857..3fd7284 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java @@ -11,14 +11,15 @@ import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted; import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped; public class SparkStreamingListener implements StreamingListener { -private SparkStreamingOutput sparkStreamingOutput; +private SparkStreamingOutput streamOutput; -public SparkStreamingListener(SparkStreamingOutput sparkStreamingOutput) { -this.sparkStreamingOutput = sparkStreamingOutput; +public SparkStreamingListener(SparkStreamingOutput streamOutput) { +this.streamOutput = streamOutput; } @Override public void onBatchCompleted(StreamingListenerBatchCompleted event) { +streamOutput.setSparkBatchCompleted(); } @Override @@ -31,7 +32,6 @@ public class SparkStreamingListener implements StreamingListener { @Override public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) { -sparkStreamingOutput.setOperationCompleted(); } @Override http://git-wip-us.apache.org/repos/asf/cxf/blob/46438c48/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java -- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java index e28bb5a..43166fe 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java @@ -20,72 +20,49 @@ package demo.jaxrs.server; import java.io.IOException; import java.io.OutputStream; -import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.StreamingOutput; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.function.VoidFunction; -import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.cxf.common.util.StringUtils; import org.apache.spark.streaming.api.java.JavaStreamingContext; public class SparkStreamingOutput implements StreamingOutput { -private JavaPairDStream wordCounts; +private BlockingQueue responseQueue = new LinkedBlockingQueue(); + private JavaStreamingContext jssc; -private boolean operationCompleted; -public SparkStreamingOutput(JavaStreamingContext jssc, JavaPairDStream wordCounts) { +private volatile boolean sparkBatchCompleted; +public SparkStreamingOutput(JavaStreamingContext jssc) { this.jssc = jssc; -this.wordCounts = wordCounts; } @Override public void write(final OutputStream output) throws IOException, WebApplicationException { -wordCounts.foreachRDD(new OutputFunction(output)); -jssc.start(); -waitForOperationCompleted(); -jssc.stop(false);