[12/37] cxf git commit: [CXF-6618] Minimizing the dep on the Spark api in StreamingOutputImpl

2016-09-10 Thread reta
[CXF-6618] Minimizing the dep  on the Spark api in StreamingOutputImpl


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/46438c48
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/46438c48
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/46438c48

Branch: refs/heads/master-jaxrs-2.1
Commit: 46438c487115889c4cadd86f70fd4109a0daa2b8
Parents: e2c866a
Author: Sergey Beryozkin 
Authored: Tue Sep 6 15:49:17 2016 +0100
Committer: Sergey Beryozkin 
Committed: Tue Sep 6 15:49:17 2016 +0100

--
 .../jaxrs/server/SparkStreamingListener.java|  8 +--
 .../demo/jaxrs/server/SparkStreamingOutput.java | 67 +++-
 .../demo/jaxrs/server/StreamingService.java | 27 ++--
 3 files changed, 49 insertions(+), 53 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/cxf/blob/46438c48/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
--
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
index 1881857..3fd7284 100644
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
@@ -11,14 +11,15 @@ import 
org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
 import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
 
 public class SparkStreamingListener implements StreamingListener {
-private SparkStreamingOutput sparkStreamingOutput;
+private SparkStreamingOutput streamOutput;
 
-public SparkStreamingListener(SparkStreamingOutput sparkStreamingOutput) {
-this.sparkStreamingOutput = sparkStreamingOutput;
+public SparkStreamingListener(SparkStreamingOutput streamOutput) {
+this.streamOutput = streamOutput;
 }
 
 @Override
 public void onBatchCompleted(StreamingListenerBatchCompleted event) {
+streamOutput.setSparkBatchCompleted();
 }
 
 @Override
@@ -31,7 +32,6 @@ public class SparkStreamingListener implements 
StreamingListener {
 
 @Override
 public void 
onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) {
-sparkStreamingOutput.setOperationCompleted();
 }
 
 @Override

http://git-wip-us.apache.org/repos/asf/cxf/blob/46438c48/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
--
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
index e28bb5a..43166fe 100644
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
@@ -20,72 +20,49 @@ package demo.jaxrs.server;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.StreamingOutput;
 
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.cxf.common.util.StringUtils;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 
 public class SparkStreamingOutput implements StreamingOutput {
-private JavaPairDStream wordCounts;
+private BlockingQueue responseQueue = new 
LinkedBlockingQueue();
+
 private JavaStreamingContext jssc;
-private boolean operationCompleted;
-public SparkStreamingOutput(JavaStreamingContext jssc, 
JavaPairDStream wordCounts) {
+private volatile boolean sparkBatchCompleted;
+public SparkStreamingOutput(JavaStreamingContext jssc) {
 this.jssc = jssc;
-this.wordCounts = wordCounts;
 }
 
 @Override
 public void write(final OutputStream output) throws IOException, 
WebApplicationException {
-wordCounts.foreachRDD(new OutputFunction(output));
-jssc.start();
-waitForOperationCompleted();
-jssc.stop(false);
-jssc.close();
-}
-
-private synchronized void waitF

cxf git commit: [CXF-6618] Minimizing the dep on the Spark api in StreamingOutputImpl

2016-09-06 Thread sergeyb
Repository: cxf
Updated Branches:
  refs/heads/master e2c866a5c -> 46438c487


[CXF-6618] Minimizing the dep  on the Spark api in StreamingOutputImpl


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/46438c48
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/46438c48
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/46438c48

Branch: refs/heads/master
Commit: 46438c487115889c4cadd86f70fd4109a0daa2b8
Parents: e2c866a
Author: Sergey Beryozkin 
Authored: Tue Sep 6 15:49:17 2016 +0100
Committer: Sergey Beryozkin 
Committed: Tue Sep 6 15:49:17 2016 +0100

--
 .../jaxrs/server/SparkStreamingListener.java|  8 +--
 .../demo/jaxrs/server/SparkStreamingOutput.java | 67 +++-
 .../demo/jaxrs/server/StreamingService.java | 27 ++--
 3 files changed, 49 insertions(+), 53 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/cxf/blob/46438c48/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
--
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
index 1881857..3fd7284 100644
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
@@ -11,14 +11,15 @@ import 
org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
 import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
 
 public class SparkStreamingListener implements StreamingListener {
-private SparkStreamingOutput sparkStreamingOutput;
+private SparkStreamingOutput streamOutput;
 
-public SparkStreamingListener(SparkStreamingOutput sparkStreamingOutput) {
-this.sparkStreamingOutput = sparkStreamingOutput;
+public SparkStreamingListener(SparkStreamingOutput streamOutput) {
+this.streamOutput = streamOutput;
 }
 
 @Override
 public void onBatchCompleted(StreamingListenerBatchCompleted event) {
+streamOutput.setSparkBatchCompleted();
 }
 
 @Override
@@ -31,7 +32,6 @@ public class SparkStreamingListener implements 
StreamingListener {
 
 @Override
 public void 
onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) {
-sparkStreamingOutput.setOperationCompleted();
 }
 
 @Override

http://git-wip-us.apache.org/repos/asf/cxf/blob/46438c48/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
--
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
index e28bb5a..43166fe 100644
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
@@ -20,72 +20,49 @@ package demo.jaxrs.server;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.StreamingOutput;
 
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.cxf.common.util.StringUtils;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 
 public class SparkStreamingOutput implements StreamingOutput {
-private JavaPairDStream wordCounts;
+private BlockingQueue responseQueue = new 
LinkedBlockingQueue();
+
 private JavaStreamingContext jssc;
-private boolean operationCompleted;
-public SparkStreamingOutput(JavaStreamingContext jssc, 
JavaPairDStream wordCounts) {
+private volatile boolean sparkBatchCompleted;
+public SparkStreamingOutput(JavaStreamingContext jssc) {
 this.jssc = jssc;
-this.wordCounts = wordCounts;
 }
 
 @Override
 public void write(final OutputStream output) throws IOException, 
WebApplicationException {
-wordCounts.foreachRDD(new OutputFunction(output));
-jssc.start();
-waitForOperationCompleted();
-jssc.stop(false);