Repository: cxf Updated Branches: refs/heads/master 21c788a31 -> 0f74af5ed
Updating Spark demo to support one way HTTP requests and process concurrent JAX-RS requests OOB when Socket Receiver is used Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/0f74af5e Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/0f74af5e Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/0f74af5e Branch: refs/heads/master Commit: 0f74af5ed7561838e1950c758a7a05e588badd10 Parents: 21c788a Author: Sergey Beryozkin <[email protected]> Authored: Mon Sep 26 12:30:47 2016 +0100 Committer: Sergey Beryozkin <[email protected]> Committed: Mon Sep 26 12:30:47 2016 +0100 ---------------------------------------------------------------------- .../release/samples/jax_rs/spark/README.txt | 17 ++++-- .../main/java/demo/jaxrs/server/SparkUtils.java | 21 +++++++- .../jaxrs/server/simple/StreamingService.java | 54 +++++++++++++++++++- .../java/demo/jaxrs/server/socket/Server.java | 20 +++++--- .../java/demo/jaxrs/server/socket/SparkJob.java | 20 +++++--- .../jaxrs/server/socket/SparkResultJob.java | 53 +++++++++++++++++++ .../server/socket/SparkStreamingOutput.java | 35 +++++++++---- .../jaxrs/server/socket/StreamingService.java | 21 ++++++-- 8 files changed, 207 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/0f74af5e/distribution/src/main/release/samples/jax_rs/spark/README.txt ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/README.txt b/distribution/src/main/release/samples/jax_rs/spark/README.txt index cf61100..c143a85 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/README.txt +++ b/distribution/src/main/release/samples/jax_rs/spark/README.txt @@ -32,14 +32,23 @@ Next do: curl -X POST -H "Accept: text/plain" -H "Content-Type: text/plain" -d "Hello Spark" http://localhost:9000/spark/stream -2. PDF/ODT/ODP processing: +2. Simple one way text processing: + +curl -X POST -H "Accept: text/plain" -H "Content-Type: text/plain" -d "Hello Spark" http://localhost:9000/spark/streamOneWay + +3. PDF/ODT/ODP processing: Open multipart.html located in src/main/resources, locate any PDF or OpenOffice text or presentation file available on the local disk and upload. -Note Spark restricts that only a single streaming context can be active in JVM at a given moment of time. -This is the error which will be logged if you try to access the demo server concurrently: -"org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). +Note Spark restricts that only a single streaming context can be active in JVM at a given moment of time. +demo.jaxrs.server.simple.Server creates a new context per every request so this is the error which will be logged +if you try to access this demo server concurrently: +"org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true". +However demo.jaxrs.server.socket.Server creates only a single context and its JAX-RS frontend can process multiple requests concurrently +without having to set "spark.driver.allowMultipleContexts = true". + + http://git-wip-us.apache.org/repos/asf/cxf/blob/0f74af5e/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkUtils.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkUtils.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkUtils.java index a01db0f..ff29627 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkUtils.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkUtils.java @@ -45,9 +45,9 @@ public final class SparkUtils { } public static JavaPairDStream<String, Integer> createOutputDStream( - JavaDStream<String> receiverStream) { + JavaDStream<String> receiverStream, boolean withId) { final JavaDStream<String> words = - receiverStream.flatMap(x -> splitInputString(x)); + receiverStream.flatMap(x -> (withId ? splitInputStringWithId(x) : splitInputString(x))); final JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> { return new Tuple2<String, Integer>(s, 1); @@ -69,6 +69,23 @@ public final class SparkUtils { } return list.iterator(); } + public static Iterator<String> splitInputStringWithId(String x) { + int index = x.indexOf(":"); + String jobId = x.substring(0, index); + x = x.substring(index + 1); + + List<String> list = new LinkedList<String>(); + for (String s : Arrays.asList(x.split(" "))) { + s = s.trim(); + if (s.endsWith(":") || s.endsWith(",") || s.endsWith(";") || s.endsWith(".")) { + s = s.substring(0, s.length() - 1); + } + if (!s.isEmpty()) { + list.add(jobId + ":" + s); + } + } + return list.iterator(); + } public static String getRandomId() { byte[] bytes = new byte[10]; new Random().nextBytes(bytes); http://git-wip-us.apache.org/repos/asf/cxf/blob/0f74af5e/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StreamingService.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StreamingService.java index a561bc0..93dbfb8 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StreamingService.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StreamingService.java @@ -38,6 +38,7 @@ import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; +import org.apache.cxf.jaxrs.ext.Oneway; import org.apache.cxf.jaxrs.ext.multipart.Attachment; import org.apache.cxf.jaxrs.ext.multipart.Multipart; import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor; @@ -103,6 +104,13 @@ public class StreamingService { public void processSimpleStream(@Suspended AsyncResponse async, InputStream is) { processStream(async, SparkUtils.getStringsFromInputStream(is)); } + @POST + @Path("/streamOneWay") + @Consumes("text/plain") + @Oneway + public void processSimpleStreamOneWay(InputStream is) { + processStreamOneWay(SparkUtils.getStringsFromInputStream(is)); + } private void processStream(AsyncResponse async, List<String> inputStrings) { try { @@ -125,7 +133,7 @@ public class StreamingService { receiverStream = jssc.receiverStream(new StringListReceiver(inputStrings)); } - JavaPairDStream<String, Integer> wordCounts = SparkUtils.createOutputDStream(receiverStream); + JavaPairDStream<String, Integer> wordCounts = SparkUtils.createOutputDStream(receiverStream, false); wordCounts.foreachRDD(new OutputFunction(streamOut)); jssc.start(); @@ -140,6 +148,31 @@ public class StreamingService { } } + private void processStreamOneWay(List<String> inputStrings) { + try { + SparkConf sparkConf = new SparkConf().setMaster("local[*]") + .setAppName("JAX-RS Spark Connect OneWay " + SparkUtils.getRandomId()); + JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); + + JavaDStream<String> receiverStream = null; + if ("queue".equals(receiverType)) { + Queue<JavaRDD<String>> rddQueue = new LinkedList<>(); + for (int i = 0; i < 30; i++) { + rddQueue.add(jssc.sparkContext().parallelize(inputStrings)); + } + receiverStream = jssc.queueStream(rddQueue); + } else { + receiverStream = jssc.receiverStream(new StringListReceiver(inputStrings)); + } + + JavaPairDStream<String, Integer> wordCounts = SparkUtils.createOutputDStream(receiverStream, false); + wordCounts.foreachRDD(new PrintOutputFunction(jssc)); + jssc.start(); + } catch (Exception ex) { + // ignore + } + } + private static class OutputFunction implements VoidFunction<JavaPairRDD<String, Integer>> { private static final long serialVersionUID = 1L; @@ -156,5 +189,24 @@ public class StreamingService { } } + private static class PrintOutputFunction implements VoidFunction<JavaPairRDD<String, Integer>> { + private static final long serialVersionUID = 1L; + private JavaStreamingContext jssc; + PrintOutputFunction(JavaStreamingContext jssc) { + this.jssc = jssc; + } + @Override + public void call(JavaPairRDD<String, Integer> rdd) { + if (!rdd.collectAsMap().isEmpty()) { + for (Map.Entry<String, Integer> entry : rdd.collectAsMap().entrySet()) { + String value = entry.getKey() + " : " + entry.getValue(); + System.out.println(value); + } + jssc.stop(false); + jssc.close(); + } + } + + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/0f74af5e/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/Server.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/Server.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/Server.java index dc65c9d..fc7968c 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/Server.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/Server.java @@ -62,7 +62,7 @@ public class Server { JavaDStream<String> receiverStream = jssc.socketTextStream( "localhost", 9999, StorageLevels.MEMORY_ONLY); - JavaPairDStream<String, Integer> wordCounts = SparkUtils.createOutputDStream(receiverStream); + JavaPairDStream<String, Integer> wordCounts = SparkUtils.createOutputDStream(receiverStream, true); PrintStream sparkResponseOutputStream = new PrintStream(jaxrsResponseClientSocket.getOutputStream(), true); wordCounts.foreachRDD(new SocketOutputFunction(sparkResponseOutputStream)); @@ -106,12 +106,20 @@ public class Server { } @Override public void call(JavaPairRDD<String, Integer> rdd) { - for (Map.Entry<String, Integer> entry : rdd.collectAsMap().entrySet()) { - String value = entry.getKey() + " : " + entry.getValue(); - streamOut.println(value); - } if (!rdd.collectAsMap().isEmpty()) { - streamOut.println("<batchEnd>"); + String jobId = null; + PrintStream printStream = null; + for (Map.Entry<String, Integer> entry : rdd.collectAsMap().entrySet()) { + String value = entry.getKey() + " : " + entry.getValue(); + if (jobId == null) { + int index = value.indexOf(":"); + jobId = value.substring(0, index); + printStream = "oneway".equals(jobId) ? System.out : streamOut; + + } + printStream.println(value); + } + printStream.println(jobId + ":" + "<batchEnd>"); } } http://git-wip-us.apache.org/repos/asf/cxf/blob/0f74af5e/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkJob.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkJob.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkJob.java index a24668a..9128a4e 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkJob.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkJob.java @@ -18,30 +18,38 @@ */ package demo.jaxrs.server.socket; -import java.io.BufferedReader; import java.io.PrintStream; import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import javax.ws.rs.container.AsyncResponse; +import demo.jaxrs.server.SparkUtils; + public class SparkJob implements Runnable { private AsyncResponse ac; - private BufferedReader sparkInputStream; + private Map<String, BlockingQueue<String>> sparkResponses; private PrintStream sparkOutputStream; private List<String> inputStrings; - public SparkJob(AsyncResponse ac, BufferedReader sparkInputStream, + public SparkJob(AsyncResponse ac, Map<String, BlockingQueue<String>> sparkResponses, PrintStream sparkOutputStream, List<String> inputStrings) { this.ac = ac; this.inputStrings = inputStrings; - this.sparkInputStream = sparkInputStream; + this.sparkResponses = sparkResponses; this.sparkOutputStream = sparkOutputStream; } @Override public void run() { + String jobId = SparkUtils.getRandomId(); + BlockingQueue<String> queue = new LinkedBlockingQueue<String>(); + sparkResponses.put(jobId, queue); + for (String s : inputStrings) { - sparkOutputStream.println(s); + sparkOutputStream.println(jobId + ":" + s); } - ac.resume(new SparkStreamingOutput(sparkInputStream)); + ac.resume(new SparkStreamingOutput(sparkResponses, jobId, queue)); } } http://git-wip-us.apache.org/repos/asf/cxf/blob/0f74af5e/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkResultJob.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkResultJob.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkResultJob.java new file mode 100644 index 0000000..eb0286b --- /dev/null +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkResultJob.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package demo.jaxrs.server.socket; + +import java.io.BufferedReader; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + +public class SparkResultJob implements Runnable { + + private Map<String, BlockingQueue<String>> sparkResponses; + private BufferedReader sparkInputStream; + public SparkResultJob(Map<String, BlockingQueue<String>> sparkResponses, + BufferedReader sparkInputStream) { + this.sparkResponses = sparkResponses; + this.sparkInputStream = sparkInputStream; + } + + + @Override + public void run() { + try { + String s = null; + while ((s = sparkInputStream.readLine()) != null) { + int index = s.indexOf(":"); + String jobId = s.substring(0, index); + String value = s.substring(index + 1); + sparkResponses.get(jobId).offer(value); + } + } catch (IOException ex) { + // ignore + } + + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/0f74af5e/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkStreamingOutput.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkStreamingOutput.java index cce1275..681395a 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkStreamingOutput.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkStreamingOutput.java @@ -18,29 +18,44 @@ */ package demo.jaxrs.server.socket; -import java.io.BufferedReader; import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.StreamingOutput; public class SparkStreamingOutput implements StreamingOutput { - private BufferedReader sparkInputStream; - public SparkStreamingOutput(BufferedReader sparkInputStream) { - this.sparkInputStream = sparkInputStream; + private Map<String, BlockingQueue<String>> sparkResponses; + private String jobId; + private BlockingQueue<String> queue; + public SparkStreamingOutput(Map<String, BlockingQueue<String>> sparkResponses, String jobId, + BlockingQueue<String> queue) { + this.sparkResponses = sparkResponses; + this.jobId = jobId; + this.queue = queue; } @Override public void write(final OutputStream output) throws IOException, WebApplicationException { - PrintStream printStream = new PrintStream(output, true); - String s = null; - while ((s = sparkInputStream.readLine()) != null) { - if ("<batchEnd>".equals(s)) { - break; + PrintStream out = new PrintStream(output, true); + try { + while (true) { + String responseEntry = queue.poll(1, TimeUnit.MILLISECONDS); + if (responseEntry != null) { + if ("<batchEnd>".equals(responseEntry)) { + sparkResponses.remove(jobId); + break; + } else { + out.println(responseEntry); + } + } } - printStream.println(s); + } catch (InterruptedException ex) { + // ignore } } http://git-wip-us.apache.org/repos/asf/cxf/blob/0f74af5e/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/StreamingService.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/StreamingService.java index 7fa69df..e0e185b 100644 --- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/StreamingService.java +++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/StreamingService.java @@ -25,6 +25,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -37,6 +39,7 @@ import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; +import org.apache.cxf.jaxrs.ext.Oneway; import org.apache.cxf.jaxrs.ext.multipart.Attachment; import org.apache.cxf.jaxrs.ext.multipart.Multipart; import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor; @@ -56,13 +59,13 @@ public class StreamingService { } private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10)); - + private Map<String, BlockingQueue<String>> sparkResponses = + new ConcurrentHashMap<String, BlockingQueue<String>>(); private PrintStream sparkOutputStream; - private BufferedReader sparkInputStream; public StreamingService(BufferedReader sparkInputStream, PrintStream sparkOutputStream) { - this.sparkInputStream = sparkInputStream; this.sparkOutputStream = sparkOutputStream; + executor.execute(new SparkResultJob(sparkResponses, sparkInputStream)); } @POST @@ -98,8 +101,16 @@ public class StreamingService { private void processStream(AsyncResponse async, List<String> inputStrings) { executor.execute( - new SparkJob(async, sparkInputStream, sparkOutputStream, inputStrings)); + new SparkJob(async, sparkResponses, sparkOutputStream, inputStrings)); } - + @POST + @Path("/streamOneWay") + @Consumes("text/plain") + @Oneway + public void processSimpleStreamOneWay(InputStream is) { + for (String s : SparkUtils.getStringsFromInputStream(is)) { + sparkOutputStream.println("oneway:" + s); + } + } }
