Re: Kmeans example reduceByKey slow
Hi Tsai, Could you share more information about the machine you used and the training parameters (runs, k, and iterations)? It can help solve your issues. Thanks! Best, Xiangrui On Sun, Mar 23, 2014 at 3:15 AM, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, At the reduceBuyKey stage, it takes a few minutes before the tasks start working. I have -Dspark.default.parallelism=127 cores (n-1). CPU/Network/IO is idling across all nodes when this is happening. And there is nothing particular on the master log file. From the spark-shell: 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:124 as TID 538 on executor 2: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:124 as 38765155 bytes in 193 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:125 as TID 539 on executor 1: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:125 as 38765155 bytes in 96 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:126 as TID 540 on executor 0: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:126 as 38765155 bytes in 100 ms But it stops there for some significant time before any movement. In the stage detail of the UI, I can see that there are 127 tasks running but the duration each is at least a few minutes. I'm working off local storage (not hdfs) and the kmeans data is about 6.5GB (50M rows). Is this a normal behaviour? Thanks!
Re: Kmeans example reduceByKey slow
Hi, This is on a 4 nodes cluster each with 32 cores/256GB Ram. (0.9.0) is deployed in a stand alone mode. Each worker is configured with 192GB. Spark executor memory is also 192GB. This is on the first iteration. K=50. Here’s the code I use: http://pastebin.com/2yXL3y8i , which is a copy-and-paste of the example. Thanks! On 24 Mar, 2014, at 2:46 pm, Xiangrui Meng men...@gmail.com wrote: Hi Tsai, Could you share more information about the machine you used and the training parameters (runs, k, and iterations)? It can help solve your issues. Thanks! Best, Xiangrui On Sun, Mar 23, 2014 at 3:15 AM, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, At the reduceBuyKey stage, it takes a few minutes before the tasks start working. I have -Dspark.default.parallelism=127 cores (n-1). CPU/Network/IO is idling across all nodes when this is happening. And there is nothing particular on the master log file. From the spark-shell: 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:124 as TID 538 on executor 2: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:124 as 38765155 bytes in 193 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:125 as TID 539 on executor 1: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:125 as 38765155 bytes in 96 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:126 as TID 540 on executor 0: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:126 as 38765155 bytes in 100 ms But it stops there for some significant time before any movement. In the stage detail of the UI, I can see that there are 127 tasks running but the duration each is at least a few minutes. I'm working off local storage (not hdfs) and the kmeans data is about 6.5GB (50M rows). Is this a normal behaviour? Thanks!
spark executor/driver log files management
Hi, I have few questions regarding log file management in spark: 1. Currently I did not find any way to modify the lof file name for executor/drivers). Its hardcoded as stdout and stderr. Also there is no log rotation. In case of streaming application this will grow forever and become unmanageable. Is there any way to overcome this? Thanks, -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
GC overhead limit exceeded in Spark-interactive shell
Hi All !! I am getting the following error in interactive spark-shell [0.8.1] *org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed more than 0 times; aborting job java.lang.OutOfMemoryError: GC overhead limit exceeded* But i had set the following in the spark.env.sh and hadoop-env.sh export SPARK_DEAMON_MEMORY=8g export SPARK_WORKER_MEMORY=8g export SPARK_DEAMON_JAVA_OPTS=-Xms8g -Xmx8g export SPARK_JAVA_OPTS=-Xms8g -Xmx8g export HADOOP_HEAPSIZE=4000 Any suggestions ?? -- *Sai Prasanna. AN* *II M.Tech (CS), SSSIHL*
Re: Kmeans example reduceByKey slow
K = 50 is certainly a large number for k-means. If there is no particular reason to have 50 clusters, could you try to reduce it to, e.g, 100 or 1000? Also, the example code is not for large-scale problems. You should use the KMeans algorithm in mllib clustering for your problem. -Xiangrui On Sun, Mar 23, 2014 at 11:53 PM, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, This is on a 4 nodes cluster each with 32 cores/256GB Ram. (0.9.0) is deployed in a stand alone mode. Each worker is configured with 192GB. Spark executor memory is also 192GB. This is on the first iteration. K=50. Here's the code I use: http://pastebin.com/2yXL3y8i , which is a copy-and-paste of the example. Thanks! On 24 Mar, 2014, at 2:46 pm, Xiangrui Meng men...@gmail.com wrote: Hi Tsai, Could you share more information about the machine you used and the training parameters (runs, k, and iterations)? It can help solve your issues. Thanks! Best, Xiangrui On Sun, Mar 23, 2014 at 3:15 AM, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, At the reduceBuyKey stage, it takes a few minutes before the tasks start working. I have -Dspark.default.parallelism=127 cores (n-1). CPU/Network/IO is idling across all nodes when this is happening. And there is nothing particular on the master log file. From the spark-shell: 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:124 as TID 538 on executor 2: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:124 as 38765155 bytes in 193 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:125 as TID 539 on executor 1: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:125 as 38765155 bytes in 96 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:126 as TID 540 on executor 0: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:126 as 38765155 bytes in 100 ms But it stops there for some significant time before any movement. In the stage detail of the UI, I can see that there are 127 tasks running but the duration each is at least a few minutes. I'm working off local storage (not hdfs) and the kmeans data is about 6.5GB (50M rows). Is this a normal behaviour? Thanks!
Re: Kmeans example reduceByKey slow
Thanks, Let me try with a smaller K. Does the size of the input data matters for the example? Currently I have 50M rows. What is a reasonable size to demonstrate the capability of Spark? On 24 Mar, 2014, at 3:38 pm, Xiangrui Meng men...@gmail.com wrote: K = 50 is certainly a large number for k-means. If there is no particular reason to have 50 clusters, could you try to reduce it to, e.g, 100 or 1000? Also, the example code is not for large-scale problems. You should use the KMeans algorithm in mllib clustering for your problem. -Xiangrui On Sun, Mar 23, 2014 at 11:53 PM, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, This is on a 4 nodes cluster each with 32 cores/256GB Ram. (0.9.0) is deployed in a stand alone mode. Each worker is configured with 192GB. Spark executor memory is also 192GB. This is on the first iteration. K=50. Here's the code I use: http://pastebin.com/2yXL3y8i , which is a copy-and-paste of the example. Thanks! On 24 Mar, 2014, at 2:46 pm, Xiangrui Meng men...@gmail.com wrote: Hi Tsai, Could you share more information about the machine you used and the training parameters (runs, k, and iterations)? It can help solve your issues. Thanks! Best, Xiangrui On Sun, Mar 23, 2014 at 3:15 AM, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, At the reduceBuyKey stage, it takes a few minutes before the tasks start working. I have -Dspark.default.parallelism=127 cores (n-1). CPU/Network/IO is idling across all nodes when this is happening. And there is nothing particular on the master log file. From the spark-shell: 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:124 as TID 538 on executor 2: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:124 as 38765155 bytes in 193 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:125 as TID 539 on executor 1: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:125 as 38765155 bytes in 96 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:126 as TID 540 on executor 0: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:126 as 38765155 bytes in 100 ms But it stops there for some significant time before any movement. In the stage detail of the UI, I can see that there are 127 tasks running but the duration each is at least a few minutes. I'm working off local storage (not hdfs) and the kmeans data is about 6.5GB (50M rows). Is this a normal behaviour? Thanks!
Re: GC overhead limit exceeded in Spark-interactive shell
To be clear on what your configuration will do: - SPARK_DAEMON_MEMORY=8g will make your standalone master and worker schedulers have a lot of memory. These do not impact the actual amount of useful memory given to executors or your driver, however, so you probably don't need to set this. - SPARK_WORKER_MEMORY=8g allows each worker to provide up to 8g worth of executors. In itself, this does not actually give executors more memory, just allows them to get more. This is a necessary setting. - *_JAVA_OPTS should not be used to set memory parameters, as they may or may not override their *_MEMORY counterparts. The two things you are not configuring are the amount of memory for your driver (for a 0.8.1 spark-shell, you must use SPARK_MEM) and the amount of memory given to each executor (spark.executor.memory). By default, Spark executors are only 512MB in size, so you will probably want to increase this up to the value of SPARK_WORKER_MEMORY. This will provide you with 1 executor per worker that uses all available memory, which is probably what you want for testing purposes (it is less ideal for sharing a cluster). In case the distinction between workers/masters (collectively daemons), executors, and drivers is not clear to you, please check out the corresponding documentation on Spark clusters: https://spark.incubator.apache.org/docs/0.8.1/cluster-overview.html On Mon, Mar 24, 2014 at 12:24 AM, Sai Prasanna ansaiprasa...@gmail.comwrote: Hi All !! I am getting the following error in interactive spark-shell [0.8.1] *org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed more than 0 times; aborting job java.lang.OutOfMemoryError: GC overhead limit exceeded* But i had set the following in the spark.env.sh and hadoop-env.sh export SPARK_DEAMON_MEMORY=8g export SPARK_WORKER_MEMORY=8g export SPARK_DEAMON_JAVA_OPTS=-Xms8g -Xmx8g export SPARK_JAVA_OPTS=-Xms8g -Xmx8g export HADOOP_HEAPSIZE=4000 Any suggestions ?? -- *Sai Prasanna. AN* *II M.Tech (CS), SSSIHL*
Re: GC overhead limit exceeded in Spark-interactive shell
PS you have a typo in DEAMON - its DAEMON. Thanks Latin. On Mar 24, 2014 7:25 AM, Sai Prasanna ansaiprasa...@gmail.com wrote: Hi All !! I am getting the following error in interactive spark-shell [0.8.1] *org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed more than 0 times; aborting job java.lang.OutOfMemoryError: GC overhead limit exceeded* But i had set the following in the spark.env.sh and hadoop-env.sh export SPARK_DEAMON_MEMORY=8g export SPARK_WORKER_MEMORY=8g export SPARK_DEAMON_JAVA_OPTS=-Xms8g -Xmx8g export SPARK_JAVA_OPTS=-Xms8g -Xmx8g export HADOOP_HEAPSIZE=4000 Any suggestions ?? -- *Sai Prasanna. AN* *II M.Tech (CS), SSSIHL*
Re: Java API - Serialization Issue
I am also facing the same problem. I have implemented Serializable for my code, but the exception is thrown from third party libraries on which I have no control . Exception in thread main org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: (lib class name here) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) Is it mandatory that Serializable must be implemented for dependent jars as well? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Java-API-Serialization-Issue-tp1460p3086.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: java.io.NotSerializableException Of dependent Java lib.
Can someone answer this question please? Specifically about the Serializable implementation of dependent jars .. ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-io-NotSerializableException-Of-dependent-Java-lib-tp1973p3087.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark worker threads waiting
Yes my input data is partitioned in a completely random manner, so each worker that produces shuffle data processes only a part of it. The way I understand it is that before each stage each workers needs to distribute correct partitions (based on hash key ranges?) to other workers. And this is where I would expect network input/output to spike. And after that the processing should occur, where I would expect CPU to spike. If you check the image I have attached earlier in this thread you can see that for example between 8.25 and 8.30 Disk, network and CPU are almost not utilized. I would be very happy to receive some suggestions on how to debug this. Or where you think would be a good place to start looking? Kind regards, Domen On Fri, Mar 21, 2014 at 6:58 PM, Mayur Rustagi [via Apache Spark User List] ml-node+s1001560n3006...@n3.nabble.com wrote: In your task details I dont see a large skew in tasks so the low cpu usage period occurs between stages or during stage execution. One issue possible is your data is 89GB Shuffle read, if the machine producing the shuffle data is not the one processing it, data shuffling across machines may be causing the delay. Can you look at your network traffic during that period to see performance. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Mar 21, 2014 at 8:33 AM, sparrow [hidden email]http://user/SendEmail.jtp?type=nodenode=3006i=0 wrote: Here is the stage overview: [image: Inline image 2] and here are the stage details for stage 0: [image: Inline image 1] Transformations from first stage to the second one are trivial, so that should not be the bottle neck (apart from keyBy().groupByKey() that causes the shuffle write/read). Kind regards, Domen On Thu, Mar 20, 2014 at 8:38 PM, Mayur Rustagi [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=2988i=0 wrote: I would have preferred the stage window details aggregate task details(above the task list). Basically if you run a job , it translates to multiple stages, each stage translates to multiple tasks (each run on worker core). So some breakup like my job is taking 16 min 3 stages , stage 1 : 5 min Stage 2: 10 min stage 3:1 min in Stage 2 give me task aggregate screenshot which talks about 50 percentile, 75 percentile 100 percentile. Regards Mayur Mayur Rustagi Ph: a href=tel:%2B1%20%28760%29%20203%203257 value=a href=tel:%2B17602033257 value=+17602033257 target=_blank +17602033257 target=_blanka href=tel:%2B1%20%28760%29%20203%203257 value=+17602033257 target=_blank+1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Mar 20, 2014 at 9:55 AM, sparrow [hidden email]http://user/SendEmail.jtp?type=nodenode=2962i=0 wrote: This is what the web UI looks like: [image: Inline image 1] I also tail all the worker logs and theese are the last entries before the waiting begins: 14/03/20 13:29:10 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, minRequest: 10066329 14/03/20 13:29:10 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 29853 non-zero-bytes blocks out of 37714 blocks 14/03/20 13:29:10 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 5 remote gets in 62 ms [PSYoungGen: 12464967K-3767331K(10552192K)] 36074093K-29053085K(44805696K), 0.6765460 secs] [Times: user=5.35 sys=0.02, real=0.67 secs] [PSYoungGen: 10779466K-3203826K(9806400K)] 35384386K-31562169K(44059904K), 0.6925730 secs] [Times: user=5.47 sys=0.00, real=0.70 secs] From the screenshot above you can see that task take ~ 6 minutes to complete. The amount of time it takes the tasks to complete seems to depend on the amount of input data. If s3 input string captures 2.5 times less data (less data to shuffle write and later read), same tasks take 1 minute. Any idea how to debug what the workers are doing? Domen On Wed, Mar 19, 2014 at 5:27 PM, Mayur Rustagi [via Apache Spark User List] [hidden email]http://user/SendEmail.jtp?type=nodenode=2938i=0 wrote: You could have some outlier task that is preventing the next set of stages from launching. Can you check out stages state in the Spark WebUI, is any task running or is everything halted. Regards Mayur Mayur Rustagi Ph: a href=tel:%2B1%20%28760%29%20203%203257 value=a href=tel:%2B17602033257 value=a href=tel:%2B17602033257 value=+17602033257 target=_blank+17602033257 target=_blanka href=tel:%2B17602033257 value=a href=tel:%2B17602033257 value=+17602033257 target=_blank+17602033257 target=_blanka href=tel:%2B17602033257 value=+17602033257 target=_blank +17602033257 target=_blanka href=tel:%2B1%20%28760%29%20203%203257 value=a href=tel:%2B17602033257 value=a href=tel:%2B17602033257 value=+17602033257 target=_blank+17602033257 target=_blanka href=tel:%2B17602033257
Re: Spark Java example using external Jars
Hello, Has anyone got any ideas? I am not quite sure if my problem is an exact fit for Spark. Since in reality in this section of my program i am not really doing a reduce job simply a group by and partition. Would calling pipe on the Partiotined JavaRDD do the trick? Are there any examples using pipe? Thanks Dimitri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Java-example-using-external-Jars-tp2647p3092.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: No space left on device exception
Patrick, correct. I have a 16 node cluster. On 14 machines out of 16, the inode usage was about 50%. On two of the slaves, one had inode usage of 96% and on the other it was 100%. When i went into /tmp on these two nodes - there were a bunch of /tmp/spark* subdirectories which I deleted. This resulted in the inode consumption falling back down to 50% and the job running successfully to completion. The slave with the 100% inode usage had the spark/work/app/number/stdout with the message that the filesystem is running out of disk space (which I posted in the original email that started this thread). What is interesting is that only two out of the 16 slaves had this problem :) Ognen On 3/24/14, 12:57 AM, Patrick Wendell wrote: Ognen - just so I understand. The issue is that there weren't enough inodes and this was causing a No space left on device error? Is that correct? If so, that's good to know because it's definitely counter intuitive. On Sun, Mar 23, 2014 at 8:36 PM, Ognen Duzlevski og...@nengoiksvelzud.com wrote: I would love to work on this (and other) stuff if I can bother someone with questions offline or on a dev mailing list. Ognen On 3/23/14, 10:04 PM, Aaron Davidson wrote: Thanks for bringing this up, 100% inode utilization is an issue I haven't seen raised before and this raises another issue which is not on our current roadmap for state cleanup (cleaning up data which was not fully cleaned up from a crashed process). On Sun, Mar 23, 2014 at 7:57 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: Bleh, strike that, one of my slaves was at 100% inode utilization on the file system. It was /tmp/spark* leftovers that apparently did not get cleaned up properly after failed or interrupted jobs. Mental note - run a cron job on all slaves and master to clean up /tmp/spark* regularly. Thanks (and sorry for the noise)! Ognen On 3/23/14, 9:52 PM, Ognen Duzlevski wrote: Aaron, thanks for replying. I am very much puzzled as to what is going on. A job that used to run on the same cluster is failing with this mysterious message about not having enough disk space when in fact I can see through watch df -h that the free space is always hovering around 3+GB on the disk and the free inodes are at 50% (this is on master). I went through each slave and the spark/work/app*/stderr and stdout and spark/logs/*out files and no mention of too many open files failures on any of the slaves nor on the master :( Thanks Ognen On 3/23/14, 8:38 PM, Aaron Davidson wrote: By default, with P partitions (for both the pre-shuffle stage and post-shuffle), there are P^2 files created. With spark.shuffle.consolidateFiles turned on, we would instead create only P files. Disk space consumption is largely unaffected, however. by the number of partitions unless each partition is particularly small. You might look at the actual executors' logs, as it's possible that this error was caused by an earlier exception, such as too many open files. On Sun, Mar 23, 2014 at 4:46 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: On 3/23/14, 5:49 PM, Matei Zaharia wrote: You can set spark.local.dir to put this data somewhere other than /tmp if /tmp is full. Actually it's recommended to have multiple local disks and set to to a comma-separated list of directories, one per disk. Matei, does the number of tasks/partitions in a transformation influence something in terms of disk space consumption? Or inode consumption? Thanks, Ognen -- A distributed system is one in which the failure of a computer you didn't even know existed can render your own computer unusable -- Leslie Lamport -- No matter what they ever do to us, we must always act for the love of our people and the earth. We must not react out of hatred against those who have no sense. -- John Trudell -- “A distributed system is one in which the failure of a computer you didn’t even know existed can render your own computer unusable” -- Leslie Lamport
Re: How many partitions is my RDD split into?
Oh, glad to know it's that simple! Patrick, in your last comment did you mean filter in? As in I start with one year of data and filter it so I have one day left? I'm assuming in that case the empty partitions would be for all the days that got filtered out. Nick 2014년 3월 24일 월요일, Patrick Wendellpwend...@gmail.com님이 작성한 메시지: As Mark said you can actually access this easily. The main issue I've seen from a performance perspective is people having a bunch of really small partitions. This will still work but the performance will improve if you coalesce the partitions using rdd.coalesce(). This can happen for example if you do a highly selective filter on an RDD. For instance, you filter out one day of data from a dataset of a year. - Patrick On Sun, Mar 23, 2014 at 9:53 PM, Mark Hamstra m...@clearstorydata.comjavascript:; wrote: It's much simpler: rdd.partitions.size On Sun, Mar 23, 2014 at 9:24 PM, Nicholas Chammas nicholas.cham...@gmail.com javascript:; wrote: Hey there fellow Dukes of Data, How can I tell how many partitions my RDD is split into? I'm interested in knowing because, from what I gather, having a good number of partitions is good for performance. If I'm looking to understand how my pipeline is performing, say for a parallelized write out to HDFS, knowing how many partitions an RDD has would be a good thing to check. Is that correct? I could not find an obvious method or property to see how my RDD is partitioned. Instead, I devised the following thingy: def f(idx, itr): yield idx rdd = sc.parallelize([1, 2, 3, 4], 4) rdd.mapPartitionsWithIndex(f).count() Frankly, I'm not sure what I'm doing here, but this seems to give me the answer I'm looking for. Derp. :) So in summary, should I care about how finely my RDDs are partitioned? And how would I check on that? Nick View this message in context: How many partitions is my RDD split into? Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: java.io.NotSerializableException Of dependent Java lib.
We now have a method to work this around. For the classes that can't easily implement serialized, we wrap this class a scala object. For example: class A {} // This class is not serializable, object AHolder { private val a: A = new A() def get: A = a } This works cause spark will serialize the instance that are used in the closure. But the scala object won't be serialized and shipped. It's loaded locally in each worker JVM. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-io-NotSerializableException-Of-dependent-Java-lib-tp1973p3095.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
mapPartitions use case
Dear all, Sorry for asking such a basic question, but someone can explain when one should use mapPartiontions instead of map. Thanks Jaonary
Akka error with largish job (works fine for smaller versions)
What does this error mean: @hadoop-s2.oculus.local:45186]: Error [Association failed with [akka.tcp://spark@hadoop-s2.oculus.local:45186]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://spark@hadoop-s2.oculus.local:45186] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: hadoop-s2.oculus.loca\ l/192.168.0.47:45186 ] ? -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: No space left on device exception
Another thing I have noticed is that out of my master+15 slaves, two slaves always carry a higher inode load. So for example right now I am running an intensive job that takes about an hour to finish and two slaves have been showing an increase in inode consumption (they are about 10% above the rest of the slaves+master) and increasing. Ognen On 3/24/14, 7:00 AM, Ognen Duzlevski wrote: Patrick, correct. I have a 16 node cluster. On 14 machines out of 16, the inode usage was about 50%. On two of the slaves, one had inode usage of 96% and on the other it was 100%. When i went into /tmp on these two nodes - there were a bunch of /tmp/spark* subdirectories which I deleted. This resulted in the inode consumption falling back down to 50% and the job running successfully to completion. The slave with the 100% inode usage had the spark/work/app/number/stdout with the message that the filesystem is running out of disk space (which I posted in the original email that started this thread). What is interesting is that only two out of the 16 slaves had this problem :) Ognen On 3/24/14, 12:57 AM, Patrick Wendell wrote: Ognen - just so I understand. The issue is that there weren't enough inodes and this was causing a No space left on device error? Is that correct? If so, that's good to know because it's definitely counter intuitive. On Sun, Mar 23, 2014 at 8:36 PM, Ognen Duzlevski og...@nengoiksvelzud.com wrote: I would love to work on this (and other) stuff if I can bother someone with questions offline or on a dev mailing list. Ognen On 3/23/14, 10:04 PM, Aaron Davidson wrote: Thanks for bringing this up, 100% inode utilization is an issue I haven't seen raised before and this raises another issue which is not on our current roadmap for state cleanup (cleaning up data which was not fully cleaned up from a crashed process). On Sun, Mar 23, 2014 at 7:57 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: Bleh, strike that, one of my slaves was at 100% inode utilization on the file system. It was /tmp/spark* leftovers that apparently did not get cleaned up properly after failed or interrupted jobs. Mental note - run a cron job on all slaves and master to clean up /tmp/spark* regularly. Thanks (and sorry for the noise)! Ognen On 3/23/14, 9:52 PM, Ognen Duzlevski wrote: Aaron, thanks for replying. I am very much puzzled as to what is going on. A job that used to run on the same cluster is failing with this mysterious message about not having enough disk space when in fact I can see through watch df -h that the free space is always hovering around 3+GB on the disk and the free inodes are at 50% (this is on master). I went through each slave and the spark/work/app*/stderr and stdout and spark/logs/*out files and no mention of too many open files failures on any of the slaves nor on the master :( Thanks Ognen On 3/23/14, 8:38 PM, Aaron Davidson wrote: By default, with P partitions (for both the pre-shuffle stage and post-shuffle), there are P^2 files created. With spark.shuffle.consolidateFiles turned on, we would instead create only P files. Disk space consumption is largely unaffected, however. by the number of partitions unless each partition is particularly small. You might look at the actual executors' logs, as it's possible that this error was caused by an earlier exception, such as too many open files. On Sun, Mar 23, 2014 at 4:46 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: On 3/23/14, 5:49 PM, Matei Zaharia wrote: You can set spark.local.dir to put this data somewhere other than /tmp if /tmp is full. Actually it's recommended to have multiple local disks and set to to a comma-separated list of directories, one per disk. Matei, does the number of tasks/partitions in a transformation influence something in terms of disk space consumption? Or inode consumption? Thanks, Ognen -- A distributed system is one in which the failure of a computer you didn't even know existed can render your own computer unusable -- Leslie Lamport -- No matter what they ever do to us, we must always act for the love of our people and the earth. We must not react out of hatred against those who have no sense. -- John Trudell -- “A distributed system is one in which the failure of a computer you didn’t even know existed can render your own computer unusable” -- Leslie Lamport
Re: mapPartitions use case
I've seen two cases most commonly: The first is when I need to create some processing object to process each record. If that object creation is expensive, creating one per record becomes prohibitive. So instead, we use mapPartition, and create one per partition, and use it on each record in the partition. The other is I've often found it much more efficient, when summarizing data, to use a mutable form of the summary object, running over each record in a partition, then reduce those per-partition results, than to create a summary object per record and reduce that much larger set pf summary objects. Again, it saves a lot of object creation. On Mon, Mar 24, 2014 at 8:57 AM, Jaonary Rabarisoa jaon...@gmail.comwrote: Dear all, Sorry for asking such a basic question, but someone can explain when one should use mapPartiontions instead of map. Thanks Jaonary -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: Sliding Window operations do not work as documented
Hi All, I found out why this problem exists. Consider the following scenario: - a DStream is created from any source. (I've checked with file and socket) - No actions are applied to this DStream - Sliding Window operation is applied to this DStream and an action is applied to the sliding window. In this case, Spark will not even read the input stream in the batch in which the sliding interval isn't a multiple of batch interval. Put another way, it won't read the input when it doesn't have to apply the window function. This is happening because all transformations in Spark are lazy. How to fix this or workaround it (see line#3): JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(1 * 60 * 1000)); JavaDStreamString inputStream = stcObj.textFileStream(/Input); inputStream.print(); // This is the workaround JavaDStreamString objWindow = inputStream.window(new Duration(windowLen*60*1000), new Duration(slideInt*60*1000)); objWindow.dstream().saveAsTextFiles(/Output, ); The Window operations example on the streaming guide implies that Spark will read the stream in every batch, which is not happening because of the lazy transformations. Wherever sliding window would be used, in most of the cases, no actions will be taken on the pre-window batch, hence my gut feeling was that Streaming would read every batch if any actions are being taken in the windowed stream. Regards, Sanjay On Friday, 21 March 2014 8:06 PM, Sanjay Awatramani sanjay_a...@yahoo.com wrote: Hi, I want to run a map/reduce process over last 5 seconds of data, every 4 seconds. This is quite similar to the sliding window pictorial example under Window Operations section on http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html . The RDDs returned by window transformation function are incorrect in my case. To investigate this further, I ran a series of examples with varying values of window length slide interval. Summary of the test results: (window length, slide interval) - result (3,1) - success (4,2) - success (3,2) - fail (4,3) - fail (5,4) - fail (5,2) - fail The only condition mentioned in the doc is that the two values(5 4) should be multiples of batch interval(1 in my case) and obviously, I get a run time error if I attempt to violate this condition. Looking at my results, it seems that failures result when the slide interval isn't a multiple of window length. My code: JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(1 * 60 * 1000)); JavaDStreamString inputStream = stcObj.textFileStream(/Input); JavaDStreamString objWindow = inputStream.window(new Duration(windowLen*60*1000), new Duration(slideInt*60*1000)); objWindow.dstream().saveAsTextFiles(/Output, ); Detailed results: (3,1) - success @t_0: [inputStream's RDD@t_0] @t_1: [inputStream's RDD@t_0,1] @t_2: [inputStream's RDD@t_0,1,2] @t_3: [inputStream's RDD@t_1,2,3] @t_4: [inputStream's RDD@t_2,3,4] @t_5: [inputStream's RDD@t_3,4,5] (4,2) - success @t_0: nothing @t_1: [inputStream's RDD@t_0,1] @t_2: nothing @t_3: [inputStream's RDD@t_0,1,2,3] @t_4: nothing @t_5: [inputStream's RDD@t_2,3,4,5] (3,2) - fail @t_0: nothing @t_1: [inputStream's RDD@t_0,1] @t_2: nothing @t_3: [inputStream's RDD@t_2,3] //(expected RDD@t_1,2,3) @t_4: nothing @t_5: [inputStream's RDD@t_4,5] //(expected RDD@t_3,4,5) (4,3) - fail @t_0: nothing @t_1: nothing @t_2: [inputStream's RDD@t_0,1,2] @t_3: nothing @t_4: nothing @t_5: [inputStream's RDD@t_3,4,5] //(expected RDD@t_2,3,4,5) (5,4) - fail @t_0: nothing @t_1: nothing @t_2: nothing @t_3: [inputStream's RDD@t_0,1,2,3] @t_4: nothing @t_5: nothing @t_6: nothing @t_7: [inputStream's RDD@t_4,5,6,7] //(expected RDD@t_3,4,5,6,7) (5,2) - fail @t_0: nothing @t_1: [inputStream's RDD@t_0,1] @t_2: nothing @t_3: [inputStream's RDD@t_0,1,2,3] @t_4: nothing @t_5: [inputStream's RDD@t_2,3,4,5] //(expected RDD@t_1,2,3,4,5) @t_6: nothing @t_7: [inputStream's RDD@t_4,5,6,7] //(expected RDD@t_3,4,5,6,7) I have run all the above examples twice to be sure ! I believe either my understanding of sliding window mechanism is incorrect or there is a problem in the sliding window mechanism. Regards, Sanjay
Re: How many partitions is my RDD split into?
Mark, This appears to be a Scala-only feature. :( Patrick, Are we planning to add this to PySpark? Nick On Mon, Mar 24, 2014 at 12:53 AM, Mark Hamstra m...@clearstorydata.comwrote: It's much simpler: rdd.partitions.size On Sun, Mar 23, 2014 at 9:24 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hey there fellow Dukes of Data, How can I tell how many partitions my RDD is split into? I'm interested in knowing because, from what I gather, having a good number of partitions is good for performance. If I'm looking to understand how my pipeline is performing, say for a parallelized write out to HDFS, knowing how many partitions an RDD has would be a good thing to check. Is that correct? I could not find an obvious method or property to see how my RDD is partitioned. Instead, I devised the following thingy: def f(idx, itr): yield idx rdd = sc.parallelize([1, 2, 3, 4], 4) rdd.mapPartitionsWithIndex(f).count() Frankly, I'm not sure what I'm doing here, but this seems to give me the answer I'm looking for. Derp. :) So in summary, should I care about how finely my RDDs are partitioned? And how would I check on that? Nick -- View this message in context: How many partitions is my RDD split into?http://apache-spark-user-list.1001560.n3.nabble.com/How-many-partitions-is-my-RDD-split-into-tp3072.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
Re: Problem starting worker processes in standalone mode
Oh, I also forgot to mention: I start the master and workers (call ./sbin/start-all.sh), and then start the shell: MASTER=spark://localhost:7077 ./bin/spark-shell Then I get the exceptions... Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-starting-worker-processes-in-standalone-mode-tp3102p3103.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
remove duplicates
I have a DStream like this: ..RDD[a,b],RDD[b,c].. Is there a way to remove duplicates across the entire DStream? Ie: I would like the output to be (by removing one of the b's): ..RDD[a],RDD[b,c].. or ..RDD[a,b],RDD[c].. Thanks -Adrian
Re: sbt/sbt assembly fails with ssl certificate error
After a long time (may be a month) I could do a fresh build for 2.0.0-mr1-cdh4.5.0...I was using the cached files in .ivy2/cache My case is especially painful since I have to build behind a firewall... @Sean thanks for the fix...I think we should put a test for https/firewall compilation as well...
Re: Kmeans example reduceByKey slow
Number of rows doesn't matter much as long as you have enough workers to distribute the work. K-means has complexity O(n * d * k), where n is number of points, d is the dimension, and k is the number of clusters. If you use the KMeans implementation from MLlib, the initialization stage is done on master, so a large k would slow down the initialization stage. If your data is sparse, the latest change to KMeans will help with the speed, depending on how sparse your data is. -Xiangrui On Mon, Mar 24, 2014 at 12:44 AM, Tsai Li Ming mailingl...@ltsai.com wrote: Thanks, Let me try with a smaller K. Does the size of the input data matters for the example? Currently I have 50M rows. What is a reasonable size to demonstrate the capability of Spark? On 24 Mar, 2014, at 3:38 pm, Xiangrui Meng men...@gmail.com wrote: K = 50 is certainly a large number for k-means. If there is no particular reason to have 50 clusters, could you try to reduce it to, e.g, 100 or 1000? Also, the example code is not for large-scale problems. You should use the KMeans algorithm in mllib clustering for your problem. -Xiangrui On Sun, Mar 23, 2014 at 11:53 PM, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, This is on a 4 nodes cluster each with 32 cores/256GB Ram. (0.9.0) is deployed in a stand alone mode. Each worker is configured with 192GB. Spark executor memory is also 192GB. This is on the first iteration. K=50. Here's the code I use: http://pastebin.com/2yXL3y8i , which is a copy-and-paste of the example. Thanks! On 24 Mar, 2014, at 2:46 pm, Xiangrui Meng men...@gmail.com wrote: Hi Tsai, Could you share more information about the machine you used and the training parameters (runs, k, and iterations)? It can help solve your issues. Thanks! Best, Xiangrui On Sun, Mar 23, 2014 at 3:15 AM, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, At the reduceBuyKey stage, it takes a few minutes before the tasks start working. I have -Dspark.default.parallelism=127 cores (n-1). CPU/Network/IO is idling across all nodes when this is happening. And there is nothing particular on the master log file. From the spark-shell: 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:124 as TID 538 on executor 2: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:124 as 38765155 bytes in 193 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:125 as TID 539 on executor 1: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:125 as 38765155 bytes in 96 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:126 as TID 540 on executor 0: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:126 as 38765155 bytes in 100 ms But it stops there for some significant time before any movement. In the stage detail of the UI, I can see that there are 127 tasks running but the duration each is at least a few minutes. I'm working off local storage (not hdfs) and the kmeans data is about 6.5GB (50M rows). Is this a normal behaviour? Thanks!
distinct in data frame in spark
Hi, I have a very simple use case: I have an rdd as following: d = [[1,2,3,4],[1,5,2,3],[2,3,4,5]] Now, I want to remove all the duplicates from a column and return the remaining frame.. For example: If i want to remove the duplicate based on column 1. Then basically I would remove either row 1 or row 2 in my final result.. because the column 1 of both first and second row is the same element (1) .. and hence the duplicate.. So, a possible result is: output = [[1,2,3,4],[2,3,4,5]] How do I do this in spark? Thanks
Re: GC overhead limit exceeded in Spark-interactive shell
1. Note sure on this, I don't believe we change the defaults from Java. 2. SPARK_JAVA_OPTS can be used to set the various Java properties (other than memory heap size itself) 3. If you want to have 8 GB executors then, yes, only two can run on each 16 GB node. (In fact, you should also keep a significant amount of memory free for the OS to use for buffer caching and such.) An executor may use many cores, though, so this shouldn't be an issue. On Mon, Mar 24, 2014 at 2:44 AM, Sai Prasanna ansaiprasa...@gmail.comwrote: Thanks Aaron and Sean... Setting SPARK_MEM finally worked. But i have a small doubt. 1)What is the default value that is allocated for JVM and for HEAP_SPACE for Garbage collector. 2)Usually we set 1/3 of total memory for heap. So what should be the practice for Spark processes. Where how should we set them. And what is the default value does it assume? 3) Moreover, if we set SPARK_MEM to say 8g and i have a 16g RAM, can only two executors run max on a node of a cluster ?? Thanks Again !! On Mon, Mar 24, 2014 at 2:13 PM, Sean Owen so...@cloudera.com wrote: PS you have a typo in DEAMON - its DAEMON. Thanks Latin. On Mar 24, 2014 7:25 AM, Sai Prasanna ansaiprasa...@gmail.com wrote: Hi All !! I am getting the following error in interactive spark-shell [0.8.1] *org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed more than 0 times; aborting job java.lang.OutOfMemoryError: GC overhead limit exceeded* But i had set the following in the spark.env.sh and hadoop-env.sh export SPARK_DEAMON_MEMORY=8g export SPARK_WORKER_MEMORY=8g export SPARK_DEAMON_JAVA_OPTS=-Xms8g -Xmx8g export SPARK_JAVA_OPTS=-Xms8g -Xmx8g export HADOOP_HEAPSIZE=4000 Any suggestions ?? -- *Sai Prasanna. AN* *II M.Tech (CS), SSSIHL* -- *Sai Prasanna. AN* *II M.Tech (CS), SSSIHL* *Entire water in the ocean can never sink a ship, Unless it gets inside. All the pressures of life can never hurt you, Unless you let them in.*
Comparing GraphX and GraphLab
Hello, I'm interested in extending the comparison between GraphX and GraphLab presented in Xin et. al (2013). The evaluation presented there is rather limited as it only compares the frameworks for one algorithm (PageRank) on a cluster with a fixed number of nodes. Are there any graph algorithms where one might expect GraphX to perform better than GraphLab? Do you expect the scaling properties (i.e. performance as a function of # of worker nodes) to differ? Thanks for your help. I'd be happy to share the results of this study with the community, if there's sufficient interest. Regards, Niko
Re: Comparing GraphX and GraphLab
Niko, Comparing some other components will be very useful as wellsvd++ from graphx vs the same algorithm in graphlabalso mllib.recommendation.als implicit/explicit compared to the collaborative filtering toolkit in graphlab... To stress test what's the biggest sparse dataset that you have in mind ? On Mar 24, 2014 11:00 AM, Niko Stahl r.niko.st...@gmail.com wrote: Hello, I'm interested in extending the comparison between GraphX and GraphLab presented in Xin et. al (2013). The evaluation presented there is rather limited as it only compares the frameworks for one algorithm (PageRank) on a cluster with a fixed number of nodes. Are there any graph algorithms where one might expect GraphX to perform better than GraphLab? Do you expect the scaling properties (i.e. performance as a function of # of worker nodes) to differ? Thanks for your help. I'd be happy to share the results of this study with the community, if there's sufficient interest. Regards, Niko
Re: Transitive dependency incompatibility
Just found this issue and wanted to link it here, in case somebody finds this thread later: https://spark-project.atlassian.net/browse/SPARK-939 On Thursday, March 20, 2014 at 11:14 AM, Matei Zaharia wrote: Hi Jaka, I’d recommend rebuilding Spark with a new version of the HTTPClient dependency. In the future we want to add a “put the user’s classpath first” option to let users overwrite dependencies. Matei On Mar 20, 2014, at 8:42 AM, Jaka Jančar j...@kubje.org (mailto:j...@kubje.org) wrote: Could the executor use isolated classloaders, in order not to pullute the environment with it's own stuff? On Wednesday, March 19, 2014 at 8:30 AM, Jaka Jančar wrote: Hi, I'm getting the following error: java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:114) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:99) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:85) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:93) at com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:26) at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96) at com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:155) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:119) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:103) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:334) at com.celtra.analyzer.TrackingLogRDD.createClient(TrackingLogRDD.scala:131) at com.celtra.analyzer.TrackingLogRDD.compute(TrackingLogRDD.scala:117) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) My app uses the AWS SDK, which requires org.apache.httpcomponents:httpclient:4.2. I believe the error is caused by the fact that an older version of the package is already present on the classpath: Spark - Akka - sjson - org.apache.httpcomponents:httpclient:4.1 Spark - jets3t - commons-httpclient:commons-httpclient:3.1 What are my options if I need to use a newer version of the library in my app? Thanks, Jaka
quick start guide: building a standalone scala program
Has anyone successfully followed the instructions on the Quick Start page of the Spark home page to run a standalone Scala application? I can't, and I figure I must be missing something obvious! I'm trying to follow the instructions here as close to word for word as possible: http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala 1. The instructions don't say what directory to create my test application in, but later I'm instructed to run sbt/sbt so I conclude that my working directory must be $SPARK_HOME. (Temporarily ignoring that it is a little weird to be working directly in the Spark distro.) 2. Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala. Copypaste in the code from the instructions exactly, replacing YOUR_SPARK_HOME with my spark home path. 3. Create $SPARK_HOME/mysparktest/simple.sbt. Copypaste in the sbt file from the instructions 4. From the $SPARK_HOME I run sbt/sbt package. It runs through the ENTIRE Spark project! This takes several minutes, and at the end, it says Done packaging. unfortunately, there's nothing in the $SPARK_HOME/mysparktest/ folder other than what I already had there. (Just for fun, I also did what I thought was more logical, which is set my working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt package, but that was even less successful: I got an error: awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ So, help? I'm sure these instructions work because people are following them every day, but I can't tell what they are supposed to do. Thanks! Diana
Re: quick start guide: building a standalone scala program
Hi, Diana, See my inlined answer -- Nan Zhu On Monday, March 24, 2014 at 3:44 PM, Diana Carroll wrote: Has anyone successfully followed the instructions on the Quick Start page of the Spark home page to run a standalone Scala application? I can't, and I figure I must be missing something obvious! I'm trying to follow the instructions here as close to word for word as possible: http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala 1. The instructions don't say what directory to create my test application in, but later I'm instructed to run sbt/sbt so I conclude that my working directory must be $SPARK_HOME. (Temporarily ignoring that it is a little weird to be working directly in the Spark distro.) You can create your application in any directory, just follow the sbt project dir structure 2. Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala. Copypaste in the code from the instructions exactly, replacing YOUR_SPARK_HOME with my spark home path. should be correct 3. Create $SPARK_HOME/mysparktest/simple.sbt. Copypaste in the sbt file from the instructions should be correct 4. From the $SPARK_HOME I run sbt/sbt package. It runs through the ENTIRE Spark project! This takes several minutes, and at the end, it says Done packaging. unfortunately, there's nothing in the $SPARK_HOME/mysparktest/ folder other than what I already had there. because you are in Spark directory, don’t need to do that actually , the dependency on Spark is resolved by sbt (Just for fun, I also did what I thought was more logical, which is set my working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt package, but that was even less successful: I got an error: awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ So, help? I'm sure these instructions work because people are following them every day, but I can't tell what they are supposed to do. Thanks! Diana
Re: quick start guide: building a standalone scala program
I am able to run standalone apps. I think you are making one mistake that throws you off from there onwards. You don't need to put your app under SPARK_HOME. I would create it in its own folder somewhere, it follows the rules of any standalone scala program (including the layout). In the giude, $SPARK_HOME is only relevant to find the Readme file which they are parsing/word-counting. But otherwise the compile time dependencies on spark would be resolved via the sbt file (or the pom file if you look at the Java example). So for example I put my app under C:\Source\spark-code and the jar gets created in C:\Source\spark-code\target\scala-2.9.3 (or 2.10 if you're running with scala 2.10 as the example shows). But for that part of the guide, it's not any different than building a scala app. On Mon, Mar 24, 2014 at 3:44 PM, Diana Carroll dcarr...@cloudera.com wrote: Has anyone successfully followed the instructions on the Quick Start page of the Spark home page to run a standalone Scala application? I can't, and I figure I must be missing something obvious! I'm trying to follow the instructions here as close to word for word as possible: http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala 1. The instructions don't say what directory to create my test application in, but later I'm instructed to run sbt/sbt so I conclude that my working directory must be $SPARK_HOME. (Temporarily ignoring that it is a little weird to be working directly in the Spark distro.) 2. Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala. Copypaste in the code from the instructions exactly, replacing YOUR_SPARK_HOME with my spark home path. 3. Create $SPARK_HOME/mysparktest/simple.sbt. Copypaste in the sbt file from the instructions 4. From the $SPARK_HOME I run sbt/sbt package. It runs through the ENTIRE Spark project! This takes several minutes, and at the end, it says Done packaging. unfortunately, there's nothing in the $SPARK_HOME/mysparktest/ folder other than what I already had there. (Just for fun, I also did what I thought was more logical, which is set my working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt package, but that was even less successful: I got an error: awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ So, help? I'm sure these instructions work because people are following them every day, but I can't tell what they are supposed to do. Thanks! Diana
Re: Comparing GraphX and GraphLab
Hi Niko, The GraphX team recently wrote a longer paper with more benchmarks and optimizations: http://arxiv.org/abs/1402.2394 Regarding the performance of GraphX vs. GraphLab, I believe GraphX currently outperforms GraphLab only in end-to-end benchmarks of pipelines involving both graph-parallel operations (e.g. PageRank) and data-parallel operations (e.g. ETL and data cleaning). This is due to the overhead of moving data between GraphLab and a data-parallel system like Spark. There's an example of a pipeline in Section 5.2 in the linked paper, and the results are in Figure 10 on page 11. GraphX has a very similar architecture as GraphLab, so I wouldn't expect it to have better performance on pure graph algorithms. GraphX may actually be slower when Spark is configured to launch many tasks per machine, because shuffle communication between Spark tasks on the same machine still occurs by reading and writing from disk, while GraphLab uses shared memory for same-machine communication. I've CC'd Joey and Reynold as well. Ankur http://www.ankurdave.com/ On Mar 24, 2014 11:00 AM, Niko Stahl r.niko.st...@gmail.com wrote: I'm interested in extending the comparison between GraphX and GraphLab presented in Xin et. al (2013). The evaluation presented there is rather limited as it only compares the frameworks for one algorithm (PageRank) on a cluster with a fixed number of nodes. Are there any graph algorithms where one might expect GraphX to perform better than GraphLab? Do you expect the scaling properties (i.e. performance as a function of # of worker nodes) to differ?
Re: quick start guide: building a standalone scala program
Yana: Thanks. Can you give me a transcript of the actual commands you are running? THanks! Diana On Mon, Mar 24, 2014 at 3:59 PM, Yana Kadiyska yana.kadiy...@gmail.comwrote: I am able to run standalone apps. I think you are making one mistake that throws you off from there onwards. You don't need to put your app under SPARK_HOME. I would create it in its own folder somewhere, it follows the rules of any standalone scala program (including the layout). In the giude, $SPARK_HOME is only relevant to find the Readme file which they are parsing/word-counting. But otherwise the compile time dependencies on spark would be resolved via the sbt file (or the pom file if you look at the Java example). So for example I put my app under C:\Source\spark-code and the jar gets created in C:\Source\spark-code\target\scala-2.9.3 (or 2.10 if you're running with scala 2.10 as the example shows). But for that part of the guide, it's not any different than building a scala app. On Mon, Mar 24, 2014 at 3:44 PM, Diana Carroll dcarr...@cloudera.com wrote: Has anyone successfully followed the instructions on the Quick Start page of the Spark home page to run a standalone Scala application? I can't, and I figure I must be missing something obvious! I'm trying to follow the instructions here as close to word for word as possible: http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala 1. The instructions don't say what directory to create my test application in, but later I'm instructed to run sbt/sbt so I conclude that my working directory must be $SPARK_HOME. (Temporarily ignoring that it is a little weird to be working directly in the Spark distro.) 2. Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala. Copypaste in the code from the instructions exactly, replacing YOUR_SPARK_HOME with my spark home path. 3. Create $SPARK_HOME/mysparktest/simple.sbt. Copypaste in the sbt file from the instructions 4. From the $SPARK_HOME I run sbt/sbt package. It runs through the ENTIRE Spark project! This takes several minutes, and at the end, it says Done packaging. unfortunately, there's nothing in the $SPARK_HOME/mysparktest/ folder other than what I already had there. (Just for fun, I also did what I thought was more logical, which is set my working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt package, but that was even less successful: I got an error: awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ So, help? I'm sure these instructions work because people are following them every day, but I can't tell what they are supposed to do. Thanks! Diana
[bug?] streaming window unexpected behaviour
I have what I would call unexpected behaviour when using window on a stream. I have 2 windowed streams with a 5s batch interval. One window stream is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow What I've noticed is that the 1st RDD produced by bigWindow is incorrect and is of the size 5s not 10s. So instead of waiting 10s and producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s. Why is this happening? To me it looks like a bug; Matei or TD can you verify that this is correct behaviour? I have the following code val ssc = new StreamingContext(conf, Seconds(5)) val smallWindowStream = ssc.queueStream(smallWindowRddQueue) val bigWindowStream = ssc.queueStream(bigWindowRddQueue) val smallWindow = smallWindowReshapedStream.window(Seconds(5), Seconds(5)) .reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3)) val bigWindow = bigWindowReshapedStream.window(Seconds(10), Seconds(5)) .reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3)) -Adrian
Re: quick start guide: building a standalone scala program
Diana, Anywhere on the filesystem you have read/write access (you need not be in your spark home directory): mkdir myproject cd myproject mkdir project mkdir target mkdir -p src/main/scala cp $mypath/$mymysource.scala src/main/scala/ cp $mypath/myproject.sbt . Make sure that myproject.sbt has the following in it: name := I NEED A NAME! version := I NEED A VERSION! scalaVersion := 2.10.3 libraryDependencies += org.apache.spark % spark-core_2.10 % 0.9.0-incubating If you will be using Hadoop/HDFS functionality you will need the below line also libraryDependencies += org.apache.hadoop % hadoop-client % 2.2.0 The above assumes you are using Spark 0.9 and Scala 2.10.3. If you are using 0.8.1 - adjust appropriately. That's it. Now you can do sbt compile sbt run arguments if any You can also do sbt package to produce a jar file of your code which you can then add to the SparkContext at runtime. In a more complicated project you may need to have a bit more involved hierarchy like com.github.dianacarroll which will then translate to src/main/scala/com/github/dianacarroll/ where you can put your multiple .scala files which will then have to be a part of a package com.github.dianacarroll (you can just put that as your first line in each of these scala files). I am new to Java/Scala so this is how I do it. More educated Java/Scala programmers may tell you otherwise ;) You can get more complicated with the sbt project subrirectory but you can read independently about sbt and what it can do, above is the bare minimum. Let me know if that helped. Ognen On 3/24/14, 2:44 PM, Diana Carroll wrote: Has anyone successfully followed the instructions on the Quick Start page of the Spark home page to run a standalone Scala application? I can't, and I figure I must be missing something obvious! I'm trying to follow the instructions here as close to word for word as possible: http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala 1. The instructions don't say what directory to create my test application in, but later I'm instructed to run sbt/sbt so I conclude that my working directory must be $SPARK_HOME. (Temporarily ignoring that it is a little weird to be working directly in the Spark distro.) 2. Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala. Copypaste in the code from the instructions exactly, replacing YOUR_SPARK_HOME with my spark home path. 3. Create $SPARK_HOME/mysparktest/simple.sbt. Copypaste in the sbt file from the instructions 4. From the $SPARK_HOME I run sbt/sbt package. It runs through the ENTIRE Spark project! This takes several minutes, and at the end, it says Done packaging. unfortunately, there's nothing in the $SPARK_HOME/mysparktest/ folder other than what I already had there. (Just for fun, I also did what I thought was more logical, which is set my working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt package, but that was even less successful: I got an error: awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ So, help? I'm sure these instructions work because people are following them every day, but I can't tell what they are supposed to do. Thanks! Diana
Re: quick start guide: building a standalone scala program
Thanks, Nan Zhu. You say that my problems are because you are in Spark directory, don't need to do that actually , the dependency on Spark is resolved by sbt I did try it initially in what I thought was a much more typical place, e.g. ~/mywork/sparktest1. But as I said in my email: (Just for fun, I also did what I thought was more logical, which is set my working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt package, but that was even less successful: I got an error: awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ On Mon, Mar 24, 2014 at 4:00 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Hi, Diana, See my inlined answer -- Nan Zhu On Monday, March 24, 2014 at 3:44 PM, Diana Carroll wrote: Has anyone successfully followed the instructions on the Quick Start page of the Spark home page to run a standalone Scala application? I can't, and I figure I must be missing something obvious! I'm trying to follow the instructions here as close to word for word as possible: http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala 1. The instructions don't say what directory to create my test application in, but later I'm instructed to run sbt/sbt so I conclude that my working directory must be $SPARK_HOME. (Temporarily ignoring that it is a little weird to be working directly in the Spark distro.) You can create your application in any directory, just follow the sbt project dir structure 2. Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala. Copypaste in the code from the instructions exactly, replacing YOUR_SPARK_HOME with my spark home path. should be correct 3. Create $SPARK_HOME/mysparktest/simple.sbt. Copypaste in the sbt file from the instructions should be correct 4. From the $SPARK_HOME I run sbt/sbt package. It runs through the ENTIRE Spark project! This takes several minutes, and at the end, it says Done packaging. unfortunately, there's nothing in the $SPARK_HOME/mysparktest/ folder other than what I already had there. because you are in Spark directory, don't need to do that actually , the dependency on Spark is resolved by sbt (Just for fun, I also did what I thought was more logical, which is set my working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt package, but that was even less successful: I got an error: awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ So, help? I'm sure these instructions work because people are following them every day, but I can't tell what they are supposed to do. Thanks! Diana
Re: quick start guide: building a standalone scala program
Thanks Ongen. Unfortunately I'm not able to follow your instructions either. In particular: sbt compile sbt run arguments if any This doesn't work for me because there's no program on my path called sbt. The instructions in the Quick Start guide are specific that I should call $SPARK_HOME/sbt/sbt. I don't have any other executable on my system called sbt. Did you download and install sbt separately? In following the Quick Start guide, that was not stated as a requirement, and I'm trying to run through the guide word for word. Diana On Mon, Mar 24, 2014 at 4:12 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: Diana, Anywhere on the filesystem you have read/write access (you need not be in your spark home directory): mkdir myproject cd myproject mkdir project mkdir target mkdir -p src/main/scala cp $mypath/$mymysource.scala src/main/scala/ cp $mypath/myproject.sbt . Make sure that myproject.sbt has the following in it: name := I NEED A NAME! version := I NEED A VERSION! scalaVersion := 2.10.3 libraryDependencies += org.apache.spark % spark-core_2.10 % 0.9.0-incubating If you will be using Hadoop/HDFS functionality you will need the below line also libraryDependencies += org.apache.hadoop % hadoop-client % 2.2.0 The above assumes you are using Spark 0.9 and Scala 2.10.3. If you are using 0.8.1 - adjust appropriately. That's it. Now you can do sbt compile sbt run arguments if any You can also do sbt package to produce a jar file of your code which you can then add to the SparkContext at runtime. In a more complicated project you may need to have a bit more involved hierarchy like com.github.dianacarroll which will then translate to src/main/scala/com/github/dianacarroll/ where you can put your multiple .scala files which will then have to be a part of a package com.github.dianacarroll (you can just put that as your first line in each of these scala files). I am new to Java/Scala so this is how I do it. More educated Java/Scala programmers may tell you otherwise ;) You can get more complicated with the sbt project subrirectory but you can read independently about sbt and what it can do, above is the bare minimum. Let me know if that helped. Ognen On 3/24/14, 2:44 PM, Diana Carroll wrote: Has anyone successfully followed the instructions on the Quick Start page of the Spark home page to run a standalone Scala application? I can't, and I figure I must be missing something obvious! I'm trying to follow the instructions here as close to word for word as possible: http://spark.apache.org/docs/latest/quick-start.html#a- standalone-app-in-scala 1. The instructions don't say what directory to create my test application in, but later I'm instructed to run sbt/sbt so I conclude that my working directory must be $SPARK_HOME. (Temporarily ignoring that it is a little weird to be working directly in the Spark distro.) 2. Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala. Copypaste in the code from the instructions exactly, replacing YOUR_SPARK_HOME with my spark home path. 3. Create $SPARK_HOME/mysparktest/simple.sbt. Copypaste in the sbt file from the instructions 4. From the $SPARK_HOME I run sbt/sbt package. It runs through the ENTIRE Spark project! This takes several minutes, and at the end, it says Done packaging. unfortunately, there's nothing in the $SPARK_HOME/mysparktest/ folder other than what I already had there. (Just for fun, I also did what I thought was more logical, which is set my working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt package, but that was even less successful: I got an error: awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ So, help? I'm sure these instructions work because people are following them every day, but I can't tell what they are supposed to do. Thanks! Diana
Re: How many partitions is my RDD split into?
There is no direct way to get this in pyspark, but you can get it from the underlying java rdd. For example a = sc.parallelize([1,2,3,4], 2) a._jrdd.splits().size() On Mon, Mar 24, 2014 at 7:46 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Mark, This appears to be a Scala-only feature. :( Patrick, Are we planning to add this to PySpark? Nick On Mon, Mar 24, 2014 at 12:53 AM, Mark Hamstra m...@clearstorydata.comwrote: It's much simpler: rdd.partitions.size On Sun, Mar 23, 2014 at 9:24 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hey there fellow Dukes of Data, How can I tell how many partitions my RDD is split into? I'm interested in knowing because, from what I gather, having a good number of partitions is good for performance. If I'm looking to understand how my pipeline is performing, say for a parallelized write out to HDFS, knowing how many partitions an RDD has would be a good thing to check. Is that correct? I could not find an obvious method or property to see how my RDD is partitioned. Instead, I devised the following thingy: def f(idx, itr): yield idx rdd = sc.parallelize([1, 2, 3, 4], 4) rdd.mapPartitionsWithIndex(f).count() Frankly, I'm not sure what I'm doing here, but this seems to give me the answer I'm looking for. Derp. :) So in summary, should I care about how finely my RDDs are partitioned? And how would I check on that? Nick -- View this message in context: How many partitions is my RDD split into?http://apache-spark-user-list.1001560.n3.nabble.com/How-many-partitions-is-my-RDD-split-into-tp3072.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
question about partitions
Hi, Quick question about partitions. If my RDD is partitioned into 5 partitions, does that mean that I am constraining it to exist on at most 5 machines? Thanks
Re: quick start guide: building a standalone scala program
Yeah, that's exactly what I did. Unfortunately it doesn't work: $SPARK_HOME/sbt/sbt package awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ On Mon, Mar 24, 2014 at 4:25 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: You can use any sbt on your machine, including the one that comes with spark. For example, try: ~/path_to_spark/sbt/sbt compile ~/path_to_spark/sbt/sbt run arguments Or you can just add that to your PATH by: export $PATH=$PATH:~/path_to_spark/sbt To make it permanent, you can add it to your ~/.bashrc or ~/.bash_profile or ??? depending on the system you are using. If you are on Windows, sorry, I can't offer any help there ;) Ognen On 3/24/14, 3:16 PM, Diana Carroll wrote: Thanks Ongen. Unfortunately I'm not able to follow your instructions either. In particular: sbt compile sbt run arguments if any This doesn't work for me because there's no program on my path called sbt. The instructions in the Quick Start guide are specific that I should call $SPARK_HOME/sbt/sbt. I don't have any other executable on my system called sbt. Did you download and install sbt separately? In following the Quick Start guide, that was not stated as a requirement, and I'm trying to run through the guide word for word. Diana On Mon, Mar 24, 2014 at 4:12 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: Diana, Anywhere on the filesystem you have read/write access (you need not be in your spark home directory): mkdir myproject cd myproject mkdir project mkdir target mkdir -p src/main/scala cp $mypath/$mymysource.scala src/main/scala/ cp $mypath/myproject.sbt . Make sure that myproject.sbt has the following in it: name := I NEED A NAME! version := I NEED A VERSION! scalaVersion := 2.10.3 libraryDependencies += org.apache.spark % spark-core_2.10 % 0.9.0-incubating If you will be using Hadoop/HDFS functionality you will need the below line also libraryDependencies += org.apache.hadoop % hadoop-client % 2.2.0 The above assumes you are using Spark 0.9 and Scala 2.10.3. If you are using 0.8.1 - adjust appropriately. That's it. Now you can do sbt compile sbt run arguments if any You can also do sbt package to produce a jar file of your code which you can then add to the SparkContext at runtime. In a more complicated project you may need to have a bit more involved hierarchy like com.github.dianacarroll which will then translate to src/main/scala/com/github/dianacarroll/ where you can put your multiple .scala files which will then have to be a part of a package com.github.dianacarroll (you can just put that as your first line in each of these scala files). I am new to Java/Scala so this is how I do it. More educated Java/Scala programmers may tell you otherwise ;) You can get more complicated with the sbt project subrirectory but you can read independently about sbt and what it can do, above is the bare minimum. Let me know if that helped. Ognen On 3/24/14, 2:44 PM, Diana Carroll wrote: Has anyone successfully followed the instructions on the Quick Start page of the Spark home page to run a standalone Scala application? I can't, and I figure I must be missing something obvious! I'm trying to follow the instructions here as close to word for word as possible: http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala 1. The instructions don't say what directory to create my test application in, but later I'm instructed to run sbt/sbt so I conclude that my working directory must be $SPARK_HOME. (Temporarily ignoring that it is a little weird to be working directly in the Spark distro.) 2. Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala. Copypaste in the code from the instructions exactly, replacing YOUR_SPARK_HOME with my spark home path. 3. Create $SPARK_HOME/mysparktest/simple.sbt. Copypaste in the sbt file from the instructions 4. From the $SPARK_HOME I run sbt/sbt package. It runs through the ENTIRE Spark project! This takes several minutes, and at the end, it says Done packaging. unfortunately, there's nothing in the $SPARK_HOME/mysparktest/ folder other than what I already had there. (Just for fun, I also did what I thought was more logical, which is set my working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt package, but that was even less successful: I got an error: awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line
Re: How many partitions is my RDD split into?
Ah we should just add this directly in pyspark - it's as simple as the code Shivaram just wrote. - Patrick On Mon, Mar 24, 2014 at 1:25 PM, Shivaram Venkataraman shivaram.venkatara...@gmail.com wrote: There is no direct way to get this in pyspark, but you can get it from the underlying java rdd. For example a = sc.parallelize([1,2,3,4], 2) a._jrdd.splits().size() On Mon, Mar 24, 2014 at 7:46 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Mark, This appears to be a Scala-only feature. :( Patrick, Are we planning to add this to PySpark? Nick On Mon, Mar 24, 2014 at 12:53 AM, Mark Hamstra m...@clearstorydata.com wrote: It's much simpler: rdd.partitions.size On Sun, Mar 23, 2014 at 9:24 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hey there fellow Dukes of Data, How can I tell how many partitions my RDD is split into? I'm interested in knowing because, from what I gather, having a good number of partitions is good for performance. If I'm looking to understand how my pipeline is performing, say for a parallelized write out to HDFS, knowing how many partitions an RDD has would be a good thing to check. Is that correct? I could not find an obvious method or property to see how my RDD is partitioned. Instead, I devised the following thingy: def f(idx, itr): yield idx rdd = sc.parallelize([1, 2, 3, 4], 4) rdd.mapPartitionsWithIndex(f).count() Frankly, I'm not sure what I'm doing here, but this seems to give me the answer I'm looking for. Derp. :) So in summary, should I care about how finely my RDDs are partitioned? And how would I check on that? Nick View this message in context: How many partitions is my RDD split into? Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: quick start guide: building a standalone scala program
Ah crud, I guess you are right, I am using the sbt I installed manually with my Scala installation. Well, here is what you can do: mkdir ~/bin cd ~/bin wget http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/0.13.1/sbt-launch.jar vi sbt Put the following contents into your new file: SBT_OPTS=-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M java $SBT_OPTS -jar `dirname $0`/sbt-launch.jar $@ :wq! chmod u+x sbt Now you can do ~/bin/sbt compile ~/bin/sbt package, run etc. Ognen On 3/24/14, 3:30 PM, Diana Carroll wrote: Yeah, that's exactly what I did. Unfortunately it doesn't work: $SPARK_HOME/sbt/sbt package awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ On Mon, Mar 24, 2014 at 4:25 PM, Ognen Duzlevski og...@plainvanillagames.com mailto:og...@plainvanillagames.com wrote: You can use any sbt on your machine, including the one that comes with spark. For example, try: ~/path_to_spark/sbt/sbt compile ~/path_to_spark/sbt/sbt run arguments Or you can just add that to your PATH by: export $PATH=$PATH:~/path_to_spark/sbt To make it permanent, you can add it to your ~/.bashrc or ~/.bash_profile or ??? depending on the system you are using. If you are on Windows, sorry, I can't offer any help there ;) Ognen On 3/24/14, 3:16 PM, Diana Carroll wrote: Thanks Ongen. Unfortunately I'm not able to follow your instructions either. In particular: sbt compile sbt run arguments if any This doesn't work for me because there's no program on my path called sbt. The instructions in the Quick Start guide are specific that I should call $SPARK_HOME/sbt/sbt. I don't have any other executable on my system called sbt. Did you download and install sbt separately? In following the Quick Start guide, that was not stated as a requirement, and I'm trying to run through the guide word for word. Diana On Mon, Mar 24, 2014 at 4:12 PM, Ognen Duzlevski og...@plainvanillagames.com mailto:og...@plainvanillagames.com wrote: Diana, Anywhere on the filesystem you have read/write access (you need not be in your spark home directory): mkdir myproject cd myproject mkdir project mkdir target mkdir -p src/main/scala cp $mypath/$mymysource.scala src/main/scala/ cp $mypath/myproject.sbt . Make sure that myproject.sbt has the following in it: name := I NEED A NAME! version := I NEED A VERSION! scalaVersion := 2.10.3 libraryDependencies += org.apache.spark % spark-core_2.10 % 0.9.0-incubating If you will be using Hadoop/HDFS functionality you will need the below line also libraryDependencies += org.apache.hadoop % hadoop-client % 2.2.0 The above assumes you are using Spark 0.9 and Scala 2.10.3. If you are using 0.8.1 - adjust appropriately. That's it. Now you can do sbt compile sbt run arguments if any You can also do sbt package to produce a jar file of your code which you can then add to the SparkContext at runtime. In a more complicated project you may need to have a bit more involved hierarchy like com.github.dianacarroll which will then translate to src/main/scala/com/github/dianacarroll/ where you can put your multiple .scala files which will then have to be a part of a package com.github.dianacarroll (you can just put that as your first line in each of these scala files). I am new to Java/Scala so this is how I do it. More educated Java/Scala programmers may tell you otherwise ;) You can get more complicated with the sbt project subrirectory but you can read independently about sbt and what it can do, above is the bare minimum. Let me know if that helped. Ognen On 3/24/14, 2:44 PM, Diana Carroll wrote: Has anyone successfully followed the instructions on the Quick Start page of the Spark home page to run a standalone Scala application? I can't, and I figure I must be missing something obvious! I'm trying to follow the instructions here as close to word for word as possible: http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala 1. The instructions don't say what directory to create my test
Re: quick start guide: building a standalone scala program
Hi, Diana, You don’t need to use spark-distributed sbt just download sbt from its official website and set your PATH to the right place Best, -- Nan Zhu On Monday, March 24, 2014 at 4:30 PM, Diana Carroll wrote: Yeah, that's exactly what I did. Unfortunately it doesn't work: $SPARK_HOME/sbt/sbt package awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ On Mon, Mar 24, 2014 at 4:25 PM, Ognen Duzlevski og...@plainvanillagames.com (mailto:og...@plainvanillagames.com) wrote: You can use any sbt on your machine, including the one that comes with spark. For example, try: ~/path_to_spark/sbt/sbt compile ~/path_to_spark/sbt/sbt run arguments Or you can just add that to your PATH by: export $PATH=$PATH:~/path_to_spark/sbt To make it permanent, you can add it to your ~/.bashrc or ~/.bash_profile or ??? depending on the system you are using. If you are on Windows, sorry, I can't offer any help there ;) Ognen On 3/24/14, 3:16 PM, Diana Carroll wrote: Thanks Ongen. Unfortunately I'm not able to follow your instructions either. In particular: sbt compile sbt run arguments if any This doesn't work for me because there's no program on my path called sbt. The instructions in the Quick Start guide are specific that I should call $SPARK_HOME/sbt/sbt. I don't have any other executable on my system called sbt. Did you download and install sbt separately? In following the Quick Start guide, that was not stated as a requirement, and I'm trying to run through the guide word for word. Diana On Mon, Mar 24, 2014 at 4:12 PM, Ognen Duzlevski og...@plainvanillagames.com (mailto:og...@plainvanillagames.com) wrote: Diana, Anywhere on the filesystem you have read/write access (you need not be in your spark home directory): mkdir myproject cd myproject mkdir project mkdir target mkdir -p src/main/scala cp $mypath/$mymysource.scala src/main/scala/ cp $mypath/myproject.sbt . Make sure that myproject.sbt has the following in it: name := I NEED A NAME! version := I NEED A VERSION! scalaVersion := 2.10.3 libraryDependencies += org.apache.spark % spark-core_2.10 % 0.9.0-incubating If you will be using Hadoop/HDFS functionality you will need the below line also libraryDependencies += org.apache.hadoop % hadoop-client % 2.2.0 The above assumes you are using Spark 0.9 and Scala 2.10.3. If you are using 0.8.1 - adjust appropriately. That's it. Now you can do sbt compile sbt run arguments if any You can also do sbt package to produce a jar file of your code which you can then add to the SparkContext at runtime. In a more complicated project you may need to have a bit more involved hierarchy like com.github.dianacarroll which will then translate to src/main/scala/com/github/dianacarroll/ where you can put your multiple .scala files which will then have to be a part of a package com.github.dianacarroll (you can just put that as your first line in each of these scala files). I am new to Java/Scala so this is how I do it. More educated Java/Scala programmers may tell you otherwise ;) You can get more complicated with the sbt project subrirectory but you can read independently about sbt and what it can do, above is the bare minimum. Let me know if that helped. Ognen On 3/24/14, 2:44 PM, Diana Carroll wrote: Has anyone successfully followed the instructions on the Quick Start page of the Spark home page to run a standalone Scala application? I can't, and I figure I must be missing something obvious! I'm trying to follow the instructions here as close to word for word as possible: http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala 1. The instructions don't say what directory to create my test application in, but later I'm instructed to run sbt/sbt so I conclude that my working directory must be $SPARK_HOME. (Temporarily ignoring that it is a little weird to be working directly in the Spark distro.) 2. Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala. Copypaste in the code from the instructions exactly, replacing YOUR_SPARK_HOME with my spark home path. 3.
Re: question about partitions
For instance, I need to work with an RDD in terms of N parts. Will calling RDD.coalesce(N) possibly cause processing bottlenecks? On Mon, Mar 24, 2014 at 1:28 PM, Walrus theCat walrusthe...@gmail.comwrote: Hi, Quick question about partitions. If my RDD is partitioned into 5 partitions, does that mean that I am constraining it to exist on at most 5 machines? Thanks
Re: quick start guide: building a standalone scala program
Thanks for your help, everyone. Several folks have explained that I can surely solve the problem by installing sbt. But I'm trying to get the instructions working *as written on the Spark website*. The instructions not only don't have you install sbt separately...they actually specifically have you use the sbt that is distributed with Spark. If it is not possible to build your own Spark programs with Spark-distributed sbt, then that's a big hole in the Spark docs that I shall file. And if the sbt that is included with Spark is MEANT to be able to compile your own Spark apps, then that's a product bug. But before I file the bug, I'm still hoping I'm missing something, and someone will point out that I'm missing a small step that will make the Spark distribution of sbt work! Diana On Mon, Mar 24, 2014 at 4:52 PM, Yana Kadiyska yana.kadiy...@gmail.comwrote: Diana, I just tried it on a clean Ubuntu machine, with Spark 0.8 (since like other folks I had sbt preinstalled on my usual machine) I ran the command exactly as Ognen suggested and see Set current project to Simple Project (do you see this -- you should at least be seeing this) and then a bunch of Resolving ... messages. I did get an error there, saying it can't find javax.servlet.orbit. I googled the error and found this thread: http://mail-archives.apache.org/mod_mbox/spark-user/201309.mbox/%3ccajbo4nexyzqe6zgreqjtzzz5zrcoavfen+wmbyced6n1epf...@mail.gmail.com%3E adding the IvyXML fragment they suggested helped in my case (but again, the build pretty clearly complained). If you're still having no luck, I suggest installing sbt and setting SBT_HOME... http://www.scala-sbt.org/ In either case though, it's not a Spark-specific issue...Hopefully some of all this helps. On Mon, Mar 24, 2014 at 4:30 PM, Diana Carroll dcarr...@cloudera.com wrote: Yeah, that's exactly what I did. Unfortunately it doesn't work: $SPARK_HOME/sbt/sbt package awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ On Mon, Mar 24, 2014 at 4:25 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: You can use any sbt on your machine, including the one that comes with spark. For example, try: ~/path_to_spark/sbt/sbt compile ~/path_to_spark/sbt/sbt run arguments Or you can just add that to your PATH by: export $PATH=$PATH:~/path_to_spark/sbt To make it permanent, you can add it to your ~/.bashrc or ~/.bash_profile or ??? depending on the system you are using. If you are on Windows, sorry, I can't offer any help there ;) Ognen On 3/24/14, 3:16 PM, Diana Carroll wrote: Thanks Ongen. Unfortunately I'm not able to follow your instructions either. In particular: sbt compile sbt run arguments if any This doesn't work for me because there's no program on my path called sbt. The instructions in the Quick Start guide are specific that I should call $SPARK_HOME/sbt/sbt. I don't have any other executable on my system called sbt. Did you download and install sbt separately? In following the Quick Start guide, that was not stated as a requirement, and I'm trying to run through the guide word for word. Diana On Mon, Mar 24, 2014 at 4:12 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: Diana, Anywhere on the filesystem you have read/write access (you need not be in your spark home directory): mkdir myproject cd myproject mkdir project mkdir target mkdir -p src/main/scala cp $mypath/$mymysource.scala src/main/scala/ cp $mypath/myproject.sbt . Make sure that myproject.sbt has the following in it: name := I NEED A NAME! version := I NEED A VERSION! scalaVersion := 2.10.3 libraryDependencies += org.apache.spark % spark-core_2.10 % 0.9.0-incubating If you will be using Hadoop/HDFS functionality you will need the below line also libraryDependencies += org.apache.hadoop % hadoop-client % 2.2.0 The above assumes you are using Spark 0.9 and Scala 2.10.3. If you are using 0.8.1 - adjust appropriately. That's it. Now you can do sbt compile sbt run arguments if any You can also do sbt package to produce a jar file of your code which you can then add to the SparkContext at runtime. In a more complicated project you may need to have a bit more involved hierarchy like com.github.dianacarroll which will then translate to src/main/scala/com/github/dianacarroll/ where you can put your multiple .scala files which will then have to be a part of a package com.github.dianacarroll (you can just put
Re: question about partitions
RDD.coalesce should be fine for rebalancing data across all RDD partitions. Coalesce is pretty handy in situations where you have sparse data and want to compact it (e.g. data after applying a strict filter) OR you know the magic number of partitions according to your cluster which will be optimal. One point to watch out though is that if N is greater than your current partitions, you need to pass shuffle=true to coalesce. If N is less than your current partitions (i.e. you are shrinking partitions, do not set shuffle=true, otherwise it will cause additional unnecessary shuffle overhead. On Mon, Mar 24, 2014 at 2:32 PM, Walrus theCat walrusthe...@gmail.comwrote: For instance, I need to work with an RDD in terms of N parts. Will calling RDD.coalesce(N) possibly cause processing bottlenecks? On Mon, Mar 24, 2014 at 1:28 PM, Walrus theCat walrusthe...@gmail.comwrote: Hi, Quick question about partitions. If my RDD is partitioned into 5 partitions, does that mean that I am constraining it to exist on at most 5 machines? Thanks
Re: Writing RDDs to HDFS
Ongen: I don't know why your process is hanging, sorry. But I do know that the way saveAsTextFile works is that you give it a path to a directory, not a file. The file is saved in multiple parts, corresponding to the partitions. (part-0, part-1 etc.) (Presumably it does this because it allows each partition to be saved on the local disk, to minimize network traffic. It's how Hadoop works, too.) On Mon, Mar 24, 2014 at 5:00 PM, Ognen Duzlevski og...@nengoiksvelzud.comwrote: Is someRDD.saveAsTextFile(hdfs://ip:port/path/final_filename.txt) supposed to work? Meaning, can I save files to the HDFS fs this way? I tried: val r = sc.parallelize(List(1,2,3,4,5,6,7,8)) r.saveAsTextFile(hdfs://ip:port/path/file.txt) and it is just hanging. At the same time on my HDFS it created file.txt but as a directory which has subdirectories (the final one is empty). Thanks! Ognen
Re: Writing RDDs to HDFS
Diana, thanks. I am not very well acquainted with HDFS. I use hdfs -put to put things as files into the filesystem (and sc.textFile to get stuff out of them in Spark) and I see that they appear to be saved as files that are replicated across 3 out of the 16 nodes in the hdfs cluster (which is my case is also my Spark cluster) - hence, I was puzzled why a directory this time. What you are saying makes sense, I suppose. As for the hanging - I am curious about that myself. Ognen On 3/24/14, 5:01 PM, Diana Carroll wrote: Ongen: I don't know why your process is hanging, sorry. But I do know that the way saveAsTextFile works is that you give it a path to a directory, not a file. The file is saved in multiple parts, corresponding to the partitions. (part-0, part-1 etc.) (Presumably it does this because it allows each partition to be saved on the local disk, to minimize network traffic. It's how Hadoop works, too.) On Mon, Mar 24, 2014 at 5:00 PM, Ognen Duzlevski og...@nengoiksvelzud.com mailto:og...@nengoiksvelzud.com wrote: Is someRDD.saveAsTextFile(hdfs://ip:port/path/final_filename.txt) supposed to work? Meaning, can I save files to the HDFS fs this way? I tried: val r = sc.parallelize(List(1,2,3,4,5,6,7,8)) r.saveAsTextFile(hdfs://ip:port/path/file.txt) and it is just hanging. At the same time on my HDFS it created file.txt but as a directory which has subdirectories (the final one is empty). Thanks! Ognen
Re: error loading large files in PySpark 0.9.0
Thanks Matei, unfortunately doesn't seem to fix it. I tried batchSize = 10, 100, as well as 1 (which should reproduce the 0.8.1 behavior?), and it stalls at the same point in each case. -- Jeremy - jeremy freeman, phd neuroscientist @thefreemanlab On Mar 23, 2014, at 9:56 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Jeremy, what happens if you pass batchSize=10 as an argument to your SparkContext? It tries to serialize that many objects together at a time, which might be too much. By default the batchSize is 1024. Matei On Mar 23, 2014, at 10:11 AM, Jeremy Freeman freeman.jer...@gmail.com wrote: Hi all, Hitting a mysterious error loading large text files, specific to PySpark 0.9.0. In PySpark 0.8.1, this works: data = sc.textFile(path/to/myfile) data.count() But in 0.9.0, it stalls. There are indications of completion up to: 14/03/17 16:54:24 INFO TaskSetManager: Finished TID 4 in 1699 ms on X.X.X.X (progress: 15/537) 14/03/17 16:54:24 INFO DAGScheduler: Completed ResultTask(5, 4) And then this repeats indefinitely 14/03/17 16:54:24 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_5, runningTasks: 144 14/03/17 16:54:25 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_5, runningTasks: 144 Always stalls at the same place. There's nothing in stderr on the workers, but in stdout there are several of these messages: INFO PythonRDD: stdin writer to Python finished early So perhaps the real error is being suppressed as in https://spark-project.atlassian.net/browse/SPARK-1025 Data is just rows of space-separated numbers, ~20GB, with 300k rows and 50k characters per row. Running on a private cluster with 10 nodes, 100GB / 16 cores each, Python v 2.7.6. I doubt the data is corrupted as it works fine in Scala in 0.8.1 and 0.9.0, and in PySpark in 0.8.1. Happy to post the file, but it should repro for anything with these dimensions. It *might* be specific to long strings: I don't see it with fewer characters (10k) per row, but I also don't see it with many fewer rows but the same number of characters per row. Happy to try and provide more info / help debug! -- Jeremy -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/error-loading-large-files-in-PySpark-0-9-0-tp3049.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Sliding Window operations do not work as documented
Hello Sanjay, Yes, your understanding of lazy semantics is correct. But ideally every batch should read based on the batch interval provided in the StreamingContext. Can you open a JIRA on this? On Mon, Mar 24, 2014 at 7:45 AM, Sanjay Awatramani sanjay_a...@yahoo.com wrote: Hi All, I found out why this problem exists. Consider the following scenario: - a DStream is created from any source. (I've checked with file and socket) - No actions are applied to this DStream - Sliding Window operation is applied to this DStream and an action is applied to the sliding window. In this case, Spark will not even read the input stream in the batch in which the sliding interval isn't a multiple of batch interval. Put another way, it won't read the input when it doesn't have to apply the window function. This is happening because all transformations in Spark are lazy. How to fix this or workaround it (see line#3): JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(1 * 60 * 1000)); JavaDStreamString inputStream = stcObj.textFileStream(/Input); inputStream.print(); // This is the workaround JavaDStreamString objWindow = inputStream.window(new Duration(windowLen*60*1000), new Duration(slideInt*60*1000)); objWindow.dstream().saveAsTextFiles(/Output, ); The Window operations example on the streaming guide implies that Spark will read the stream in every batch, which is not happening because of the lazy transformations. Wherever sliding window would be used, in most of the cases, no actions will be taken on the pre-window batch, hence my gut feeling was that Streaming would read every batch if any actions are being taken in the windowed stream. Regards, Sanjay On Friday, 21 March 2014 8:06 PM, Sanjay Awatramani sanjay_a...@yahoo.com wrote: Hi, I want to run a map/reduce process over last 5 seconds of data, every 4 seconds. This is quite similar to the sliding window pictorial example under Window Operations section on http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html . The RDDs returned by window transformation function are incorrect in my case. To investigate this further, I ran a series of examples with varying values of window length slide interval. Summary of the test results: (window length, slide interval) - result (3,1) - success (4,2) - success (3,2) - fail (4,3) - fail (5,4) - fail (5,2) - fail The only condition mentioned in the doc is that the two values(5 4) should be multiples of batch interval(1 in my case) and obviously, I get a run time error if I attempt to violate this condition. Looking at my results, it seems that failures result when the slide interval isn't a multiple of window length. My code: JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(1 * 60 * 1000)); JavaDStreamString inputStream = stcObj.textFileStream(/Input); JavaDStreamString objWindow = inputStream.window(new Duration(windowLen*60*1000), new Duration(slideInt*60*1000)); objWindow.dstream().saveAsTextFiles(/Output, ); Detailed results: (3,1) - success @t_0: [inputStream's RDD@t_0] @t_1: [inputStream's RDD@t_0,1] @t_2: [inputStream's RDD@t_0,1,2] @t_3: [inputStream's RDD@t_1,2,3] @t_4: [inputStream's RDD@t_2,3,4] @t_5: [inputStream's RDD@t_3,4,5] (4,2) - success @t_0: nothing @t_1: [inputStream's RDD@t_0,1] @t_2: nothing @t_3: [inputStream's RDD@t_0,1,2,3] @t_4: nothing @t_5: [inputStream's RDD@t_2,3,4,5] (3,2) - fail @t_0: nothing @t_1: [inputStream's RDD@t_0,1] @t_2: nothing @t_3: [inputStream's RDD@t_2,3]//(expected RDD@t_1,2,3) @t_4: nothing @t_5: [inputStream's RDD@t_4,5]//(expected RDD@t_3,4,5) (4,3) - fail @t_0: nothing @t_1: nothing @t_2: [inputStream's RDD@t_0,1,2] @t_3: nothing @t_4: nothing @t_5: [inputStream's RDD@t_3,4,5]//(expected RDD@t_2,3,4,5) (5,4) - fail @t_0: nothing @t_1: nothing @t_2: nothing @t_3: [inputStream's RDD@t_0,1,2,3] @t_4: nothing @t_5: nothing @t_6: nothing @t_7: [inputStream's RDD@t_4,5,6,7]//(expected RDD@t_3,4,5,6,7) (5,2) - fail @t_0: nothing @t_1: [inputStream's RDD@t_0,1] @t_2: nothing @t_3: [inputStream's RDD@t_0,1,2,3] @t_4: nothing @t_5: [inputStream's RDD@t_2,3,4,5]//(expected RDD@t_1,2,3,4,5) @t_6: nothing @t_7: [inputStream's RDD@t_4,5,6,7]//(expected RDD@t_3,4,5,6,7) I have run all the above examples twice to be sure ! I believe either my understanding of sliding window mechanism is incorrect or there is a problem in the sliding window mechanism. Regards, Sanjay
Re: [bug?] streaming window unexpected behaviour
Yes, I believe that is current behavior. Essentially, the first few RDDs will be partial windows (assuming window duration sliding interval). TD On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu amoc...@verticalscope.com wrote: I have what I would call unexpected behaviour when using window on a stream. I have 2 windowed streams with a 5s batch interval. One window stream is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow What I've noticed is that the 1st RDD produced by bigWindow is incorrect and is of the size 5s not 10s. So instead of waiting 10s and producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s. Why is this happening? To me it looks like a bug; Matei or TD can you verify that this is correct behaviour? I have the following code val ssc = new StreamingContext(conf, Seconds(5)) val smallWindowStream = ssc.queueStream(smallWindowRddQueue) val bigWindowStream = ssc.queueStream(bigWindowRddQueue) val smallWindow = smallWindowReshapedStream.window(Seconds(5), Seconds(5)) .reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3)) val bigWindow = bigWindowReshapedStream.window(Seconds(10), Seconds(5)) .reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3)) -Adrian
Re: question about partitions
Syed, Thanks for the tip. I'm not sure if coalesce is doing what I'm intending to do, which is, in effect, to subdivide the RDD into N parts (by calling coalesce and doing operations on the partitions.) It sounds like, however, this won't bottleneck my processing power. If this sets off any alarms for anyone, feel free to chime in. On Mon, Mar 24, 2014 at 2:50 PM, Syed A. Hashmi shas...@cloudera.comwrote: RDD.coalesce should be fine for rebalancing data across all RDD partitions. Coalesce is pretty handy in situations where you have sparse data and want to compact it (e.g. data after applying a strict filter) OR you know the magic number of partitions according to your cluster which will be optimal. One point to watch out though is that if N is greater than your current partitions, you need to pass shuffle=true to coalesce. If N is less than your current partitions (i.e. you are shrinking partitions, do not set shuffle=true, otherwise it will cause additional unnecessary shuffle overhead. On Mon, Mar 24, 2014 at 2:32 PM, Walrus theCat walrusthe...@gmail.comwrote: For instance, I need to work with an RDD in terms of N parts. Will calling RDD.coalesce(N) possibly cause processing bottlenecks? On Mon, Mar 24, 2014 at 1:28 PM, Walrus theCat walrusthe...@gmail.comwrote: Hi, Quick question about partitions. If my RDD is partitioned into 5 partitions, does that mean that I am constraining it to exist on at most 5 machines? Thanks
Re: Writing RDDs to HDFS
Just so I can close this thread (in case anyone else runs into this stuff) - I did sleep through the basics of Spark ;). The answer on why my job is in waiting state (hanging) is here: http://spark.incubator.apache.org/docs/latest/spark-standalone.html#resource-scheduling Ognen On 3/24/14, 5:01 PM, Diana Carroll wrote: Ongen: I don't know why your process is hanging, sorry. But I do know that the way saveAsTextFile works is that you give it a path to a directory, not a file. The file is saved in multiple parts, corresponding to the partitions. (part-0, part-1 etc.) (Presumably it does this because it allows each partition to be saved on the local disk, to minimize network traffic. It's how Hadoop works, too.) On Mon, Mar 24, 2014 at 5:00 PM, Ognen Duzlevski og...@nengoiksvelzud.com mailto:og...@nengoiksvelzud.com wrote: Is someRDD.saveAsTextFile(hdfs://ip:port/path/final_filename.txt) supposed to work? Meaning, can I save files to the HDFS fs this way? I tried: val r = sc.parallelize(List(1,2,3,4,5,6,7,8)) r.saveAsTextFile(hdfs://ip:port/path/file.txt) and it is just hanging. At the same time on my HDFS it created file.txt but as a directory which has subdirectories (the final one is empty). Thanks! Ognen -- A distributed system is one in which the failure of a computer you didn't even know existed can render your own computer unusable -- Leslie Lamport
Re: combining operations elegantly
Hi guys, thanks for the information, I'll give it a try with Algebird, thanks again, Richard @Patrick, thanks for the release calendar On Mon, Mar 24, 2014 at 12:16 AM, Patrick Wendell pwend...@gmail.comwrote: Hey All, I think the old thread is here: https://groups.google.com/forum/#!msg/spark-users/gVtOp1xaPdU/Uyy9cQz9H_8J The method proposed in that thread is to create a utility class for doing single-pass aggregations. Using Algebird is a pretty good way to do this and is a bit more flexible since you don't need to create a new utility each time you want to do this. In Spark 1.0 and later you will be able to do this more elegantly with the schema support: myRDD.groupBy('user).select(Sum('clicks) as 'clicks, Average('duration) as 'duration) and it will use a single pass automatically... but that's not quite released yet :) - Patrick On Sun, Mar 23, 2014 at 1:31 PM, Koert Kuipers ko...@tresata.com wrote: i currently typically do something like this: scala val rdd = sc.parallelize(1 to 10) scala import com.twitter.algebird.Operators._ scala import com.twitter.algebird.{Max, Min} scala rdd.map{ x = ( | 1L, | Min(x), | Max(x), | x | )}.reduce(_ + _) res0: (Long, com.twitter.algebird.Min[Int], com.twitter.algebird.Max[Int], Int) = (10,Min(1),Max(10),55) however for this you need twitter algebird dependency. without that you have to code the reduce function on the tuples yourself... another example with 2 columns, where i do conditional count for first column, and simple sum for second: scala sc.parallelize((1 to 10).zip(11 to 20)).map{ case (x, y) = ( | if (x 5) 1 else 0, | y | )}.reduce(_ + _) res3: (Int, Int) = (5,155) On Sun, Mar 23, 2014 at 2:26 PM, Richard Siebeling rsiebel...@gmail.com wrote: Hi Koert, Patrick, do you already have an elegant solution to combine multiple operations on a single RDD? Say for example that I want to do a sum over one column, a count and an average over another column, thanks in advance, Richard On Mon, Mar 17, 2014 at 8:20 AM, Richard Siebeling rsiebel...@gmail.com wrote: Patrick, Koert, I'm also very interested in these examples, could you please post them if you find them? thanks in advance, Richard On Thu, Mar 13, 2014 at 9:39 PM, Koert Kuipers ko...@tresata.com wrote: not that long ago there was a nice example on here about how to combine multiple operations on a single RDD. so basically if you want to do a count() and something else, how to roll them into a single job. i think patrick wendell gave the examples. i cant find them anymore patrick can you please repost? thanks!
Splitting RDD and Grouping together to perform computation
Hi,I have large data set of numbers ie RDD and wanted to perform a computation only on groupof two values at a time.For example1,2,3,4,5,6,7... is an RDDCan i group the RDD into (1,2),(3,4),(5,6)...?? and perform the respective computations ?in an efficient manner?As we do'nt have a way to index elements directly using forloop etc..(i,i+1)...is their way to resolve this problem?Please suggest me ..i would be really thankful to you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-and-Grouping-together-to-perform-computation-tp3153.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: quick start guide: building a standalone scala program
Diana, I think you are correct - I just installed wget http://mirror.symnds.com/software/Apache/incubator/spark/spark-0.9.0-incubating/spark-0.9.0-incubating-bin-cdh4.tgz and indeed I see the same error that you see It looks like in previous versions sbt-launch used to just come down in the package, but now they try to get it for you -- and that code seems to have some assumptions on where it is being invoked from On Mon, Mar 24, 2014 at 5:47 PM, Diana Carroll dcarr...@cloudera.com wrote: Thanks for your help, everyone. Several folks have explained that I can surely solve the problem by installing sbt. But I'm trying to get the instructions working as written on the Spark website. The instructions not only don't have you install sbt separately...they actually specifically have you use the sbt that is distributed with Spark. If it is not possible to build your own Spark programs with Spark-distributed sbt, then that's a big hole in the Spark docs that I shall file. And if the sbt that is included with Spark is MEANT to be able to compile your own Spark apps, then that's a product bug. But before I file the bug, I'm still hoping I'm missing something, and someone will point out that I'm missing a small step that will make the Spark distribution of sbt work! Diana On Mon, Mar 24, 2014 at 4:52 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Diana, I just tried it on a clean Ubuntu machine, with Spark 0.8 (since like other folks I had sbt preinstalled on my usual machine) I ran the command exactly as Ognen suggested and see Set current project to Simple Project (do you see this -- you should at least be seeing this) and then a bunch of Resolving ... messages. I did get an error there, saying it can't find javax.servlet.orbit. I googled the error and found this thread: http://mail-archives.apache.org/mod_mbox/spark-user/201309.mbox/%3ccajbo4nexyzqe6zgreqjtzzz5zrcoavfen+wmbyced6n1epf...@mail.gmail.com%3E adding the IvyXML fragment they suggested helped in my case (but again, the build pretty clearly complained). If you're still having no luck, I suggest installing sbt and setting SBT_HOME... http://www.scala-sbt.org/ In either case though, it's not a Spark-specific issue...Hopefully some of all this helps. On Mon, Mar 24, 2014 at 4:30 PM, Diana Carroll dcarr...@cloudera.com wrote: Yeah, that's exactly what I did. Unfortunately it doesn't work: $SPARK_HOME/sbt/sbt package awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ On Mon, Mar 24, 2014 at 4:25 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: You can use any sbt on your machine, including the one that comes with spark. For example, try: ~/path_to_spark/sbt/sbt compile ~/path_to_spark/sbt/sbt run arguments Or you can just add that to your PATH by: export $PATH=$PATH:~/path_to_spark/sbt To make it permanent, you can add it to your ~/.bashrc or ~/.bash_profile or ??? depending on the system you are using. If you are on Windows, sorry, I can't offer any help there ;) Ognen On 3/24/14, 3:16 PM, Diana Carroll wrote: Thanks Ongen. Unfortunately I'm not able to follow your instructions either. In particular: sbt compile sbt run arguments if any This doesn't work for me because there's no program on my path called sbt. The instructions in the Quick Start guide are specific that I should call $SPARK_HOME/sbt/sbt. I don't have any other executable on my system called sbt. Did you download and install sbt separately? In following the Quick Start guide, that was not stated as a requirement, and I'm trying to run through the guide word for word. Diana On Mon, Mar 24, 2014 at 4:12 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: Diana, Anywhere on the filesystem you have read/write access (you need not be in your spark home directory): mkdir myproject cd myproject mkdir project mkdir target mkdir -p src/main/scala cp $mypath/$mymysource.scala src/main/scala/ cp $mypath/myproject.sbt . Make sure that myproject.sbt has the following in it: name := I NEED A NAME! version := I NEED A VERSION! scalaVersion := 2.10.3 libraryDependencies += org.apache.spark % spark-core_2.10 % 0.9.0-incubating If you will be using Hadoop/HDFS functionality you will need the below line also libraryDependencies += org.apache.hadoop % hadoop-client % 2.2.0 The above assumes you are using Spark 0.9 and Scala 2.10.3. If you are using 0.8.1 - adjust appropriately.
Re: quick start guide: building a standalone scala program
I found that I never read the document carefully and I never find that Spark document is suggesting you to use Spark-distributed sbt…… Best, -- Nan Zhu On Monday, March 24, 2014 at 5:47 PM, Diana Carroll wrote: Thanks for your help, everyone. Several folks have explained that I can surely solve the problem by installing sbt. But I'm trying to get the instructions working as written on the Spark website. The instructions not only don't have you install sbt separately...they actually specifically have you use the sbt that is distributed with Spark. If it is not possible to build your own Spark programs with Spark-distributed sbt, then that's a big hole in the Spark docs that I shall file. And if the sbt that is included with Spark is MEANT to be able to compile your own Spark apps, then that's a product bug. But before I file the bug, I'm still hoping I'm missing something, and someone will point out that I'm missing a small step that will make the Spark distribution of sbt work! Diana On Mon, Mar 24, 2014 at 4:52 PM, Yana Kadiyska yana.kadiy...@gmail.com (mailto:yana.kadiy...@gmail.com) wrote: Diana, I just tried it on a clean Ubuntu machine, with Spark 0.8 (since like other folks I had sbt preinstalled on my usual machine) I ran the command exactly as Ognen suggested and see Set current project to Simple Project (do you see this -- you should at least be seeing this) and then a bunch of Resolving ... messages. I did get an error there, saying it can't find javax.servlet.orbit. I googled the error and found this thread: http://mail-archives.apache.org/mod_mbox/spark-user/201309.mbox/%3ccajbo4nexyzqe6zgreqjtzzz5zrcoavfen+wmbyced6n1epf...@mail.gmail.com%3E adding the IvyXML fragment they suggested helped in my case (but again, the build pretty clearly complained). If you're still having no luck, I suggest installing sbt and setting SBT_HOME... http://www.scala-sbt.org/ In either case though, it's not a Spark-specific issue...Hopefully some of all this helps. On Mon, Mar 24, 2014 at 4:30 PM, Diana Carroll dcarr...@cloudera.com (mailto:dcarr...@cloudera.com) wrote: Yeah, that's exactly what I did. Unfortunately it doesn't work: $SPARK_HOME/sbt/sbt package awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ On Mon, Mar 24, 2014 at 4:25 PM, Ognen Duzlevski og...@plainvanillagames.com (mailto:og...@plainvanillagames.com) wrote: You can use any sbt on your machine, including the one that comes with spark. For example, try: ~/path_to_spark/sbt/sbt compile ~/path_to_spark/sbt/sbt run arguments Or you can just add that to your PATH by: export $PATH=$PATH:~/path_to_spark/sbt To make it permanent, you can add it to your ~/.bashrc or ~/.bash_profile or ??? depending on the system you are using. If you are on Windows, sorry, I can't offer any help there ;) Ognen On 3/24/14, 3:16 PM, Diana Carroll wrote: Thanks Ongen. Unfortunately I'm not able to follow your instructions either. In particular: sbt compile sbt run arguments if any This doesn't work for me because there's no program on my path called sbt. The instructions in the Quick Start guide are specific that I should call $SPARK_HOME/sbt/sbt. I don't have any other executable on my system called sbt. Did you download and install sbt separately? In following the Quick Start guide, that was not stated as a requirement, and I'm trying to run through the guide word for word. Diana On Mon, Mar 24, 2014 at 4:12 PM, Ognen Duzlevski og...@plainvanillagames.com (mailto:og...@plainvanillagames.com) wrote: Diana, Anywhere on the filesystem you have read/write access (you need not be in your spark home directory): mkdir myproject cd myproject mkdir project mkdir target mkdir -p src/main/scala cp $mypath/$mymysource.scala src/main/scala/ cp $mypath/myproject.sbt . Make sure that myproject.sbt has the following in it: name := I NEED A NAME! version := I NEED A VERSION! scalaVersion := 2.10.3 libraryDependencies += org.apache.spark % spark-core_2.10 % 0.9.0-incubating If you will be using Hadoop/HDFS functionality you will need the below line also libraryDependencies += org.apache.hadoop % hadoop-client % 2.2.0 The above assumes you are using Spark 0.9 and Scala 2.10.3. If you are using 0.8.1
Re: Splitting RDD and Grouping together to perform computation
We need some one who can explain us with short code snippet on given example so that we get clear cut idea on RDDs indexing.. Guys please help us -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-and-Grouping-together-to-perform-computation-tp3153p3158.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
coalescing RDD into equally sized partitions
Hi, sc.parallelize(Array.tabulate(100)(i=i)).filter( _ % 20 == 0 ).coalesce(5,true).glom.collect yields Array[Array[Int]] = Array(Array(0, 20, 40, 60, 80), Array(), Array(), Array(), Array()) How do I get something more like: Array(Array(0), Array(20), Array(40), Array(60), Array(80)) Thanks
Re: N-Fold validation and RDD partitions
There is also https://github.com/apache/spark/pull/18 against the current repo which may be easier to apply. On Fri, Mar 21, 2014 at 8:58 AM, Hai-Anh Trinh a...@adatao.com wrote: Hi Jaonary, You can find the code for k-fold CV in https://github.com/apache/incubator-spark/pull/448. I have not find the time to resubmit the pull to latest master. On Fri, Mar 21, 2014 at 8:46 PM, Sanjay Awatramani sanjay_a...@yahoo.comwrote: Hi Jaonary, I believe the n folds should be mapped into n Keys in spark using a map function. You can reduce the returned PairRDD and you should get your metric. I don't understand partitions fully, but from whatever I understand of it, they aren't required in your scenario. Regards, Sanjay On Friday, 21 March 2014 7:03 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi I need to partition my data represented as RDD into n folds and run metrics computation in each fold and finally compute the means of my metrics overall the folds. Does spark can do the data partition out of the box or do I need to implement it myself. I know that RDD has a partitions method and mapPartitions but I really don't understand the purpose and the meaning of partition here. Cheers, Jaonary -- Hai-Anh Trinh | Senior Software Engineer | http://adatao.com/ http://www.linkedin.com/in/haianh -- Cell : 425-233-8271
Re: quick start guide: building a standalone scala program
It is suggested implicitly in giving you the command ./sbt/sbt. The separately installed sbt isn't in a folder called sbt, whereas Spark's version is. And more relevantly, just a few paragraphs earlier in the tutorial you execute the command sbt/sbt assembly which definitely refers to the spark install. On Monday, March 24, 2014, Nan Zhu zhunanmcg...@gmail.com wrote: I found that I never read the document carefully and I never find that Spark document is suggesting you to use Spark-distributed sbt.. Best, -- Nan Zhu On Monday, March 24, 2014 at 5:47 PM, Diana Carroll wrote: Thanks for your help, everyone. Several folks have explained that I can surely solve the problem by installing sbt. But I'm trying to get the instructions working *as written on the Spark website*. The instructions not only don't have you install sbt separately...they actually specifically have you use the sbt that is distributed with Spark. If it is not possible to build your own Spark programs with Spark-distributed sbt, then that's a big hole in the Spark docs that I shall file. And if the sbt that is included with Spark is MEANT to be able to compile your own Spark apps, then that's a product bug. But before I file the bug, I'm still hoping I'm missing something, and someone will point out that I'm missing a small step that will make the Spark distribution of sbt work! Diana On Mon, Mar 24, 2014 at 4:52 PM, Yana Kadiyska yana.kadiy...@gmail.comwrote: Diana, I just tried it on a clean Ubuntu machine, with Spark 0.8 (since like other folks I had sbt preinstalled on my usual machine) I ran the command exactly as Ognen suggested and see Set current project to Simple Project (do you see this -- you should at least be seeing this) and then a bunch of Resolving ... messages. I did get an error there, saying it can't find javax.servlet.orbit. I googled the error and found this thread: http://mail-archives.apache.org/mod_mbox/spark-user/201309.mbox/%3ccajbo4nexyzqe6zgreqjtzzz5zrcoavfen+wmbyced6n1epf...@mail.gmail.com%3E adding the IvyXML fragment they suggested helped in my case (but again, the build pretty clearly complained). If you're still having no luck, I suggest installing sbt and setting SBT_HOME... http://www.scala-sbt.org/ In either case though, it's not a Spark-specific issue...Hopefully some of all this helps. On Mon, Mar 24, 2014 at 4:30 PM, Diana Carroll dcarr...@cloudera.com wrote: Yeah, that's exactly what I did. Unfortunately it doesn't work: $SPARK_HOME/sbt/sbt package awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ On Mon, Mar 24, 2014 at 4:25 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: You can use any sbt on your machine, including the one that comes with spark. For example, try: ~/path_to_spark/sbt/sbt compile ~/path_to_spark/sbt/sbt run arguments Or you can just add that to your PATH by: export $PATH=$PATH:~/path_to_spark/sbt To make it permanent, you can add it to your ~/.bashrc or ~/.bash_profile or ??? depending on the system you are using. If you are on Windows, sorry, I can't offer any help there ;) Ognen On 3/24/14, 3:16 PM, Diana Carroll wrote: Thanks Ongen. Unfortunately I'm not able to follow your instructions either. In particular: sbt compile sbt run arguments if any This doesn't work for me because there's no program on my path called sbt. The instructions in the Quick Start guide are specific that I sho
Re: Splitting RDD and Grouping together to perform computation
partition your input into even number partitions use mapPartition to operate on Iterator[Int] maybe there are some more efficient way…. Best, -- Nan Zhu On Monday, March 24, 2014 at 7:59 PM, yh18190 wrote: Hi, I have large data set of numbers ie RDD and wanted to perform a computation only on groupof two values at a time. For example 1,2,3,4,5,6,7... is an RDD Can i group the RDD into (1,2),(3,4),(5,6)...?? and perform the respective computations ?in an efficient manner? As we do'nt have a way to index elements directly using forloop etc..(i,i+1)...is their way to resolve this problem? Please suggest me ..i would be really thankful to you View this message in context: Splitting RDD and Grouping together to perform computation (http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-and-Grouping-together-to-perform-computation-tp3153.html) Sent from the Apache Spark User List mailing list archive (http://apache-spark-user-list.1001560.n3.nabble.com/) at Nabble.com (http://Nabble.com).
Re: quick start guide: building a standalone scala program
Yes, actually even for spark, I mostly use the sbt I installed…..so always missing this issue…. If you can reproduce the problem with a spark-distribtued sbt…I suggest proposing a PR to fix the document, before 0.9.1 is officially released Best, -- Nan Zhu On Monday, March 24, 2014 at 8:34 PM, Diana Carroll wrote: It is suggested implicitly in giving you the command ./sbt/sbt. The separately installed sbt isn't in a folder called sbt, whereas Spark's version is. And more relevantly, just a few paragraphs earlier in the tutorial you execute the command sbt/sbt assembly which definitely refers to the spark install. On Monday, March 24, 2014, Nan Zhu zhunanmcg...@gmail.com (mailto:zhunanmcg...@gmail.com) wrote: I found that I never read the document carefully and I never find that Spark document is suggesting you to use Spark-distributed sbt…… Best, -- Nan Zhu On Monday, March 24, 2014 at 5:47 PM, Diana Carroll wrote: Thanks for your help, everyone. Several folks have explained that I can surely solve the problem by installing sbt. But I'm trying to get the instructions working as written on the Spark website. The instructions not only don't have you install sbt separately...they actually specifically have you use the sbt that is distributed with Spark. If it is not possible to build your own Spark programs with Spark-distributed sbt, then that's a big hole in the Spark docs that I shall file. And if the sbt that is included with Spark is MEANT to be able to compile your own Spark apps, then that's a product bug. But before I file the bug, I'm still hoping I'm missing something, and someone will point out that I'm missing a small step that will make the Spark distribution of sbt work! Diana On Mon, Mar 24, 2014 at 4:52 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Diana, I just tried it on a clean Ubuntu machine, with Spark 0.8 (since like other folks I had sbt preinstalled on my usual machine) I ran the command exactly as Ognen suggested and see Set current project to Simple Project (do you see this -- you should at least be seeing this) and then a bunch of Resolving ... messages. I did get an error there, saying it can't find javax.servlet.orbit. I googled the error and found this thread: http://mail-archives.apache.org/mod_mbox/spark-user/201309.mbox/%3ccajbo4nexyzqe6zgreqjtzzz5zrcoavfen+wmbyced6n1epf...@mail.gmail.com%3E adding the IvyXML fragment they suggested helped in my case (but again, the build pretty clearly complained). If you're still having no luck, I suggest installing sbt and setting SBT_HOME... http://www.scala-sbt.org/ In either case though, it's not a Spark-specific issue...Hopefully some of all this helps. On Mon, Mar 24, 2014 at 4:30 PM, Diana Carroll dcarr...@cloudera.com wrote: Yeah, that's exactly what I did. Unfortunately it doesn't work: $SPARK_HOME/sbt/sbt package awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for reading (No such file or directory) Attempting to fetch sbt /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or directory Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please install sbt manually from http://www.scala-sbt.org/ On Mon, Mar 24, 2014 at 4:25 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: You can use any sbt on your machine, including the one that comes with spark. For example, try: ~/path_to_spark/sbt/sbt compile ~/path_to_spark/sbt/sbt run arguments Or you can just add that to your PATH by: export $PATH=$PATH:~/path_to_spark/sbt To make it permanent, you can add it to your ~/.bashrc or ~/.bash_profile or ??? depending on the system you are using. If you are on Windows, sorry, I can't offer any help there ;) Ognen On 3/24/14, 3:16 PM, Diana Carroll wrote: Thanks Ongen. Unfortunately I'm not able to follow your instructions either. In particular: sbt compile sbt run arguments if any This doesn't work for me because there's no program on my path called sbt. The instructions in the Quick Start guide are specific that I sho
Re: Writing RDDs to HDFS
Ognen, can you comment if you were actually able to run two jobs concurrently with just restricting spark.cores.max? I run Shark on the same cluster and was not able to see a standalone job get in (since Shark is a long running job) until I restricted both spark.cores.max _and_ spark.executor.memory. Just curious if I did something wrong. On Mon, Mar 24, 2014 at 7:48 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: Just so I can close this thread (in case anyone else runs into this stuff) - I did sleep through the basics of Spark ;). The answer on why my job is in waiting state (hanging) is here: http://spark.incubator.apache.org/docs/latest/spark-standalone.html#resource-scheduling Ognen On 3/24/14, 5:01 PM, Diana Carroll wrote: Ongen: I don't know why your process is hanging, sorry. But I do know that the way saveAsTextFile works is that you give it a path to a directory, not a file. The file is saved in multiple parts, corresponding to the partitions. (part-0, part-1 etc.) (Presumably it does this because it allows each partition to be saved on the local disk, to minimize network traffic. It's how Hadoop works, too.) On Mon, Mar 24, 2014 at 5:00 PM, Ognen Duzlevski og...@nengoiksvelzud.com wrote: Is someRDD.saveAsTextFile(hdfs://ip:port/path/final_filename.txt) supposed to work? Meaning, can I save files to the HDFS fs this way? I tried: val r = sc.parallelize(List(1,2,3,4,5,6,7,8)) r.saveAsTextFile(hdfs://ip:port/path/file.txt) and it is just hanging. At the same time on my HDFS it created file.txt but as a directory which has subdirectories (the final one is empty). Thanks! Ognen -- A distributed system is one in which the failure of a computer you didn't even know existed can render your own computer unusable -- Leslie Lamport
Re: Splitting RDD and Grouping together to perform computation
I didn’t group the integers, but process them in group of two, partition that scala val a = sc.parallelize(List(1, 2, 3, 4), 2) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at console:12 process each partition and process elements in the partition in group of 2 scala a.mapPartitions(p = {val l = p.toList; | val ret = new ListBuffer[Int] | for (i - 0 until l.length by 2) { | ret += l(i) + l(i + 1) | } | ret.toList.iterator | } | ) res7: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at mapPartitions at console:16 scala res7.collect res10: Array[Int] = Array(3, 7) Best, -- Nan Zhu On Monday, March 24, 2014 at 8:40 PM, Nan Zhu wrote: partition your input into even number partitions use mapPartition to operate on Iterator[Int] maybe there are some more efficient way…. Best, -- Nan Zhu On Monday, March 24, 2014 at 7:59 PM, yh18190 wrote: Hi, I have large data set of numbers ie RDD and wanted to perform a computation only on groupof two values at a time. For example 1,2,3,4,5,6,7... is an RDD Can i group the RDD into (1,2),(3,4),(5,6)...?? and perform the respective computations ?in an efficient manner? As we do'nt have a way to index elements directly using forloop etc..(i,i+1)...is their way to resolve this problem? Please suggest me ..i would be really thankful to you View this message in context: Splitting RDD and Grouping together to perform computation (http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-and-Grouping-together-to-perform-computation-tp3153.html) Sent from the Apache Spark User List mailing list archive (http://apache-spark-user-list.1001560.n3.nabble.com/) at Nabble.com (http://Nabble.com).
Re: RDD usage
points.foreach(p=p.y = another_value) will return a new modified RDD. 2014-03-24 18:13 GMT+08:00 Chieh-Yen r01944...@csie.ntu.edu.tw: Dear all, I have a question about the usage of RDD. I implemented a class called AppDataPoint, it looks like: case class AppDataPoint(input_y : Double, input_x : Array[Double]) extends Serializable { var y : Double = input_y var x : Array[Double] = input_x .. } Furthermore, I created the RDD by the following function. def parsePoint(line: String): AppDataPoint = { /* Some related works for parsing */ .. } Assume the RDD called points: val lines = sc.textFile(inputPath, numPartition) var points = lines.map(parsePoint _).cache() The question is that, I tried to modify the value of this RDD, the operation is: points.foreach(p=p.y = another_value) The operation is workable. There doesn't have any warning or error message showed by the system and the results are right. I wonder that if the modification for RDD is a correct and in fact workable design. The usage web said that the RDD is immutable, is there any suggestion? Thanks a lot. Chieh-Yen Lin
答复: RDD usage
Hi hequn, a relative question, is that mean the memory usage will doubled? And further more, if the compute function in a rdd is not idempotent, rdd will changed during the job running, is that right? -原始邮件- 发件人: hequn cheng chenghe...@gmail.com 发送时间: 2014/3/25 9:35 收件人: user@spark.apache.org user@spark.apache.org 主题: Re: RDD usage points.foreach(p=p.y = another_value) will return a new modified RDD. 2014-03-24 18:13 GMT+08:00 Chieh-Yen r01944...@csie.ntu.edu.tw: Dear all, I have a question about the usage of RDD. I implemented a class called AppDataPoint, it looks like: case class AppDataPoint(input_y : Double, input_x : Array[Double]) extends Serializable { var y : Double = input_y var x : Array[Double] = input_x .. } Furthermore, I created the RDD by the following function. def parsePoint(line: String): AppDataPoint = { /* Some related works for parsing */ .. } Assume the RDD called points: val lines = sc.textFile(inputPath, numPartition) var points = lines.map(parsePoint _).cache() The question is that, I tried to modify the value of this RDD, the operation is: points.foreach(p=p.y = another_value) The operation is workable. There doesn't have any warning or error message showed by the system and the results are right. I wonder that if the modification for RDD is a correct and in fact workable design. The usage web said that the RDD is immutable, is there any suggestion? Thanks a lot. Chieh-Yen Lin
Re: RDD usage
No, it won't. The type of RDD#foreach is Unit, so it doesn't return an RDD. The utility of foreach is purely for the side effects it generates, not for its return value -- and modifying an RDD in place via foreach is generally not a very good idea. On Mon, Mar 24, 2014 at 6:35 PM, hequn cheng chenghe...@gmail.com wrote: points.foreach(p=p.y = another_value) will return a new modified RDD. 2014-03-24 18:13 GMT+08:00 Chieh-Yen r01944...@csie.ntu.edu.tw: Dear all, I have a question about the usage of RDD. I implemented a class called AppDataPoint, it looks like: case class AppDataPoint(input_y : Double, input_x : Array[Double]) extends Serializable { var y : Double = input_y var x : Array[Double] = input_x .. } Furthermore, I created the RDD by the following function. def parsePoint(line: String): AppDataPoint = { /* Some related works for parsing */ .. } Assume the RDD called points: val lines = sc.textFile(inputPath, numPartition) var points = lines.map(parsePoint _).cache() The question is that, I tried to modify the value of this RDD, the operation is: points.foreach(p=p.y = another_value) The operation is workable. There doesn't have any warning or error message showed by the system and the results are right. I wonder that if the modification for RDD is a correct and in fact workable design. The usage web said that the RDD is immutable, is there any suggestion? Thanks a lot. Chieh-Yen Lin
Re: Kmeans example reduceByKey slow
Sorry, I meant the master branch of https://github.com/apache/spark. -Xiangrui On Mon, Mar 24, 2014 at 6:27 PM, Tsai Li Ming mailingl...@ltsai.com wrote: Thanks again. If you use the KMeans implementation from MLlib, the initialization stage is done on master, The master here is the app/driver/spark-shell? Thanks! On 25 Mar, 2014, at 1:03 am, Xiangrui Meng men...@gmail.com wrote: Number of rows doesn't matter much as long as you have enough workers to distribute the work. K-means has complexity O(n * d * k), where n is number of points, d is the dimension, and k is the number of clusters. If you use the KMeans implementation from MLlib, the initialization stage is done on master, so a large k would slow down the initialization stage. If your data is sparse, the latest change to KMeans will help with the speed, depending on how sparse your data is. -Xiangrui On Mon, Mar 24, 2014 at 12:44 AM, Tsai Li Ming mailingl...@ltsai.com wrote: Thanks, Let me try with a smaller K. Does the size of the input data matters for the example? Currently I have 50M rows. What is a reasonable size to demonstrate the capability of Spark? On 24 Mar, 2014, at 3:38 pm, Xiangrui Meng men...@gmail.com wrote: K = 50 is certainly a large number for k-means. If there is no particular reason to have 50 clusters, could you try to reduce it to, e.g, 100 or 1000? Also, the example code is not for large-scale problems. You should use the KMeans algorithm in mllib clustering for your problem. -Xiangrui On Sun, Mar 23, 2014 at 11:53 PM, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, This is on a 4 nodes cluster each with 32 cores/256GB Ram. (0.9.0) is deployed in a stand alone mode. Each worker is configured with 192GB. Spark executor memory is also 192GB. This is on the first iteration. K=50. Here's the code I use: http://pastebin.com/2yXL3y8i , which is a copy-and-paste of the example. Thanks! On 24 Mar, 2014, at 2:46 pm, Xiangrui Meng men...@gmail.com wrote: Hi Tsai, Could you share more information about the machine you used and the training parameters (runs, k, and iterations)? It can help solve your issues. Thanks! Best, Xiangrui On Sun, Mar 23, 2014 at 3:15 AM, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, At the reduceBuyKey stage, it takes a few minutes before the tasks start working. I have -Dspark.default.parallelism=127 cores (n-1). CPU/Network/IO is idling across all nodes when this is happening. And there is nothing particular on the master log file. From the spark-shell: 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:124 as TID 538 on executor 2: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:124 as 38765155 bytes in 193 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:125 as TID 539 on executor 1: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:125 as 38765155 bytes in 96 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:126 as TID 540 on executor 0: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:126 as 38765155 bytes in 100 ms But it stops there for some significant time before any movement. In the stage detail of the UI, I can see that there are 127 tasks running but the duration each is at least a few minutes. I'm working off local storage (not hdfs) and the kmeans data is about 6.5GB (50M rows). Is this a normal behaviour? Thanks!
Re: 答复: RDD usage
First question: If you save your modified RDD like this: points.foreach(p=p.y = another_value).collect() or points.foreach(p=p.y = another_value).saveAsTextFile(...) the modified RDD will be materialized and this will not use any work's memory. If you have more transformatins after the map(), the spark will pipelines all transformations and build a DAG. Very little memory will be used in this stage and the memory will be free soon. Only cache() will persist your RDD in memory for a long time. Second question: Once RDD be created, it can not be changed due to the immutable feature.You can only create a new RDD from the existing RDD or from file system. 2014-03-25 9:45 GMT+08:00 林武康 vboylin1...@gmail.com: Hi hequn, a relative question, is that mean the memory usage will doubled? And further more, if the compute function in a rdd is not idempotent, rdd will changed during the job running, is that right? -- 发件人: hequn cheng chenghe...@gmail.com 发送时间: 2014/3/25 9:35 收件人: user@spark.apache.org 主题: Re: RDD usage points.foreach(p=p.y = another_value) will return a new modified RDD. 2014-03-24 18:13 GMT+08:00 Chieh-Yen r01944...@csie.ntu.edu.tw: Dear all, I have a question about the usage of RDD. I implemented a class called AppDataPoint, it looks like: case class AppDataPoint(input_y : Double, input_x : Array[Double]) extends Serializable { var y : Double = input_y var x : Array[Double] = input_x .. } Furthermore, I created the RDD by the following function. def parsePoint(line: String): AppDataPoint = { /* Some related works for parsing */ .. } Assume the RDD called points: val lines = sc.textFile(inputPath, numPartition) var points = lines.map(parsePoint _).cache() The question is that, I tried to modify the value of this RDD, the operation is: points.foreach(p=p.y = another_value) The operation is workable. There doesn't have any warning or error message showed by the system and the results are right. I wonder that if the modification for RDD is a correct and in fact workable design. The usage web said that the RDD is immutable, is there any suggestion? Thanks a lot. Chieh-Yen Lin
答复: 答复: RDD usage
Hi hequn, I dig into the source of spark a bit deeper, and I got some ideas, firstly, immutable is a feather of rdd but not a solid rule, there are ways to change it, for excample, a rdd with non-idempotent compute function, though it is really a bad design to make that function non-idempotent for uncontrollable side-effect. I agree with Mark that foreach can modify the elements of a rdd, but we should avoid this because it will effect all the rdds generate by this changed rdd , make the whole process inconsistent and unstable. Some rough opinions on the immutable feature of rdd, full discuss can make it more clear. Any ideas? -原始邮件- 发件人: hequn cheng chenghe...@gmail.com 发送时间: 2014/3/25 10:40 收件人: user@spark.apache.org user@spark.apache.org 主题: Re: 答复: RDD usage First question: If you save your modified RDD like this: points.foreach(p=p.y = another_value).collect() or points.foreach(p=p.y = another_value).saveAsTextFile(...) the modified RDD will be materialized and this will not use any work's memory. If you have more transformatins after the map(), the spark will pipelines all transformations and build a DAG. Very little memory will be used in this stage and the memory will be free soon. Only cache() will persist your RDD in memory for a long time. Second question: Once RDD be created, it can not be changed due to the immutable feature.You can only create a new RDD from the existing RDD or from file system. 2014-03-25 9:45 GMT+08:00 林武康 vboylin1...@gmail.com: Hi hequn, a relative question, is that mean the memory usage will doubled? And further more, if the compute function in a rdd is not idempotent, rdd will changed during the job running, is that right? 发件人: hequn cheng 发送时间: 2014/3/25 9:35 收件人: user@spark.apache.org 主题: Re: RDD usage points.foreach(p=p.y = another_value) will return a new modified RDD. 2014-03-24 18:13 GMT+08:00 Chieh-Yen r01944...@csie.ntu.edu.tw: Dear all, I have a question about the usage of RDD. I implemented a class called AppDataPoint, it looks like: case class AppDataPoint(input_y : Double, input_x : Array[Double]) extends Serializable { var y : Double = input_y var x : Array[Double] = input_x .. } Furthermore, I created the RDD by the following function. def parsePoint(line: String): AppDataPoint = { /* Some related works for parsing */ .. } Assume the RDD called points: val lines = sc.textFile(inputPath, numPartition) var points = lines.map(parsePoint _).cache() The question is that, I tried to modify the value of this RDD, the operation is: points.foreach(p=p.y = another_value) The operation is workable. There doesn't have any warning or error message showed by the system and the results are right. I wonder that if the modification for RDD is a correct and in fact workable design. The usage web said that the RDD is immutable, is there any suggestion? Thanks a lot. Chieh-Yen Lin