Re: partitions, coalesce() and parallelism

2014-06-25 Thread Alex Boisvert
Thanks Daniel and Nicholas for the helpful responses.  I'll go with
coalesce(shuffle = true) and see how things go.


On Wed, Jun 25, 2014 at 8:19 AM, Daniel Siegmann daniel.siegm...@velos.io
wrote:

 The behavior you're seeing is by design, and it is VERY IMPORTANT to
 understand why this happens because it can cause unexpected behavior in
 various ways. I learned that the hard way. :-)

 Spark collapses multiple transforms into a single stage wherever
 possible (presumably for performance). The boundary between stages is a
 shuffle. In your example there's no shuffle, so all transforms are being
 collapsed into a single stage. Since you coalesce at the end into two
 partitions, and there is only one stage, that stage must contain two tasks.

 It is important to note that coalesce will not cause a shuffle by default
 (repartition will always cause a shuffle). However, you can force it to
 partition by passing true as a second (optional) parameter, like so:

 val rdd4 = rdd3.coalesce(2, true)

 Try this in Spark shell and you should see 100 tasks for the first stage
 and 2 tasks for the second.



 On Tue, Jun 24, 2014 at 9:22 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Ah, here's a better hypothesis. Everything you are doing minus the save() is
 a transformation, not an action. Since nothing is actually triggered until
 the save(), Spark may be seeing that the lineage of operations ends with
 2 partitions anyway and simplifies accordingly.

 Two suggestions you can try:

1. Remove the coalesce(2) and concatenate the files post-processing
to get the number of files you want. This will also ensure the save() 
 operation
can be parallelized fully. I think this is the preferable approach since 
 it
does not artificially reduce the parallelism of your job at any stage.
2.

Another thing you can try is the following:

val rdd1 = sc.sequenceFile(...)
val rdd2 = rdd1.coalesce(100)

val rdd3 = rdd2.map(...).cache() // cache this RDD
val some_count = rdd3.count() // force the map() to run and materialize 
 the result

val rdd4 = rdd3.coalesce(2)
val rdd5 = rdd4.saveAsTextFile(...) // want only two output files

rdd3.unpersist()

This should let the map() run 100 tasks in parallel while giving you
only 2 output files. You'll get this at the cost of serializing rdd3 to
memory by running the count().

 Nick


  On Tue, Jun 24, 2014 at 8:47 PM, Alex Boisvert alex.boisv...@gmail.com
 wrote:

 For the skeptics :), here's a version you can easily reproduce at home:

 val rdd1 = sc.parallelize(1 to 1000, 100) // force with 100 partitions
 val rdd2 = rdd1.coalesce(100)
 val rdd3 = rdd2 map { _ + 1000 }
 val rdd4 = rdd3.coalesce(2)
 rdd4.collect()

 You can see that everything runs as only 2 tasks ... :-/

 2014-06-25 00:43:20,795 INFO org.apache.spark.SparkContext: Starting
 job: collect at console:48
 2014-06-25 00:43:20,811 INFO org.apache.spark.scheduler.DAGScheduler:
 Got job 0 (collect at console:48) with 2 output partitions
 (allowLocal=false)
 2014-06-25 00:43:20,812 INFO org.apache.spark.scheduler.DAGScheduler:
 Final stage: Stage 0 (collect at console:48)
 2014-06-25 00:43:20,812 INFO org.apache.spark.scheduler.DAGScheduler:
 Parents of final stage: List()
 2014-06-25 00:43:20,821 INFO org.apache.spark.scheduler.DAGScheduler:
 Missing parents: List()
 2014-06-25 00:43:20,827 INFO org.apache.spark.scheduler.DAGScheduler:
 Submitting Stage 0 (CoalescedRDD[11] at coalesce at console:45), which
 has no missing parents
 2014-06-25 00:43:20,898 INFO org.apache.spark.scheduler.DAGScheduler:
 Submitting 2 missing tasks from Stage 0 (CoalescedRDD[11] at coalesce at
 console:45)
 2014-06-25 00:43:20,901 INFO
 org.apache.spark.scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2
 tasks
 2014-06-25 00:43:20,921 INFO org.apache.spark.scheduler.TaskSetManager:
 Starting task 0.0:0 as TID 0 on executor 2: ip-10-226-98-211.ec2.internal
 (PROCESS_LOCAL)
 2014-06-25 00:43:20,939 INFO org.apache.spark.scheduler.TaskSetManager:
 Serialized task 0.0:0 as 6632 bytes in 16 ms
 2014-06-25 00:43:20,943 INFO org.apache.spark.scheduler.TaskSetManager:
 Starting task 0.0:1 as TID 1 on executor 5: ip-10-13-132-153.ec2.internal
 (PROCESS_LOCAL)
 2014-06-25 00:43:20,951 INFO org.apache.spark.scheduler.TaskSetManager:
 Serialized task 0.0:1 as 6632 bytes in 8 ms
 2014-06-25 00:43:21,605 INFO org.apache.spark.scheduler.TaskSetManager:
 Finished TID 0 in 685 ms on ip-10-226-98-211.ec2.internal (progress: 1/2)
 2014-06-25 00:43:21,605 INFO org.apache.spark.scheduler.TaskSetManager:
 Finished TID 1 in 662 ms on ip-10-13-132-153.ec2.internal (progress: 2/2)
 2014-06-25 00:43:21,606 INFO org.apache.spark.scheduler.DAGScheduler:
 Completed ResultTask(0, 0)
 2014-06-25 00:43:21,607 INFO
 org.apache.spark.scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose
 tasks have all completed, from pool
 2014-06-25 00:43:21,607 INFO org.apache.spark.scheduler.DAGScheduler

