Re: partitions, coalesce() and parallelism
Thanks Daniel and Nicholas for the helpful responses. I'll go with coalesce(shuffle = true) and see how things go. On Wed, Jun 25, 2014 at 8:19 AM, Daniel Siegmann daniel.siegm...@velos.io wrote: The behavior you're seeing is by design, and it is VERY IMPORTANT to understand why this happens because it can cause unexpected behavior in various ways. I learned that the hard way. :-) Spark collapses multiple transforms into a single stage wherever possible (presumably for performance). The boundary between stages is a shuffle. In your example there's no shuffle, so all transforms are being collapsed into a single stage. Since you coalesce at the end into two partitions, and there is only one stage, that stage must contain two tasks. It is important to note that coalesce will not cause a shuffle by default (repartition will always cause a shuffle). However, you can force it to partition by passing true as a second (optional) parameter, like so: val rdd4 = rdd3.coalesce(2, true) Try this in Spark shell and you should see 100 tasks for the first stage and 2 tasks for the second. On Tue, Jun 24, 2014 at 9:22 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Ah, here's a better hypothesis. Everything you are doing minus the save() is a transformation, not an action. Since nothing is actually triggered until the save(), Spark may be seeing that the lineage of operations ends with 2 partitions anyway and simplifies accordingly. Two suggestions you can try: 1. Remove the coalesce(2) and concatenate the files post-processing to get the number of files you want. This will also ensure the save() operation can be parallelized fully. I think this is the preferable approach since it does not artificially reduce the parallelism of your job at any stage. 2. Another thing you can try is the following: val rdd1 = sc.sequenceFile(...) val rdd2 = rdd1.coalesce(100) val rdd3 = rdd2.map(...).cache() // cache this RDD val some_count = rdd3.count() // force the map() to run and materialize the result val rdd4 = rdd3.coalesce(2) val rdd5 = rdd4.saveAsTextFile(...) // want only two output files rdd3.unpersist() This should let the map() run 100 tasks in parallel while giving you only 2 output files. You'll get this at the cost of serializing rdd3 to memory by running the count(). Nick On Tue, Jun 24, 2014 at 8:47 PM, Alex Boisvert alex.boisv...@gmail.com wrote: For the skeptics :), here's a version you can easily reproduce at home: val rdd1 = sc.parallelize(1 to 1000, 100) // force with 100 partitions val rdd2 = rdd1.coalesce(100) val rdd3 = rdd2 map { _ + 1000 } val rdd4 = rdd3.coalesce(2) rdd4.collect() You can see that everything runs as only 2 tasks ... :-/ 2014-06-25 00:43:20,795 INFO org.apache.spark.SparkContext: Starting job: collect at console:48 2014-06-25 00:43:20,811 INFO org.apache.spark.scheduler.DAGScheduler: Got job 0 (collect at console:48) with 2 output partitions (allowLocal=false) 2014-06-25 00:43:20,812 INFO org.apache.spark.scheduler.DAGScheduler: Final stage: Stage 0 (collect at console:48) 2014-06-25 00:43:20,812 INFO org.apache.spark.scheduler.DAGScheduler: Parents of final stage: List() 2014-06-25 00:43:20,821 INFO org.apache.spark.scheduler.DAGScheduler: Missing parents: List() 2014-06-25 00:43:20,827 INFO org.apache.spark.scheduler.DAGScheduler: Submitting Stage 0 (CoalescedRDD[11] at coalesce at console:45), which has no missing parents 2014-06-25 00:43:20,898 INFO org.apache.spark.scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (CoalescedRDD[11] at coalesce at console:45) 2014-06-25 00:43:20,901 INFO org.apache.spark.scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 2014-06-25 00:43:20,921 INFO org.apache.spark.scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor 2: ip-10-226-98-211.ec2.internal (PROCESS_LOCAL) 2014-06-25 00:43:20,939 INFO org.apache.spark.scheduler.TaskSetManager: Serialized task 0.0:0 as 6632 bytes in 16 ms 2014-06-25 00:43:20,943 INFO org.apache.spark.scheduler.TaskSetManager: Starting task 0.0:1 as TID 1 on executor 5: ip-10-13-132-153.ec2.internal (PROCESS_LOCAL) 2014-06-25 00:43:20,951 INFO org.apache.spark.scheduler.TaskSetManager: Serialized task 0.0:1 as 6632 bytes in 8 ms 2014-06-25 00:43:21,605 INFO org.apache.spark.scheduler.TaskSetManager: Finished TID 0 in 685 ms on ip-10-226-98-211.ec2.internal (progress: 1/2) 2014-06-25 00:43:21,605 INFO org.apache.spark.scheduler.TaskSetManager: Finished TID 1 in 662 ms on ip-10-13-132-153.ec2.internal (progress: 2/2) 2014-06-25 00:43:21,606 INFO org.apache.spark.scheduler.DAGScheduler: Completed ResultTask(0, 0) 2014-06-25 00:43:21,607 INFO org.apache.spark.scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 2014-06-25 00:43:21,607 INFO org.apache.spark.scheduler.DAGScheduler
partitions, coalesce() and parallelism
With the following pseudo-code, val rdd1 = sc.sequenceFile(...) // has 100 partitions val rdd2 = rdd1.coalesce(100) val rdd3 = rdd2 map { ... } val rdd4 = rdd3.coalesce(2) val rdd5 = rdd4.saveAsTextFile(...) // want only two output files I would expect the parallelism of the map() operation to be 100 concurrent tasks, and the parallelism of the save() operation to be 2. However, it appears the parallelism of the entire chain is 2 -- I only see two tasks created for the save() operation and those tasks appear to execute the map() operation as well. Assuming what I'm seeing is as-specified (meaning, how things are meant to be), what's the recommended way to force a parallelism of 100 on the map() operation? thanks!
Re: what is the best way to do cartesian
You might want to try the built-in RDD.cartesian() method. On Thu, Apr 24, 2014 at 9:05 PM, Qin Wei wei@dewmobile.net wrote: Hi All, I have a problem with the Item-Based Collaborative Filtering Recommendation Algorithms in spark. The basic flow is as below: (Item1, (User1 , Score1)) RDD1 ==(Item2, (User2 , Score2)) (Item1, (User2 , Score3)) (Item2, (User1 , Score4)) RDD1.groupByKey == RDD2 (Item1, ((User1, Score1), (User2, Score3))) (Item2, ((User1, Score4), (User2, Score2))) The similarity of Vector ((User1, Score1), (User2, Score3)) and ((User1, Score4), (User2, Score2)) is the similarity of Item1 and Item2. In my situation, RDD2 contains 20 million records, my spark programm is extreamly slow, the source code is as below: val conf = new SparkConf().setMaster(spark://211.151.121.184:7077).setAppName(Score Calcu Total).set(spark.executor.memory, 20g).setJars(Seq(/home/deployer/score-calcu-assembly-1.0.jar)) val sc = new SparkContext(conf) val mongoRDD = sc.textFile(args(0).toString, 400) val jsonRDD = mongoRDD.map(arg = new JSONObject(arg)) val newRDD = jsonRDD.map(arg = { var score = haha(arg.get(a).asInstanceOf[JSONObject]) // set score to 0.5 for testing arg.put(score, 0.5) arg }) val resourceScoresRDD = newRDD.map(arg = (arg.get(rid).toString.toLong, (arg.get(zid).toString, arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey().cache() val resourceScores = resourceScoresRDD.collect() val bcResourceScores = sc.broadcast(resourceScores) val simRDD = resourceScoresRDD.mapPartitions({iter = val m = bcResourceScores.value for{ (r1, v1) - iter (r2, v2) - m if r1 r2 } yield (r1, r2, cosSimilarity(v1, v2))}, true).filter(arg = arg._3 0.1) println(simRDD.count) And I saw this in Spark Web UI: http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204018.png http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204001.png My standalone cluster has 3 worker node (16 core and 32G RAM),and the workload of the machine in my cluster is heavy when the spark program is running. Is there any better way to do the algorithm? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-do-cartesian-tp4807.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark - ready for prime time?
I'll provide answers from our own experience at Bizo. We've been using Spark for 1+ year now and have found it generally better than previous approaches (Hadoop + Hive mostly). On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth andras.nem...@lynxanalytics.com wrote: I. Is it too much magic? Lots of things just work right in Spark and it's extremely convenient and efficient when it indeed works. But should we be worried that customization is hard if the built in behavior is not quite right for us? Are we to expect hard to track down issues originating from the black box behind the magic? I think is goes back to understanding Spark's architecture, its design constraints and the problems it explicitly set out to address. If the solution to your problems can be easily formulated in terms of the map/reduce model, then it's a good choice. You'll want your customizations to go with (not against) the grain of the architecture. II. Is it mature enough? E.g. we've created a pull requesthttps://github.com/apache/spark/pull/181which fixes a problem that we were very surprised no one ever stumbled upon before. So that's why I'm asking: is Spark being already used in professional settings? Can one already trust it being reasonably bug free and reliable? There are lots of ways to use Spark; and not all of the features are necessarily at the same level of maturity. For instance, we put all the jars on the main classpath so we've never run into the issue your pull request addresses. We definitely use and rely on Spark on a professional basis. We have 5+ spark jobs running nightly on Amazon's EMR, slicing through GBs of data. Once we got them working with the proper configuration settings, they have been running reliability since. I would characterize our use of Spark as a better Hadoop, in the sense that we use it for batch processing only, no streaming yet. We're happy it performs better than Hadoop but we don't require/rely on its memory caching features. In fact, for most of our jobs it would simplify our lives if Spark wouldn't cache so many things in memory since it would make configuration/tuning a lot simpler and jobs would run successfully on the first try instead of having to tweak things (# of partitions and such). So, to the concrete issues. Sorry for the long mail, and let me know if I should break this out into more threads or if there is some other way to have this discussion... 1. Memory management The general direction of these questions is whether it's possible to take RDD caching related memory management more into our own hands as LRU eviction is nice most of the time but can be very suboptimal in some of our use cases. A. Somehow prioritize cached RDDs, E.g. mark some essential that one really wants to keep. I'm fine with going down in flames if I mark too much data essential. B. Memory reflection: can you pragmatically get the memory size of a cached rdd and memory sizes available in total/per executor? If we could do this we could indirectly avoid automatic evictions of things we might really want to keep in memory. C. Evictions caused by RDD partitions on the driver. I had a setup with huge worker memory and smallish memory on the driver JVM. To my surprise, the system started to cache RDD partitions on the driver as well. As the driver ran out of memory I started to see evictions while there were still plenty of space on workers. This resulted in lengthy recomputations. Can this be avoided somehow? D. Broadcasts. Is it possible to get rid of a broadcast manually, without waiting for the LRU eviction taking care of it? Can you tell the size of a broadcast programmatically? 2. Akka lost connections We have quite often experienced lost executors due to akka exceptions - mostly connection lost or similar. It seems to happen when an executor gets extremely busy with some CPU intensive work. Our hypothesis is that akka network threads get starved and the executor fails to respond within timeout limits. Is this plausible? If yes, what can we do with it? We've seen these as well. In our case, increasing the akka timeouts and framesize helped a lot. e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize} In general, these are scary errors in the sense that they come from the very core of the framework and it's hard to link it to something we do in our own code, and thus hard to find a fix. So a question more for the community: how often do you end up scratching your head about cases where spark magic doesn't work perfectly? For us, this happens most often for jobs processing TBs of data (instead of GBs)... which is frustrating of course because these jobs cost a lot more in $$$ + time to run/debug/diagnose than smaller jobs. It means we have to comb the logs to understand what happened, interpret stack traces, dump memory / object allocations, read Spark source to formulate hypothesis about what went wrong and then trial