Re: Spark streaming for synchronous API
Tobias, Let me explain a little more. I want to create a synchronous REST API that will process some data that is passed in as some request. I would imagine that the Spark Streaming Job on YARN is a long running job that waits on requests from something. What that something is is still not clear to me, but I would imagine that it’s some queue. The goal is to be able to push a message onto a queue with some id, and then get the processed results back from Spark Streaming. The goal is for the REST API be able to respond to lots of calls with low latency. Hope that clarifies things... Thanks, Ron On Sep 8, 2014, at 7:41 PM, Tobias Pfeiffer t...@preferred.jp wrote: Ron, On Tue, Sep 9, 2014 at 11:27 AM, Ron's Yahoo! zlgonza...@yahoo.com.invalid wrote: I’m trying to figure out how I can run Spark Streaming like an API. The goal is to have a synchronous REST API that runs the spark data flow on YARN. I guess I *may* develop something similar in the future. By a synchronous REST API, do you mean that submitting the job is synchronous and you would fetch the processing results via a different call? Or do you want to submit a job and get the processed data back as an HTTP stream? To begin with, is it even possible to have Spark Streaming run as a yarn job? I think it is very much possible to run Spark Streaming as a YARN job; at least it worked well with Mesos. Tobias
Re: Spark streaming for synchronous API
Tobias, Let me explain a little more. I want to create a synchronous REST API that will process some data that is passed in as some request. I would imagine that the Spark Streaming Job on YARN is a long running job that waits on requests from something. What that something is is still not clear to me, but I would imagine that it’s some queue. The goal is to be able to push a message onto a queue with some id, and then get the processed results back from Spark Streaming. The goal is for the REST API be able to respond to lots of calls with low latency. Hope that clarifies things... Thanks, Ron On Sep 8, 2014, at 7:41 PM, Tobias Pfeiffer t...@preferred.jp wrote: Ron, On Tue, Sep 9, 2014 at 11:27 AM, Ron's Yahoo! zlgonza...@yahoo.com.invalid wrote: I’m trying to figure out how I can run Spark Streaming like an API. The goal is to have a synchronous REST API that runs the spark data flow on YARN. I guess I *may* develop something similar in the future. By a synchronous REST API, do you mean that submitting the job is synchronous and you would fetch the processing results via a different call? Or do you want to submit a job and get the processed data back as an HTTP stream? To begin with, is it even possible to have Spark Streaming run as a yarn job? I think it is very much possible to run Spark Streaming as a YARN job; at least it worked well with Mesos. Tobias
Re: Spark streaming for synchronous API
Hi Tobias, So I guess where I was coming from was the assumption that starting up a new job to be listening on a particular queue topic could be done asynchronously. For example, let’s say there’s a particular topic T1 in a Kafka queue. If I have a new set of requests coming from a particular client A, I was wondering if I could create a partition A. The streaming job is submitted to listen to T1.A and will write to a topic T2.A, which the REST endpoint would be listening on. It does seem a little contrived but the ultimate goal here is to get a bunch of messages from a queue, distribute to a bunch of Spark jobs that process and write back to another queue, which the REST endpoint synchronously waits on. Storm might be a better fit, but the background behind this question is that I want to reuse the same set of transformations for both batch and streaming, with the streaming use case represented by a REST call. In other words, the job submission would not be part of the equation so I would imagine the latency is limited to the processing, write back and consumption of the processed message by the original REST request. Let me know what you think… Thanks, Ron On Sep 8, 2014, at 9:28 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Tue, Sep 9, 2014 at 12:59 PM, Ron's Yahoo! zlgonza...@yahoo.com wrote: I want to create a synchronous REST API that will process some data that is passed in as some request. I would imagine that the Spark Streaming Job on YARN is a long running job that waits on requests from something. What that something is is still not clear to me, but I would imagine that it’s some queue. The goal is to be able to push a message onto a queue with some id, and then get the processed results back from Spark Streaming. That is not exactly a Spark Streaming use case, I think. Spark Streaming pulls data from some source (like a queue), then processes all data collected in a certain interval in a mini-batch, and stores that data somewhere. It is not well suited for handling request-response cycles in a synchronous way; you might consider using plain Spark (without Streaming) for that. For example, you could use the unfiltered http://unfiltered.databinder.net/Unfiltered.html library and within request handling do some RDD operation, returning the output as HTTP response. This works fine as multiple threads can submit Spark jobs concurrently https://spark.apache.org/docs/latest/job-scheduling.html You could also check https://github.com/adobe-research/spindle -- that seems to be similar to what you are doing. The goal is for the REST API be able to respond to lots of calls with low latency. Hope that clarifies things... Note that low latency for lots of calls is maybe not something that Spark was built for. Even if you do close to nothing data processing, you may not get below 200ms or so due to the overhead of submitting jobs etc., from my experience. Tobias
Re: Compile spark code with idea succesful but run SparkPi error with java.lang.SecurityException
Not sure what your environment is but this happened to me before because I had a couple of servlet-api jars in the path which did not match. I was building a system that programmatically submitted jobs so I had my own jars that conflicted with that of spark. The solution is do mvn dependency:tree from your app and see what jars you have an ensure to exclude those. Thanks, Ron On Aug 11, 2014, at 6:36 AM, Zhanfeng Huo huozhanf...@gmail.com wrote: Hi, I have compiled spark-1.0.1 code with Intellij Idea 13.1.4 on ubuntu 14.04 succesful but when I run SparkPi Example in local mode it failed . I have set env export SPARK_HADOOP_VERSION=2.3.0 and export SPARK_YARN=true before I start Idea. I have attemped to use patch @https://github.com/apache/spark/pull/1271/files , but it doesn't effect. How can I solve this problem? The full message: 14/08/11 22:15:56 INFO Utils: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/08/11 22:15:56 WARN Utils: Your hostname, syn resolves to a loopback address: 127.0.1.1; using 192.168.159.132 instead (on interface eth0) 14/08/11 22:15:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 14/08/11 22:15:56 INFO SecurityManager: Changing view acls to: syn 14/08/11 22:15:56 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(syn) 14/08/11 22:15:57 INFO Slf4jLogger: Slf4jLogger started 14/08/11 22:15:57 INFO Remoting: Starting remoting 14/08/11 22:15:57 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@192.168.159.132:50914] 14/08/11 22:15:57 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@192.168.159.132:50914] 14/08/11 22:15:57 INFO SparkEnv: Registering MapOutputTracker 14/08/11 22:15:57 INFO SparkEnv: Registering BlockManagerMaster 14/08/11 22:15:57 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140811221557-dd19 14/08/11 22:15:57 INFO MemoryStore: MemoryStore started with capacity 804.3 MB. 14/08/11 22:15:57 INFO ConnectionManager: Bound socket to port 56061 with id = ConnectionManagerId(192.168.159.132,56061) 14/08/11 22:15:57 INFO BlockManagerMaster: Trying to register BlockManager 14/08/11 22:15:57 INFO BlockManagerInfo: Registering block manager 192.168.159.132:56061 with 804.3 MB RAM 14/08/11 22:15:57 INFO BlockManagerMaster: Registered BlockManager 14/08/11 22:15:57 INFO HttpServer: Starting HTTP Server 14/08/11 22:15:57 INFO HttpBroadcast: Broadcast server started at http://192.168.159.132:39676 14/08/11 22:15:57 INFO HttpFileServer: HTTP File server directory is /tmp/spark-f8474345-0dcd-41c4-9247-3e916d409b27 14/08/11 22:15:57 INFO HttpServer: Starting HTTP Server Exception in thread main java.lang.SecurityException: class javax.servlet.FilterRegistration's signer information does not match signer information of other classes in the same package at java.lang.ClassLoader.checkCerts(ClassLoader.java:952) at java.lang.ClassLoader.preDefineClass(ClassLoader.java:666) at java.lang.ClassLoader.defineClass(ClassLoader.java:794) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at org.eclipse.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:136) at org.eclipse.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:129) at org.eclipse.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:98) at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:98) at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:89) at org.apache.spark.ui.WebUI.attachPage(WebUI.scala:64) at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:57) at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:57) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.ui.WebUI.attachTab(WebUI.scala:57) at org.apache.spark.ui.SparkUI.initialize(SparkUI.scala:66) at org.apache.spark.ui.SparkUI.init(SparkUI.scala:60) at org.apache.spark.ui.SparkUI.init(SparkUI.scala:42 at org.apache.spark.SparkContext.init(SparkContext.scala:221) at
Re: Issues with HDP 2.4.0.2.1.3.0-563
That failed since it defaulted the versions for yarn and hadoop I’ll give it a try with just 2.4.0 for both yarn and hadoop… Thanks, Ron On Aug 4, 2014, at 9:44 AM, Patrick Wendell pwend...@gmail.com wrote: Can you try building without any of the special `hadoop.version` flags and just building only with -Phadoop-2.4? In the past users have reported issues trying to build random spot versions... I think HW is supposed to be compatible with the normal 2.4.0 build. On Mon, Aug 4, 2014 at 8:35 AM, Ron's Yahoo! zlgonza...@yahoo.com.invalid wrote: Thanks, I ensured that $SPARK_HOME/pom.xml had the HDP repository under the repositories element. I also confirmed that if the build couldn’t find the version, it would fail fast so it seems as if it’s able to get the versions it needs to build the distribution. I ran the following (generated from make-distribution.sh), but it did not address the problem, while building with an older version (2.4.0.2.1.2.0-402) worked. Any other thing I can try? mvn clean package -Phadoop-2.4 -Phive -Pyarn -Dyarn.version=2.4.0.2.1.2.0-563 -Dhadoop.version=2.4.0.2.1.3.0-563 -DskipTests Thanks, Ron On Aug 4, 2014, at 7:13 AM, Steve Nunez snu...@hortonworks.com wrote: Provided you¹ve got the HWX repo in your pom.xml, you can build with this line: mvn -Pyarn -Phive -Phadoop-2.4 -Dhadoop.version=2.4.0.2.1.1.0-385 -DskipTests clean package I haven¹t tried building a distro, but it should be similar. - SteveN On 8/4/14, 1:25, Sean Owen so...@cloudera.com wrote: For any Hadoop 2.4 distro, yes, set hadoop.version but also set -Phadoop-2.4. http://spark.apache.org/docs/latest/building-with-maven.html On Mon, Aug 4, 2014 at 9:15 AM, Patrick Wendell pwend...@gmail.com wrote: For hortonworks, I believe it should work to just link against the corresponding upstream version. I.e. just set the Hadoop version to 2.4.0 Does that work? - Patrick On Mon, Aug 4, 2014 at 12:13 AM, Ron's Yahoo! zlgonza...@yahoo.com.invalid wrote: Hi, Not sure whose issue this is, but if I run make-distribution using HDP 2.4.0.2.1.3.0-563 as the hadoop version (replacing it in make-distribution.sh), I get a strange error with the exception below. If I use a slightly older version of HDP (2.4.0.2.1.2.0-402) with make-distribution, using the generated assembly all works fine for me. Either 1.0.0 or 1.0.1 will work fine. Should I file a JIRA or is this a known issue? Thanks, Ron Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 1 times, most recent failure: Exception failure in TID 0 on host localhost: java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyI nputFormat.java:47) org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:111) org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:99) org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:61) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:615) java.lang.Thread.run(Thread.java:745) - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You
Re: Issues with HDP 2.4.0.2.1.3.0-563
I meant yarn and hadoop defaulted to 1.0.4 so the yarn build fails since 1.0.4 doesn’t exist for yarn... Thanks, Ron On Aug 4, 2014, at 10:01 AM, Ron's Yahoo! zlgonza...@yahoo.com wrote: That failed since it defaulted the versions for yarn and hadoop I’ll give it a try with just 2.4.0 for both yarn and hadoop… Thanks, Ron On Aug 4, 2014, at 9:44 AM, Patrick Wendell pwend...@gmail.com wrote: Can you try building without any of the special `hadoop.version` flags and just building only with -Phadoop-2.4? In the past users have reported issues trying to build random spot versions... I think HW is supposed to be compatible with the normal 2.4.0 build. On Mon, Aug 4, 2014 at 8:35 AM, Ron's Yahoo! zlgonza...@yahoo.com.invalid wrote: Thanks, I ensured that $SPARK_HOME/pom.xml had the HDP repository under the repositories element. I also confirmed that if the build couldn’t find the version, it would fail fast so it seems as if it’s able to get the versions it needs to build the distribution. I ran the following (generated from make-distribution.sh), but it did not address the problem, while building with an older version (2.4.0.2.1.2.0-402) worked. Any other thing I can try? mvn clean package -Phadoop-2.4 -Phive -Pyarn -Dyarn.version=2.4.0.2.1.2.0-563 -Dhadoop.version=2.4.0.2.1.3.0-563 -DskipTests Thanks, Ron On Aug 4, 2014, at 7:13 AM, Steve Nunez snu...@hortonworks.com wrote: Provided you¹ve got the HWX repo in your pom.xml, you can build with this line: mvn -Pyarn -Phive -Phadoop-2.4 -Dhadoop.version=2.4.0.2.1.1.0-385 -DskipTests clean package I haven¹t tried building a distro, but it should be similar. - SteveN On 8/4/14, 1:25, Sean Owen so...@cloudera.com wrote: For any Hadoop 2.4 distro, yes, set hadoop.version but also set -Phadoop-2.4. http://spark.apache.org/docs/latest/building-with-maven.html On Mon, Aug 4, 2014 at 9:15 AM, Patrick Wendell pwend...@gmail.com wrote: For hortonworks, I believe it should work to just link against the corresponding upstream version. I.e. just set the Hadoop version to 2.4.0 Does that work? - Patrick On Mon, Aug 4, 2014 at 12:13 AM, Ron's Yahoo! zlgonza...@yahoo.com.invalid wrote: Hi, Not sure whose issue this is, but if I run make-distribution using HDP 2.4.0.2.1.3.0-563 as the hadoop version (replacing it in make-distribution.sh), I get a strange error with the exception below. If I use a slightly older version of HDP (2.4.0.2.1.2.0-402) with make-distribution, using the generated assembly all works fine for me. Either 1.0.0 or 1.0.1 will work fine. Should I file a JIRA or is this a known issue? Thanks, Ron Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 1 times, most recent failure: Exception failure in TID 0 on host localhost: java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyI nputFormat.java:47) org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:111) org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:99) org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:61) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:615) java.lang.Thread.run(Thread.java:745) - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication
Re: How to share a NonSerializable variable among tasks in the same worker node?
I think you’re going to have to make it serializable by registering it with the Kryo registrator. I think multiple workers are running as separate VMs so it might need to be able to serialize and deserialize broadcasted variables to the different executors. Thanks, Ron On Aug 3, 2014, at 6:38 PM, Fengyun RAO raofeng...@gmail.com wrote: Could anybody help? I wonder if I asked a stupid question or I didn't make the question clear? 2014-07-31 21:47 GMT+08:00 Fengyun RAO raofeng...@gmail.com: As shown here: 2 - Why Is My Spark Job so Slow and Only Using a Single Thread? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 object JSONParser { def parse(raw: String): String = ... } object MyFirstSparkJob { def main(args: Array[String]) { val sc = new SparkContext() val lines = sc.textFileStream(beacons.txt) lines.map(line = JSONParser.parse(line)) lines.foreach(line = println(line)) ssc.start() } } It says parser instance is now a singleton created in the scope of our driver program which I thought was in the scope of executor. Am I wrong, or why? What if the parser is not serializable, and I want to share it among tasks in the same worker node?