partitions, coalesce() and parallelism

2014-06-24 Thread Alex Boisvert
With the following pseudo-code,

val rdd1 = sc.sequenceFile(...) // has  100 partitions
val rdd2 = rdd1.coalesce(100)
val rdd3 = rdd2 map { ... }
val rdd4 = rdd3.coalesce(2)
val rdd5 = rdd4.saveAsTextFile(...) // want only two output files

I would expect the parallelism of the map() operation to be 100 concurrent
tasks, and the parallelism of the save() operation to be 2.

However, it appears the parallelism of the entire chain is 2 -- I only see
two tasks created for the save() operation and those tasks appear to
execute the map() operation as well.

Assuming what I'm seeing is as-specified (meaning, how things are meant to
be), what's the recommended way to force a parallelism of 100 on the map()
operation?

thanks!


Re: what is the best way to do cartesian

2014-04-25 Thread Alex Boisvert
You might want to try the built-in RDD.cartesian() method.


On Thu, Apr 24, 2014 at 9:05 PM, Qin Wei wei@dewmobile.net wrote:

 Hi All,

 I have a problem with the Item-Based Collaborative Filtering Recommendation
 Algorithms in spark.
 The basic flow is as below:
 (Item1,  (User1 ,
 Score1))
RDD1 ==(Item2,  (User2 ,   Score2))
 (Item1,  (User2 ,
 Score3))
 (Item2,  (User1 ,
 Score4))

