[14/37] cxf git commit: [CXF-6618] Creating a new Sparc context per every new request
[CXF-6618] Creating a new Sparc context per every new request Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/c8e788ee Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/c8e788ee Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/c8e788ee Branch: refs/heads/master-jaxrs-2.1 Commit: c8e788eee8f7b5ce3a2161dfe727f3153ded607b Parents: 76edf93 Author: Sergey Beryozkin Authored: Tue Sep 6 17:18:31 2016 +0100 Committer: Sergey Beryozkin Committed: Tue Sep 6 17:18:31 2016 +0100 -- .../src/main/java/demo/jaxrs/server/InputStreamReceiver.java | 4 .../jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java | 5 + .../src/main/java/demo/jaxrs/server/StreamingService.java | 7 --- 3 files changed, 5 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/cxf/blob/c8e788ee/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java -- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java index 05658e1..790ee35 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java @@ -39,10 +39,6 @@ public class InputStreamReceiver extends Receiver { super(StorageLevel.MEMORY_ONLY()); BufferedReader reader = new BufferedReader(new InputStreamReader(is)); String userInput = null; -// Receiver is meant to be serializable, but it would be -// great if if we could avoid copying InputStream -// TODO: submit Spark enhancement request so that it can keep streaming from -// the incoming InputStream to its processing nodes ? while ((userInput = readLine(reader)) != null) { inputStrings.add(userInput); } http://git-wip-us.apache.org/repos/asf/cxf/blob/c8e788ee/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java -- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java index c74b215..50f915a 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java @@ -21,18 +21,15 @@ package demo.jaxrs.server; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; -import org.apache.spark.SparkConf; public class Server { protected Server() throws Exception { -SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("JAX-RS Spark Connect"); - JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); sf.setResourceClasses(StreamingService.class); sf.setResourceProvider(StreamingService.class, -new SingletonResourceProvider(new StreamingService(sparkConf))); +new SingletonResourceProvider(new StreamingService())); sf.setAddress("http://localhost:9000/";); sf.create(); http://git-wip-us.apache.org/repos/asf/cxf/blob/c8e788ee/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java -- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java index 0e94b3c..c9cc033 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java @@ -55,9 +55,7 @@ import scala.Tuple2; public class StreamingService { private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(10)); -private JavaStreamingContext jssc; -public StreamingService(SparkConf sparkConf) { -jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); +public StreamingService() { } @POST @@ -66,6 +64,9 @@ public class StreamingSe
cxf git commit: [CXF-6618] Creating a new Sparc context per every new request
Repository: cxf Updated Branches: refs/heads/master 76edf9350 -> c8e788eee [CXF-6618] Creating a new Sparc context per every new request Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/c8e788ee Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/c8e788ee Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/c8e788ee Branch: refs/heads/master Commit: c8e788eee8f7b5ce3a2161dfe727f3153ded607b Parents: 76edf93 Author: Sergey Beryozkin Authored: Tue Sep 6 17:18:31 2016 +0100 Committer: Sergey Beryozkin Committed: Tue Sep 6 17:18:31 2016 +0100 -- .../src/main/java/demo/jaxrs/server/InputStreamReceiver.java | 4 .../jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java | 5 + .../src/main/java/demo/jaxrs/server/StreamingService.java | 7 --- 3 files changed, 5 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/cxf/blob/c8e788ee/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java -- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java index 05658e1..790ee35 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java @@ -39,10 +39,6 @@ public class InputStreamReceiver extends Receiver { super(StorageLevel.MEMORY_ONLY()); BufferedReader reader = new BufferedReader(new InputStreamReader(is)); String userInput = null; -// Receiver is meant to be serializable, but it would be -// great if if we could avoid copying InputStream -// TODO: submit Spark enhancement request so that it can keep streaming from -// the incoming InputStream to its processing nodes ? while ((userInput = readLine(reader)) != null) { inputStrings.add(userInput); } http://git-wip-us.apache.org/repos/asf/cxf/blob/c8e788ee/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java -- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java index c74b215..50f915a 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java @@ -21,18 +21,15 @@ package demo.jaxrs.server; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; -import org.apache.spark.SparkConf; public class Server { protected Server() throws Exception { -SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("JAX-RS Spark Connect"); - JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); sf.setResourceClasses(StreamingService.class); sf.setResourceProvider(StreamingService.class, -new SingletonResourceProvider(new StreamingService(sparkConf))); +new SingletonResourceProvider(new StreamingService())); sf.setAddress("http://localhost:9000/";); sf.create(); http://git-wip-us.apache.org/repos/asf/cxf/blob/c8e788ee/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java -- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java index 0e94b3c..c9cc033 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java @@ -55,9 +55,7 @@ import scala.Tuple2; public class StreamingService { private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(10)); -private JavaStreamingContext jssc; -public StreamingService(SparkConf sparkConf) { -jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); +public StreamingService()