Repository: cxf
Updated Branches:
  refs/heads/master 76edf9350 -> c8e788eee


[CXF-6618] Creating a new Sparc context per every new request


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/c8e788ee
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/c8e788ee
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/c8e788ee

Branch: refs/heads/master
Commit: c8e788eee8f7b5ce3a2161dfe727f3153ded607b
Parents: 76edf93
Author: Sergey Beryozkin <sberyoz...@gmail.com>
Authored: Tue Sep 6 17:18:31 2016 +0100
Committer: Sergey Beryozkin <sberyoz...@gmail.com>
Committed: Tue Sep 6 17:18:31 2016 +0100

----------------------------------------------------------------------
 .../src/main/java/demo/jaxrs/server/InputStreamReceiver.java  | 4 ----
 .../jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java  | 5 +----
 .../src/main/java/demo/jaxrs/server/StreamingService.java     | 7 ++++---
 3 files changed, 5 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/c8e788ee/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
index 05658e1..790ee35 100644
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
@@ -39,10 +39,6 @@ public class InputStreamReceiver extends Receiver<String> {
         super(StorageLevel.MEMORY_ONLY());
         BufferedReader reader = new BufferedReader(new InputStreamReader(is));
         String userInput = null;
-        // Receiver is meant to be serializable, but it would be
-        // great if if we could avoid copying InputStream
-        // TODO: submit Spark enhancement request so that it can keep 
streaming from 
-        // the incoming InputStream to its processing nodes ?
         while ((userInput = readLine(reader)) != null) {
             inputStrings.add(userInput);
         }

http://git-wip-us.apache.org/repos/asf/cxf/blob/c8e788ee/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
index c74b215..50f915a 100644
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
@@ -21,18 +21,15 @@ package demo.jaxrs.server;
 
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.spark.SparkConf;
 
 
 public class Server {
 
     protected Server() throws Exception {
-        SparkConf sparkConf = new 
SparkConf().setMaster("local[*]").setAppName("JAX-RS Spark Connect");
-        
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
         sf.setResourceClasses(StreamingService.class);
         sf.setResourceProvider(StreamingService.class, 
-            new SingletonResourceProvider(new StreamingService(sparkConf)));
+            new SingletonResourceProvider(new StreamingService()));
         sf.setAddress("http://localhost:9000/";);
 
         sf.create();

http://git-wip-us.apache.org/repos/asf/cxf/blob/c8e788ee/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
index 0e94b3c..c9cc033 100644
--- 
a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ 
b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -55,9 +55,7 @@ import scala.Tuple2;
 public class StreamingService {
     private Executor executor = new ThreadPoolExecutor(5, 5, 0, 
TimeUnit.SECONDS,
                                                        new 
ArrayBlockingQueue<Runnable>(10));
-    private JavaStreamingContext jssc;
-    public StreamingService(SparkConf sparkConf) {
-        jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
+    public StreamingService() {
     }
     
     @POST
@@ -66,6 +64,9 @@ public class StreamingService {
     @Produces("text/plain")
     public void getStream(@Suspended AsyncResponse async, InputStream is) {
         try {
+            SparkConf sparkConf = new 
SparkConf().setMaster("local[*]").setAppName("JAX-RS Spark Connect");
+            JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(1)); 
+            
             SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc);
             SparkStreamingListener sparkListener =  new 
SparkStreamingListener(streamOut);
             jssc.addStreamingListener(sparkListener);

Reply via email to