Repository: cxf
Updated Branches:
  refs/heads/master 46438c487 -> 76edf9350


[CXF-6618] Resuming the async responce only when the batch is started


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/76edf935
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/76edf935
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/76edf935

Branch: refs/heads/master
Commit: 76edf9350e7640860b66196ad40396d54bbcb2b2
Parents: 46438c4
Author: Sergey Beryozkin <sberyoz...@gmail.com>
Authored: Tue Sep 6 16:37:07 2016 +0100
Committer: Sergey Beryozkin <sberyoz...@gmail.com>
Committed: Tue Sep 6 16:37:07 2016 +0100

----------------------------------------------------------------------
 .../main/java/demo/jaxrs/server/SparkJob.java   | 36 ++++++++++++++++++
 .../jaxrs/server/SparkStreamingListener.java    | 40 +++++++++++++++++++-
 .../demo/jaxrs/server/StreamingService.java     | 12 +++---
 3 files changed, 79 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/76edf935/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkJob.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkJob.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkJob.java
new file mode 100644
index 0000000..010ddf3
--- /dev/null
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkJob.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package demo.jaxrs.server;
+
+import javax.ws.rs.container.AsyncResponse;
+
+public class SparkJob implements Runnable {
+    private AsyncResponse ac;
+    private SparkStreamingListener sparkListener;
+    public SparkJob(AsyncResponse ac, SparkStreamingListener sparkListener) {
+        this.ac = ac;
+        this.sparkListener = sparkListener;
+    }
+    @Override
+    public void run() {
+        sparkListener.waitForBatchStarted();
+        ac.resume(sparkListener.getStreamOut());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/76edf935/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
index 3fd7284..8a891c2 100644
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package demo.jaxrs.server;
 
 import org.apache.spark.streaming.scheduler.StreamingListener;
@@ -12,7 +30,8 @@ import 
org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
 
 public class SparkStreamingListener implements StreamingListener {
     private SparkStreamingOutput streamOutput;
-
+    private boolean batchStarted;
+    
     public SparkStreamingListener(SparkStreamingOutput streamOutput) {
         this.streamOutput = streamOutput;
     }
@@ -23,7 +42,9 @@ public class SparkStreamingListener implements 
StreamingListener {
     }
 
     @Override
-    public void onBatchStarted(StreamingListenerBatchStarted event) {
+    public synchronized void onBatchStarted(StreamingListenerBatchStarted 
event) {
+        batchStarted = true;
+        notify();
     }
 
     @Override
@@ -49,5 +70,20 @@ public class SparkStreamingListener implements 
StreamingListener {
     @Override
     public void onReceiverStopped(StreamingListenerReceiverStopped arg0) {
     }
+
+    public SparkStreamingOutput getStreamOut() {
+        return streamOutput;
+    }
+
+    public synchronized void waitForBatchStarted() {
+        while (!batchStarted) {
+            try {
+                this.wait();
+            } catch (InterruptedException ex) {
+                // continue
+            }
+        }
+        
+    }
     
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/76edf935/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
index f986225..0e94b3c 100644
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -66,19 +66,17 @@ public class StreamingService {
     @Produces("text/plain")
     public void getStream(@Suspended AsyncResponse async, InputStream is) {
         try {
+            SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc);
+            SparkStreamingListener sparkListener =  new 
SparkStreamingListener(streamOut);
+            jssc.addStreamingListener(sparkListener);
+            
             JavaReceiverInputDStream<String> receiverStream = 
                 jssc.receiverStream(new InputStreamReceiver(is));
-            SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc);
-            jssc.addStreamingListener(new SparkStreamingListener(streamOut));
             JavaPairDStream<String, Integer> wordCounts = 
createOutputDStream(receiverStream);
             wordCounts.foreachRDD(new OutputFunction(streamOut));
             jssc.start();
                                                     
-            executor.execute(new Runnable() {
-                public void run() {
-                     async.resume(streamOut);
-                }
-            });
+            executor.execute(new SparkJob(async, sparkListener));
         } catch (Exception ex) {
             // the compiler does not allow to catch SparkException directly
             if (ex instanceof SparkException) {

Reply via email to