That appears to work, with a few changes to get the types correct: input.distinct().combineByKey((s: String) => 1, (agg: Int, s: String) => agg + 1, (agg1: Int, agg2: Int) => agg1 + agg2)
dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) Typesafe <http://typesafe.com> @deanwampler <http://twitter.com/deanwampler> http://polyglotprogramming.com On Mon, Apr 13, 2015 at 3:24 PM, Victor Tso-Guillen <v...@paxata.com> wrote: > How about this? > > input.distinct().combineByKey((v: V) => 1, (agg: Int, x: Int) => agg + 1, > (agg1: Int, agg2: Int) => agg1 + agg2).collect() > > On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler <deanwamp...@gmail.com> > wrote: > >> The problem with using collect is that it will fail for large data sets, >> as you'll attempt to copy the entire RDD to the memory of your driver >> program. The following works (Scala syntax, but similar to Python): >> >> scala> val i1 = input.distinct.groupByKey >> scala> i1.foreach(println) >> (1,CompactBuffer(beta, alpha, foo)) >> (3,CompactBuffer(foo)) >> (2,CompactBuffer(alpha, bar)) >> >> scala> val i2 = i1.map(tup => (tup._1, tup._2.size)) >> scala> i1.foreach(println) >> (1,3) >> (3,1) >> (2,2) >> >> The "i2" line passes a function that takes a tuple argument, then >> constructs a new output tuple with the first element and the size of the >> second (each CompactBuffer). An alternative pattern match syntax would be. >> >> scala> val i2 = i1.map { case (key, buffer) => (key, buffer.size) } >> >> This should work as long as none of the CompactBuffers are too large, >> which could happen for extremely large data sets. >> >> dean >> >> Dean Wampler, Ph.D. >> Author: Programming Scala, 2nd Edition >> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >> Typesafe <http://typesafe.com> >> @deanwampler <http://twitter.com/deanwampler> >> http://polyglotprogramming.com >> >> On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw <marco.s...@gmail.com> >> wrote: >> >>> **Learning the ropes** >>> >>> I'm trying to grasp the concept of using the pipeline in pySpark... >>> >>> Simplified example: >>> >>> >>> list=[(1,"alpha"),(1,"beta"),(1,"foo"),(1,"alpha"),(2,"alpha"),(2,"alpha"),(2,"bar"),(3,"foo")] >>> >>> Desired outcome: >>> [(1,3),(2,2),(3,1)] >>> >>> Basically for each key, I want the number of unique values. >>> >>> I've tried different approaches, but am I really using Spark >>> effectively? I wondered if I would do something like: >>> >>> input=sc.parallelize(list) >>> >>> input.groupByKey().collect() >>> >>> Then I wondered if I could do something like a foreach over each key >>> value, and then map the actual values and reduce them. Pseudo-code: >>> >>> input.groupbykey() >>> .keys >>> .foreach(_.values >>> .map(lambda x: x,1) >>> .reducebykey(lambda a,b:a+b) >>> .count() >>> ) >>> >>> I was somehow hoping that the key would get the current value of count, >>> and thus be the count of the unique keys, which is exactly what I think I'm >>> looking for. >>> >>> Am I way off base on how I could accomplish this? >>> >>> Marco >>> >> >> >