Hey Mark, I didn't mean to say that the information isn't out there -- just that when something goes wrong with spark, the scope of what could be wrong is so large - some bad setting with JVM, serializer, akka, badly written scala code, algorithm wrong, check worker logs, check executor stderrs, ....
When I looked at this post this morning, my initial thought wasn't that "countByValue" would be at fault. ...probably since I've only been using Scala/Spark for a month or so. It was just a suggestion to help newbies come up to speed more quickly and gain insights into how to debug issues. On Tue, Oct 22, 2013 at 8:14 AM, Mark Hamstra <m...@clearstorydata.com>wrote: > There's no need to guess at that. The docs tell you directly: > > def countByValue(): Map[T, Long] > > Return the count of each unique value in this RDD as a map of (value, > count) pairs. The final combine step happens locally on the master, > equivalent to running a single reduce task. > > > > On Tue, Oct 22, 2013 at 7:22 AM, Shay Seng <s...@1618labs.com> wrote: > >> Hi Matei, >> >> I've seen several memory tuning queries on this mailing list, and also >> heard the same kinds of queries at the spark meetup. In fact the last >> bullet point in Josh Carver(?) slides, the guy from Bizo, was "memory >> tuning is still a mystery". >> >> I certainly had lots of issues in when I first started. From memory >> issues to gc issues, things seem to run fine until you try something with >> 500GB of data etc. >> >> I was wondering if you could write up a little white paper or some guide >> lines on how to set memory values, and what to look at when something goes >> wrong? Eg. I would never gave guessed that countByValue happens on a single >> machine etc. >> On Oct 21, 2013 6:18 PM, "Matei Zaharia" <matei.zaha...@gmail.com> >> wrote: >> >>> Hi there, >>> >>> The problem is that countByValue happens in only a single reduce task -- >>> this is probably something we should fix but it's basically not designed >>> for lots of values. Instead, do the count in parallel as follows: >>> >>> val counts = mapped.map(str => (str, 1)).reduceByKey((a, b) => a + b) >>> >>> If this still has trouble, you can also increase the level of >>> parallelism of reduceByKey by passing it a second parameter for the number >>> of tasks (e.g. 100). >>> >>> BTW one other small thing with your code, flatMap should actually work >>> fine if your function returns an Iterator to Traversable, so there's no >>> need to call toList and return a Seq in ngrams; you can just return an >>> Iterator[String]. >>> >>> Matei >>> >>> On Oct 21, 2013, at 1:05 PM, Timothy Perrigo <tperr...@gmail.com> wrote: >>> >>> > Hi everyone, >>> > I am very new to Spark, so as a learning exercise I've set up a small >>> cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves), which >>> I'm hoping to use to calculate ngram frequencies from text files of various >>> sizes (I'm not doing anything with them; I just thought this would be >>> slightly more interesting than the usual 'word count' example). Currently, >>> I'm trying to work with a 1GB text file, but running into memory issues. >>> I'm wondering what parameters I should be setting (in spark-env.sh) in >>> order to properly utilize the cluster. Right now, I'd be happy just to >>> have the process complete successfully with the 1 gig file, so I'd really >>> appreciate any suggestions you all might have. >>> > >>> > Here's a summary of the code I'm running through the spark shell on >>> the master: >>> > >>> > def ngrams(s: String, n: Int = 3): Seq[String] = { >>> > (s.split("\\s+").sliding(n)).filter(_.length == n).map(_.mkString(" >>> ")).map(_.trim).toList >>> > } >>> > >>> > val text = sc.textFile("s3n://my-bucket/my-1gb-text-file") >>> > >>> > val mapped = text.filter(_.trim.length > 0).flatMap(ngrams(_, 3)) >>> > >>> > So far so good; the problems come during the reduce phase. With small >>> files, I was able to issue the following to calculate the most frequently >>> occurring trigram: >>> > >>> > val topNgram = (mapped countByValue) reduce((a:(String, Long), >>> b:(String, Long)) => if (a._2 > b._2) a else b) >>> > >>> > With the 1 gig file, though, I've been running into OutOfMemory >>> errors, so I decided to split the reduction to several steps, starting with >>> simply issuing countByValue of my "mapped" RDD, but I have yet to get it to >>> complete successfully. >>> > >>> > SPARK_MEM is currently set to 6154m. I also bumped up the >>> spark.akka.framesize setting to 500 (though at this point, I was grasping >>> at straws; I'm not sure what a "proper" value would be). What properties >>> should I be setting for a job of this size on a cluster of 3 m1.large >>> slaves? (The cluster was initially configured using the spark-ec2 scripts). >>> Also, programmatically, what should I be doing differently? (For example, >>> should I be setting the minimum number of splits when reading the text >>> file? If so, what would be a good default?). >>> > >>> > I apologize for what I'm sure are very naive questions. I think Spark >>> is a fantastic project and have enjoyed working with it, but I'm still very >>> much a newbie and would appreciate any help you all can provide (as well as >>> any 'rules-of-thumb' or best practices I should be following). >>> > >>> > Thanks, >>> > Tim Perrigo >>> >>> >