That appears to work, with a few changes to get the types correct:

input.distinct().combineByKey((s: String) => 1, (agg: Int, s: String) =>
agg + 1, (agg1: Int, agg2: Int) => agg1 + agg2)

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Mon, Apr 13, 2015 at 3:24 PM, Victor Tso-Guillen <v...@paxata.com> wrote:

> How about this?
>
> input.distinct().combineByKey((v: V) => 1, (agg: Int, x: Int) => agg + 1,
> (agg1: Int, agg2: Int) => agg1 + agg2).collect()
>
> On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler <deanwamp...@gmail.com>
> wrote:
>
>> The problem with using collect is that it will fail for large data sets,
>> as you'll attempt to copy the entire RDD to the memory of your driver
>> program. The following works (Scala syntax, but similar to Python):
>>
>> scala> val i1 = input.distinct.groupByKey
>> scala> i1.foreach(println)
>> (1,CompactBuffer(beta, alpha, foo))
>> (3,CompactBuffer(foo))
>> (2,CompactBuffer(alpha, bar))
>>
>> scala> val i2 = i1.map(tup => (tup._1, tup._2.size))
>> scala> i1.foreach(println)
>> (1,3)
>> (3,1)
>> (2,2)
>>
>> The "i2" line passes a function that takes a tuple argument, then
>> constructs a new output tuple with the first element and the size of the
>> second (each CompactBuffer). An alternative pattern match syntax would be.
>>
>> scala> val i2 = i1.map { case (key, buffer) => (key, buffer.size) }
>>
>> This should work as long as none of the CompactBuffers are too large,
>> which could happen for extremely large data sets.
>>
>> dean
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>> Typesafe <http://typesafe.com>
>> @deanwampler <http://twitter.com/deanwampler>
>> http://polyglotprogramming.com
>>
>> On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw <marco.s...@gmail.com>
>> wrote:
>>
>>> **Learning the ropes**
>>>
>>> I'm trying to grasp the concept of using the pipeline in pySpark...
>>>
>>> Simplified example:
>>> >>>
>>> list=[(1,"alpha"),(1,"beta"),(1,"foo"),(1,"alpha"),(2,"alpha"),(2,"alpha"),(2,"bar"),(3,"foo")]
>>>
>>> Desired outcome:
>>> [(1,3),(2,2),(3,1)]
>>>
>>> Basically for each key, I want the number of unique values.
>>>
>>> I've tried different approaches, but am I really using Spark
>>> effectively?  I wondered if I would do something like:
>>> >>> input=sc.parallelize(list)
>>> >>> input.groupByKey().collect()
>>>
>>> Then I wondered if I could do something like a foreach over each key
>>> value, and then map the actual values and reduce them.  Pseudo-code:
>>>
>>> input.groupbykey()
>>> .keys
>>> .foreach(_.values
>>> .map(lambda x: x,1)
>>> .reducebykey(lambda a,b:a+b)
>>> .count()
>>> )
>>>
>>> I was somehow hoping that the key would get the current value of count,
>>> and thus be the count of the unique keys, which is exactly what I think I'm
>>> looking for.
>>>
>>> Am I way off base on how I could accomplish this?
>>>
>>> Marco
>>>
>>
>>
>

Reply via email to