Mind trying 1.5.2 release ? Thanks
On Fri, Nov 20, 2015 at 10:56 AM, Walrus theCat <walrusthe...@gmail.com> wrote: > I'm running into all kinds of problems with Spark 1.5.1 -- does anyone > have a version that's working smoothly for them? > > On Fri, Nov 20, 2015 at 10:50 AM, Dean Wampler <deanwamp...@gmail.com> > wrote: > >> I didn't expect that to fail. I would call it a bug for sure, since it's >> practically useless if this method doesn't work. >> >> Dean Wampler, Ph.D. >> Author: Programming Scala, 2nd Edition >> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >> Typesafe <http://typesafe.com> >> @deanwampler <http://twitter.com/deanwampler> >> http://polyglotprogramming.com >> >> On Fri, Nov 20, 2015 at 12:45 PM, Walrus theCat <walrusthe...@gmail.com> >> wrote: >> >>> Dean, >>> >>> What's the point of Scala without magic? :-) >>> >>> Thanks for your help. It's still giving me unreliable results. There >>> just has to be a way to do this in Spark. It's a pretty fundamental thing. >>> >>> scala> targets.takeOrdered(1) // imported as implicit here >>> res23: Array[(String, Int)] = Array() >>> >>> scala> targets.takeOrdered(1)(CountOrdering) >>> res24: Array[(String, Int)] = Array((\bmurders?\b,717)) >>> >>> scala> targets.takeOrdered(1)(CountOrdering) >>> res25: Array[(String, Int)] = Array((\bmurders?\b,717)) >>> >>> scala> targets.takeOrdered(1)(CountOrdering) >>> res26: Array[(String, Int)] = Array((\bguns?\b,1253)) >>> >>> scala> targets.takeOrdered(1)(CountOrdering) >>> res27: Array[(String, Int)] = Array((\bmurders?\b,717)) >>> >>> >>> >>> On Wed, Nov 18, 2015 at 6:20 PM, Dean Wampler <deanwamp...@gmail.com> >>> wrote: >>> >>>> You don't have to use sortBy (although that would be better...). You >>>> have to define an Ordering object and pass it as the second argument list >>>> to takeOrdered()(), or declare it "implicitly". This is more fancy Scala >>>> than Spark should require here. Here's an example I've used: >>>> >>>> // schema with (String,Int). Order by the Int descending >>>> object CountOrdering extends Ordering[(String,Int)] { >>>> def compare(a:(String,Int), b:(String,Int)) = >>>> -(a._2 compare b._2) // - so that it sorts descending >>>> } >>>> >>>> myRDD.takeOrdered(100)(CountOrdering) >>>> >>>> >>>> Or, if you add the keyword "implicit" before "object CountOrdering >>>> {...}", then you can omit the second argument list. That's more magic than >>>> is justified. ;) >>>> >>>> HTH, >>>> >>>> dean >>>> >>>> >>>> Dean Wampler, Ph.D. >>>> Author: Programming Scala, 2nd Edition >>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >>>> Typesafe <http://typesafe.com> >>>> @deanwampler <http://twitter.com/deanwampler> >>>> http://polyglotprogramming.com >>>> >>>> On Wed, Nov 18, 2015 at 6:37 PM, Walrus theCat <walrusthe...@gmail.com> >>>> wrote: >>>> >>>>> Dean, >>>>> >>>>> Thanks a lot. Very helpful. How would I use takeOrdered to order by >>>>> the second member of the tuple, as I am attempting to do with >>>>> rdd.sortBy(_._2).first? >>>>> >>>>> On Wed, Nov 18, 2015 at 4:24 PM, Dean Wampler <deanwamp...@gmail.com> >>>>> wrote: >>>>> >>>>>> Someone please correct me if I'm wrong, but I think the answer is >>>>>> actually "it's not implemented that way" in the sort methods, and it >>>>>> should >>>>>> either be documented more explicitly or fixed. >>>>>> >>>>>> Reading the Spark source code, it looks like each partition is sorted >>>>>> internally, and each partition holds a contiguous range of keys in the >>>>>> RDD. >>>>>> So, if you know which order the partitions should be in, you can produce >>>>>> a >>>>>> total order and hence allow take(n) to do what you expect. >>>>>> >>>>>> The take(n) appears to walk the list of partitions in order, but it's >>>>>> that list that's not deterministic. I can't find any evidence that the >>>>>> RDD >>>>>> output by sortBy has this list of partitions in the correct order. So, >>>>>> each >>>>>> time you ran your job, the "targets" RDD had sorted partitions, but the >>>>>> list of partitions itself was not properly ordered globally. When you got >>>>>> an exception, probably the first partition happened to be empty. >>>>>> >>>>>> Now, you could argue that take(n) is a "debug" method and the >>>>>> performance implications of getting the RDD.partitions list in total >>>>>> order >>>>>> is not justified. There is a takeOrdered(n) method that is both much more >>>>>> efficient than sort().take(n), and it does the correct thing. Still, at >>>>>> the >>>>>> very least, the documentation for take(n) should tell you what to expect. >>>>>> >>>>>> Hope I'm right and this helps! >>>>>> >>>>>> dean >>>>>> >>>>>> Dean Wampler, Ph.D. >>>>>> Author: Programming Scala, 2nd Edition >>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >>>>>> Typesafe <http://typesafe.com> >>>>>> @deanwampler <http://twitter.com/deanwampler> >>>>>> http://polyglotprogramming.com >>>>>> >>>>>> On Wed, Nov 18, 2015 at 5:53 PM, Walrus theCat < >>>>>> walrusthe...@gmail.com> wrote: >>>>>> >>>>>>> Dean, >>>>>>> >>>>>>> Thanks for the insight. Shouldn't take(n) or first return the same >>>>>>> result, provided that the RDD is sorted? If I specify that the RDD is >>>>>>> ordered, I need to have guarantees as I reason about it that the first >>>>>>> item >>>>>>> is in fact the first, and the last is the last. >>>>>>> >>>>>>> On Wed, Nov 18, 2015 at 3:16 PM, Dean Wampler <deanwamp...@gmail.com >>>>>>> > wrote: >>>>>>> >>>>>>>> Methods like first() and take(n) can't guarantee to return the same >>>>>>>> result in a distributed context, because Spark uses an algorithm to >>>>>>>> grab >>>>>>>> data from one or more partitions that involves running a distributed >>>>>>>> job >>>>>>>> over the cluster, with tasks on the nodes where the chosen partitions >>>>>>>> are >>>>>>>> located. You can look at the logic in the Spark code base, RDD.scala >>>>>>>> (first method calls the take method) and SparkContext.scala (runJob >>>>>>>> method, >>>>>>>> which take calls). >>>>>>>> >>>>>>>> However, the exceptions definitely look like bugs to me. There must >>>>>>>> be some empty partitions. >>>>>>>> >>>>>>>> dean >>>>>>>> >>>>>>>> Dean Wampler, Ph.D. >>>>>>>> Author: Programming Scala, 2nd Edition >>>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >>>>>>>> Typesafe <http://typesafe.com> >>>>>>>> @deanwampler <http://twitter.com/deanwampler> >>>>>>>> http://polyglotprogramming.com >>>>>>>> >>>>>>>> On Wed, Nov 18, 2015 at 4:52 PM, Walrus theCat < >>>>>>>> walrusthe...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I'm launching a Spark cluster with the spark-ec2 script and >>>>>>>>> playing around in spark-shell. I'm running the same line of code over >>>>>>>>> and >>>>>>>>> over again, and getting different results, and sometimes exceptions. >>>>>>>>> Towards the end, after I cache the first RDD, it gives me the correct >>>>>>>>> result multiple times in a row before throwing an exception. How can >>>>>>>>> I get >>>>>>>>> correct behavior out of these operations on these RDDs? >>>>>>>>> >>>>>>>>> scala> val targets = >>>>>>>>> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) >>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] = >>>>>>>>> MapPartitionsRDD[116] at sortBy at <console>:36 >>>>>>>>> >>>>>>>>> scala> targets.first >>>>>>>>> res26: (String, Int) = (\bguns?\b,1253) >>>>>>>>> >>>>>>>>> scala> val targets = data map {_.REGEX} groupBy{identity} map { >>>>>>>>> Function.tupled(_->_.size)} sortBy(_._2,false) >>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] = >>>>>>>>> MapPartitionsRDD[125] at sortBy at <console>:36 >>>>>>>>> >>>>>>>>> scala> targets.first >>>>>>>>> res27: (String, Int) = (nika,7) >>>>>>>>> >>>>>>>>> >>>>>>>>> scala> val targets = >>>>>>>>> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) >>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] = >>>>>>>>> MapPartitionsRDD[134] at sortBy at <console>:36 >>>>>>>>> >>>>>>>>> scala> targets.first >>>>>>>>> res28: (String, Int) = (\bcalientes?\b,6) >>>>>>>>> >>>>>>>>> scala> targets.sortBy(_._2,false).first >>>>>>>>> java.lang.UnsupportedOperationException: empty collection >>>>>>>>> >>>>>>>>> scala> val targets = >>>>>>>>> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache >>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] = >>>>>>>>> MapPartitionsRDD[283] at sortBy at <console>:36 >>>>>>>>> >>>>>>>>> scala> targets.first >>>>>>>>> res46: (String, Int) = (\bhurting\ yous?\b,8) >>>>>>>>> >>>>>>>>> scala> val targets = >>>>>>>>> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache >>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] = >>>>>>>>> MapPartitionsRDD[292] at sortBy at <console>:36 >>>>>>>>> >>>>>>>>> scala> targets.first >>>>>>>>> java.lang.UnsupportedOperationException: empty collection >>>>>>>>> >>>>>>>>> scala> val targets = >>>>>>>>> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) >>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] = >>>>>>>>> MapPartitionsRDD[301] at sortBy at <console>:36 >>>>>>>>> >>>>>>>>> scala> targets.first >>>>>>>>> res48: (String, Int) = (\bguns?\b,1253) >>>>>>>>> >>>>>>>>> scala> val targets = >>>>>>>>> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) >>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] = >>>>>>>>> MapPartitionsRDD[310] at sortBy at <console>:36 >>>>>>>>> >>>>>>>>> scala> targets.first >>>>>>>>> res49: (String, Int) = (\bguns?\b,1253) >>>>>>>>> >>>>>>>>> scala> val targets = >>>>>>>>> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) >>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] = >>>>>>>>> MapPartitionsRDD[319] at sortBy at <console>:36 >>>>>>>>> >>>>>>>>> scala> targets.first >>>>>>>>> res50: (String, Int) = (\bguns?\b,1253) >>>>>>>>> >>>>>>>>> scala> val targets = >>>>>>>>> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false) >>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] = >>>>>>>>> MapPartitionsRDD[328] at sortBy at <console>:36 >>>>>>>>> >>>>>>>>> scala> targets.first >>>>>>>>> java.lang.UnsupportedOperationException: empty collection >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >