Repository: cxf Updated Branches: refs/heads/master 46438c487 -> 76edf9350
[CXF-6618] Resuming the async responce only when the batch is started Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/76edf935 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/76edf935 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/76edf935 Branch: refs/heads/master Commit: 76edf9350e7640860b66196ad40396d54bbcb2b2 Parents: 46438c4 Author: Sergey Beryozkin <sberyoz...@gmail.com> Authored: Tue Sep 6 16:37:07 2016 +0100 Committer: Sergey Beryozkin <sberyoz...@gmail.com> Committed: Tue Sep 6 16:37:07 2016 +0100 ---------------------------------------------------------------------- .../main/java/demo/jaxrs/server/SparkJob.java | 36 ++++++++++++++++++ .../jaxrs/server/SparkStreamingListener.java | 40 +++++++++++++++++++- .../demo/jaxrs/server/StreamingService.java | 12 +++--- 3 files changed, 79 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/76edf935/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkJob.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkJob.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkJob.java new file mode 100644 index 0000000..010ddf3 --- /dev/null +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkJob.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package demo.jaxrs.server; + +import javax.ws.rs.container.AsyncResponse; + +public class SparkJob implements Runnable { + private AsyncResponse ac; + private SparkStreamingListener sparkListener; + public SparkJob(AsyncResponse ac, SparkStreamingListener sparkListener) { + this.ac = ac; + this.sparkListener = sparkListener; + } + @Override + public void run() { + sparkListener.waitForBatchStarted(); + ac.resume(sparkListener.getStreamOut()); + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/76edf935/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java index 3fd7284..8a891c2 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package demo.jaxrs.server; import org.apache.spark.streaming.scheduler.StreamingListener; @@ -12,7 +30,8 @@ import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped; public class SparkStreamingListener implements StreamingListener { private SparkStreamingOutput streamOutput; - + private boolean batchStarted; + public SparkStreamingListener(SparkStreamingOutput streamOutput) { this.streamOutput = streamOutput; } @@ -23,7 +42,9 @@ public class SparkStreamingListener implements StreamingListener { } @Override - public void onBatchStarted(StreamingListenerBatchStarted event) { + public synchronized void onBatchStarted(StreamingListenerBatchStarted event) { + batchStarted = true; + notify(); } @Override @@ -49,5 +70,20 @@ public class SparkStreamingListener implements StreamingListener { @Override public void onReceiverStopped(StreamingListenerReceiverStopped arg0) { } + + public SparkStreamingOutput getStreamOut() { + return streamOutput; + } + + public synchronized void waitForBatchStarted() { + while (!batchStarted) { + try { + this.wait(); + } catch (InterruptedException ex) { + // continue + } + } + + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/76edf935/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java index f986225..0e94b3c 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java @@ -66,19 +66,17 @@ public class StreamingService { @Produces("text/plain") public void getStream(@Suspended AsyncResponse async, InputStream is) { try { + SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc); + SparkStreamingListener sparkListener = new SparkStreamingListener(streamOut); + jssc.addStreamingListener(sparkListener); + JavaReceiverInputDStream<String> receiverStream = jssc.receiverStream(new InputStreamReceiver(is)); - SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc); - jssc.addStreamingListener(new SparkStreamingListener(streamOut)); JavaPairDStream<String, Integer> wordCounts = createOutputDStream(receiverStream); wordCounts.foreachRDD(new OutputFunction(streamOut)); jssc.start(); - executor.execute(new Runnable() { - public void run() { - async.resume(streamOut); - } - }); + executor.execute(new SparkJob(async, sparkListener)); } catch (Exception ex) { // the compiler does not allow to catch SparkException directly if (ex instanceof SparkException) {