Repository: cxf Updated Branches: refs/heads/master 76edf9350 -> c8e788eee
[CXF-6618] Creating a new Sparc context per every new request Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/c8e788ee Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/c8e788ee Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/c8e788ee Branch: refs/heads/master Commit: c8e788eee8f7b5ce3a2161dfe727f3153ded607b Parents: 76edf93 Author: Sergey Beryozkin <sberyoz...@gmail.com> Authored: Tue Sep 6 17:18:31 2016 +0100 Committer: Sergey Beryozkin <sberyoz...@gmail.com> Committed: Tue Sep 6 17:18:31 2016 +0100 ---------------------------------------------------------------------- .../src/main/java/demo/jaxrs/server/InputStreamReceiver.java | 4 ---- .../jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java | 5 +---- .../src/main/java/demo/jaxrs/server/StreamingService.java | 7 ++++--- 3 files changed, 5 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/c8e788ee/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java index 05658e1..790ee35 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java @@ -39,10 +39,6 @@ public class InputStreamReceiver extends Receiver<String> { super(StorageLevel.MEMORY_ONLY()); BufferedReader reader = new BufferedReader(new InputStreamReader(is)); String userInput = null; - // Receiver is meant to be serializable, but it would be - // great if if we could avoid copying InputStream - // TODO: submit Spark enhancement request so that it can keep streaming from - // the incoming InputStream to its processing nodes ? while ((userInput = readLine(reader)) != null) { inputStrings.add(userInput); } http://git-wip-us.apache.org/repos/asf/cxf/blob/c8e788ee/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java index c74b215..50f915a 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java @@ -21,18 +21,15 @@ package demo.jaxrs.server; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; -import org.apache.spark.SparkConf; public class Server { protected Server() throws Exception { - SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("JAX-RS Spark Connect"); - JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); sf.setResourceClasses(StreamingService.class); sf.setResourceProvider(StreamingService.class, - new SingletonResourceProvider(new StreamingService(sparkConf))); + new SingletonResourceProvider(new StreamingService())); sf.setAddress("http://localhost:9000/"); sf.create(); http://git-wip-us.apache.org/repos/asf/cxf/blob/c8e788ee/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java index 0e94b3c..c9cc033 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java @@ -55,9 +55,7 @@ import scala.Tuple2; public class StreamingService { private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10)); - private JavaStreamingContext jssc; - public StreamingService(SparkConf sparkConf) { - jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); + public StreamingService() { } @POST @@ -66,6 +64,9 @@ public class StreamingService { @Produces("text/plain") public void getStream(@Suspended AsyncResponse async, InputStream is) { try { + SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("JAX-RS Spark Connect"); + JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); + SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc); SparkStreamingListener sparkListener = new SparkStreamingListener(streamOut); jssc.addStreamingListener(sparkListener);