Mind trying 1.5.2 release ?

Thanks

On Fri, Nov 20, 2015 at 10:56 AM, Walrus theCat <walrusthe...@gmail.com>
wrote:

> I'm running into all kinds of problems with Spark 1.5.1 -- does anyone
> have a version that's working smoothly for them?
>
> On Fri, Nov 20, 2015 at 10:50 AM, Dean Wampler <deanwamp...@gmail.com>
> wrote:
>
>> I didn't expect that to fail. I would call it a bug for sure, since it's
>> practically useless if this method doesn't work.
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>> Typesafe <http://typesafe.com>
>> @deanwampler <http://twitter.com/deanwampler>
>> http://polyglotprogramming.com
>>
>> On Fri, Nov 20, 2015 at 12:45 PM, Walrus theCat <walrusthe...@gmail.com>
>> wrote:
>>
>>> Dean,
>>>
>>> What's the point of Scala without magic? :-)
>>>
>>> Thanks for your help.  It's still giving me unreliable results.  There
>>> just has to be a way to do this in Spark.  It's a pretty fundamental thing.
>>>
>>> scala> targets.takeOrdered(1) // imported as implicit here
>>> res23: Array[(String, Int)] = Array()
>>>
>>> scala> targets.takeOrdered(1)(CountOrdering)
>>> res24: Array[(String, Int)] = Array((\bmurders?\b,717))
>>>
>>> scala> targets.takeOrdered(1)(CountOrdering)
>>> res25: Array[(String, Int)] = Array((\bmurders?\b,717))
>>>
>>> scala> targets.takeOrdered(1)(CountOrdering)
>>> res26: Array[(String, Int)] = Array((\bguns?\b,1253))
>>>
>>> scala> targets.takeOrdered(1)(CountOrdering)
>>> res27: Array[(String, Int)] = Array((\bmurders?\b,717))
>>>
>>>
>>>
>>> On Wed, Nov 18, 2015 at 6:20 PM, Dean Wampler <deanwamp...@gmail.com>
>>> wrote:
>>>
>>>> You don't have to use sortBy (although that would be better...). You
>>>> have to define an Ordering object and pass it as the second argument list
>>>> to takeOrdered()(), or declare it "implicitly". This is more fancy Scala
>>>> than Spark should require here. Here's an example I've used:
>>>>
>>>>   // schema with (String,Int). Order by the Int descending
>>>>   object CountOrdering extends Ordering[(String,Int)] {
>>>>     def compare(a:(String,Int), b:(String,Int)) =
>>>>       -(a._2 compare b._2)  // - so that it sorts descending
>>>>   }
>>>>
>>>>   myRDD.takeOrdered(100)(CountOrdering)
>>>>
>>>>
>>>> Or, if you add the keyword "implicit" before "object CountOrdering
>>>> {...}", then you can omit the second argument list. That's more magic than
>>>> is justified. ;)
>>>>
>>>> HTH,
>>>>
>>>> dean
>>>>
>>>>
>>>> Dean Wampler, Ph.D.
>>>> Author: Programming Scala, 2nd Edition
>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>> Typesafe <http://typesafe.com>
>>>> @deanwampler <http://twitter.com/deanwampler>
>>>> http://polyglotprogramming.com
>>>>
>>>> On Wed, Nov 18, 2015 at 6:37 PM, Walrus theCat <walrusthe...@gmail.com>
>>>> wrote:
>>>>
>>>>> Dean,
>>>>>
>>>>> Thanks a lot.  Very helpful.  How would I use takeOrdered to order by
>>>>> the second member of the tuple, as I am attempting to do with
>>>>> rdd.sortBy(_._2).first?
>>>>>
>>>>> On Wed, Nov 18, 2015 at 4:24 PM, Dean Wampler <deanwamp...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Someone please correct me if I'm wrong, but I think the answer is
>>>>>> actually "it's not implemented that way" in the sort methods, and it 
>>>>>> should
>>>>>> either be documented more explicitly or fixed.
>>>>>>
>>>>>> Reading the Spark source code, it looks like each partition is sorted
>>>>>> internally, and each partition holds a contiguous range of keys in the 
>>>>>> RDD.
>>>>>> So, if you know which order the partitions should be in, you can produce 
>>>>>> a
>>>>>> total order and hence allow take(n) to do what you expect.
>>>>>>
>>>>>> The take(n) appears to walk the list of partitions in order, but it's
>>>>>> that list that's not deterministic. I can't find any evidence that the 
>>>>>> RDD
>>>>>> output by sortBy has this list of partitions in the correct order. So, 
>>>>>> each
>>>>>> time you ran your job, the "targets" RDD had sorted partitions, but the
>>>>>> list of partitions itself was not properly ordered globally. When you got
>>>>>> an exception, probably the first partition happened to be empty.
>>>>>>
>>>>>> Now, you could argue that take(n) is a "debug" method and the
>>>>>> performance implications of getting the RDD.partitions list in total 
>>>>>> order
>>>>>> is not justified. There is a takeOrdered(n) method that is both much more
>>>>>> efficient than sort().take(n), and it does the correct thing. Still, at 
>>>>>> the
>>>>>> very least, the documentation for take(n) should tell you what to expect.
>>>>>>
>>>>>> Hope I'm right and this helps!
>>>>>>
>>>>>> dean
>>>>>>
>>>>>> Dean Wampler, Ph.D.
>>>>>> Author: Programming Scala, 2nd Edition
>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>>>> Typesafe <http://typesafe.com>
>>>>>> @deanwampler <http://twitter.com/deanwampler>
>>>>>> http://polyglotprogramming.com
>>>>>>
>>>>>> On Wed, Nov 18, 2015 at 5:53 PM, Walrus theCat <
>>>>>> walrusthe...@gmail.com> wrote:
>>>>>>
>>>>>>> Dean,
>>>>>>>
>>>>>>> Thanks for the insight.  Shouldn't take(n) or first return the same
>>>>>>> result, provided that the RDD is sorted?  If I specify that the RDD is
>>>>>>> ordered, I need to have guarantees as I reason about it that the first 
>>>>>>> item
>>>>>>> is in fact the first, and the last is the last.
>>>>>>>
>>>>>>> On Wed, Nov 18, 2015 at 3:16 PM, Dean Wampler <deanwamp...@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Methods like first() and take(n) can't guarantee to return the same
>>>>>>>> result in a distributed context, because Spark uses an algorithm to 
>>>>>>>> grab
>>>>>>>> data from one or more partitions that involves running a distributed 
>>>>>>>> job
>>>>>>>> over the cluster, with tasks on the nodes where the chosen partitions 
>>>>>>>> are
>>>>>>>> located.  You can look at the logic in the Spark code base, RDD.scala
>>>>>>>> (first method calls the take method) and SparkContext.scala (runJob 
>>>>>>>> method,
>>>>>>>> which take calls).
>>>>>>>>
>>>>>>>> However, the exceptions definitely look like bugs to me. There must
>>>>>>>> be some empty partitions.
>>>>>>>>
>>>>>>>> dean
>>>>>>>>
>>>>>>>> Dean Wampler, Ph.D.
>>>>>>>> Author: Programming Scala, 2nd Edition
>>>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>>>>>> Typesafe <http://typesafe.com>
>>>>>>>> @deanwampler <http://twitter.com/deanwampler>
>>>>>>>> http://polyglotprogramming.com
>>>>>>>>
>>>>>>>> On Wed, Nov 18, 2015 at 4:52 PM, Walrus theCat <
>>>>>>>> walrusthe...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I'm launching a Spark cluster with the spark-ec2 script and
>>>>>>>>> playing around in spark-shell. I'm running the same line of code over 
>>>>>>>>> and
>>>>>>>>> over again, and getting different results, and sometimes exceptions.
>>>>>>>>> Towards the end, after I cache the first RDD, it gives me the correct
>>>>>>>>> result multiple times in a row before throwing an exception.  How can 
>>>>>>>>> I get
>>>>>>>>> correct behavior out of these operations on these RDDs?
>>>>>>>>>
>>>>>>>>> scala> val targets =
>>>>>>>>> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
>>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] =
>>>>>>>>> MapPartitionsRDD[116] at sortBy at <console>:36
>>>>>>>>>
>>>>>>>>> scala> targets.first
>>>>>>>>> res26: (String, Int) = (\bguns?\b,1253)
>>>>>>>>>
>>>>>>>>> scala> val targets = data map {_.REGEX} groupBy{identity} map {
>>>>>>>>> Function.tupled(_->_.size)} sortBy(_._2,false)
>>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] =
>>>>>>>>> MapPartitionsRDD[125] at sortBy at <console>:36
>>>>>>>>>
>>>>>>>>> scala> targets.first
>>>>>>>>> res27: (String, Int) = (nika,7)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> scala> val targets =
>>>>>>>>> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
>>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] =
>>>>>>>>> MapPartitionsRDD[134] at sortBy at <console>:36
>>>>>>>>>
>>>>>>>>> scala> targets.first
>>>>>>>>> res28: (String, Int) = (\bcalientes?\b,6)
>>>>>>>>>
>>>>>>>>> scala> targets.sortBy(_._2,false).first
>>>>>>>>> java.lang.UnsupportedOperationException: empty collection
>>>>>>>>>
>>>>>>>>> scala> val targets =
>>>>>>>>> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache
>>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] =
>>>>>>>>> MapPartitionsRDD[283] at sortBy at <console>:36
>>>>>>>>>
>>>>>>>>> scala> targets.first
>>>>>>>>> res46: (String, Int) = (\bhurting\ yous?\b,8)
>>>>>>>>>
>>>>>>>>> scala> val targets =
>>>>>>>>> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache
>>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] =
>>>>>>>>> MapPartitionsRDD[292] at sortBy at <console>:36
>>>>>>>>>
>>>>>>>>> scala> targets.first
>>>>>>>>> java.lang.UnsupportedOperationException: empty collection
>>>>>>>>>
>>>>>>>>> scala> val targets =
>>>>>>>>> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
>>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] =
>>>>>>>>> MapPartitionsRDD[301] at sortBy at <console>:36
>>>>>>>>>
>>>>>>>>> scala> targets.first
>>>>>>>>> res48: (String, Int) = (\bguns?\b,1253)
>>>>>>>>>
>>>>>>>>> scala> val targets =
>>>>>>>>> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
>>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] =
>>>>>>>>> MapPartitionsRDD[310] at sortBy at <console>:36
>>>>>>>>>
>>>>>>>>> scala> targets.first
>>>>>>>>> res49: (String, Int) = (\bguns?\b,1253)
>>>>>>>>>
>>>>>>>>> scala> val targets =
>>>>>>>>> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
>>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] =
>>>>>>>>> MapPartitionsRDD[319] at sortBy at <console>:36
>>>>>>>>>
>>>>>>>>> scala> targets.first
>>>>>>>>> res50: (String, Int) = (\bguns?\b,1253)
>>>>>>>>>
>>>>>>>>> scala> val targets =
>>>>>>>>> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
>>>>>>>>> targets: org.apache.spark.rdd.RDD[(String, Int)] =
>>>>>>>>> MapPartitionsRDD[328] at sortBy at <console>:36
>>>>>>>>>
>>>>>>>>> scala> targets.first
>>>>>>>>> java.lang.UnsupportedOperationException: empty collection
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to