RDD1.groupByKey   ==  RDD2
 (Item1,  ((User1,
 Score1),
 (User2,   Score3)))
 (Item2,  ((User1,
 Score4),
 (User2,   Score2)))

 The similarity of Vector  ((User1,   Score1),   (User2,   Score3)) and
 ((User1,   Score4),   (User2,   Score2)) is the similarity of Item1 and
 Item2.

 In my situation, RDD2 contains 20 million records, my spark programm is
 extreamly slow, the source code is as below:
 val conf = new
 SparkConf().setMaster(spark://211.151.121.184:7077).setAppName(Score
 Calcu Total).set(spark.executor.memory,
 20g).setJars(Seq(/home/deployer/score-calcu-assembly-1.0.jar))
 val sc = new SparkContext(conf)

 val mongoRDD =
 sc.textFile(args(0).toString,
 400)
 val jsonRDD = mongoRDD.map(arg = new
 JSONObject(arg))

 val newRDD = jsonRDD.map(arg = {
 var score =
 haha(arg.get(a).asInstanceOf[JSONObject])

 // set score to 0.5 for testing
 arg.put(score, 0.5)
 arg
 })

 val resourceScoresRDD = newRDD.map(arg =
 (arg.get(rid).toString.toLong, (arg.get(zid).toString,
 arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey().cache()
 val resourceScores =
 resourceScoresRDD.collect()
 val bcResourceScores =
 sc.broadcast(resourceScores)

 val simRDD =
 resourceScoresRDD.mapPartitions({iter =
 val m = bcResourceScores.value
 for{ (r1, v1) - iter
(r2, v2) - m
if r1  r2
 } yield (r1, r2, cosSimilarity(v1,
 v2))}, true).filter(arg = arg._3  0.1)

 println(simRDD.count)

 And I saw this in Spark Web UI:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204018.png
 
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204001.png
 

 My standalone cluster has 3 worker node (16 core and 32G RAM),and the
 workload of the machine in my cluster is heavy when the spark program is
 running.

 Is there any better way to do the algorithm?

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-do-cartesian-tp4807.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark - ready for prime time?

2014-04-10 Thread Alex Boisvert
I'll provide answers from our own experience at Bizo.  We've been using
Spark for 1+ year now and have found it generally better than previous
approaches (Hadoop + Hive mostly).


On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth 
andras.nem...@lynxanalytics.com wrote:

 I. Is it too much magic? Lots of things just work right in Spark and
 it's extremely convenient and efficient when it indeed works. But should we
 be worried that customization is hard if the built in behavior is not quite
 right for us? Are we to expect hard to track down issues originating from
 the black box behind the magic?


I think is goes back to understanding Spark's architecture, its design
constraints and the problems it explicitly set out to address.   If the
solution to your problems can be easily formulated in terms of the
map/reduce model, then it's a good choice.  You'll want your
customizations to go with (not against) the grain of the architecture.


 II. Is it mature enough? E.g. we've created a pull 
 requesthttps://github.com/apache/spark/pull/181which fixes a problem that 
 we were very surprised no one ever stumbled upon
 before. So that's why I'm asking: is Spark being already used in
 professional settings? Can one already trust it being reasonably bug free
 and reliable?


There are lots of ways to use Spark; and not all of the features are
necessarily at the same level of maturity.   For instance, we put all the
jars on the main classpath so we've never run into the issue your pull
request addresses.

We definitely use and rely on Spark on a professional basis.  We have 5+
spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
Once we got them working with the proper configuration settings, they have
been running reliability since.

I would characterize our use of Spark as a better Hadoop, in the sense
that we use it for batch processing only, no streaming yet.   We're happy
it performs better than Hadoop but we don't require/rely on its memory
caching features.  In fact, for most of our jobs it would simplify our
lives if Spark wouldn't cache so many things in memory since it would make
configuration/tuning a lot simpler and jobs would run successfully on the
first try instead of having to tweak things (# of partitions and such).

So, to the concrete issues. Sorry for the long mail, and let me know if I
 should break this out into more threads or if there is some other way to
 have this discussion...

 1. Memory management
 The general direction of these questions is whether it's possible to take
 RDD caching related memory management more into our own hands as LRU
 eviction is nice most of the time but can be very suboptimal in some of our
 use cases.
 A. Somehow prioritize cached RDDs, E.g. mark some essential that one
 really wants to keep. I'm fine with going down in flames if I mark too much
 data essential.
 B. Memory reflection: can you pragmatically get the memory size of a
 cached rdd and memory sizes available in total/per executor? If we could do
 this we could indirectly avoid automatic evictions of things we might
 really want to keep in memory.
 C. Evictions caused by RDD partitions on the driver. I had a setup with
 huge worker memory and smallish memory on the driver JVM. To my surprise,
 the system started to cache RDD partitions on the driver as well. As the
 driver ran out of memory I started to see evictions while there were still
 plenty of space on workers. This resulted in lengthy recomputations. Can
 this be avoided somehow?
 D. Broadcasts. Is it possible to get rid of a broadcast manually, without
 waiting for the LRU eviction taking care of it? Can you tell the size of a
 broadcast programmatically?


 2. Akka lost connections
 We have quite often experienced lost executors due to akka exceptions -
 mostly connection lost or similar. It seems to happen when an executor gets
 extremely busy with some CPU intensive work. Our hypothesis is that akka
 network threads get starved and the executor fails to respond within
 timeout limits. Is this plausible? If yes, what can we do with it?


We've seen these as well.  In our case, increasing the akka timeouts and
framesize helped a lot.

e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}



 In general, these are scary errors in the sense that they come from the
 very core of the framework and it's hard to link it to something we do in
 our own code, and thus hard to find a fix. So a question more for the
 community: how often do you end up scratching your head about cases where
 spark

magic doesn't work perfectly?


For us, this happens most often for jobs processing TBs of data (instead of
GBs)... which is frustrating of course because these jobs cost a lot more
in $$$ + time to run/debug/diagnose than smaller jobs.

It means we have to comb the logs to understand what happened, interpret
stack traces, dump memory / object allocations, read Spark source to
formulate hypothesis about what went wrong and then trial