Re: getting different results from same line of code repeated

2015-11-20 Thread Walrus theCat
I'm running into all kinds of problems with Spark 1.5.1 -- does anyone have
a version that's working smoothly for them?

On Fri, Nov 20, 2015 at 10:50 AM, Dean Wampler 
wrote:

> I didn't expect that to fail. I would call it a bug for sure, since it's
> practically useless if this method doesn't work.
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
>  (O'Reilly)
> Typesafe 
> @deanwampler 
> http://polyglotprogramming.com
>
> On Fri, Nov 20, 2015 at 12:45 PM, Walrus theCat 
> wrote:
>
>> Dean,
>>
>> What's the point of Scala without magic? :-)
>>
>> Thanks for your help.  It's still giving me unreliable results.  There
>> just has to be a way to do this in Spark.  It's a pretty fundamental thing.
>>
>> scala> targets.takeOrdered(1) // imported as implicit here
>> res23: Array[(String, Int)] = Array()
>>
>> scala> targets.takeOrdered(1)(CountOrdering)
>> res24: Array[(String, Int)] = Array((\bmurders?\b,717))
>>
>> scala> targets.takeOrdered(1)(CountOrdering)
>> res25: Array[(String, Int)] = Array((\bmurders?\b,717))
>>
>> scala> targets.takeOrdered(1)(CountOrdering)
>> res26: Array[(String, Int)] = Array((\bguns?\b,1253))
>>
>> scala> targets.takeOrdered(1)(CountOrdering)
>> res27: Array[(String, Int)] = Array((\bmurders?\b,717))
>>
>>
>>
>> On Wed, Nov 18, 2015 at 6:20 PM, Dean Wampler 
>> wrote:
>>
>>> You don't have to use sortBy (although that would be better...). You
>>> have to define an Ordering object and pass it as the second argument list
>>> to takeOrdered()(), or declare it "implicitly". This is more fancy Scala
>>> than Spark should require here. Here's an example I've used:
>>>
>>>   // schema with (String,Int). Order by the Int descending
>>>   object CountOrdering extends Ordering[(String,Int)] {
>>> def compare(a:(String,Int), b:(String,Int)) =
>>>   -(a._2 compare b._2)  // - so that it sorts descending
>>>   }
>>>
>>>   myRDD.takeOrdered(100)(CountOrdering)
>>>
>>>
>>> Or, if you add the keyword "implicit" before "object CountOrdering
>>> {...}", then you can omit the second argument list. That's more magic than
>>> is justified. ;)
>>>
>>> HTH,
>>>
>>> dean
>>>
>>>
>>> Dean Wampler, Ph.D.
>>> Author: Programming Scala, 2nd Edition
>>>  (O'Reilly)
>>> Typesafe 
>>> @deanwampler 
>>> http://polyglotprogramming.com
>>>
>>> On Wed, Nov 18, 2015 at 6:37 PM, Walrus theCat 
>>> wrote:
>>>
 Dean,

 Thanks a lot.  Very helpful.  How would I use takeOrdered to order by
 the second member of the tuple, as I am attempting to do with
 rdd.sortBy(_._2).first?

 On Wed, Nov 18, 2015 at 4:24 PM, Dean Wampler 
 wrote:

> Someone please correct me if I'm wrong, but I think the answer is
> actually "it's not implemented that way" in the sort methods, and it 
> should
> either be documented more explicitly or fixed.
>
> Reading the Spark source code, it looks like each partition is sorted
> internally, and each partition holds a contiguous range of keys in the 
> RDD.
> So, if you know which order the partitions should be in, you can produce a
> total order and hence allow take(n) to do what you expect.
>
> The take(n) appears to walk the list of partitions in order, but it's
> that list that's not deterministic. I can't find any evidence that the RDD
> output by sortBy has this list of partitions in the correct order. So, 
> each
> time you ran your job, the "targets" RDD had sorted partitions, but the
> list of partitions itself was not properly ordered globally. When you got
> an exception, probably the first partition happened to be empty.
>
> Now, you could argue that take(n) is a "debug" method and the
> performance implications of getting the RDD.partitions list in total order
> is not justified. There is a takeOrdered(n) method that is both much more
> efficient than sort().take(n), and it does the correct thing. Still, at 
> the
> very least, the documentation for take(n) should tell you what to expect.
>
> Hope I'm right and this helps!
>
> dean
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
>  (O'Reilly)
> Typesafe 
> @deanwampler 
> http://polyglotprogramming.com
>
> On Wed, Nov 18, 2015 at 5:53 PM, Walrus theCat  > wrote:
>
>> Dean,
>>
>> Thanks for the insight.  Shouldn't take(n) or first return the same
>> result, provided that the RDD is sorted?  If I specify that the RDD is
>> ordered, I need 

Re: getting different results from same line of code repeated

2015-11-20 Thread Ted Yu
Mind trying 1.5.2 release ?

Thanks

On Fri, Nov 20, 2015 at 10:56 AM, Walrus theCat 
wrote:

> I'm running into all kinds of problems with Spark 1.5.1 -- does anyone
> have a version that's working smoothly for them?
>
> On Fri, Nov 20, 2015 at 10:50 AM, Dean Wampler 
> wrote:
>
>> I didn't expect that to fail. I would call it a bug for sure, since it's
>> practically useless if this method doesn't work.
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>>  (O'Reilly)
>> Typesafe 
>> @deanwampler 
>> http://polyglotprogramming.com
>>
>> On Fri, Nov 20, 2015 at 12:45 PM, Walrus theCat 
>> wrote:
>>
>>> Dean,
>>>
>>> What's the point of Scala without magic? :-)
>>>
>>> Thanks for your help.  It's still giving me unreliable results.  There
>>> just has to be a way to do this in Spark.  It's a pretty fundamental thing.
>>>
>>> scala> targets.takeOrdered(1) // imported as implicit here
>>> res23: Array[(String, Int)] = Array()
>>>
>>> scala> targets.takeOrdered(1)(CountOrdering)
>>> res24: Array[(String, Int)] = Array((\bmurders?\b,717))
>>>
>>> scala> targets.takeOrdered(1)(CountOrdering)
>>> res25: Array[(String, Int)] = Array((\bmurders?\b,717))
>>>
>>> scala> targets.takeOrdered(1)(CountOrdering)
>>> res26: Array[(String, Int)] = Array((\bguns?\b,1253))
>>>
>>> scala> targets.takeOrdered(1)(CountOrdering)
>>> res27: Array[(String, Int)] = Array((\bmurders?\b,717))
>>>
>>>
>>>
>>> On Wed, Nov 18, 2015 at 6:20 PM, Dean Wampler 
>>> wrote:
>>>
 You don't have to use sortBy (although that would be better...). You
 have to define an Ordering object and pass it as the second argument list
 to takeOrdered()(), or declare it "implicitly". This is more fancy Scala
 than Spark should require here. Here's an example I've used:

   // schema with (String,Int). Order by the Int descending
   object CountOrdering extends Ordering[(String,Int)] {
 def compare(a:(String,Int), b:(String,Int)) =
   -(a._2 compare b._2)  // - so that it sorts descending
   }

   myRDD.takeOrdered(100)(CountOrdering)


 Or, if you add the keyword "implicit" before "object CountOrdering
 {...}", then you can omit the second argument list. That's more magic than
 is justified. ;)

 HTH,

 dean


 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
  (O'Reilly)
 Typesafe 
 @deanwampler 
 http://polyglotprogramming.com

 On Wed, Nov 18, 2015 at 6:37 PM, Walrus theCat 
 wrote:

> Dean,
>
> Thanks a lot.  Very helpful.  How would I use takeOrdered to order by
> the second member of the tuple, as I am attempting to do with
> rdd.sortBy(_._2).first?
>
> On Wed, Nov 18, 2015 at 4:24 PM, Dean Wampler 
> wrote:
>
>> Someone please correct me if I'm wrong, but I think the answer is
>> actually "it's not implemented that way" in the sort methods, and it 
>> should
>> either be documented more explicitly or fixed.
>>
>> Reading the Spark source code, it looks like each partition is sorted
>> internally, and each partition holds a contiguous range of keys in the 
>> RDD.
>> So, if you know which order the partitions should be in, you can produce 
>> a
>> total order and hence allow take(n) to do what you expect.
>>
>> The take(n) appears to walk the list of partitions in order, but it's
>> that list that's not deterministic. I can't find any evidence that the 
>> RDD
>> output by sortBy has this list of partitions in the correct order. So, 
>> each
>> time you ran your job, the "targets" RDD had sorted partitions, but the
>> list of partitions itself was not properly ordered globally. When you got
>> an exception, probably the first partition happened to be empty.
>>
>> Now, you could argue that take(n) is a "debug" method and the
>> performance implications of getting the RDD.partitions list in total 
>> order
>> is not justified. There is a takeOrdered(n) method that is both much more
>> efficient than sort().take(n), and it does the correct thing. Still, at 
>> the
>> very least, the documentation for take(n) should tell you what to expect.
>>
>> Hope I'm right and this helps!
>>
>> dean
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>>  (O'Reilly)
>> Typesafe 
>> @deanwampler 
>> http://polyglotprogramming.com
>>
>> On Wed, Nov 18, 

getting different results from same line of code repeated

2015-11-18 Thread Walrus theCat
Hi,

I'm launching a Spark cluster with the spark-ec2 script and playing around
in spark-shell. I'm running the same line of code over and over again, and
getting different results, and sometimes exceptions.  Towards the end,
after I cache the first RDD, it gives me the correct result multiple times
in a row before throwing an exception.  How can I get correct behavior out
of these operations on these RDDs?

scala> val targets =
data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[116] at
sortBy at :36

scala> targets.first
res26: (String, Int) = (\bguns?\b,1253)

scala> val targets = data map {_.REGEX} groupBy{identity} map {
Function.tupled(_->_.size)} sortBy(_._2,false)
targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[125] at
sortBy at :36

scala> targets.first
res27: (String, Int) = (nika,7)


scala> val targets =
data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[134] at
sortBy at :36

scala> targets.first
res28: (String, Int) = (\bcalientes?\b,6)

scala> targets.sortBy(_._2,false).first
java.lang.UnsupportedOperationException: empty collection

scala> val targets =
data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache
targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[283] at
sortBy at :36

scala> targets.first
res46: (String, Int) = (\bhurting\ yous?\b,8)

scala> val targets =
data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache
targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[292] at
sortBy at :36

scala> targets.first
java.lang.UnsupportedOperationException: empty collection

scala> val targets =
data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[301] at
sortBy at :36

scala> targets.first
res48: (String, Int) = (\bguns?\b,1253)

scala> val targets =
data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[310] at
sortBy at :36

scala> targets.first
res49: (String, Int) = (\bguns?\b,1253)

scala> val targets =
data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[319] at
sortBy at :36

scala> targets.first
res50: (String, Int) = (\bguns?\b,1253)

scala> val targets =
data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[328] at
sortBy at :36

scala> targets.first
java.lang.UnsupportedOperationException: empty collection


Re: getting different results from same line of code repeated

2015-11-18 Thread Dean Wampler
Methods like first() and take(n) can't guarantee to return the same result
in a distributed context, because Spark uses an algorithm to grab data from
one or more partitions that involves running a distributed job over the
cluster, with tasks on the nodes where the chosen partitions are located.
You can look at the logic in the Spark code base, RDD.scala (first method
calls the take method) and SparkContext.scala (runJob method, which take
calls).

However, the exceptions definitely look like bugs to me. There must be some
empty partitions.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Wed, Nov 18, 2015 at 4:52 PM, Walrus theCat 
wrote:

> Hi,
>
> I'm launching a Spark cluster with the spark-ec2 script and playing around
> in spark-shell. I'm running the same line of code over and over again, and
> getting different results, and sometimes exceptions.  Towards the end,
> after I cache the first RDD, it gives me the correct result multiple times
> in a row before throwing an exception.  How can I get correct behavior out
> of these operations on these RDDs?
>
> scala> val targets =
> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[116]
> at sortBy at :36
>
> scala> targets.first
> res26: (String, Int) = (\bguns?\b,1253)
>
> scala> val targets = data map {_.REGEX} groupBy{identity} map {
> Function.tupled(_->_.size)} sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[125]
> at sortBy at :36
>
> scala> targets.first
> res27: (String, Int) = (nika,7)
>
>
> scala> val targets =
> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[134]
> at sortBy at :36
>
> scala> targets.first
> res28: (String, Int) = (\bcalientes?\b,6)
>
> scala> targets.sortBy(_._2,false).first
> java.lang.UnsupportedOperationException: empty collection
>
> scala> val targets =
> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[283]
> at sortBy at :36
>
> scala> targets.first
> res46: (String, Int) = (\bhurting\ yous?\b,8)
>
> scala> val targets =
> data.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false).cache
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[292]
> at sortBy at :36
>
> scala> targets.first
> java.lang.UnsupportedOperationException: empty collection
>
> scala> val targets =
> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[301]
> at sortBy at :36
>
> scala> targets.first
> res48: (String, Int) = (\bguns?\b,1253)
>
> scala> val targets =
> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[310]
> at sortBy at :36
>
> scala> targets.first
> res49: (String, Int) = (\bguns?\b,1253)
>
> scala> val targets =
> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[319]
> at sortBy at :36
>
> scala> targets.first
> res50: (String, Int) = (\bguns?\b,1253)
>
> scala> val targets =
> data.cache.map(_.REGEX).groupBy(identity).map(Function.tupled(_->_.size)).sortBy(_._2,false)
> targets: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[328]
> at sortBy at :36
>
> scala> targets.first
> java.lang.UnsupportedOperationException: empty collection
>
>
>
>
>