Re: Building Spark 0.9.x for CDH5 with mrv1 installation (Protobuf 2.5 upgrade)
I'm not sure exactly how your cluster is configured. But as far as I can tell Cloudera's MR1 CDH5 dependencies are against Hadoop 2.3. I'd just find the exact CDH version you have and link against the `mr1` version of their published dependencies in that version. So I think you wan't 2.3.0-mr1-cdh5.0.0 https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/hadoop/hadoop-client/2.3.0-mr1-cdh5.0.0/ The full list of Cloudera versions is here: https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/hadoop/hadoop-client/ On Tue, Mar 25, 2014 at 6:42 PM, Gary Malouf malouf.g...@gmail.com wrote: Today, our cluster setup is as follows: Mesos 0.15, CDH 4.2.1-MRV1, Spark 0.9-pre-scala-2.10 off master build targeted at appropriate CDH4 version We are looking to upgrade all of these in order to get protobuf 2.5 working properly. The question is, which 'Hadoop version build' of Spark 0.9 is compatible with the HDFS from Hadoop 2.2 and Cloudera's CDH5 MRV1 installation? Is there one?
ALS memory limits
Hi, For our usecases we are looking into 20 x 1M matrices which comes in the similar ranges as outlined by the paper over here: http://sandeeptata.blogspot.com/2012/12/sparkler-large-scale-matrix.html Is the exponential runtime growth in spark ALS as outlined by the blog still exists in recommendation.ALS ? I am running a spark cluster of 10 nodes with total memory of around 1 TB with 80 cores With rank = 50, the memory requirements for ALS should be 20Mx50 doubles on every worker which is around 8 GB Even if both the factor matrices are cached in memory I should be bounded by ~ 9 GB but even with 32 GB per worker I see GC errors... I am debugging the scalability and memory requirements of the algorithm further but any insights will be very helpful... Also there are two other issues: 1. If GC errors are hit, that worker JVM goes down and I have to restart it manually. Is this expected ? 2. When I try to make use of all 80 cores on the cluster I get some issues related to java.io.File not found exception on /tmp/ ? Is there some OS limit that how many cores can simultaneously access /tmp from a process ? Thanks. Deb On Sun, Mar 16, 2014 at 2:20 PM, Sean Owen so...@cloudera.com wrote: Good point -- there's been another optimization for ALS in HEAD ( https://github.com/apache/spark/pull/131), but yes the better place to pick up just essential changes since 0.9.0 including the previous one is the 0.9 branch. -- Sean Owen | Director, Data Science | London On Sun, Mar 16, 2014 at 2:18 PM, Patrick Wendell pwend...@gmail.comwrote: Sean - was this merged into the 0.9 branch as well (it seems so based on the message from rxin). If so it might make sense to try out the head of branch-0.9 as well. Unless there are *also* other changes relevant to this in master. - Patrick On Sun, Mar 16, 2014 at 12:24 PM, Sean Owen so...@cloudera.com wrote: You should simply use a snapshot built from HEAD of github.com/apache/spark if you can. The key change is in MLlib and with any luck you can just replace that bit. See the PR I referenced. Sure with enough memory you can get it to run even with the memory issue, but it could be hundreds of GB at your scale. Not sure I take the point about the JVM; you can give it 64GB of heap and executors can use that much, sure. You could reduce the number of features a lot to work around it too, or reduce the input size. (If anyone saw my blog post about StackOverflow and ALS -- that's why I snuck in a relatively paltry 40 features and pruned questions with 4 tags :) ) I don't think jblas has anything to do with it per se, and the allocation fails in Java code, not native code. This should be exactly what that PR I mentioned fixes. -- Sean Owen | Director, Data Science | London On Sun, Mar 16, 2014 at 11:48 AM, Debasish Das debasish.da...@gmail.com wrote: Thanks Sean...let me get the latest code..do you know which PR was it ? But will the executors run fine with say 32 gb or 64 gb of memory ? Does not JVM shows up issues when the max memory goes beyond certain limit... Also the failure is due to GC limits from jblas...and I was thinking that jblas is going to call native malloc right ? May be 64 gb is not a big deal then...I will try increasing to 32 and then 64... java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded)
RDD Collect returns empty arrays
I am getting strange behavior with the RDDs. All I want is to persist the RDD contents in a single file. The saveAsTextFile() saves them in multiple textfiles for each partition. So I tried with rdd.coalesce(1,true).saveAsTextFile(). This fails with the exception : org.apache.spark.SparkException: Job aborted: Task 75.0:0 failed 1 times (most recent failure: Exception failure: java.lang.IllegalStateException: unread block data) Then I tried collecting the RDD contents in an array, and writing the array to the file manually. Again, that fails. It is giving me empty arrays, even when data is there. /**The below saves the data in multiple text files. So data is there for sure **/ rdd.saveAsTextFile(resultDirectory) /**The below simply prints size 0 for all the RDDs in a stream. Why ?! **/ val arr = rdd.collect println(SIZE of RDD + rdd.id + + arr.size) Kindly help! I am clueless on how to proceed. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Collect-returns-empty-arrays-tp3242.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Error reading HDFS file using spark 0.9.0 / hadoop 2.2.0 - incompatible protobuf 2.5 and 2.4.1
Egor, i encounter the same problem which you have asked in this thread: http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3CCAMrx5DwJVJS0g_FE7_2qwMu4Xf0y5VfV=tlyauv2kh5v4k6...@mail.gmail.com%3E have you fixed this problem? i am using shark to read a table which i have created on hdfs. i found in shark lib_managed directory there are two protobuf*.jar: [root@bigdata001 shark-0.9.0]# find . -name proto*.jar ./lib_managed/jars/org.spark-project.protobuf/protobuf-java/protobuf-java-2.4.1-shaded.jar ./lib_managed/bundles/com.google.protobuf/protobuf-java/protobuf-java-2.5.0.jar my hadoop is using protobuf-java-2.5.0.jar .
Re: Shark does not give any results with SELECT count(*) command
hi, Praveen, thanks for replying. I am using hive-0.11 which comes from amplab, at the begining , the hive-site.xml of amplab is empty, so , i copy one hive-site.xml from my cluster and then remove some attributes and aslo add some atrributs. i think it is not the reason for my problem, i think the reason is shark is runing on local mode , not cluster mode, when i run bin/shark on bigdata001, it certainly can not get the result which exist on bigdata003. while i run bin/shark on bigdata003, i can get result. though it is the reason, i still can not understand why the result is on bigdata003(master is bigdata001)? 2014-03-25 18:41 GMT+08:00 Praveen R prav...@mobipulse.in: Hi Qingyang Li, Shark-0.9.0 uses a patched version of hive-0.11 and using configuration/metastore of hive-0.12 could be incompatible. May I know the reason you are using hive-site.xml from previous hive version(to use existing metastore?). You might just leave hive-site.xml blank, otherwise. Something like this: ?xml version=1.0? ?xml-stylesheet type=text/xsl href=configuration.xsl? configuration /configuration In any case you could run ./bin/shark-withdebug for any errors. Regards, Praveen On 25-Mar-2014, at 1:49 pm, qingyang li liqingyang1...@gmail.com wrote: reopen this thread because i encounter this problem again. Here is my env: scala 2.10.3 s spark 0.9.0tandalone mode shark 0.9.0downlaod the source code and build by myself hive hive-shark-0.11 I have copied hive-site.xml from my hadoop cluster , it's hive version is 0.12, after copied , i deleted some attributes from hive-site.xml When run select count(*) from xxx, no resut and no errors output. Can someone give me some suggestions to debug ? 2014-03-20 11:27 GMT+08:00 qingyang li liqingyang1...@gmail.com: have found the cause , my problem is : the style of file salves is not correct, so the task only be run on master. explain here to help other guy who also encounter similiar problem. 2014-03-20 9:57 GMT+08:00 qingyang li liqingyang1...@gmail.com: Hi, i install spark0.9.0 and shark0.9 on 3 nodes , when i run select * from src , i can get result, but when i run select count(*) from src or select * from src limit 1, there is no result output. i have found similiar problem on google groups: https://groups.google.com/forum/#!searchin/spark-users/Shark$20does$20not$20give$20any$20results$20with$20SELECT$20command/spark-users/oKMBPBWim0U/_hbDCi4m-xUJ but , there is no solution on it. Does anyone encounter such problem?
Re: Shark does not give any results with SELECT count(*) command
Oh k. You must be running shark server on bigdata001 to use it from other machines. ./bin/shark --service sharkserver # runs shark server on port 1 You could connect to shark server as ./bin/shark -h bigdata001, this should work unless there is a firewall blocking it. You might use telnet bigdata001 1 from bigdata003 to check if port is accessible. Hope that helps. On Wed, Mar 26, 2014 at 12:57 PM, qingyang li liqingyang1...@gmail.comwrote: hi, Praveen, thanks for replying. I am using hive-0.11 which comes from amplab, at the begining , the hive-site.xml of amplab is empty, so , i copy one hive-site.xml from my cluster and then remove some attributes and aslo add some atrributs. i think it is not the reason for my problem, i think the reason is shark is runing on local mode , not cluster mode, when i run bin/shark on bigdata001, it certainly can not get the result which exist on bigdata003. while i run bin/shark on bigdata003, i can get result. though it is the reason, i still can not understand why the result is on bigdata003(master is bigdata001)? 2014-03-25 18:41 GMT+08:00 Praveen R prav...@mobipulse.in: Hi Qingyang Li, Shark-0.9.0 uses a patched version of hive-0.11 and using configuration/metastore of hive-0.12 could be incompatible. May I know the reason you are using hive-site.xml from previous hive version(to use existing metastore?). You might just leave hive-site.xml blank, otherwise. Something like this: ?xml version=1.0? ?xml-stylesheet type=text/xsl href=configuration.xsl? configuration /configuration In any case you could run ./bin/shark-withdebug for any errors. Regards, Praveen On 25-Mar-2014, at 1:49 pm, qingyang li liqingyang1...@gmail.com wrote: reopen this thread because i encounter this problem again. Here is my env: scala 2.10.3 s spark 0.9.0tandalone mode shark 0.9.0downlaod the source code and build by myself hive hive-shark-0.11 I have copied hive-site.xml from my hadoop cluster , it's hive version is 0.12, after copied , i deleted some attributes from hive-site.xml When run select count(*) from xxx, no resut and no errors output. Can someone give me some suggestions to debug ? 2014-03-20 11:27 GMT+08:00 qingyang li liqingyang1...@gmail.com: have found the cause , my problem is : the style of file salves is not correct, so the task only be run on master. explain here to help other guy who also encounter similiar problem. 2014-03-20 9:57 GMT+08:00 qingyang li liqingyang1...@gmail.com: Hi, i install spark0.9.0 and shark0.9 on 3 nodes , when i run select * from src , i can get result, but when i run select count(*) from src or select * from src limit 1, there is no result output. i have found similiar problem on google groups: https://groups.google.com/forum/#!searchin/spark-users/Shark$20does$20not$20give$20any$20results$20with$20SELECT$20command/spark-users/oKMBPBWim0U/_hbDCi4m-xUJ but , there is no solution on it. Does anyone encounter such problem?
Re: Shark does not give any results with SELECT count(*) command
hi, Praveen, I can start server on bigdata001 using /bin/shark --service sharkserver, i can also connect this server using ./bin/shark -h bigdata001 . but, the problem still there: run select count(*) from b on bigdata001, no result , no error. run select count(*) from b on bigdata002, no result , no error. run select count(*) from b on bigdata004, no result , no error. run select count(*) from b on bigdata003, have result. 2014-03-26 15:49 GMT+08:00 Praveen R prav...@sigmoidanalytics.com: Oh k. You must be running shark server on bigdata001 to use it from other machines. ./bin/shark --service sharkserver # runs shark server on port 1 You could connect to shark server as ./bin/shark -h bigdata001, this should work unless there is a firewall blocking it. You might use telnet bigdata001 1 from bigdata003 to check if port is accessible. Hope that helps. On Wed, Mar 26, 2014 at 12:57 PM, qingyang li liqingyang1...@gmail.comwrote: hi, Praveen, thanks for replying. I am using hive-0.11 which comes from amplab, at the begining , the hive-site.xml of amplab is empty, so , i copy one hive-site.xml from my cluster and then remove some attributes and aslo add some atrributs. i think it is not the reason for my problem, i think the reason is shark is runing on local mode , not cluster mode, when i run bin/shark on bigdata001, it certainly can not get the result which exist on bigdata003. while i run bin/shark on bigdata003, i can get result. though it is the reason, i still can not understand why the result is on bigdata003(master is bigdata001)? 2014-03-25 18:41 GMT+08:00 Praveen R prav...@mobipulse.in: Hi Qingyang Li, Shark-0.9.0 uses a patched version of hive-0.11 and using configuration/metastore of hive-0.12 could be incompatible. May I know the reason you are using hive-site.xml from previous hive version(to use existing metastore?). You might just leave hive-site.xml blank, otherwise. Something like this: ?xml version=1.0? ?xml-stylesheet type=text/xsl href=configuration.xsl? configuration /configuration In any case you could run ./bin/shark-withdebug for any errors. Regards, Praveen On 25-Mar-2014, at 1:49 pm, qingyang li liqingyang1...@gmail.com wrote: reopen this thread because i encounter this problem again. Here is my env: scala 2.10.3 s spark 0.9.0tandalone mode shark 0.9.0downlaod the source code and build by myself hive hive-shark-0.11 I have copied hive-site.xml from my hadoop cluster , it's hive version is 0.12, after copied , i deleted some attributes from hive-site.xml When run select count(*) from xxx, no resut and no errors output. Can someone give me some suggestions to debug ? 2014-03-20 11:27 GMT+08:00 qingyang li liqingyang1...@gmail.com: have found the cause , my problem is : the style of file salves is not correct, so the task only be run on master. explain here to help other guy who also encounter similiar problem. 2014-03-20 9:57 GMT+08:00 qingyang li liqingyang1...@gmail.com: Hi, i install spark0.9.0 and shark0.9 on 3 nodes , when i run select * from src , i can get result, but when i run select count(*) from src or select * from src limit 1, there is no result output. i have found similiar problem on google groups: https://groups.google.com/forum/#!searchin/spark-users/Shark$20does$20not$20give$20any$20results$20with$20SELECT$20command/spark-users/oKMBPBWim0U/_hbDCi4m-xUJ but , there is no solution on it. Does anyone encounter such problem?
Re: How to set environment variable for a spark job
OK, it was working. I printed System.getenv(..) for both env variables and they gave correct values. However it did not give me the intended result. My intention was to load a native library from LD_LIBRARY_PATH, but looks like the library is loaded from value of -Djava.library.path. Value of this property is coming as -Djava.library.path=/opt/cloudera/parcels/CDH-5.0.0-0.cdh5b2.p0.27/lib/spark/lib:/opt/cloudera/parcels/CDH-5.0.0-0.cdh5b2.p0.27/lib/hadoop/lib/native Any idea how to append my custom path to it programatically? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-environment-variable-for-a-spark-job-tp3180p3249.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Shark does not give any results with SELECT count(*) command
i have found such log on bigdata003: 14/03/25 17:08:49 INFO network.ConnectionManager: Accepted connection from [bigdata001/192.168.1.101] 14/03/25 17:08:49 INFO network.ConnectionManager: Accepted connection from [bigdata002/192.168.1.102] 14/03/25 17:08:49 INFO network.ConnectionManager: Accepted connection from [bigdata004/192.168.1.104] and also found such log on bigdata004 002 001: 09/01/13 09:32:29 INFO network.ConnectionManager: Accepted connection from [bigdata003/192.168.1.103] 09/01/13 09:32:29 INFO network.SendingConnection: Initiating connection to [bigdata003/192.168.1.103:39848] 09/01/13 09:32:29 INFO network.SendingConnection: Connected to [bigdata003/192.168.1.103:39848], 1 messages pending from the log, it seems bigdata003 becomes master, but i config bigdata001 as master. Another clue : sometimes, after i re-start spark cluster, it becomes ok again, i can get result on bigdata001, but fail on bigdata003, so, if spark choose one node randomly to store the result? if i did not say the problem clearly, please let me know. thanks. 2014-03-26 16:55 GMT+08:00 qingyang li liqingyang1...@gmail.com: hi, Praveen, I can start server on bigdata001 using /bin/shark --service sharkserver, i can also connect this server using ./bin/shark -h bigdata001 . but, the problem still there: run select count(*) from b on bigdata001, no result , no error. run select count(*) from b on bigdata002, no result , no error. run select count(*) from b on bigdata004, no result , no error. run select count(*) from b on bigdata003, have result. 2014-03-26 15:49 GMT+08:00 Praveen R prav...@sigmoidanalytics.com: Oh k. You must be running shark server on bigdata001 to use it from other machines. ./bin/shark --service sharkserver # runs shark server on port 1 You could connect to shark server as ./bin/shark -h bigdata001, this should work unless there is a firewall blocking it. You might use telnet bigdata001 1 from bigdata003 to check if port is accessible. Hope that helps. On Wed, Mar 26, 2014 at 12:57 PM, qingyang li liqingyang1...@gmail.comwrote: hi, Praveen, thanks for replying. I am using hive-0.11 which comes from amplab, at the begining , the hive-site.xml of amplab is empty, so , i copy one hive-site.xml from my cluster and then remove some attributes and aslo add some atrributs. i think it is not the reason for my problem, i think the reason is shark is runing on local mode , not cluster mode, when i run bin/shark on bigdata001, it certainly can not get the result which exist on bigdata003. while i run bin/shark on bigdata003, i can get result. though it is the reason, i still can not understand why the result is on bigdata003(master is bigdata001)? 2014-03-25 18:41 GMT+08:00 Praveen R prav...@mobipulse.in: Hi Qingyang Li, Shark-0.9.0 uses a patched version of hive-0.11 and using configuration/metastore of hive-0.12 could be incompatible. May I know the reason you are using hive-site.xml from previous hive version(to use existing metastore?). You might just leave hive-site.xml blank, otherwise. Something like this: ?xml version=1.0? ?xml-stylesheet type=text/xsl href=configuration.xsl? configuration /configuration In any case you could run ./bin/shark-withdebug for any errors. Regards, Praveen On 25-Mar-2014, at 1:49 pm, qingyang li liqingyang1...@gmail.com wrote: reopen this thread because i encounter this problem again. Here is my env: scala 2.10.3 s spark 0.9.0tandalone mode shark 0.9.0downlaod the source code and build by myself hive hive-shark-0.11 I have copied hive-site.xml from my hadoop cluster , it's hive version is 0.12, after copied , i deleted some attributes from hive-site.xml When run select count(*) from xxx, no resut and no errors output. Can someone give me some suggestions to debug ? 2014-03-20 11:27 GMT+08:00 qingyang li liqingyang1...@gmail.com: have found the cause , my problem is : the style of file salves is not correct, so the task only be run on master. explain here to help other guy who also encounter similiar problem. 2014-03-20 9:57 GMT+08:00 qingyang li liqingyang1...@gmail.com: Hi, i install spark0.9.0 and shark0.9 on 3 nodes , when i run select * from src , i can get result, but when i run select count(*) from src or select * from src limit 1, there is no result output. i have found similiar problem on google groups: https://groups.google.com/forum/#!searchin/spark-users/Shark$20does$20not$20give$20any$20results$20with$20SELECT$20command/spark-users/oKMBPBWim0U/_hbDCi4m-xUJ but , there is no solution on it. Does anyone encounter such problem?
Re: tracking resource usage for spark-shell commands
Thanks for the response Mayur. I was seeing the webui of 0.9.0 spark. I see lots of detailed statistics in the newer 1.0.0-snapshot version. The only thing I found missing was the actual code that I had typed in at the spark-shell prompt, but I can always get it from the shell history. On 26-Mar-2014, at 7:53 am, Mayur Rustagi mayur.rust...@gmail.com wrote: Time taken is shown in Shark shell web ui (hosted on 4040 port). Also memory used is shown in terms of Storage of RDD, how much shuffle data was written read during the process is also highlighted thr. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Tue, Mar 25, 2014 at 6:04 AM, Bharath Bhushan manku.ti...@outlook.com wrote: Is there a way to see the resource usage of each spark-shell command — say time taken and memory used? I checked the WebUI of spark-shell and of the master and I don’t see any such breakdown. I see the time taken in the INFO logs but nothing about memory usage. It would also be nice to track the time taken in the spark-shell web UI. — Thanks
Re: Spark Streaming - Shared hashmaps
When you say launch long-running tasks does it mean long running Spark jobs/tasks, or long-running tasks in another system? If the rate of requests from Kafka is not low (in terms of records per second), you could collect the records in the driver, and maintain the shared bag in the driver. A separate thread in the driver could pick stuff from the bag and launch tasks. This is a slightly unorthodox use of Spark Streaming, but should work. If the rate of request from Kafka is high, then I am not sure how you can sustain that many long running tasks (assuming 1 task corresponding to each request from Kafka). TD On Wed, Mar 26, 2014 at 1:19 AM, Bryan Bryan bryanbryan...@gmail.comwrote: Hi there, I have read about the two fundamental shared features in spark (broadcasting variables and accumulators), but this is what i need. I'm using spark streaming in order to get requests from Kafka, these requests may launch long-running tasks, and i need to control them: 1) Keep them in a shared bag, like a Hashmap, to retrieve them by ID, for example. 2) Retrieve an instance of this object/task whatever on-demand (on-request, in fact) Any idea about that? How can i share objects between slaves? May i use something out of spark (maybe hazelcast') Regards.
Distributed running in Spark Interactive shell
Is it possible to run across cluster using Spark Interactive Shell ? To be more explicit, is the procedure similar to running standalone master-slave spark. I want to execute my code in the interactive shell in the master-node, and it should run across the cluster [say 5 node]. Is the procedure similar ??? -- *Sai Prasanna. AN* *II M.Tech (CS), SSSIHL* *Entire water in the ocean can never sink a ship, Unless it gets inside.All the pressures of life can never hurt you, Unless you let them in.*
Re: Distributed running in Spark Interactive shell
what do you mean by run across the cluster? you want to start the spark-shell across the cluster or you want to distribute tasks to multiple machines? if the former case, yes, as long as you indicate the right master URL if the later case, also yes, you can observe the distributed task in the Spark UI -- Nan Zhu On Wednesday, March 26, 2014 at 8:54 AM, Sai Prasanna wrote: Is it possible to run across cluster using Spark Interactive Shell ? To be more explicit, is the procedure similar to running standalone master-slave spark. I want to execute my code in the interactive shell in the master-node, and it should run across the cluster [say 5 node]. Is the procedure similar ??? -- Sai Prasanna. AN II M.Tech (CS), SSSIHL Entire water in the ocean can never sink a ship, Unless it gets inside. All the pressures of life can never hurt you, Unless you let them in.
Re: Distributed running in Spark Interactive shell
Nan Zhu, its the later, I want to distribute the tasks to the cluster [machines available.] If i set the SPARK_MASTER_IP at the other machines and set the slaves-IP in the /conf/slaves at the master node, will the interactive shell code run at the master get distributed across multiple machines ??? On Wed, Mar 26, 2014 at 6:32 PM, Nan Zhu zhunanmcg...@gmail.com wrote: what do you mean by run across the cluster? you want to start the spark-shell across the cluster or you want to distribute tasks to multiple machines? if the former case, yes, as long as you indicate the right master URL if the later case, also yes, you can observe the distributed task in the Spark UI -- Nan Zhu On Wednesday, March 26, 2014 at 8:54 AM, Sai Prasanna wrote: Is it possible to run across cluster using Spark Interactive Shell ? To be more explicit, is the procedure similar to running standalone master-slave spark. I want to execute my code in the interactive shell in the master-node, and it should run across the cluster [say 5 node]. Is the procedure similar ??? -- *Sai Prasanna. AN* *II M.Tech (CS), SSSIHL* *Entire water in the ocean can never sink a ship, Unless it gets inside. All the pressures of life can never hurt you, Unless you let them in.* -- *Sai Prasanna. AN* *II M.Tech (CS), SSSIHL* *Entire water in the ocean can never sink a ship, Unless it gets inside.All the pressures of life can never hurt you, Unless you let them in.*
Re: Distributed running in Spark Interactive shell
what you only need to do is ensure your spark cluster is running well, (you can check by access the Spark UI to see if all workers are displayed) then, you have to set correct SPARK_MASTER_IP in the machine where you run spark-shell The more details are : when you run bin/spark-shell, it will start the driver program in that machine, interacting with the Master to start the application (in this case, it is spark-shell) the Master tells Workers to start executors for your application, and the executors will try to register with your driver, then your driver can distribute tasks to the executors, i.e. run in a distributed fashion Best, -- Nan Zhu On Wednesday, March 26, 2014 at 9:01 AM, Sai Prasanna wrote: Nan Zhu, its the later, I want to distribute the tasks to the cluster [machines available.] If i set the SPARK_MASTER_IP at the other machines and set the slaves-IP in the /conf/slaves at the master node, will the interactive shell code run at the master get distributed across multiple machines ??? On Wed, Mar 26, 2014 at 6:32 PM, Nan Zhu zhunanmcg...@gmail.com (mailto:zhunanmcg...@gmail.com) wrote: what do you mean by run across the cluster? you want to start the spark-shell across the cluster or you want to distribute tasks to multiple machines? if the former case, yes, as long as you indicate the right master URL if the later case, also yes, you can observe the distributed task in the Spark UI -- Nan Zhu On Wednesday, March 26, 2014 at 8:54 AM, Sai Prasanna wrote: Is it possible to run across cluster using Spark Interactive Shell ? To be more explicit, is the procedure similar to running standalone master-slave spark. I want to execute my code in the interactive shell in the master-node, and it should run across the cluster [say 5 node]. Is the procedure similar ??? -- Sai Prasanna. AN II M.Tech (CS), SSSIHL Entire water in the ocean can never sink a ship, Unless it gets inside. All the pressures of life can never hurt you, Unless you let them in. -- Sai Prasanna. AN II M.Tech (CS), SSSIHL Entire water in the ocean can never sink a ship, Unless it gets inside. All the pressures of life can never hurt you, Unless you let them in.
Re: Distributed running in Spark Interactive shell
Nan (or anyone who feels they understand the cluster architecture well), can you clarify something for me. From reading this user group and your explanation above it appears that the cluster master is only involved in this during application startup -- to allocate executors(from what you wrote sounds like the driver itself passes the job/tasks to the executors). From there onwards all computation is done on the executors, who communicate results directly to the driver if certain actions (say collect) are performed. Is that right? The only description of the cluster I've seen came from here: https://spark.apache.org/docs/0.9.0/cluster-overview.html but that picture suggests there is no direct communication between driver and executors, which I believe is wrong (unless I am misreading the picture -- I believe Master and Cluster Manager refer to the same thing?). The very short form of my question is, does the master do anything other than executor allocation? On Wed, Mar 26, 2014 at 9:23 AM, Nan Zhu zhunanmcg...@gmail.com wrote: what you only need to do is ensure your spark cluster is running well, (you can check by access the Spark UI to see if all workers are displayed) then, you have to set correct SPARK_MASTER_IP in the machine where you run spark-shell The more details are : when you run bin/spark-shell, it will start the driver program in that machine, interacting with the Master to start the application (in this case, it is spark-shell) the Master tells Workers to start executors for your application, and the executors will try to register with your driver, then your driver can distribute tasks to the executors, i.e. run in a distributed fashion Best, -- Nan Zhu On Wednesday, March 26, 2014 at 9:01 AM, Sai Prasanna wrote: Nan Zhu, its the later, I want to distribute the tasks to the cluster [machines available.] If i set the SPARK_MASTER_IP at the other machines and set the slaves-IP in the /conf/slaves at the master node, will the interactive shell code run at the master get distributed across multiple machines ??? On Wed, Mar 26, 2014 at 6:32 PM, Nan Zhu zhunanmcg...@gmail.com wrote: what do you mean by run across the cluster? you want to start the spark-shell across the cluster or you want to distribute tasks to multiple machines? if the former case, yes, as long as you indicate the right master URL if the later case, also yes, you can observe the distributed task in the Spark UI -- Nan Zhu On Wednesday, March 26, 2014 at 8:54 AM, Sai Prasanna wrote: Is it possible to run across cluster using Spark Interactive Shell ? To be more explicit, is the procedure similar to running standalone master-slave spark. I want to execute my code in the interactive shell in the master-node, and it should run across the cluster [say 5 node]. Is the procedure similar ??? -- *Sai Prasanna. AN* *II M.Tech (CS), SSSIHL* *Entire water in the ocean can never sink a ship, Unless it gets inside. All the pressures of life can never hurt you, Unless you let them in.* -- *Sai Prasanna. AN* *II M.Tech (CS), SSSIHL* *Entire water in the ocean can never sink a ship, Unless it gets inside. All the pressures of life can never hurt you, Unless you let them in.*
Re: Distributed running in Spark Interactive shell
master does more work than that actually, I just explained why he should set MASTER_IP correctly a simplified list: 1. maintain the worker status 2. maintain in-cluster driver status 3. maintain executor status (the worker tells master what happened on the executor, -- Nan Zhu On Wednesday, March 26, 2014 at 9:46 AM, Yana Kadiyska wrote: Nan (or anyone who feels they understand the cluster architecture well), can you clarify something for me. From reading this user group and your explanation above it appears that the cluster master is only involved in this during application startup -- to allocate executors(from what you wrote sounds like the driver itself passes the job/tasks to the executors). From there onwards all computation is done on the executors, who communicate results directly to the driver if certain actions (say collect) are performed. Is that right? The only description of the cluster I've seen came from here: https://spark.apache.org/docs/0.9.0/cluster-overview.html but that picture suggests there is no direct communication between driver and executors, which I believe is wrong (unless I am misreading the picture -- I believe Master and Cluster Manager refer to the same thing?). The very short form of my question is, does the master do anything other than executor allocation? On Wed, Mar 26, 2014 at 9:23 AM, Nan Zhu zhunanmcg...@gmail.com (mailto:zhunanmcg...@gmail.com) wrote: what you only need to do is ensure your spark cluster is running well, (you can check by access the Spark UI to see if all workers are displayed) then, you have to set correct SPARK_MASTER_IP in the machine where you run spark-shell The more details are : when you run bin/spark-shell, it will start the driver program in that machine, interacting with the Master to start the application (in this case, it is spark-shell) the Master tells Workers to start executors for your application, and the executors will try to register with your driver, then your driver can distribute tasks to the executors, i.e. run in a distributed fashion Best, -- Nan Zhu On Wednesday, March 26, 2014 at 9:01 AM, Sai Prasanna wrote: Nan Zhu, its the later, I want to distribute the tasks to the cluster [machines available.] If i set the SPARK_MASTER_IP at the other machines and set the slaves-IP in the /conf/slaves at the master node, will the interactive shell code run at the master get distributed across multiple machines ??? On Wed, Mar 26, 2014 at 6:32 PM, Nan Zhu zhunanmcg...@gmail.com (mailto:zhunanmcg...@gmail.com) wrote: what do you mean by run across the cluster? you want to start the spark-shell across the cluster or you want to distribute tasks to multiple machines? if the former case, yes, as long as you indicate the right master URL if the later case, also yes, you can observe the distributed task in the Spark UI -- Nan Zhu On Wednesday, March 26, 2014 at 8:54 AM, Sai Prasanna wrote: Is it possible to run across cluster using Spark Interactive Shell ? To be more explicit, is the procedure similar to running standalone master-slave spark. I want to execute my code in the interactive shell in the master-node, and it should run across the cluster [say 5 node]. Is the procedure similar ??? -- Sai Prasanna. AN II M.Tech (CS), SSSIHL Entire water in the ocean can never sink a ship, Unless it gets inside. All the pressures of life can never hurt you, Unless you let them in. -- Sai Prasanna. AN II M.Tech (CS), SSSIHL Entire water in the ocean can never sink a ship, Unless it gets inside. All the pressures of life can never hurt you, Unless you let them in.
Re: Distributed running in Spark Interactive shell
and, yes, I think that picture is a bit misleading, though in the following paragraph it has mentioned that “ Because the driver schedules tasks on the cluster, it should be run close to the worker nodes, preferably on the same local area network. If you’d like to send requests to the cluster remotely, it’s better to open an RPC to the driver and have it submit operations from nearby than to run a driver far away from the worker nodes. -- Nan Zhu On Wednesday, March 26, 2014 at 9:59 AM, Nan Zhu wrote: master does more work than that actually, I just explained why he should set MASTER_IP correctly a simplified list: 1. maintain the worker status 2. maintain in-cluster driver status 3. maintain executor status (the worker tells master what happened on the executor, -- Nan Zhu On Wednesday, March 26, 2014 at 9:46 AM, Yana Kadiyska wrote: Nan (or anyone who feels they understand the cluster architecture well), can you clarify something for me. From reading this user group and your explanation above it appears that the cluster master is only involved in this during application startup -- to allocate executors(from what you wrote sounds like the driver itself passes the job/tasks to the executors). From there onwards all computation is done on the executors, who communicate results directly to the driver if certain actions (say collect) are performed. Is that right? The only description of the cluster I've seen came from here: https://spark.apache.org/docs/0.9.0/cluster-overview.html but that picture suggests there is no direct communication between driver and executors, which I believe is wrong (unless I am misreading the picture -- I believe Master and Cluster Manager refer to the same thing?). The very short form of my question is, does the master do anything other than executor allocation? On Wed, Mar 26, 2014 at 9:23 AM, Nan Zhu zhunanmcg...@gmail.com (mailto:zhunanmcg...@gmail.com) wrote: what you only need to do is ensure your spark cluster is running well, (you can check by access the Spark UI to see if all workers are displayed) then, you have to set correct SPARK_MASTER_IP in the machine where you run spark-shell The more details are : when you run bin/spark-shell, it will start the driver program in that machine, interacting with the Master to start the application (in this case, it is spark-shell) the Master tells Workers to start executors for your application, and the executors will try to register with your driver, then your driver can distribute tasks to the executors, i.e. run in a distributed fashion Best, -- Nan Zhu On Wednesday, March 26, 2014 at 9:01 AM, Sai Prasanna wrote: Nan Zhu, its the later, I want to distribute the tasks to the cluster [machines available.] If i set the SPARK_MASTER_IP at the other machines and set the slaves-IP in the /conf/slaves at the master node, will the interactive shell code run at the master get distributed across multiple machines ??? On Wed, Mar 26, 2014 at 6:32 PM, Nan Zhu zhunanmcg...@gmail.com (mailto:zhunanmcg...@gmail.com) wrote: what do you mean by run across the cluster? you want to start the spark-shell across the cluster or you want to distribute tasks to multiple machines? if the former case, yes, as long as you indicate the right master URL if the later case, also yes, you can observe the distributed task in the Spark UI -- Nan Zhu On Wednesday, March 26, 2014 at 8:54 AM, Sai Prasanna wrote: Is it possible to run across cluster using Spark Interactive Shell ? To be more explicit, is the procedure similar to running standalone master-slave spark. I want to execute my code in the interactive shell in the master-node, and it should run across the cluster [say 5 node]. Is the procedure similar ??? -- Sai Prasanna. AN II M.Tech (CS), SSSIHL Entire water in the ocean can never sink a ship, Unless it gets inside. All the pressures of life can never hurt you, Unless you let them in. -- Sai Prasanna. AN II M.Tech (CS), SSSIHL Entire water in the ocean can never sink a ship, Unless it gets inside. All the pressures of life can never hurt you, Unless you let them in.
Re: java.lang.ClassNotFoundException
Have you looked at the individual nodes logs? Can you post a bit more of the exception's output? On 3/26/14, 8:42 AM, Jaonary Rabarisoa wrote: Hi all, I got java.lang.ClassNotFoundException even with addJar called. The jar file is present in each node. I use the version of spark from github master. Any ideas ? Jaonary
Re: spark-shell on standalone cluster gives error no mesos in java.library.path
Hi, I have a similar issue like the user below: I’m running Spark 0.8.1 (standalone). When I test the streaming NetworkWordCount example as in the docs with local[2] it works fine. As soon as I want to connect to my cluster using [NetworkWordCount master …] it says: --- Failed to load native Mesos library from /usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib Exception in thread main java.lang.UnsatisfiedLinkError: no mesos in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1886) at java.lang.Runtime.loadLibrary0(Runtime.java:849) at java.lang.System.loadLibrary(System.java:1088) at org.apache.mesos.MesosNativeLibrary.load(MesosNativeLibrary.java:52) at org.apache.mesos.MesosNativeLibrary.load(MesosNativeLibrary.java:64) at org.apache.spark.SparkContext.init(SparkContext.scala:260) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:559) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:84) at org.apache.spark.streaming.api.java.JavaStreamingContext.init(JavaStreamingContext.scala:76) at org.apache.spark.streaming.examples.JavaNetworkWordCount.main(JavaNetworkWordCount.java:50) --- I built mesos 0.13 and added the MESOS_NATIVE_LIBRARY entry in spark-env.sh. But then I get: --- A fatal error has been detected by the Java Runtime Environment: # # SIGSEGV (0xb) at pc=0x7fed89801ce9, pid=13580, tid=140657358776064 # # JRE version: Java(TM) SE Runtime Environment (7.0_51-b13) (build 1.7.0_51-b13) # Java VM: Java HotSpot(TM) 64-Bit Server VM (24.51-b03 mixed mode linux-amd64 compressed oops) # Problematic frame: # V [libjvm.so+0x632ce9] jni_GetByteArrayElements+0x89 # # Failed to write core dump. Core dumps have been disabled. To enable core dumping, try ulimit -c unlimited before starting Java again # # An error report file with more information is saved as: # /home/vagrant/hs_err_pid13580.log # # If you would like to submit a bug report, please visit: # http://bugreport.sun.com/bugreport/crash.jsp --- The error lag says: --- Current thread (0x7fed8473d000): JavaThread MesosSchedulerBackend driver daemon [_thread_in_vm, id=13638, stack(0x7fed57d7a000,0x7fed57e7b000)] … --- Working on Ubuntu 12.04 in Virtual Box. Tried it with OpenJDK 6 and Oracle Java 7. Any ideas?? Many thanks. Christoph Please ignore this error - I found the issue. Thanks ! On Mon, Jan 20, 2014 at 3:14 PM, Manoj Samel manojsamelt...@gmail.comwrote: Hi I deployed spark 0.8.1 on standalone cluster per https://spark.incubator.apache.org/docs/0.8.1/spark-standalone.html When i start a spark-shell , I get following error I thought mesos should not be required for standalone cluster. Do I have to change any parameters in make-distribution.sh that I used to build the spark distribution for this cluster ? I left all to default (and noticed that the default HADOOP version is 1.0.4 which is not my hadoop version - but I am not using Hadoop here). Creating SparkContext... Failed to load native Mesos library from java.lang.UnsatisfiedLinkError: no mesos in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1738) at java.lang.Runtime.loadLibrary0(Runtime.java:823) at java.lang.System.loadLibrary(System.java:1028) at org.apache.mesos.MesosNativeLibrary.load(MesosNativeLibrary.java:52) at org.apache.mesos.MesosNativeLibrary.load(MesosNativeLibrary.java:64) at org.apache.spark.SparkContext.init(SparkContext.scala:260) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:862)
closures moving averages (state)
I'm passing a moving average function during the map phase like this: val average= new Sma(window=3) stream.map(x= average.addNumber(x)) where class Sma extends Serializable { .. } I also tried to put creation of object average in an object like I saw in another post: object Average { val smaFn = new VSRTXYSimpleMovingAverage[(String, Long)](3) } Every time average.addNumber is called it is a new instance. How can I preserve state of average object? Thanks -Adrian
Re: java.lang.ClassNotFoundException
Have you looked through the logs fully? I have seen this (in my limited experience) pop up as a result of previous exceptions/errors, also as a result of being unable to serialize objects etc. Ognen On 3/26/14, 10:39 AM, Jaonary Rabarisoa wrote: I notice that I get this error when I'm trying to load an objectFile with val viperReloaded = context.objectFile[ReIdDataSetEntry](data) On Wed, Mar 26, 2014 at 3:58 PM, Jaonary Rabarisoa jaon...@gmail.com mailto:jaon...@gmail.com wrote: Here the output that I get : [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 4 times (most recent failure: Exception failure in TID 6 on host 172.166.86.36 http://172.166.86.36: java.lang.ClassNotFoundException: value.models.ReIdDataSetEntry) org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 4 times (most recent failure: Exception failure in TID 6 on host 172.166.86.36 http://172.166.86.36: java.lang.ClassNotFoundException: value.models.ReIdDataSetEntry) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1011) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1009) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org http://org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1009) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:596) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:596) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:596) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:146) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Spark says that the jar is added : 14/03/26 15:49:18 INFO SparkContext: Added JAR target/scala-2.10/value-spark_2.10-1.0.jar On Wed, Mar 26, 2014 at 3:34 PM, Ognen Duzlevski og...@plainvanillagames.com mailto:og...@plainvanillagames.com wrote: Have you looked at the individual nodes logs? Can you post a bit more of the exception's output? On 3/26/14, 8:42 AM, Jaonary Rabarisoa wrote: Hi all, I got java.lang.ClassNotFoundException even with addJar called. The jar file is present in each node. I use the version of spark from github master. Any ideas ? Jaonary
RE: closures moving averages (state)
Tried with reduce and it's giving me pretty weird results that make no sense ie: 1 number for an entire RDD val smaStream= inputStream.reduce{ case(t1,t2) = { val sma= average.addDataPoint(t1) sma }} Tried with transform and that worked correctly, but unfortunately, it works 1 RDD at a time so the moving average is reset when the next consecutive RDD is read .. as if a new instance of the Average class is created for each RDD. Is there a way to have 1 global variable of custom type (ie my case Average type) .. somewhat like accumulators, but not incrementable in parallel - it wouldn't make sense for a moving average. The reason I want to apply a moving average function to a stream is so that the tuples remain in Spark and benefit from its fault tolerant mechanisms. My guess is that currently this is not possible, but I'll wait for one of the Spark creators to comment on this. -A From: Benjamin Black [mailto:b...@b3k.us] Sent: March-26-14 11:50 AM To: user@spark.apache.org Subject: Re: closures moving averages (state) Perhaps you want reduce rather than map? On Wednesday, March 26, 2014, Adrian Mocanu amoc...@verticalscope.commailto:amoc...@verticalscope.com wrote: I'm passing a moving average function during the map phase like this: val average= new Sma(window=3) stream.map(x= average.addNumber(x)) where class Sma extends Serializable { .. } I also tried to put creation of object average in an object like I saw in another post: object Average { val smaFn = new VSRTXYSimpleMovingAverage[(String, Long)](3) } Every time average.addNumber is called it is a new instance. How can I preserve state of average object? Thanks -Adrian
interleave partitions?
Hi, I want to do something like this: rdd3 = rdd1.coalesce(N).partitions.zip(rdd2.coalesce(N).partitions) I realize the above will get me something like Array[(partition,partition)]. I hope you see what I'm going for here -- any tips on how to accomplish this? Thanks
streaming questions
I'm trying to understand Spark streaming, hoping someone can help. I've kinda-sorta got a version of Word Count running, and it looks like this: import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ object StreamingWordCount { def main(args: Array[String]) { if (args.length 3) { System.err.println(Usage: StreamingWordCount master hostname port) System.exit(1) } val master = args(0) val hostname = args(1) val port = args(2).toInt val ssc = new StreamingContext(master, Streaming Word Count,Seconds(2)) val lines = ssc.socketTextStream(hostname, port) val words = lines.flatMap(line = line.split( )) val wordCounts = words.map(x = (x, 1)).reduceByKey((x,y) = x+y) wordCounts.print() ssc.start() ssc.awaitTermination() } } (I also have a small script that sends text to that port.) *Question 1:* When I run this, I don't get any output from the wordCounts.print as long as my data is still streaming. I have to stop my streaming data script before my program will display the word counts. Why is that? What if my stream is indefinite? I thought the point of Streaming was that it would process it in real time? *Question 2:* While I run this (and the stream is still sending) I get continuous warning messages like this: 14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623200 already exists on this machine; not re-adding it 14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623400 already exists on this machine; not re-adding it What does that mean? *Question 3:* I tried replacing the wordCounts.print() line with wordCounts.saveAsTextFiles(file:/my/path/outdir). This results in the creation of a new outdir-timestamp file being created every two seconds...even if there's no data during that time period. Is there a way to tell it to save only if there's data? Thanks!
Re: streaming questions
*Answer 1:*Make sure you are using master as local[n] with n 1 (assuming you are running it in local mode). The way Spark Streaming works is that it assigns a code to the data receiver, and so if you run the program with only one core (i.e., with local or local[1]), then it wont have resources to process data along with receiving it. *Answer 2:*Spark Streaming is designed to replicate the received data within the machines in a Spark cluster for fault-tolerance. However, when you are running in the local mode, since there is only one machine, the blocks of data arent able to replicate. This is expected and safe to ignore in local mode. *Answer 3:*You can do something like wordCounts.foreachRDD((rdd: RDD[...], time: Time) = { if (rdd.take(1).size == 1) { // There exists at least one element in RDD, so save it to file rdd.saveAsTextFile(generate file name based on time) } } TD On Wed, Mar 26, 2014 at 11:08 AM, Diana Carroll dcarr...@cloudera.com wrote: I'm trying to understand Spark streaming, hoping someone can help. I've kinda-sorta got a version of Word Count running, and it looks like this: import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ object StreamingWordCount { def main(args: Array[String]) { if (args.length 3) { System.err.println(Usage: StreamingWordCount master hostname port) System.exit(1) } val master = args(0) val hostname = args(1) val port = args(2).toInt val ssc = new StreamingContext(master, Streaming Word Count,Seconds(2)) val lines = ssc.socketTextStream(hostname, port) val words = lines.flatMap(line = line.split( )) val wordCounts = words.map(x = (x, 1)).reduceByKey((x,y) = x+y) wordCounts.print() ssc.start() ssc.awaitTermination() } } (I also have a small script that sends text to that port.) Question 1: When I run this, I don't get any output from the wordCounts.print as long as my data is still streaming. I have to stop my streaming data script before my program will display the word counts. Why is that? What if my stream is indefinite? I thought the point of Streaming was that it would process it in real time? Question 2: While I run this (and the stream is still sending) I get continuous warning messages like this: 14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623200 already exists on this machine; not re-adding it 14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623400 already exists on this machine; not re-adding it What does that mean? Question 3: I tried replacing the wordCounts.print() line with wordCounts.saveAsTextFiles(file:/my/path/outdir). This results in the creation of a new outdir-timestamp file being created every two seconds...even if there's no data during that time period. Is there a way to tell it to save only if there's data? Thanks!
RE: streaming questions
Hi Diana, I'll answer Q3: You can check if an RDD is empty in several ways. Someone here mentioned that using an iterator was safer: val isEmpty = rdd.mapPartitions(iter = Iterator(! iter.hasNext)).reduce(__) You can also check with a fold or rdd.count rdd.reduce(_ + _) // can't handle empty RDD rdd.fold(0)(_ + _) // no problem with empty RDD A From: Diana Carroll [mailto:dcarr...@cloudera.com] Sent: March-26-14 2:09 PM To: user Subject: streaming questions I'm trying to understand Spark streaming, hoping someone can help. I've kinda-sorta got a version of Word Count running, and it looks like this: import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ object StreamingWordCount { def main(args: Array[String]) { if (args.length 3) { System.err.println(Usage: StreamingWordCount master hostname port) System.exit(1) } val master = args(0) val hostname = args(1) val port = args(2).toInt val ssc = new StreamingContext(master, Streaming Word Count,Seconds(2)) val lines = ssc.socketTextStream(hostname, port) val words = lines.flatMap(line = line.split( )) val wordCounts = words.map(x = (x, 1)).reduceByKey((x,y) = x+y) wordCounts.print() ssc.start() ssc.awaitTermination() } } (I also have a small script that sends text to that port.) Question 1: When I run this, I don't get any output from the wordCounts.print as long as my data is still streaming. I have to stop my streaming data script before my program will display the word counts. Why is that? What if my stream is indefinite? I thought the point of Streaming was that it would process it in real time? Question 2: While I run this (and the stream is still sending) I get continuous warning messages like this: 14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623200 already exists on this machine; not re-adding it 14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623400 already exists on this machine; not re-adding it What does that mean? Question 3: I tried replacing the wordCounts.print() line with wordCounts.saveAsTextFiles(file:/my/path/outdir). This results in the creation of a new outdir-timestamp file being created every two seconds...even if there's no data during that time period. Is there a way to tell it to save only if there's data? Thanks!
Re: interleave partitions?
Answering my own question here. This may not be efficient, but this is what I came up with: rdd1.coalesce(N).glom.zip(rdd2.coalesce(N).glom).map { case(x,y) = x++y} On Wed, Mar 26, 2014 at 11:11 AM, Walrus theCat walrusthe...@gmail.comwrote: Hi, I want to do something like this: rdd3 = rdd1.coalesce(N).partitions.zip(rdd2.coalesce(N).partitions) I realize the above will get me something like Array[(partition,partition)]. I hope you see what I'm going for here -- any tips on how to accomplish this? Thanks
Re: streaming questions
Thanks, Tagatha, very helpful. A follow-up question below... On Wed, Mar 26, 2014 at 2:27 PM, Tathagata Das tathagata.das1...@gmail.comwrote: *Answer 3:*You can do something like wordCounts.foreachRDD((rdd: RDD[...], time: Time) = { if (rdd.take(1).size == 1) { // There exists at least one element in RDD, so save it to file rdd.saveAsTextFile(generate file name based on time) } } Is calling foreachRDD and performing an operation on each individually as efficient as performing the operation on the dstream? Is this foreach pretty much what dstream.saveAsTextFiles is doing anyway? This also brings up a question I have about caching in the context of streaming. In this example, would I want to call rdd.cache()? I'm calling two successive operations on the same rdd (take(1) and then saveAsTextFile))...if I were doing this in regular Spark I'd want to cache so I wouldn't need to re-calculate the rdd for both calls. Does the same apply here? Thanks, Diana
YARN problem using an external jar in worker nodes Inbox x
Hello, (this is Yarn related) I'm able to load an external jar and use its classes within ApplicationMaster. I wish to use this jar within worker nodes, so I added sc.addJar(pathToJar) and ran. I get the following exception: org.apache.spark.SparkException: Job aborted: Task 0.0:1 failed 4 times (most recent failure: Exception failure: java.lang.NoClassDefFoundError: org/opencv/objdetect/HOGDescriptor) Job aborted: Task 0.0:1 failed 4 times (most recent failure: Exception failure: java.lang.NoClassDefFoundError: org/opencv/objdetect/HOGDescriptor) org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) scala.Option.foreach(Option.scala:236) org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) akka.actor.ActorCell.invoke(ActorCell.scala:456) akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) akka.dispatch.Mailbox.run(Mailbox.scala:219) akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) And in worker node containers' stderr log (nothing in stdout log), I don't see any reference to loading jars: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/gpphddata/1/yarn/nm-local-dir/usercache/yarn/filecache/7394400996676014282/spark-assembly-0.9.0-incubating-hadoop2.0.2-alpha-gphd-2.0.1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/gphd/hadoop-2.0.2_alpha_gphd_2_0_1_0/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 14/03/26 13:12:18 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/03/26 13:12:18 INFO Remoting: Starting remoting 14/03/26 13:12:18 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006] 14/03/26 13:12:18 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006] 14/03/26 13:12:18 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@alpinenode5.alpinenow.local:10314/user/CoarseGrainedScheduler 14/03/26 13:12:18 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006] - [akka.tcp://spark@alpinenode5.alpinenow.local:10314] disassociated! Shutting down. Any idea what's going on?
Re: rdd.saveAsTextFile problem
Can you give us the more detailed exception + stack trace in the log? It should be in the driver log. If not, please take a look at the executor logs, through the web ui to find the stack trace. TD On Tue, Mar 25, 2014 at 10:43 PM, gaganbm gagan.mis...@gmail.com wrote: Hi Folks, Is this issue resolved ? If yes, could you please throw some light on how to fix this ? I am facing the same problem during writing to text files. When I do stream.foreachRDD(rdd ={ rdd.saveAsTextFile(Some path) }) This works fine for me. But it creates multiple text files for each partition within an RDD. So I tried with coalesce option to merge my results in a single file for each RDD as : stream.foreachRDD(rdd ={ rdd.coalesce(1, true).saveAsTextFile(Some path) }) This fails with : org.apache.spark.SparkException: Job aborted: Task 75.0:0 failed 1 times (most recent failure: Exception failure: java.lang.IllegalStateException: unread block data) I am using Spark Streaming 0.9.0 Any clue what's going wrong when using coalesce ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/rdd-saveAsTextFile-problem-tp176p3238.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: java.lang.ClassNotFoundException
it seems to be an old problem : http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3c7f6aa9e820f55d4a96946a87e086ef4a4bcdf...@eagh-erfpmbx41.erf.thomson.com%3E https://groups.google.com/forum/#!topic/spark-users/Q66UOeA2u-I Does anyone got the solution ? On Wed, Mar 26, 2014 at 5:50 PM, Yana Kadiyska yana.kadiy...@gmail.comwrote: I might be way off here but are you looking at the logs on the worker machines? I am running an older version (0.8) and when I look at the error log for the executor process I see the exact location where the executor process tries to load the jar from...with a line like this: 14/03/26 13:57:11 INFO executor.Executor: Adding file:/dirs/dirs/spark/work/app-20140326135710-0029/0/./spark-test.jar to class loader You said The jar file is present in each node, do you see any information on the executor indicating that it's trying to load the jar or where it's loading it from? I can't tell for sure by looking at your logs but they seem to be logs from the master and driver, not from the executor itself? On Wed, Mar 26, 2014 at 11:46 AM, Ognen Duzlevski og...@plainvanillagames.com wrote: Have you looked through the logs fully? I have seen this (in my limited experience) pop up as a result of previous exceptions/errors, also as a result of being unable to serialize objects etc. Ognen On 3/26/14, 10:39 AM, Jaonary Rabarisoa wrote: I notice that I get this error when I'm trying to load an objectFile with val viperReloaded = context.objectFile[ReIdDataSetEntry](data) On Wed, Mar 26, 2014 at 3:58 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Here the output that I get : [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 4 times (most recent failure: Exception failure in TID 6 on host 172.166.86.36: java.lang.ClassNotFoundException: value.models.ReIdDataSetEntry) org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 4 times (most recent failure: Exception failure in TID 6 on host 172.166.86.36: java.lang.ClassNotFoundException: value.models.ReIdDataSetEntry) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1011) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1009) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1009) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:596) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:596) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:596) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:146) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Spark says that the jar is added : 14/03/26 15:49:18 INFO SparkContext: Added JAR target/scala-2.10/value-spark_2.10-1.0.jar On Wed, Mar 26, 2014 at 3:34 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: Have you looked at the individual nodes logs? Can you post a bit more of the exception's output? On 3/26/14, 8:42 AM, Jaonary Rabarisoa wrote: Hi all, I got java.lang.ClassNotFoundException even with addJar called. The jar file is present in each node. I use the version of spark from github master. Any ideas ? Jaonary
Re: YARN problem using an external jar in worker nodes Inbox x
Hi Sung, Are you using yarn-standalone mode? Have you specified the --addJars option with your external jars? -Sandy On Wed, Mar 26, 2014 at 1:17 PM, Sung Hwan Chung coded...@cs.stanford.eduwrote: Hello, (this is Yarn related) I'm able to load an external jar and use its classes within ApplicationMaster. I wish to use this jar within worker nodes, so I added sc.addJar(pathToJar) and ran. I get the following exception: org.apache.spark.SparkException: Job aborted: Task 0.0:1 failed 4 times (most recent failure: Exception failure: java.lang.NoClassDefFoundError: org/opencv/objdetect/HOGDescriptor) Job aborted: Task 0.0:1 failed 4 times (most recent failure: Exception failure: java.lang.NoClassDefFoundError: org/opencv/objdetect/HOGDescriptor) org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) scala.Option.foreach(Option.scala:236) org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) akka.actor.ActorCell.invoke(ActorCell.scala:456) akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) akka.dispatch.Mailbox.run(Mailbox.scala:219) akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) And in worker node containers' stderr log (nothing in stdout log), I don't see any reference to loading jars: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/gpphddata/1/yarn/nm-local-dir/usercache/yarn/filecache/7394400996676014282/spark-assembly-0.9.0-incubating-hadoop2.0.2-alpha-gphd-2.0.1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/gphd/hadoop-2.0.2_alpha_gphd_2_0_1_0/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 14/03/26 13:12:18 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/03/26 13:12:18 INFO Remoting: Starting remoting 14/03/26 13:12:18 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006] 14/03/26 13:12:18 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006] 14/03/26 13:12:18 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@alpinenode5.alpinenow.local:10314/user/CoarseGrainedScheduler 14/03/26 13:12:18 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006] - [akka.tcp://spark@alpinenode5.alpinenow.local:10314] disassociated! Shutting down. Any idea what's going on?
Re: java.lang.ClassNotFoundException
context.objectFile[ReIdDataSetEntry](data) -not sure how this is compiled in scala. But, if it uses some sort of ObjectInputStream, you need to be careful - ObjectInputStream uses root classloader to load classes and does not work with jars that are added to TCCC. Apache commons has ClassLoaderObjectInputStream to workaround this. On Wed, Mar 26, 2014 at 1:38 PM, Jaonary Rabarisoa jaon...@gmail.comwrote: it seems to be an old problem : http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3c7f6aa9e820f55d4a96946a87e086ef4a4bcdf...@eagh-erfpmbx41.erf.thomson.com%3E https://groups.google.com/forum/#!topic/spark-users/Q66UOeA2u-I Does anyone got the solution ? On Wed, Mar 26, 2014 at 5:50 PM, Yana Kadiyska yana.kadiy...@gmail.comwrote: I might be way off here but are you looking at the logs on the worker machines? I am running an older version (0.8) and when I look at the error log for the executor process I see the exact location where the executor process tries to load the jar from...with a line like this: 14/03/26 13:57:11 INFO executor.Executor: Adding file:/dirs/dirs/spark/work/app-20140326135710-0029/0/./spark-test.jar to class loader You said The jar file is present in each node, do you see any information on the executor indicating that it's trying to load the jar or where it's loading it from? I can't tell for sure by looking at your logs but they seem to be logs from the master and driver, not from the executor itself? On Wed, Mar 26, 2014 at 11:46 AM, Ognen Duzlevski og...@plainvanillagames.com wrote: Have you looked through the logs fully? I have seen this (in my limited experience) pop up as a result of previous exceptions/errors, also as a result of being unable to serialize objects etc. Ognen On 3/26/14, 10:39 AM, Jaonary Rabarisoa wrote: I notice that I get this error when I'm trying to load an objectFile with val viperReloaded = context.objectFile[ReIdDataSetEntry](data) On Wed, Mar 26, 2014 at 3:58 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Here the output that I get : [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 4 times (most recent failure: Exception failure in TID 6 on host 172.166.86.36: java.lang.ClassNotFoundException: value.models.ReIdDataSetEntry) org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 4 times (most recent failure: Exception failure in TID 6 on host 172.166.86.36 : java.lang.ClassNotFoundException: value.models.ReIdDataSetEntry) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1011) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1009) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1009) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:596) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:596) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:596) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:146) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Spark says that the jar is added : 14/03/26 15:49:18 INFO SparkContext: Added JAR target/scala-2.10/value-spark_2.10-1.0.jar On Wed, Mar 26, 2014 at 3:34 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: Have you looked at the individual nodes logs? Can you post a bit more of the exception's output? On 3/26/14, 8:42 AM, Jaonary Rabarisoa wrote: Hi all, I got java.lang.ClassNotFoundException even with addJar called. The jar file is present in each node. I use the version of spark from github master. Any ideas ? Jaonary -- ...:::Aniket:::... Quetzalco@tl
Announcing Spark SQL
Hey Everyone, This already went out to the dev list, but I wanted to put a pointer here as well to a new feature we are pretty excited about for Spark 1.0. http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html Michael
All pairs shortest paths?
No idea how feasible this is. Has anyone done it?
Re: Announcing Spark SQL
This is so, so COOL. YES. I'm excited about using this once I'm a bit more comfortable with Spark. Nice work, people! On Wed, Mar 26, 2014 at 5:58 PM, Michael Armbrust mich...@databricks.comwrote: Hey Everyone, This already went out to the dev list, but I wanted to put a pointer here as well to a new feature we are pretty excited about for Spark 1.0. http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html Michael
RE: Announcing Spark SQL
Fantastic! Although, I think they missed an obvious name choice: SparkQL (pronounced sparkle) :) Skyler From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Wednesday, March 26, 2014 3:58 PM To: user@spark.apache.org Subject: Announcing Spark SQL Hey Everyone, This already went out to the dev list, but I wanted to put a pointer here as well to a new feature we are pretty excited about for Spark 1.0. http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html Michael
Re: All pairs shortest paths?
To clarify: I don't need the actual paths, just the distances. On Wed, Mar 26, 2014 at 3:04 PM, Ryan Compton compton.r...@gmail.com wrote: No idea how feasible this is. Has anyone done it?
Re: coalescing RDD into equally sized partitions
For the record, I tried this, and it worked. On Wed, Mar 26, 2014 at 10:51 AM, Walrus theCat walrusthe...@gmail.comwrote: Oh so if I had something more reasonable, like RDD's full of tuples of say, (Int,Set,Set), I could expect a more uniform distribution? Thanks On Mon, Mar 24, 2014 at 11:11 PM, Matei Zaharia matei.zaha...@gmail.comwrote: This happened because they were integers equal to 0 mod 5, and we used the default hashCode implementation for integers, which will map them all to 0. There's no API method that will look at the resulting partition sizes and rebalance them, but you could use another hash function. Matei On Mar 24, 2014, at 5:20 PM, Walrus theCat walrusthe...@gmail.com wrote: Hi, sc.parallelize(Array.tabulate(100)(i=i)).filter( _ % 20 == 0 ).coalesce(5,true).glom.collect yields Array[Array[Int]] = Array(Array(0, 20, 40, 60, 80), Array(), Array(), Array(), Array()) How do I get something more like: Array(Array(0), Array(20), Array(40), Array(60), Array(80)) Thanks
Re: Announcing Spark SQL
Congrats Michael co for putting this together — this is probably the neatest piece of technology added to Spark in the past few months, and it will greatly change what users can do as more data sources are added. Matei On Mar 26, 2014, at 3:22 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: Wow! Ognen On 3/26/14, 4:58 PM, Michael Armbrust wrote: Hey Everyone, This already went out to the dev list, but I wanted to put a pointer here as well to a new feature we are pretty excited about for Spark 1.0. http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html Michael
Re: Announcing Spark SQL
+1 Michael, Reynold et al. This is key to some of the things we're doing. -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Wed, Mar 26, 2014 at 2:58 PM, Michael Armbrust mich...@databricks.comwrote: Hey Everyone, This already went out to the dev list, but I wanted to put a pointer here as well to a new feature we are pretty excited about for Spark 1.0. http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html Michael
Re: All pairs shortest paths?
Yeah, if you’re just worried about statistics, maybe you can do sampling (do single-pair paths from 100 random nodes and you get an idea of what percentage of nodes have what distribution of neighbors in a given distance). Matei On Mar 26, 2014, at 5:55 PM, Ryan Compton compton.r...@gmail.com wrote: Much thanks, I suspected this would be difficult. I was hoping to generate some 4 degrees of separation-like statistics. Looks like I'll just have to work with a subset of my graph. On Wed, Mar 26, 2014 at 5:20 PM, Matei Zaharia matei.zaha...@gmail.com wrote: All-pairs distances is tricky for a large graph because you need O(V^2) storage. Do you want to just quickly query the distance between two vertices? In that case you can do single-source shortest paths, which I believe exists in GraphX, or at least is very quick to implement on top of its Pregel API. If your graph is small enough that storing all-pairs is feasible, you can probably run this as an iterative algorithm: http://en.wikipedia.org/wiki/Floyd–Warshall_algorithm, though I haven’t tried it. It may be tough to do with GraphX. Matei On Mar 26, 2014, at 3:51 PM, Ryan Compton compton.r...@gmail.com wrote: To clarify: I don't need the actual paths, just the distances. On Wed, Mar 26, 2014 at 3:04 PM, Ryan Compton compton.r...@gmail.com wrote: No idea how feasible this is. Has anyone done it?
Re: Spark Streaming + Kafka + Mesos/Marathon strangeness
The web-ui shows 3 executors, the driver and one spark task on each worker. I do see that there were 8 successful tasks and the ninth failed like so... java.lang.Exception (java.lang.Exception: Could not compute split, block input-0-1395860790200 not found) org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:45) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) org.apache.spark.rdd.RDD.iterator(RDD.scala:232) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) org.apache.spark.scheduler.Task.run(Task.scala:53) org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:701) Why would that happen? The two tasks that are running are ones that never successfully received messages from kafka, whereas the one that did was killed for some reason after working fine for a few minutes. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Kafka-Mesos-Marathon-strangeness-tp3285p3312.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Announcing Spark SQL
Very nice. Any plans to make the SQL typesafe using something like Slick ( http://slick.typesafe.com/) Thanks ! On Wed, Mar 26, 2014 at 5:58 PM, Michael Armbrust mich...@databricks.comwrote: Hey Everyone, This already went out to the dev list, but I wanted to put a pointer here as well to a new feature we are pretty excited about for Spark 1.0. http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html Michael
Spark preferred compression format
Hi, What's the splittable compression format that works with Spark right now ? We are looking into bzip2 / lzo / gzip...gzip is not splittable so not a good optionWithin bzip2/lzo I am confused. Thanks. Deb
Re: Announcing Spark SQL
Any plans to make the SQL typesafe using something like Slick ( http://slick.typesafe.com/) I would really like to do something like that, and maybe we will in a couple of months. However, in the near term, I think the top priorities are going to be performance and stability. Michael
Not getting it
Hi all, I've got something which I think should be straightforward but it's not so I'm not getting it. I have an 8 node spark 0.9.0 cluster also running HDFS. Workers have 16g of memory using 8 cores. In HDFS I have a CSV file of 110M lines of 9 columns (e.g., [key,a,b,c...]). I have another file of 25K lines containing some number of keys which might be in my CSV file. (Yes, I know I should use an RDBMS or shark or something. I'll get to that but this is toy problem that I'm using to get some intuition with spark.) Working on each file individually spark has no problem manipulating the files. If I try and join or union+filter though I can't seem to find the join of the two files. Code is along the lines of val fileA = sc.textFile(hdfs://.../fileA_110M.csv).map{_.split(,)}.keyBy{_(0)} val fileB = sc.textFile(hdfs://.../fileB_25k.csv).keyBy{x = x} And trying things like fileA.join(fileB) gives me heap OOM. Trying (fileA ++ fileB.map{case (k,v) = (k, Array(v))}).groupBy{_._1}.filter{case (k, (_, xs)) = xs.exists{_.length == 1} just causes spark to freeze. (In all the cases I'm trying I just use a final .count to force the results.) I suspect I'm missing something fundamental about bringing the keyed data together into the same partitions so it can be efficiently joined but I've given up for now. If anyone can shed some light (Beyond, No really. Use shark.) on what I'm not understanding it would be most helpful. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316.html Sent from the Apache Spark User List mailing list archive at Nabble.com.