Re: Issue with zip and partitions
From API docs: Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc. Assumes that the two RDDs have the *same number of partitions* and the *same number of elements in each partition* (e.g. one was made through a map on the other). Basically, one RDD should be a mapped RDD of the other, or both RDDs are mapped RDDs of the same RDD. Btw, your message says Dell - Internal Use - Confidential... Best, Xiangrui On Tue, Apr 1, 2014 at 7:27 PM, patrick_nico...@dell.com wrote: Dell - Internal Use - Confidential I got an exception can't zip RDDs with unusual numbers of Partitions when I apply any action (reduce, collect) of dataset created by zipping two dataset of 10 million entries each. The problem occurs independently of the number of partitions or when I let Spark creates those partitions. Interestingly enough, I do not have problem zipping datasets of 1 and 2.5 million entries. A similar problem was reported on this board with 0.8 but remember if the problem was fixed. Any idea? Any workaround? I appreciate.
Re: Using ProtoBuf 2.5 for messages with Spark Streaming
It's this: mvn -Dhadoop.version=2.0.0-cdh4.2.1 -DskipTests clean package On Tue, Apr 1, 2014 at 5:15 PM, Vipul Pandey vipan...@gmail.com wrote: how do you recommend building that - it says ERROR] Failed to execute goal org.apache.maven.plugins:maven-assembly-plugin:2.2-beta-5:assembly (default-cli) on project spark-0.9.0-incubating: Error reading assemblies: No assembly descriptors found. - [Help 1] upon runnning mvn -Dhadoop.version=2.0.0-cdh4.2.1 -DskipTests clean assembly:assembly On Apr 1, 2014, at 4:13 PM, Patrick Wendell pwend...@gmail.com wrote: Do you get the same problem if you build with maven? On Tue, Apr 1, 2014 at 12:23 PM, Vipul Pandey vipan...@gmail.com wrote: SPARK_HADOOP_VERSION=2.0.0-cdh4.2.1 sbt/sbt assembly That's all I do. On Apr 1, 2014, at 11:41 AM, Patrick Wendell pwend...@gmail.com wrote: Vidal - could you show exactly what flags/commands you are using when you build spark to produce this assembly? On Tue, Apr 1, 2014 at 12:53 AM, Vipul Pandey vipan...@gmail.com wrote: Spark now shades its own protobuf dependency so protobuf 2.4.1 should't be getting pulled in unless you are directly using akka yourself. Are you? No i'm not. Although I see that protobuf libraries are directly pulled into the 0.9.0 assembly jar - I do see the shaded version as well. e.g. below for Message.class -bash-4.1$ jar -ftv ./assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-hadoop2.0.0-cdh4.2.1.jar | grep protobuf | grep /Message.class 478 Thu Jun 30 15:26:12 PDT 2011 com/google/protobuf/Message.class 508 Sat Dec 14 14:20:38 PST 2013 com/google/protobuf_spark/Message.class Does your project have other dependencies that might be indirectly pulling in protobuf 2.4.1? It would be helpful if you could list all of your dependencies including the exact Spark version and other libraries. I did have another one which I moved to the end of classpath - even ran partial code without that dependency but it still failed whenever I use the jar with ScalaBuf dependency. Spark version is 0.9.0 ~Vipul On Mar 31, 2014, at 4:51 PM, Patrick Wendell pwend...@gmail.com wrote: Spark now shades its own protobuf dependency so protobuf 2.4.1 should't be getting pulled in unless you are directly using akka yourself. Are you? Does your project have other dependencies that might be indirectly pulling in protobuf 2.4.1? It would be helpful if you could list all of your dependencies including the exact Spark version and other libraries. - Patrick On Sun, Mar 30, 2014 at 10:03 PM, Vipul Pandey vipan...@gmail.comwrote: I'm using ScalaBuff (which depends on protobuf2.5) and facing the same issue. any word on this one? On Mar 27, 2014, at 6:41 PM, Kanwaldeep kanwal...@gmail.com wrote: We are using Protocol Buffer 2.5 to send messages to Spark Streaming 0.9 with Kafka stream setup. I have protocol Buffer 2.5 part of the uber jar deployed on each of the spark worker nodes. The message is compiled using 2.5 but then on runtime it is being de-serialized by 2.4.1 as I'm getting the following exception java.lang.VerifyError (java.lang.VerifyError: class com.snc.sinet.messages.XServerMessage$XServer overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet;) java.lang.ClassLoader.defineClass1(Native Method) java.lang.ClassLoader.defineClassCond(ClassLoader.java:631) java.lang.ClassLoader.defineClass(ClassLoader.java:615) java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141) Suggestions on how I could still use ProtoBuf 2.5. Based on the article - https://spark-project.atlassian.net/browse/SPARK-995 we should be able to use different version of protobuf in the application. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-ProtoBuf-2-5-for-messages-with-Spark-Streaming-tp3396.html Sent from the Apache Spark User List mailing list archive at Nabble.com http://nabble.com/.
Re: How to index each map operation????
Hi Thierry, Your code does not work if @yh18190 wants a global counter. A RDD may have more than one partition. For each partition, cnt will be reset to -1. You can try the following code: scala val rdd = sc.parallelize( (1, 'a') :: (2, 'b') :: (3, 'c') :: (4, 'd') :: Nil) rdd: org.apache.spark.rdd.RDD[(Int, Char)] = ParallelCollectionRDD[3] at parallelize at console:12 scala import org.apache.spark.HashPartitioner import org.apache.spark.HashPartitioner scala val rdd2 = rdd.partitionBy(new HashPartitioner(2)) rdd2: org.apache.spark.rdd.RDD[(Int, Char)] = ShuffledRDD[4] at partitionBy at console:18 scala var cnt = -1 cnt: Int = -1 scala val rdd3 = rdd2.map(i = {cnt+=1; (cnt,i)} ) rdd3: org.apache.spark.rdd.RDD[(Int, (Int, Char))] = MappedRDD[5] at map at console:22 scala rdd3.collect res2: Array[(Int, (Int, Char))] = Array((0,(2,b)), (1,(4,d)), (0,(1,a)), (1,(3,c))) A proper solution is using rdd.partitionBy(new HashPartitioner(1)) to make sure there is only one partition. But that's not efficient for big input. Best Regards, Shixiong Zhu 2014-04-02 11:10 GMT+08:00 Thierry Herrmann thierry.herrm...@gmail.com: I'm new to Spark, but isn't this a pure scala question ? The following seems to work with the spark shell: $ spark-shell scala val rdd = sc.makeRDD(List(10,20,30)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at console:12 scala var cnt = -1 cnt: Int = -1 scala val rdd2 = rdd.map(i = {cnt+=1; (cnt,i)} ) rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MappedRDD[9] at map at console:16 scala rdd2.collect res8: Array[(Int, Int)] = Array((0,10), (1,20), (2,30)) Thierry -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-index-each-map-operation-tp3471p3614.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: possible bug in Spark's ALS implementation...
I think multiply by ratings is a heuristic that worked on rating related problems like netflix dataset or any other ratings datasets but the scope of NMF is much more broad than that @Sean please correct me in case you don't agree... Definitely it's good to add all the rating dataset related optimizations but it should not be in the core algorithm but say RatingALS which extends upon core ALS... On Wed, Apr 2, 2014 at 12:06 AM, Michael Allman m...@allman.ms wrote: Hi Nick, I don't have my spark clone in front of me, but OTOH the major differences are/were: 1. Oryx multiplies lambda by alpha. 2. Oryx uses a different matrix inverse algorithm. It maintains a certain symmetry which the Spark algo does not, however I don't think this difference has a real impact on the results. 3. Oryx supports the specification of a convergence threshold for termination of the algorithm, based on delta rmse on a subset of the training set if I recall correctly. I've been using that as the termination criterion instead of a fixed number of iterations. 4. Oryx uses the weighted regularization scheme you alluded to below, multiplying lambda by the number of ratings. I've patched the spark impl to support (4) but haven't pushed it to my clone on github. I think it would be a valuable feature to support officially. I'd also like to work on (3) but don't have time now. I've only been using Oryx the past couple of weeks. Cheers, Michael On Tue, 1 Apr 2014, Nick Pentreath [via Apache Spark User List] wrote: Hi Michael Would you mind setting out exactly what differences you did find between the Spark and Oryx implementations? Would be good to be clear on them, and also see if there are further tricks/enhancements from the Oryx one that can be ported (such as the lambda * numRatings adjustment). N On Sat, Mar 15, 2014 at 2:52 AM, Michael Allman [hidden email] wrote: I've been thoroughly investigating this issue over the past couple of days and have discovered quite a bit. For one thing, there is definitely (at least) one issue/bug in the Spark implementation that leads to incorrect results for models generated with rank 1 or a large number of iterations. I will post a bug report with a thorough explanation this weekend or on Monday. I believe I've been able to track down every difference between the Spark and Oryx implementations that lead to difference results. I made some adjustments to the spark implementation so that, given the same initial product/item vectors, the resulting model is identical to the one produced by Oryx within a small numerical tolerance. I've verified this for small data sets and am working on verifying this with some large data sets. Aside from those already identified in this thread, another significant difference in the Spark implementation is that it begins the factorization process by computing the product matrix (Y) from the initial user matrix (X). Both of the papers on ALS referred to in this thread begin the process by computing the user matrix. I haven't done any testing comparing the models generated starting from Y or X, but they are very different. Is there a reason Spark begins the iteration by computing Y? Initializing both X and Y as is done in the Spark implementation seems unnecessary unless I'm overlooking some desired side-effect. Only the factor matrix which generates the other in the first iteration needs to be initialized. I also found that the product and user RDDs were being rebuilt many times over in my tests, even for tiny data sets. By persisting the RDD returned from updateFeatures() I was able to avoid a raft of duplicate computations. Is there a reason not to do this? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s -ALS-implementation-tp2567p2704.html Sent from the Apache Spark User List mailing list archive at Nabble.com. If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s -ALS-implementation-tp2567p3588.html To unsubscribe from possible bug in Spark's ALS implementation..., click here. NAML -- View this message in context: Re: possible bug in Spark's ALS
Re: Using ProtoBuf 2.5 for messages with Spark Streaming
I downloaded 0.9.0 fresh and ran the mvn command - the assembly jar thus generated also has both shaded and real version of protobuf classes Vipuls-MacBook-Pro-3:spark-0.9.0-incubating vipul$ jar -ftv ./assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.0.0-cdh4.2.1.jar | grep proto | grep /Message 1190 Wed Apr 02 00:19:56 PDT 2014 com/google/protobuf_spark/MessageOrBuilder.class 2913 Wed Apr 02 00:19:56 PDT 2014 com/google/protobuf_spark/Message$Builder.class 704 Wed Apr 02 00:19:56 PDT 2014 com/google/protobuf_spark/MessageLite.class 1904 Wed Apr 02 00:19:56 PDT 2014 com/google/protobuf_spark/MessageLite$Builder.class 257 Wed Apr 02 00:19:56 PDT 2014 com/google/protobuf_spark/MessageLiteOrBuilder.class 508 Wed Apr 02 00:19:56 PDT 2014 com/google/protobuf_spark/Message.class 2661 Wed Apr 02 00:20:00 PDT 2014 com/google/protobuf/Message$Builder.class 478 Wed Apr 02 00:20:00 PDT 2014 com/google/protobuf/Message.class 1748 Wed Apr 02 00:20:00 PDT 2014 com/google/protobuf/MessageLite$Builder.class 668 Wed Apr 02 00:20:00 PDT 2014 com/google/protobuf/MessageLite.class 245 Wed Apr 02 00:20:00 PDT 2014 com/google/protobuf/MessageLiteOrBuilder.class 1112 Wed Apr 02 00:20:00 PDT 2014 com/google/protobuf/MessageOrBuilder.class On Apr 1, 2014, at 11:44 PM, Patrick Wendell pwend...@gmail.com wrote: It's this: mvn -Dhadoop.version=2.0.0-cdh4.2.1 -DskipTests clean package On Tue, Apr 1, 2014 at 5:15 PM, Vipul Pandey vipan...@gmail.com wrote: how do you recommend building that - it says ERROR] Failed to execute goal org.apache.maven.plugins:maven-assembly-plugin:2.2-beta-5:assembly (default-cli) on project spark-0.9.0-incubating: Error reading assemblies: No assembly descriptors found. - [Help 1] upon runnning mvn -Dhadoop.version=2.0.0-cdh4.2.1 -DskipTests clean assembly:assembly On Apr 1, 2014, at 4:13 PM, Patrick Wendell pwend...@gmail.com wrote: Do you get the same problem if you build with maven? On Tue, Apr 1, 2014 at 12:23 PM, Vipul Pandey vipan...@gmail.com wrote: SPARK_HADOOP_VERSION=2.0.0-cdh4.2.1 sbt/sbt assembly That's all I do. On Apr 1, 2014, at 11:41 AM, Patrick Wendell pwend...@gmail.com wrote: Vidal - could you show exactly what flags/commands you are using when you build spark to produce this assembly? On Tue, Apr 1, 2014 at 12:53 AM, Vipul Pandey vipan...@gmail.com wrote: Spark now shades its own protobuf dependency so protobuf 2.4.1 should't be getting pulled in unless you are directly using akka yourself. Are you? No i'm not. Although I see that protobuf libraries are directly pulled into the 0.9.0 assembly jar - I do see the shaded version as well. e.g. below for Message.class -bash-4.1$ jar -ftv ./assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-hadoop2.0.0-cdh4.2.1.jar | grep protobuf | grep /Message.class 478 Thu Jun 30 15:26:12 PDT 2011 com/google/protobuf/Message.class 508 Sat Dec 14 14:20:38 PST 2013 com/google/protobuf_spark/Message.class Does your project have other dependencies that might be indirectly pulling in protobuf 2.4.1? It would be helpful if you could list all of your dependencies including the exact Spark version and other libraries. I did have another one which I moved to the end of classpath - even ran partial code without that dependency but it still failed whenever I use the jar with ScalaBuf dependency. Spark version is 0.9.0 ~Vipul On Mar 31, 2014, at 4:51 PM, Patrick Wendell pwend...@gmail.com wrote: Spark now shades its own protobuf dependency so protobuf 2.4.1 should't be getting pulled in unless you are directly using akka yourself. Are you? Does your project have other dependencies that might be indirectly pulling in protobuf 2.4.1? It would be helpful if you could list all of your dependencies including the exact Spark version and other libraries. - Patrick On Sun, Mar 30, 2014 at 10:03 PM, Vipul Pandey vipan...@gmail.com wrote: I'm using ScalaBuff (which depends on protobuf2.5) and facing the same issue. any word on this one? On Mar 27, 2014, at 6:41 PM, Kanwaldeep kanwal...@gmail.com wrote: We are using Protocol Buffer 2.5 to send messages to Spark Streaming 0.9 with Kafka stream setup. I have protocol Buffer 2.5 part of the uber jar deployed on each of the spark worker nodes. The message is compiled using 2.5 but then on runtime it is being de-serialized by 2.4.1 as I'm getting the following exception java.lang.VerifyError (java.lang.VerifyError: class com.snc.sinet.messages.XServerMessage$XServer overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet;) java.lang.ClassLoader.defineClass1(Native Method) java.lang.ClassLoader.defineClassCond(ClassLoader.java:631) java.lang.ClassLoader.defineClass(ClassLoader.java:615) java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141) Suggestions on
Re: How to index each map operation????
Hi Therry, Thanks for the above responses..I implemented using RangePartitioner..we need to use any of the custom partitioners in orderto perform this task..Normally u cant maintain a counter becoz count operations should beperformed on each partitioned block ofdata... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-index-each-map-operation-tp3471p3624.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
CDH5 Spark on EC2
I’ve been able to get CDH5 up and running on EC2 and according to Cloudera Manager, Spark is running healthy. But when I try to run spark-shell, I eventually get the error: 14/04/02 07:18:18 INFO client.AppClient$ClientActor: Connecting to master spark://ip-172-xxx-xxx-xxx:7077... 14/04/02 07:18:38 ERROR client.AppClient$ClientActor: All masters are unresponsive! Giving up. 14/04/02 07:18:38 ERROR cluster.SparkDeploySchedulerBackend: Spark cluster looks dead, giving up. 14/04/02 07:18:38 ERROR scheduler.TaskSchedulerImpl: Exiting due to error from cluster scheduler: Spark cluster looks down Wondering which configurations I would need to change to get this to work? Thanks! Denny
Re: Status of MLI?
Thanks for the update Evan! In terms of using MLI, I see that the Github code is linked to Spark 0.8; will it not work with 0.9 (which is what I have set up) or higher versions? On Wed, Apr 2, 2014 at 1:44 AM, Evan R. Sparks [via Apache Spark User List] ml-node+s1001560n3615...@n3.nabble.com wrote: Hi there, MLlib is the first component of MLbase - MLI and the higher levels of the stack are still being developed. Look for updates in terms of our progress on the hyperparameter tuning/model selection problem in the next month or so! - Evan On Tue, Apr 1, 2014 at 8:05 PM, Krakna H [hidden email]http://user/SendEmail.jtp?type=nodenode=3615i=0 wrote: Hi Nan, I was actually referring to MLI/MLBase (http://www.mlbase.org); is this being actively developed? I'm familiar with mllib and have been looking at its documentation. Thanks! On Tue, Apr 1, 2014 at 10:44 PM, Nan Zhu [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=3612i=0 wrote: mllib has been part of Spark distribution (under mllib directory), also check http://spark.apache.org/docs/latest/mllib-guide.html and for JIRA, because of the recent migration to apache JIRA, I think all mllib-related issues should be under the Spark umbrella, https://issues.apache.org/jira/browse/SPARK/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel -- Nan Zhu On Tuesday, April 1, 2014 at 10:38 PM, Krakna H wrote: What is the current development status of MLI/MLBase? I see that the github repo is lying dormant (https://github.com/amplab/MLI) and JIRA has had no activity in the last 30 days ( https://spark-project.atlassian.net/browse/MLI/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel). Is the plan to add a lot of this into mllib itself without needing a separate API? Thanks! -- View this message in context: Status of MLI?http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3611.html To start a new topic under Apache Spark User List, email [hidden email]http://user/SendEmail.jtp?type=nodenode=3612i=1 To unsubscribe from Apache Spark User List, click here. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Status of MLI?http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3612.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3615.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click herehttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=c2hhbmthcmsrc3lzQGdtYWlsLmNvbXwxfDk3NjU5Mzg0 . NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3632.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: possible bug in Spark's ALS implementation...
It should be kept in mind that different implementations are rarely strictly better, and that what works well in one type of data might not in another. It also bears keeping in mind that several of these differences just amount to different amounts of regularization, which need not be a difference. #1 is just a question of 'semantics' really and not an algorithmic difference. #4 is more subtle and I think it is a small positive to use weighted regularization -- for all types of data. It is not specific to explicit data since it is based on counts. That said the effect of this is mostly more regularization across the boards, and can be simulated with a higher global lambda. There might be something to #2. I suspect the QR decomposition is more accurate and that can matter here. I have no evidence for that though. (The symmetry wasn't actually an issue in the end no?) #3 won't affect the result so much as when to stop. That is stopping in a principled way after X iterations is good, but it can be simulated with just specifying X iterations. If I were to experiment next I would focus on #2 and maybe #4. -- Sean Owen | Director, Data Science | London On Wed, Apr 2, 2014 at 9:06 AM, Michael Allman m...@allman.ms wrote: Hi Nick, I don't have my spark clone in front of me, but OTOH the major differences are/were: 1. Oryx multiplies lambda by alpha. 2. Oryx uses a different matrix inverse algorithm. It maintains a certain symmetry which the Spark algo does not, however I don't think this difference has a real impact on the results. 3. Oryx supports the specification of a convergence threshold for termination of the algorithm, based on delta rmse on a subset of the training set if I recall correctly. I've been using that as the termination criterion instead of a fixed number of iterations. 4. Oryx uses the weighted regularization scheme you alluded to below, multiplying lambda by the number of ratings. I've patched the spark impl to support (4) but haven't pushed it to my clone on github. I think it would be a valuable feature to support officially. I'd also like to work on (3) but don't have time now. I've only been using Oryx the past couple of weeks. Cheers, Michael On Tue, 1 Apr 2014, Nick Pentreath [via Apache Spark User List] wrote: Hi Michael Would you mind setting out exactly what differences you did find between the Spark and Oryx implementations? Would be good to be clear on them, and also see if there are further tricks/enhancements from the Oryx one that can be ported (such as the lambda * numRatings adjustment). N On Sat, Mar 15, 2014 at 2:52 AM, Michael Allman [hidden email] wrote: I've been thoroughly investigating this issue over the past couple of days and have discovered quite a bit. For one thing, there is definitely (at least) one issue/bug in the Spark implementation that leads to incorrect results for models generated with rank 1 or a large number of iterations. I will post a bug report with a thorough explanation this weekend or on Monday. I believe I've been able to track down every difference between the Spark and Oryx implementations that lead to difference results. I made some adjustments to the spark implementation so that, given the same initial product/item vectors, the resulting model is identical to the one produced by Oryx within a small numerical tolerance. I've verified this for small data sets and am working on verifying this with some large data sets. Aside from those already identified in this thread, another significant difference in the Spark implementation is that it begins the factorization process by computing the product matrix (Y) from the initial user matrix (X). Both of the papers on ALS referred to in this thread begin the process by computing the user matrix. I haven't done any testing comparing the models generated starting from Y or X, but they are very different. Is there a reason Spark begins the iteration by computing Y? Initializing both X and Y as is done in the Spark implementation seems unnecessary unless I'm overlooking some desired side-effect. Only the factor matrix which generates the other in the first iteration needs to be initialized. I also found that the product and user RDDs were being rebuilt many times over in my tests, even for tiny data sets. By persisting the RDD returned from updateFeatures() I was able to avoid a raft of duplicate computations. Is there a reason not to do this? Thanks. -- View this message in context:http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s
ActorNotFound problem for mesos driver
Hi, Spark Devs: I encounter a problem which shows error message akka.actor.ActorNotFound on our mesos mini-cluster. mesos : 0.17.0 spark : spark-0.9.0-incubating spark-env.sh: #!/usr/bin/env bash export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so export SPARK_EXECUTOR_URI=hdfs:// 192.168.1.20/tmp/spark-0.9.0-incubating-hadoop_2.0.0-cdh4.6.0-bin.tar.gz export MASTER=zk://192.168.1.20:2181/mesos export SPARK_JAVA_OPTS=-Dspark.driver.port=17077 And the logs from each slave looks like: 14/04/02 15:14:37 INFO MesosExecutorBackend: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/04/02 15:14:37 INFO MesosExecutorBackend: Registered with Mesos as executor ID 201403301937-335653056-5050-991-1 14/04/02 15:14:38 INFO Slf4jLogger: Slf4jLogger started 14/04/02 15:14:38 INFO Remoting: Starting remoting 14/04/02 15:14:38 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@zetyun-cloud3:42218] 14/04/02 15:14:38 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@zetyun-cloud3:42218] 14/04/02 15:14:38 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://spark@localhost:17077/user/BlockManagerMaster akka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://spark@localhost :17077/]/user/BlockManagerMaster] at akka.actor.ActorSelection$anonfun$resolveOne$1.apply(ActorSelection.scala:66) at akka.actor.ActorSelection$anonfun$resolveOne$1.apply(ActorSelection.scala:64) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91) at akka.actor.ActorRef.tell(ActorRef.scala:125) at akka.dispatch.Mailboxes$anon$1$anon$2.enqueue(Mailboxes.scala:44) at akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438) at akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650) at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309) at akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204) at akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$finishTerminate(FaultHandling.scala:203) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.run(Mailbox.scala:218) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Exception in thread Thread-0 Any clue for this problem? Thanks in advance.
Re: ActorNotFound problem for mesos driver
Heya, Yep this is a problem in the Mesos scheduler implementation that has been fixed after 0.9.0 (https://spark-project.atlassian.net/browse/SPARK-1052 = MesosSchedulerBackend) So several options, like applying the patch, upgrading to 0.9.1 :-/ Cheers, Andy On Wed, Apr 2, 2014 at 5:30 PM, Leon Zhang leonca...@gmail.com wrote: Hi, Spark Devs: I encounter a problem which shows error message akka.actor.ActorNotFound on our mesos mini-cluster. mesos : 0.17.0 spark : spark-0.9.0-incubating spark-env.sh: #!/usr/bin/env bash export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so export SPARK_EXECUTOR_URI=hdfs:// 192.168.1.20/tmp/spark-0.9.0-incubating-hadoop_2.0.0-cdh4.6.0-bin.tar.gz export MASTER=zk://192.168.1.20:2181/mesos export SPARK_JAVA_OPTS=-Dspark.driver.port=17077 And the logs from each slave looks like: 14/04/02 15:14:37 INFO MesosExecutorBackend: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/04/02 15:14:37 INFO MesosExecutorBackend: Registered with Mesos as executor ID 201403301937-335653056-5050-991-1 14/04/02 15:14:38 INFO Slf4jLogger: Slf4jLogger started 14/04/02 15:14:38 INFO Remoting: Starting remoting 14/04/02 15:14:38 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@zetyun-cloud3:42218] 14/04/02 15:14:38 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@zetyun-cloud3:42218] 14/04/02 15:14:38 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://spark@localhost:17077/user/BlockManagerMaster akka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://spark@localhost :17077/]/user/BlockManagerMaster] at akka.actor.ActorSelection$anonfun$resolveOne$1.apply(ActorSelection.scala:66) at akka.actor.ActorSelection$anonfun$resolveOne$1.apply(ActorSelection.scala:64) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91) at akka.actor.ActorRef.tell(ActorRef.scala:125) at akka.dispatch.Mailboxes$anon$1$anon$2.enqueue(Mailboxes.scala:44) at akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438) at akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650) at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309) at akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204) at akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$finishTerminate(FaultHandling.scala:203) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.run(Mailbox.scala:218) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Exception in thread Thread-0 Any clue for this problem? Thanks in advance.
Re: ActorNotFound problem for mesos driver
Aha, thank you for your kind reply. Upgrading to 0.9.1 is a good choice. :) On Wed, Apr 2, 2014 at 11:35 PM, andy petrella andy.petre...@gmail.comwrote: Heya, Yep this is a problem in the Mesos scheduler implementation that has been fixed after 0.9.0 (https://spark-project.atlassian.net/browse/SPARK-1052= MesosSchedulerBackend) So several options, like applying the patch, upgrading to 0.9.1 :-/ Cheers, Andy On Wed, Apr 2, 2014 at 5:30 PM, Leon Zhang leonca...@gmail.com wrote: Hi, Spark Devs: I encounter a problem which shows error message akka.actor.ActorNotFound on our mesos mini-cluster. mesos : 0.17.0 spark : spark-0.9.0-incubating spark-env.sh: #!/usr/bin/env bash export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so export SPARK_EXECUTOR_URI=hdfs:// 192.168.1.20/tmp/spark-0.9.0-incubating-hadoop_2.0.0-cdh4.6.0-bin.tar.gz export MASTER=zk://192.168.1.20:2181/mesos export SPARK_JAVA_OPTS=-Dspark.driver.port=17077 And the logs from each slave looks like: 14/04/02 15:14:37 INFO MesosExecutorBackend: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/04/02 15:14:37 INFO MesosExecutorBackend: Registered with Mesos as executor ID 201403301937-335653056-5050-991-1 14/04/02 15:14:38 INFO Slf4jLogger: Slf4jLogger started 14/04/02 15:14:38 INFO Remoting: Starting remoting 14/04/02 15:14:38 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@zetyun-cloud3:42218] 14/04/02 15:14:38 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@zetyun-cloud3:42218] 14/04/02 15:14:38 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://spark@localhost:17077/user/BlockManagerMaster akka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://spark@localhost :17077/]/user/BlockManagerMaster] at akka.actor.ActorSelection$anonfun$resolveOne$1.apply(ActorSelection.scala:66) at akka.actor.ActorSelection$anonfun$resolveOne$1.apply(ActorSelection.scala:64) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91) at akka.actor.ActorRef.tell(ActorRef.scala:125) at akka.dispatch.Mailboxes$anon$1$anon$2.enqueue(Mailboxes.scala:44) at akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438) at akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650) at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309) at akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204) at akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$finishTerminate(FaultHandling.scala:203) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.run(Mailbox.scala:218) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Exception in thread Thread-0 Any clue for this problem? Thanks in advance.
Re: ActorNotFound problem for mesos driver
np ;-) On Wed, Apr 2, 2014 at 5:50 PM, Leon Zhang leonca...@gmail.com wrote: Aha, thank you for your kind reply. Upgrading to 0.9.1 is a good choice. :) On Wed, Apr 2, 2014 at 11:35 PM, andy petrella andy.petre...@gmail.comwrote: Heya, Yep this is a problem in the Mesos scheduler implementation that has been fixed after 0.9.0 (https://spark-project.atlassian.net/browse/SPARK-1052= MesosSchedulerBackend) So several options, like applying the patch, upgrading to 0.9.1 :-/ Cheers, Andy On Wed, Apr 2, 2014 at 5:30 PM, Leon Zhang leonca...@gmail.com wrote: Hi, Spark Devs: I encounter a problem which shows error message akka.actor.ActorNotFound on our mesos mini-cluster. mesos : 0.17.0 spark : spark-0.9.0-incubating spark-env.sh: #!/usr/bin/env bash export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so export SPARK_EXECUTOR_URI=hdfs:// 192.168.1.20/tmp/spark-0.9.0-incubating-hadoop_2.0.0-cdh4.6.0-bin.tar.gz export MASTER=zk://192.168.1.20:2181/mesos export SPARK_JAVA_OPTS=-Dspark.driver.port=17077 And the logs from each slave looks like: 14/04/02 15:14:37 INFO MesosExecutorBackend: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/04/02 15:14:37 INFO MesosExecutorBackend: Registered with Mesos as executor ID 201403301937-335653056-5050-991-1 14/04/02 15:14:38 INFO Slf4jLogger: Slf4jLogger started 14/04/02 15:14:38 INFO Remoting: Starting remoting 14/04/02 15:14:38 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@zetyun-cloud3:42218] 14/04/02 15:14:38 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@zetyun-cloud3:42218] 14/04/02 15:14:38 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://spark@localhost:17077/user/BlockManagerMaster akka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://spark@localhost :17077/]/user/BlockManagerMaster] at akka.actor.ActorSelection$anonfun$resolveOne$1.apply(ActorSelection.scala:66) at akka.actor.ActorSelection$anonfun$resolveOne$1.apply(ActorSelection.scala:64) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91) at akka.actor.ActorRef.tell(ActorRef.scala:125) at akka.dispatch.Mailboxes$anon$1$anon$2.enqueue(Mailboxes.scala:44) at akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438) at akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650) at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309) at akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204) at akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$finishTerminate(FaultHandling.scala:203) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.run(Mailbox.scala:218) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Exception in thread Thread-0 Any clue for this problem? Thanks in advance.
Resilient nature of RDD
Can someone explain how RDD is resilient? If one of the partition is lost, who is responsible to recreate that partition - is it the driver program?
Print line in JavaNetworkWordCount
Hi Guys I would like printing the content inside of line in : JavaDStreamString lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2])); JavaDStreamString words = lines.flatMap(new FlatMapFunctionString, String() { @Override public IterableString call(String x) { return Lists.newArrayList(x.split( )); } }); Is it possible? How could I do? Thanks -- Informativa sulla Privacy: http://www.unibs.it/node/8155
Re: Need suggestions
TL;DR Your classes are missing on the workers, pass the jar containing the class main.scala.Utils to the SparkContext Longer: I miss some information, like how the SparkContext is configured but my best guess is that you didn't provided the jars (addJars on SparkConf or use the SC's constructor param). Actually, the classes are not found on the slave which is another node, another machine (or env), and so on. So to run your class it must be able to load it -- which is handle by Spark but you simply have to pass it as an argument... This jar could be simply your current project packaged as a jar using maven/sbt/... HTH On Wed, Apr 2, 2014 at 10:01 PM, yh18190 yh18...@gmail.com wrote: Hi Guys, Currently I am facing this issue ..Not able to find erros.. here is sbt file. name := Simple Project version := 1.0 scalaVersion := 2.10.3 resolvers += bintray/meetup at http://dl.bintray.com/meetup/maven; resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Cloudera Repository at https://repository.cloudera.com/artifactory/cloudera-repos/; libraryDependencies += org.apache.spark %% spark-core % 0.9.0-incubating libraryDependencies += com.cloudphysics % jerkson_2.10 % 0.6.3 libraryDependencies += org.apache.hadoop % hadoop-client % 2.0.0-mr1-cdh4.6.0 retrieveManaged := true output.. [error] (run-main) org.apache.spark.SparkException: Job aborted: Task 2.0:2 failed 4 times (most recent failure: Exception failure: java.lang.NoClassDefFoundError: Could not initialize class main.scala.Utils$) org.apache.spark.SparkException: Job aborted: Task 2.0:2 failed 4 times (most recent failure: Exception failure: java.lang.NoClassDefFoundError: Could not initialize class main.scala.Utils$) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Need-suggestions-tp3650.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: PySpark RDD.partitionBy() requires an RDD of tuples
Update: I'm now using this ghetto function to partition the RDD I get back when I call textFile() on a gzipped file: # Python 2.6 def partitionRDD(rdd, numPartitions): counter = {'a': 0} def count_up(x): counter['a'] += 1 return counter['a'] return (rdd.keyBy(count_up) .partitionBy(numPartitions) .map(lambda (counter, data): data)) If there's supposed to be a built-in Spark method to do this, I'd love to learn more about it. Nick On Tue, Apr 1, 2014 at 7:59 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hmm, doing help(rdd) in PySpark doesn't show a method called repartition(). Trying rdd.repartition() or rdd.repartition(10) also fail. I'm on 0.9.0. The approach I'm going with to partition my MappedRDD is to key it by a random int, and then partition it. So something like: rdd = sc.textFile('s3n://gzipped_file_brah.gz') # rdd has 1 partition; minSplits is not actionable due to gzip keyed_rdd = rdd.keyBy(lambda x: randint(1,100)) # we key the RDD so we can partition it partitioned_rdd = keyed_rdd.partitionBy(10) # rdd has 10 partitions Are you saying I don't have to do this? Nick On Tue, Apr 1, 2014 at 7:38 PM, Aaron Davidson ilike...@gmail.com wrote: Hm, yeah, the docs are not clear on this one. The function you're looking for to change the number of partitions on any ol' RDD is repartition(), which is available in master but for some reason doesn't seem to show up in the latest docs. Sorry about that, I also didn't realize partitionBy() had this behavior from reading the Python docs (though it is consistent with the Scala API, just more type-safe there). On Tue, Apr 1, 2014 at 3:01 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Just an FYI, it's not obvious from the docshttp://spark.incubator.apache.org/docs/latest/api/pyspark/pyspark.rdd.RDD-class.html#partitionBythat the following code should fail: a = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 2) a._jrdd.splits().size() a.count() b = a.partitionBy(5) b._jrdd.splits().size() b.count() I figured out from the example that if I generated a key by doing this b = a.map(lambda x: (x, x)).partitionBy(5) then all would be well. In other words, partitionBy() only works on RDDs of tuples. Is that correct? Nick -- View this message in context: PySpark RDD.partitionBy() requires an RDD of tupleshttp://apache-spark-user-list.1001560.n3.nabble.com/PySpark-RDD-partitionBy-requires-an-RDD-of-tuples-tp3598.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
Re: Need suggestions
Sorry I was not clear perhaps, anyway, could you try with the path in the *List* to be the absolute one; e.g. List(/home/yh/src/pj/spark-stuffs/target/scala-2.10/simple-project_2.10-1.0.jar) In order to provide a relative path, you need first to figure out your CWD, so you can do (to be really sure) do: //before the new SparkContecxt) println(new java.io.File(.).toUri.toString) // didn't try so adapt to make scalax happy ^^ On Wed, Apr 2, 2014 at 11:30 PM, yh18190 yh18...@gmail.com wrote: Hi, Here is the sparkcontext feature.Do I need to any more extra jars to slaves separetely or this is enough? But i am able to see this created jar in my target directory.. val sc = new SparkContext(spark://spark-master-001:7077, Simple App, utilclass.spark_home, List(target/scala-2.10/simple-project_2.10-1.0.jar)) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Need-suggestions-tp3650p3655.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark output compression on HDFS
For textFile I believe we overload it and let you set a codec directly: https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/FileSuite.scala#L59 For saveAsSequenceFile yep, I think Mark is right, you need an option. On Wed, Apr 2, 2014 at 12:36 PM, Mark Hamstra m...@clearstorydata.comwrote: http://www.scala-lang.org/api/2.10.3/index.html#scala.Option The signature is 'def saveAsSequenceFile(path: String, codec: Option[Class[_ : CompressionCodec]] = None)', but you are providing a Class, not an Option[Class]. Try counts.saveAsSequenceFile(output, Some(classOf[org.apache.hadoop.io.compress.SnappyCodec])) On Wed, Apr 2, 2014 at 12:18 PM, Kostiantyn Kudriavtsev kudryavtsev.konstan...@gmail.com wrote: Hi there, I've started using Spark recently and evaluating possible use cases in our company. I'm trying to save RDD as compressed Sequence file. I'm able to save non-compressed file be calling: counts.saveAsSequenceFile(output) where counts is my RDD (IntWritable, Text). However, I didn't manage to compress output. I tried several configurations and always got exception: counts.saveAsSequenceFile(output, classOf[org.apache.hadoop.io.compress.SnappyCodec]) console:21: error: type mismatch; found : Class[org.apache.hadoop.io.compress.SnappyCodec](classOf[org.apache.hadoop.io.compress.SnappyCodec]) required: Option[Class[_ : org.apache.hadoop.io.compress.CompressionCodec]] counts.saveAsSequenceFile(output, classOf[org.apache.hadoop.io.compress.SnappyCodec]) counts.saveAsSequenceFile(output, classOf[org.apache.spark.io.SnappyCompressionCodec]) console:21: error: type mismatch; found : Class[org.apache.spark.io.SnappyCompressionCodec](classOf[org.apache.spark.io.SnappyCompressionCodec]) required: Option[Class[_ : org.apache.hadoop.io.compress.CompressionCodec]] counts.saveAsSequenceFile(output, classOf[org.apache.spark.io.SnappyCompressionCodec]) and it doesn't work even for Gzip: counts.saveAsSequenceFile(output, classOf[org.apache.hadoop.io.compress.GzipCodec]) console:21: error: type mismatch; found : Class[org.apache.hadoop.io.compress.GzipCodec](classOf[org.apache.hadoop.io.compress.GzipCodec]) required: Option[Class[_ : org.apache.hadoop.io.compress.CompressionCodec]] counts.saveAsSequenceFile(output, classOf[org.apache.hadoop.io.compress.GzipCodec]) Could you please suggest solution? also, I didn't find how is it possible to specify compression parameters (i.e. compression type for Snappy). I wondered if you could share code snippets for writing/reading RDD with compression? Thank you in advance, Konstantin Kudryavtsev
Optimal Server Design for Spark
Hi Folks I'm looking to buy some gear to run Spark. I'm quite well versed in Hadoop Server design but there does not seem to be much Spark related collateral around infrastructure guidelines (or at least I haven't been able to find them). My current thinking for server design is something along these lines. - 2 x 10Gbe NICs - 128 GB RAM - 6 x 1 TB Small Form Factor Disks (2 x RAID 1 Mirror for O/S and Runtimes, 4 x 1TB for Data Drives) - 1 Disk Controller - 2 x 2.6 GHz 6 core processors If I stick with 1u servers then I lose disk capacity per rack but I get a lot more memory and CPU capacity per rack. This increases my total cluster memory footprint and it doesn't seem to make sense to have super dense storage servers because I can't fit all that data on disk in memory anyways. So at present, my thinking is to go with 1u servers instead of 2u Servers. Is 128GB RAM per server normal? Do you guys use more or less than that? Any feedback would be appreciated Regards Steve Watt
Re: Spark output compression on HDFS
Is this a Scala-onlyhttp://spark.incubator.apache.org/docs/latest/api/pyspark/pyspark.rdd.RDD-class.html#saveAsTextFilefeature? On Wed, Apr 2, 2014 at 5:55 PM, Patrick Wendell pwend...@gmail.com wrote: For textFile I believe we overload it and let you set a codec directly: https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/FileSuite.scala#L59 For saveAsSequenceFile yep, I think Mark is right, you need an option. On Wed, Apr 2, 2014 at 12:36 PM, Mark Hamstra m...@clearstorydata.comwrote: http://www.scala-lang.org/api/2.10.3/index.html#scala.Option The signature is 'def saveAsSequenceFile(path: String, codec: Option[Class[_ : CompressionCodec]] = None)', but you are providing a Class, not an Option[Class]. Try counts.saveAsSequenceFile(output, Some(classOf[org.apache.hadoop.io.compress.SnappyCodec])) On Wed, Apr 2, 2014 at 12:18 PM, Kostiantyn Kudriavtsev kudryavtsev.konstan...@gmail.com wrote: Hi there, I've started using Spark recently and evaluating possible use cases in our company. I'm trying to save RDD as compressed Sequence file. I'm able to save non-compressed file be calling: counts.saveAsSequenceFile(output) where counts is my RDD (IntWritable, Text). However, I didn't manage to compress output. I tried several configurations and always got exception: counts.saveAsSequenceFile(output, classOf[org.apache.hadoop.io.compress.SnappyCodec]) console:21: error: type mismatch; found : Class[org.apache.hadoop.io.compress.SnappyCodec](classOf[org.apache.hadoop.io.compress.SnappyCodec]) required: Option[Class[_ : org.apache.hadoop.io.compress.CompressionCodec]] counts.saveAsSequenceFile(output, classOf[org.apache.hadoop.io.compress.SnappyCodec]) counts.saveAsSequenceFile(output, classOf[org.apache.spark.io.SnappyCompressionCodec]) console:21: error: type mismatch; found : Class[org.apache.spark.io.SnappyCompressionCodec](classOf[org.apache.spark.io.SnappyCompressionCodec]) required: Option[Class[_ : org.apache.hadoop.io.compress.CompressionCodec]] counts.saveAsSequenceFile(output, classOf[org.apache.spark.io.SnappyCompressionCodec]) and it doesn't work even for Gzip: counts.saveAsSequenceFile(output, classOf[org.apache.hadoop.io.compress.GzipCodec]) console:21: error: type mismatch; found : Class[org.apache.hadoop.io.compress.GzipCodec](classOf[org.apache.hadoop.io.compress.GzipCodec]) required: Option[Class[_ : org.apache.hadoop.io.compress.CompressionCodec]] counts.saveAsSequenceFile(output, classOf[org.apache.hadoop.io.compress.GzipCodec]) Could you please suggest solution? also, I didn't find how is it possible to specify compression parameters (i.e. compression type for Snappy). I wondered if you could share code snippets for writing/reading RDD with compression? Thank you in advance, Konstantin Kudryavtsev
Re: PySpark RDD.partitionBy() requires an RDD of tuples
There is a repartition method in pyspark master: https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L1128 On Wed, Apr 2, 2014 at 2:44 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Update: I'm now using this ghetto function to partition the RDD I get back when I call textFile() on a gzipped file: # Python 2.6 def partitionRDD(rdd, numPartitions): counter = {'a': 0} def count_up(x): counter['a'] += 1 return counter['a'] return (rdd.keyBy(count_up) .partitionBy(numPartitions) .map(lambda (counter, data): data)) If there's supposed to be a built-in Spark method to do this, I'd love to learn more about it. Nick On Tue, Apr 1, 2014 at 7:59 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hmm, doing help(rdd) in PySpark doesn't show a method called repartition(). Trying rdd.repartition() or rdd.repartition(10) also fail. I'm on 0.9.0. The approach I'm going with to partition my MappedRDD is to key it by a random int, and then partition it. So something like: rdd = sc.textFile('s3n://gzipped_file_brah.gz') # rdd has 1 partition; minSplits is not actionable due to gzip keyed_rdd = rdd.keyBy(lambda x: randint(1,100)) # we key the RDD so we can partition it partitioned_rdd = keyed_rdd.partitionBy(10) # rdd has 10 partitions Are you saying I don't have to do this? Nick On Tue, Apr 1, 2014 at 7:38 PM, Aaron Davidson ilike...@gmail.comwrote: Hm, yeah, the docs are not clear on this one. The function you're looking for to change the number of partitions on any ol' RDD is repartition(), which is available in master but for some reason doesn't seem to show up in the latest docs. Sorry about that, I also didn't realize partitionBy() had this behavior from reading the Python docs (though it is consistent with the Scala API, just more type-safe there). On Tue, Apr 1, 2014 at 3:01 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Just an FYI, it's not obvious from the docshttp://spark.incubator.apache.org/docs/latest/api/pyspark/pyspark.rdd.RDD-class.html#partitionBythat the following code should fail: a = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 2) a._jrdd.splits().size() a.count() b = a.partitionBy(5) b._jrdd.splits().size() b.count() I figured out from the example that if I generated a key by doing this b = a.map(lambda x: (x, x)).partitionBy(5) then all would be well. In other words, partitionBy() only works on RDDs of tuples. Is that correct? Nick -- View this message in context: PySpark RDD.partitionBy() requires an RDD of tupleshttp://apache-spark-user-list.1001560.n3.nabble.com/PySpark-RDD-partitionBy-requires-an-RDD-of-tuples-tp3598.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
Re: Spark output compression on HDFS
Thanks for pointing that out. On Wed, Apr 2, 2014 at 6:11 PM, Mark Hamstra m...@clearstorydata.comwrote: First, you shouldn't be using spark.incubator.apache.org anymore, just spark.apache.org. Second, saveAsSequenceFile doesn't appear to exist in the Python API at this point. On Wed, Apr 2, 2014 at 3:00 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Is this a Scala-onlyhttp://spark.incubator.apache.org/docs/latest/api/pyspark/pyspark.rdd.RDD-class.html#saveAsTextFilefeature? On Wed, Apr 2, 2014 at 5:55 PM, Patrick Wendell pwend...@gmail.comwrote: For textFile I believe we overload it and let you set a codec directly: https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/FileSuite.scala#L59 For saveAsSequenceFile yep, I think Mark is right, you need an option. On Wed, Apr 2, 2014 at 12:36 PM, Mark Hamstra m...@clearstorydata.comwrote: http://www.scala-lang.org/api/2.10.3/index.html#scala.Option The signature is 'def saveAsSequenceFile(path: String, codec: Option[Class[_ : CompressionCodec]] = None)', but you are providing a Class, not an Option[Class]. Try counts.saveAsSequenceFile(output, Some(classOf[org.apache.hadoop.io.compress.SnappyCodec])) On Wed, Apr 2, 2014 at 12:18 PM, Kostiantyn Kudriavtsev kudryavtsev.konstan...@gmail.com wrote: Hi there, I've started using Spark recently and evaluating possible use cases in our company. I'm trying to save RDD as compressed Sequence file. I'm able to save non-compressed file be calling: counts.saveAsSequenceFile(output) where counts is my RDD (IntWritable, Text). However, I didn't manage to compress output. I tried several configurations and always got exception: counts.saveAsSequenceFile(output, classOf[org.apache.hadoop.io.compress.SnappyCodec]) console:21: error: type mismatch; found : Class[org.apache.hadoop.io.compress.SnappyCodec](classOf[org.apache.hadoop.io.compress.SnappyCodec]) required: Option[Class[_ : org.apache.hadoop.io.compress.CompressionCodec]] counts.saveAsSequenceFile(output, classOf[org.apache.hadoop.io.compress.SnappyCodec]) counts.saveAsSequenceFile(output, classOf[org.apache.spark.io.SnappyCompressionCodec]) console:21: error: type mismatch; found : Class[org.apache.spark.io.SnappyCompressionCodec](classOf[org.apache.spark.io.SnappyCompressionCodec]) required: Option[Class[_ : org.apache.hadoop.io.compress.CompressionCodec]] counts.saveAsSequenceFile(output, classOf[org.apache.spark.io.SnappyCompressionCodec]) and it doesn't work even for Gzip: counts.saveAsSequenceFile(output, classOf[org.apache.hadoop.io.compress.GzipCodec]) console:21: error: type mismatch; found : Class[org.apache.hadoop.io.compress.GzipCodec](classOf[org.apache.hadoop.io.compress.GzipCodec]) required: Option[Class[_ : org.apache.hadoop.io.compress.CompressionCodec]] counts.saveAsSequenceFile(output, classOf[org.apache.hadoop.io.compress.GzipCodec]) Could you please suggest solution? also, I didn't find how is it possible to specify compression parameters (i.e. compression type for Snappy). I wondered if you could share code snippets for writing/reading RDD with compression? Thank you in advance, Konstantin Kudryavtsev
Re: PySpark RDD.partitionBy() requires an RDD of tuples
Ah, now I see what Aaron was referring to. So I'm guessing we will get this in the next release or two. Thank you. On Wed, Apr 2, 2014 at 6:09 PM, Mark Hamstra m...@clearstorydata.comwrote: There is a repartition method in pyspark master: https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L1128 On Wed, Apr 2, 2014 at 2:44 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Update: I'm now using this ghetto function to partition the RDD I get back when I call textFile() on a gzipped file: # Python 2.6 def partitionRDD(rdd, numPartitions): counter = {'a': 0} def count_up(x): counter['a'] += 1 return counter['a'] return (rdd.keyBy(count_up) .partitionBy(numPartitions) .map(lambda (counter, data): data)) If there's supposed to be a built-in Spark method to do this, I'd love to learn more about it. Nick On Tue, Apr 1, 2014 at 7:59 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hmm, doing help(rdd) in PySpark doesn't show a method called repartition(). Trying rdd.repartition() or rdd.repartition(10) also fail. I'm on 0.9.0. The approach I'm going with to partition my MappedRDD is to key it by a random int, and then partition it. So something like: rdd = sc.textFile('s3n://gzipped_file_brah.gz') # rdd has 1 partition; minSplits is not actionable due to gzip keyed_rdd = rdd.keyBy(lambda x: randint(1,100)) # we key the RDD so we can partition it partitioned_rdd = keyed_rdd.partitionBy(10) # rdd has 10 partitions Are you saying I don't have to do this? Nick On Tue, Apr 1, 2014 at 7:38 PM, Aaron Davidson ilike...@gmail.comwrote: Hm, yeah, the docs are not clear on this one. The function you're looking for to change the number of partitions on any ol' RDD is repartition(), which is available in master but for some reason doesn't seem to show up in the latest docs. Sorry about that, I also didn't realize partitionBy() had this behavior from reading the Python docs (though it is consistent with the Scala API, just more type-safe there). On Tue, Apr 1, 2014 at 3:01 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Just an FYI, it's not obvious from the docshttp://spark.incubator.apache.org/docs/latest/api/pyspark/pyspark.rdd.RDD-class.html#partitionBythat the following code should fail: a = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 2) a._jrdd.splits().size() a.count() b = a.partitionBy(5) b._jrdd.splits().size() b.count() I figured out from the example that if I generated a key by doing this b = a.map(lambda x: (x, x)).partitionBy(5) then all would be well. In other words, partitionBy() only works on RDDs of tuples. Is that correct? Nick -- View this message in context: PySpark RDD.partitionBy() requires an RDD of tupleshttp://apache-spark-user-list.1001560.n3.nabble.com/PySpark-RDD-partitionBy-requires-an-RDD-of-tuples-tp3598.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
Re: PySpark RDD.partitionBy() requires an RDD of tuples
Will be in 1.0.0 On Wed, Apr 2, 2014 at 3:22 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Ah, now I see what Aaron was referring to. So I'm guessing we will get this in the next release or two. Thank you. On Wed, Apr 2, 2014 at 6:09 PM, Mark Hamstra m...@clearstorydata.comwrote: There is a repartition method in pyspark master: https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L1128 On Wed, Apr 2, 2014 at 2:44 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Update: I'm now using this ghetto function to partition the RDD I get back when I call textFile() on a gzipped file: # Python 2.6 def partitionRDD(rdd, numPartitions): counter = {'a': 0} def count_up(x): counter['a'] += 1 return counter['a'] return (rdd.keyBy(count_up) .partitionBy(numPartitions) .map(lambda (counter, data): data)) If there's supposed to be a built-in Spark method to do this, I'd love to learn more about it. Nick On Tue, Apr 1, 2014 at 7:59 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hmm, doing help(rdd) in PySpark doesn't show a method called repartition(). Trying rdd.repartition() or rdd.repartition(10) also fail. I'm on 0.9.0. The approach I'm going with to partition my MappedRDD is to key it by a random int, and then partition it. So something like: rdd = sc.textFile('s3n://gzipped_file_brah.gz') # rdd has 1 partition; minSplits is not actionable due to gzip keyed_rdd = rdd.keyBy(lambda x: randint(1,100)) # we key the RDD so we can partition it partitioned_rdd = keyed_rdd.partitionBy(10) # rdd has 10 partitions Are you saying I don't have to do this? Nick On Tue, Apr 1, 2014 at 7:38 PM, Aaron Davidson ilike...@gmail.comwrote: Hm, yeah, the docs are not clear on this one. The function you're looking for to change the number of partitions on any ol' RDD is repartition(), which is available in master but for some reason doesn't seem to show up in the latest docs. Sorry about that, I also didn't realize partitionBy() had this behavior from reading the Python docs (though it is consistent with the Scala API, just more type-safe there). On Tue, Apr 1, 2014 at 3:01 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Just an FYI, it's not obvious from the docshttp://spark.incubator.apache.org/docs/latest/api/pyspark/pyspark.rdd.RDD-class.html#partitionBythat the following code should fail: a = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 2) a._jrdd.splits().size() a.count() b = a.partitionBy(5) b._jrdd.splits().size() b.count() I figured out from the example that if I generated a key by doing this b = a.map(lambda x: (x, x)).partitionBy(5) then all would be well. In other words, partitionBy() only works on RDDs of tuples. Is that correct? Nick -- View this message in context: PySpark RDD.partitionBy() requires an RDD of tupleshttp://apache-spark-user-list.1001560.n3.nabble.com/PySpark-RDD-partitionBy-requires-an-RDD-of-tuples-tp3598.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
Re: Is there a way to get the current progress of the job?
What I'd like is a way to capture the information provided on the stages page (i.e. cluster:4040/stages via IndexPage). Looking through the Spark code, it doesn't seem like it is possible to directly query for specific facts such as how many tasks have succeeded or how many total tasks there are for a given active stage. Instead, it looks like all the data for the page is generated at once using information from the JobProgressListener. It doesn't seem like I have any way to programmatically access this information myself. I can't even instantiate my own JobProgressListener because it is spark package private. I could implement my SparkListener and gather up the information myself. It feels a bit awkward since classes like Task and TaskInfo are also spark package private. It does seem possible to gather up what I need but it seems like this sort of information should just be available without by implementing a custom SparkListener (or worse screen scraping the html generated by StageTable!) I was hoping that I would find the answer in MetricsServlet which is turned on by default. It seems that when I visit http://cluster:4040/metrics/json/ I should be able to get everything I want but I don't see the basic stage/task progress information I would expect. Are there special metrics properties that I should set to get this info? I think this would be the best solution - just give it the right URL and parse the resulting JSON - but I can't seem to figure out how to do this or if it is possible. Any advice is appreciated. Thanks, Philip On 04/01/2014 09:43 AM, Philip Ogren wrote: Hi DB, Just wondering if you ever got an answer to your question about monitoring progress - either offline or through your own investigation. Any findings would be appreciated. Thanks, Philip On 01/30/2014 10:32 PM, DB Tsai wrote: Hi guys, When we're running a very long job, we would like to show users the current progress of map and reduce job. After looking at the api document, I don't find anything for this. However, in Spark UI, I could see the progress of the task. Is there anything I miss? Thanks. Sincerely, DB Tsai Machine Learning Engineer Alpine Data Labs -- Web: http://alpinenow.com/
Measure the Total Network I/O, Cpu and Memory Consumed by Spark Job
Hi All, I am intrested in measure the total network I/O, cpu and memory consumed by Spark job. I tried to find the related information in logs and Web UI. But there seems no sufficient information. Could anyone give me any suggestion? Thanks very much in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Measure-the-Total-Network-I-O-Cpu-and-Memory-Consumed-by-Spark-Job-tp3668.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Efficient way to aggregate event data at daily/weekly/monthly level
Hi, I want to aggregate (time-stamped) event data at daily, weekly and monthly level stored in a directory in data//mm/dd/dat.gz format. For example: Each dat.gz file contains tuples in (datetime, id, value) format. I can perform aggregation as follows: but this code doesn't seem to be efficient because it doesn't exploit the data dependencies in reduce steps. For example, there is no dependency in between the data of 2010-01 and that of all other dates (2010-02, 2010-03, ...) so it would be ideal if we can load one month of data in each node once and perform all three (daily, weekly and monthly) aggregation. I think I could use mapPartitions and a big reducer that performs all three aggregations, but not sure it is a right way to go. Is there a more efficient way to perform these aggregations (by loading data once) yet keeping the code modular? Thanks, K -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-way-to-aggregate-event-data-at-daily-weekly-monthly-level-tp3673.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Resilient nature of RDD
The driver stores the meta-data associated with the partition, but the re-computation will occur on an executor. So if several partitions are lost, e.g. due to a few machines failing, the re-computation can be striped across the cluster making it fast. On Wed, Apr 2, 2014 at 11:27 AM, David Thomas dt5434...@gmail.com wrote: Can someone explain how RDD is resilient? If one of the partition is lost, who is responsible to recreate that partition - is it the driver program?
Re: Is there a way to get the current progress of the job?
Hey Phillip, Right now there is no mechanism for this. You have to go in through the low level listener interface. We could consider exposing the JobProgressListener directly - I think it's been factored nicely so it's fairly decoupled from the UI. The concern is this is a semi-internal piece of functionality and something we might, e.g. want to change the API of over time. - Patrick On Wed, Apr 2, 2014 at 3:39 PM, Philip Ogren philip.og...@oracle.comwrote: What I'd like is a way to capture the information provided on the stages page (i.e. cluster:4040/stages via IndexPage). Looking through the Spark code, it doesn't seem like it is possible to directly query for specific facts such as how many tasks have succeeded or how many total tasks there are for a given active stage. Instead, it looks like all the data for the page is generated at once using information from the JobProgressListener. It doesn't seem like I have any way to programmatically access this information myself. I can't even instantiate my own JobProgressListener because it is spark package private. I could implement my SparkListener and gather up the information myself. It feels a bit awkward since classes like Task and TaskInfo are also spark package private. It does seem possible to gather up what I need but it seems like this sort of information should just be available without by implementing a custom SparkListener (or worse screen scraping the html generated by StageTable!) I was hoping that I would find the answer in MetricsServlet which is turned on by default. It seems that when I visit http://cluster:4040/metrics/json/ I should be able to get everything I want but I don't see the basic stage/task progress information I would expect. Are there special metrics properties that I should set to get this info? I think this would be the best solution - just give it the right URL and parse the resulting JSON - but I can't seem to figure out how to do this or if it is possible. Any advice is appreciated. Thanks, Philip On 04/01/2014 09:43 AM, Philip Ogren wrote: Hi DB, Just wondering if you ever got an answer to your question about monitoring progress - either offline or through your own investigation. Any findings would be appreciated. Thanks, Philip On 01/30/2014 10:32 PM, DB Tsai wrote: Hi guys, When we're running a very long job, we would like to show users the current progress of map and reduce job. After looking at the api document, I don't find anything for this. However, in Spark UI, I could see the progress of the task. Is there anything I miss? Thanks. Sincerely, DB Tsai Machine Learning Engineer Alpine Data Labs -- Web: http://alpinenow.com/
Re: Is there a way to get the current progress of the job?
Hi Philip, In the upcoming release of Spark 1.0 there will be a feature that provides for exactly what you describe: capturing the information displayed on the UI in JSON. More details will be provided in the documentation, but for now, anything before 0.9.1 can only go through JobLogger.scala, which outputs information in a somewhat arbitrary format and will be deprecated soon. If you find this feature useful, you can test it out by building the master branch of Spark yourself, following the instructions in https://github.com/apache/spark/pull/42. Andrew On Wed, Apr 2, 2014 at 3:39 PM, Philip Ogren philip.og...@oracle.comwrote: What I'd like is a way to capture the information provided on the stages page (i.e. cluster:4040/stages via IndexPage). Looking through the Spark code, it doesn't seem like it is possible to directly query for specific facts such as how many tasks have succeeded or how many total tasks there are for a given active stage. Instead, it looks like all the data for the page is generated at once using information from the JobProgressListener. It doesn't seem like I have any way to programmatically access this information myself. I can't even instantiate my own JobProgressListener because it is spark package private. I could implement my SparkListener and gather up the information myself. It feels a bit awkward since classes like Task and TaskInfo are also spark package private. It does seem possible to gather up what I need but it seems like this sort of information should just be available without by implementing a custom SparkListener (or worse screen scraping the html generated by StageTable!) I was hoping that I would find the answer in MetricsServlet which is turned on by default. It seems that when I visit http://cluster:4040/metrics/json/ I should be able to get everything I want but I don't see the basic stage/task progress information I would expect. Are there special metrics properties that I should set to get this info? I think this would be the best solution - just give it the right URL and parse the resulting JSON - but I can't seem to figure out how to do this or if it is possible. Any advice is appreciated. Thanks, Philip On 04/01/2014 09:43 AM, Philip Ogren wrote: Hi DB, Just wondering if you ever got an answer to your question about monitoring progress - either offline or through your own investigation. Any findings would be appreciated. Thanks, Philip On 01/30/2014 10:32 PM, DB Tsai wrote: Hi guys, When we're running a very long job, we would like to show users the current progress of map and reduce job. After looking at the api document, I don't find anything for this. However, in Spark UI, I could see the progress of the task. Is there anything I miss? Thanks. Sincerely, DB Tsai Machine Learning Engineer Alpine Data Labs -- Web: http://alpinenow.com/
Re: Efficient way to aggregate event data at daily/weekly/monthly level
Watch out with loading data from gzipped files. Spark cannot parallelize the load of gzipped files, and if you do not explicitly repartition your RDD created from such a file, everything you do on that RDD will run on a single core. On Wed, Apr 2, 2014 at 8:22 PM, K Koh den...@gmail.com wrote: Hi, I want to aggregate (time-stamped) event data at daily, weekly and monthly level stored in a directory in data//mm/dd/dat.gz format. For example: Each dat.gz file contains tuples in (datetime, id, value) format. I can perform aggregation as follows: but this code doesn't seem to be efficient because it doesn't exploit the data dependencies in reduce steps. For example, there is no dependency in between the data of 2010-01 and that of all other dates (2010-02, 2010-03, ...) so it would be ideal if we can load one month of data in each node once and perform all three (daily, weekly and monthly) aggregation. I think I could use mapPartitions and a big reducer that performs all three aggregations, but not sure it is a right way to go. Is there a more efficient way to perform these aggregations (by loading data once) yet keeping the code modular? Thanks, K -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-way-to-aggregate-event-data-at-daily-weekly-monthly-level-tp3673.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Status of MLI?
Targeting 0.9.0 should work out of the box (just a change to the build.sbt) - I'll push some changes I've been sitting on to the public repo in the next couple of days. On Wed, Apr 2, 2014 at 4:05 AM, Krakna H shankark+...@gmail.com wrote: Thanks for the update Evan! In terms of using MLI, I see that the Github code is linked to Spark 0.8; will it not work with 0.9 (which is what I have set up) or higher versions? On Wed, Apr 2, 2014 at 1:44 AM, Evan R. Sparks [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=3632i=0wrote: Hi there, MLlib is the first component of MLbase - MLI and the higher levels of the stack are still being developed. Look for updates in terms of our progress on the hyperparameter tuning/model selection problem in the next month or so! - Evan On Tue, Apr 1, 2014 at 8:05 PM, Krakna H [hidden email]http://user/SendEmail.jtp?type=nodenode=3615i=0 wrote: Hi Nan, I was actually referring to MLI/MLBase (http://www.mlbase.org); is this being actively developed? I'm familiar with mllib and have been looking at its documentation. Thanks! On Tue, Apr 1, 2014 at 10:44 PM, Nan Zhu [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=3612i=0 wrote: mllib has been part of Spark distribution (under mllib directory), also check http://spark.apache.org/docs/latest/mllib-guide.html and for JIRA, because of the recent migration to apache JIRA, I think all mllib-related issues should be under the Spark umbrella, https://issues.apache.org/jira/browse/SPARK/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel -- Nan Zhu On Tuesday, April 1, 2014 at 10:38 PM, Krakna H wrote: What is the current development status of MLI/MLBase? I see that the github repo is lying dormant (https://github.com/amplab/MLI) and JIRA has had no activity in the last 30 days ( https://spark-project.atlassian.net/browse/MLI/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel). Is the plan to add a lot of this into mllib itself without needing a separate API? Thanks! -- View this message in context: Status of MLI?http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3611.html To start a new topic under Apache Spark User List, email [hidden email] http://user/SendEmail.jtp?type=nodenode=3612i=1 To unsubscribe from Apache Spark User List, click here. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Status of MLI?http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3612.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3615.html To start a new topic under Apache Spark User List, email [hidden email]http://user/SendEmail.jtp?type=nodenode=3632i=1 To unsubscribe from Apache Spark User List, click here. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Status of MLI?http://apache-spark-user-list.1001560.n3.nabble.com/Status-of-MLI-tp3610p3632.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
Re: Optimal Server Design for Spark
I would suggest to start with cloud hosting if you can, depending on your usecase, memory requirement may vary a lot . Regards Mayur On Apr 2, 2014 3:59 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Steve, This configuration sounds pretty good. The one thing I would consider is having more disks, for two reasons — Spark uses the disks for large shuffles and out-of-core operations, and often it’s better to run HDFS or your storage system on the same nodes. But whether this is valuable will depend on whether you plan to do that in your deployment. You should determine that and go from there. The amount of cores and RAM are both good — actually with a lot more of these you would probably want to run multiple Spark workers per node, which is more work to configure. Your numbers are in line with other deployments. There’s a provisioning overview with more details at https://spark.apache.org/docs/latest/hardware-provisioning.html but what you have sounds fine. Matei On Apr 2, 2014, at 2:58 PM, Stephen Watt sw...@redhat.com wrote: Hi Folks I'm looking to buy some gear to run Spark. I'm quite well versed in Hadoop Server design but there does not seem to be much Spark related collateral around infrastructure guidelines (or at least I haven't been able to find them). My current thinking for server design is something along these lines. - 2 x 10Gbe NICs - 128 GB RAM - 6 x 1 TB Small Form Factor Disks (2 x RAID 1 Mirror for O/S and Runtimes, 4 x 1TB for Data Drives) - 1 Disk Controller - 2 x 2.6 GHz 6 core processors If I stick with 1u servers then I lose disk capacity per rack but I get a lot more memory and CPU capacity per rack. This increases my total cluster memory footprint and it doesn't seem to make sense to have super dense storage servers because I can't fit all that data on disk in memory anyways. So at present, my thinking is to go with 1u servers instead of 2u Servers. Is 128GB RAM per server normal? Do you guys use more or less than that? Any feedback would be appreciated Regards Steve Watt
Re: Optimal Server Design for Spark
Hi Matei, How can I run multiple Spark workers per node ? I am running 8 core 10 node cluster but I do have 8 more cores on each nodeSo having 2 workers per node will definitely help my usecase. Thanks. Deb On Wed, Apr 2, 2014 at 3:58 PM, Matei Zaharia matei.zaha...@gmail.comwrote: Hey Steve, This configuration sounds pretty good. The one thing I would consider is having more disks, for two reasons -- Spark uses the disks for large shuffles and out-of-core operations, and often it's better to run HDFS or your storage system on the same nodes. But whether this is valuable will depend on whether you plan to do that in your deployment. You should determine that and go from there. The amount of cores and RAM are both good -- actually with a lot more of these you would probably want to run multiple Spark workers per node, which is more work to configure. Your numbers are in line with other deployments. There's a provisioning overview with more details at https://spark.apache.org/docs/latest/hardware-provisioning.html but what you have sounds fine. Matei On Apr 2, 2014, at 2:58 PM, Stephen Watt sw...@redhat.com wrote: Hi Folks I'm looking to buy some gear to run Spark. I'm quite well versed in Hadoop Server design but there does not seem to be much Spark related collateral around infrastructure guidelines (or at least I haven't been able to find them). My current thinking for server design is something along these lines. - 2 x 10Gbe NICs - 128 GB RAM - 6 x 1 TB Small Form Factor Disks (2 x RAID 1 Mirror for O/S and Runtimes, 4 x 1TB for Data Drives) - 1 Disk Controller - 2 x 2.6 GHz 6 core processors If I stick with 1u servers then I lose disk capacity per rack but I get a lot more memory and CPU capacity per rack. This increases my total cluster memory footprint and it doesn't seem to make sense to have super dense storage servers because I can't fit all that data on disk in memory anyways. So at present, my thinking is to go with 1u servers instead of 2u Servers. Is 128GB RAM per server normal? Do you guys use more or less than that? Any feedback would be appreciated Regards Steve Watt
How to ask questions on Spark usage?
Hi, Shall I send my questions to this Email address? Sorry for bothering, and thanks a lot!
Re: How to ask questions on Spark usage?
Yes, please do. :) On Wed, Apr 2, 2014 at 7:36 PM, weida xu xwd0...@gmail.com wrote: Hi, Shall I send my questions to this Email address? Sorry for bothering, and thanks a lot!
Spark RDD to Shark table IN MEMORY conversion
Hi, We are placing business logic in incoming data stream using Spark streaming. Here I want to point Shark table to use data coming from Spark Streaming. Instead of storing Spark streaming to HDFS or other area, is there a way I can directly point Shark in-memory table to take data from Spark Streaming ? Spark version: 0.81 Shark version: 0.81 Thanks, Abhishek -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-RDD-to-Shark-table-IN-MEMORY-conversion-tp3682.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Shark Direct insert into table value (?)
Hi, I'm trying to run script in SHARK(0.81) insert into emp (id,name) values (212,Abhi) but it doesn't work. I urgently need direct insert as it is show stopper. I know that we can do insert into emp select * from xyz. Here requirement is direct insert. Does any one tried it ? Or is there SHARK API that allow us to do it ? Please assist. Thanks, Abhishek -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shark-Direct-insert-into-table-value-tp3684.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Submitting to yarn cluster
Hi, I have a small program but I cannot seem to make it connect to the right properties of the cluster. I have the SPARK_YARN_APP_JAR, SPARK_JAR and SPARK_HOME set properly. If I run this scala file, I am seeing that this is never using the yarn.resourcemanager.address property that I set on the SparkConf instance. Any advice? Thanks, Ron import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.deploy.yarn.Client import java.lang.System import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = /home/rgonzalez/app/spark-0.9.0-incubating-bin-hadoop2/README.md val conf = new SparkConf() conf.set(yarn.resourcemanager.address, localhost:8050) val sc = new SparkContext(yarn-client, Simple App, conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() println(Lines with a: %s, Lines with b: %s.format(numAs, numBs)) } }
Error when run Spark on mesos
I deployed mesos and test it using the exmaple/test-framework script, mesos seems OK.but when runing spark on the mesos cluster, the mesos slave nodes report the following exception, any one can help me to fix this ? thanks in advance:14/04/03 11:24:39 INFO Slf4jLogger: Slf4jLogger started14/04/03 11:24:39 INFO Remoting: Starting remoting14/04/03 11:24:39 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@pdn00:40265]14/04/03 11:24:39 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@pdn00:40265]14/04/03 11:24:39 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://spark@localhost:7077/user/BlockManagerMasterakka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://spark@localhost:7077/]/user/BlockManagerMaster] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:66) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:64) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269)at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512)at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545)at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535)at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91) at akka.actor.ActorRef.tell(ActorRef.scala:125)at akka.dispatch.Mailboxes$$anon$1$$anon$2.enqueue(Mailboxes.scala:44) at akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438) at akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650) at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309)at akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204) at akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:203) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338)at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)at akka.dispatch.Mailbox.run(Mailbox.scala:218)at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-run-Spark-on-mesos-tp3687.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error when run Spark on mesos
any advice ? 2014-04-03 11:35 GMT+08:00 felix cnwe...@gmail.com: I deployed mesos and test it using the exmaple/test-framework script, mesos seems OK. but when runing spark on the mesos cluster, the mesos slave nodes report the following exception, any one can help me to fix this ? thanks in advance: 14/04/03 11:24:39 INFO Slf4jLogger: Slf4jLogger started 14/04/03 11:24:39 INFO Remoting: Starting remoting 14/04/03 11:24:39 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@pdn00:40265] 14/04/03 11:24:39 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@pdn00:40265] 14/04/03 11:24:39 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://spark@localhost:7077/user/BlockManagerMaster akka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://spark@localhost:7077/]/user/BlockManagerMaster] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:66) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:64) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91) at akka.actor.ActorRef.tell(ActorRef.scala:125) at akka.dispatch.Mailboxes$$anon$1$$anon$2.enqueue(Mailboxes.scala:44) at akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438) at akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650) at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309) at akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204) at akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:203) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.run(Mailbox.scala:218) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: Error when run Spark on mesoshttp://apache-spark-user-list.1001560.n3.nabble.com/Error-when-run-Spark-on-mesos-tp3687.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com. -- 不学习,不知道
Re: Error when run Spark on mesos
I think this is related to a known issue (regression) in 0.9.0. Try using explicit IP other than loop back. Sent from a mobile device On Apr 2, 2014, at 8:53 PM, panfei cnwe...@gmail.com wrote: any advice ? 2014-04-03 11:35 GMT+08:00 felix cnwe...@gmail.com: I deployed mesos and test it using the exmaple/test-framework script, mesos seems OK. but when runing spark on the mesos cluster, the mesos slave nodes report the following exception, any one can help me to fix this ? thanks in advance: 14/04/03 11:24:39 INFO Slf4jLogger: Slf4jLogger started 14/04/03 11:24:39 INFO Remoting: Starting remoting 14/04/03 11:24:39 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@pdn00:40265] 14/04/03 11:24:39 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@pdn00:40265] 14/04/03 11:24:39 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://spark@localhost:7077/user/BlockManagerMaster akka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://spark@localhost:7077/]/user/BlockManagerMaster] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:66) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:64) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91) at akka.actor.ActorRef.tell(ActorRef.scala:125) at akka.dispatch.Mailboxes$$anon$1$$anon$2.enqueue(Mailboxes.scala:44) at akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438) at akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650) at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309) at akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204) at akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:203) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.run(Mailbox.scala:218) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) View this message in context: Error when run Spark on mesos Sent from the Apache Spark User List mailing list archive at Nabble.com. -- 不学习,不知道
Example of creating expressions for SchemaRDD methods
For various schemaRDD functions like select, where, orderby, groupby etc. I would like to create expression objects and pass these to the methods for execution. Can someone show some examples of how to create expressions for case class and execute ? E.g., how to create expressions for select, order by, group by etc. and execute methods using the expressions ? Regards,