Hi Jerry,

In fact, HashSet approach is what we took earlier. However, this did not
work with a Windowed DStream (i.e. if we provide a forward and inverse
reduce operation). The reason is that the inverse reduce tries to remove
values that may still exist elsewhere in the window and should not have
been removed. We discovered the logic error recently i.e. basically this is
not an invertible function.

I thought of doing this without an inverse reduce function but even over a
15 minute window, this is going to be an expensive operation.

Instead, I have put in place a solution using HashMap where I keep the
actual counts around and in inverse reduce we decrement the counts. At the
end we remove the keys for which the value is 0 and then take the size of
the map (in terms of how many keys it has that have non-zero counts). This
gives us the distinct count.

I was hoping that there is a more straightforward way of doing this within
Spark itself without having to resort to a "hack" like this.

Thanks
Nikunj



On Sun, Jul 19, 2015 at 6:13 PM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi Nikunj,
>
> Sorry, I totally misread your question.
> I think you need to first groupbykey (get all values of the same key
> together), then follow by mapValues (probably put the values into a set and
> then take the size of it because you want a distinct count)
>
> HTH,
>
> Jerry
>
> Sent from my iPhone
>
> On 19 Jul, 2015, at 8:48 pm, N B <nb.nos...@gmail.com> wrote:
>
> Hi Suyog,
>
> That code outputs the following:
>
> key2 val22 : 1
> key1 val1 : 2
> key2 val2 : 2
>
> while the output I want to achieve would have been (with your example):
>
> key1 : 2
> key2 : 2
>
> because there are 2 distinct types of values for each key ( regardless of
> their actual duplicate counts .. hence the use of the DISTINCT keyword in
> the query equivalent ).
>
> Thanks
> Nikunj
>
>
> On Sun, Jul 19, 2015 at 2:37 PM, suyog choudhari <suyogchoudh...@gmail.com
> > wrote:
>
>> public static void main(String[] args) {
>>
>>  SparkConf sparkConf = new SparkConf().setAppName("CountDistinct");
>>
>>  JavaSparkContext jsc = new JavaSparkContext(sparkConf);
>>
>>   List<Tuple2<String, String>> list = new ArrayList<Tuple2<String,
>> String>>();
>>
>>   list.add(new Tuple2<String, String>("key1", "val1"));
>>
>>  list.add(new Tuple2<String, String>("key1", "val1"));
>>
>>  list.add(new Tuple2<String, String>("key2", "val2"));
>>
>>  list.add(new Tuple2<String, String>("key2", "val2"));
>>
>>  list.add(new Tuple2<String, String>("key2", "val22"));
>>
>>     JavaPairRDD<String, Integer> rdd =  jsc.parallelize(list).mapToPair(t
>> -> new Tuple2<String, Integer>(t._1 + " " +t._2, 1));
>>
>>   JavaPairRDD<String, Integer> rdd2 = rdd.reduceByKey((c1, c2) -> c1+c2
>> );
>>
>>     List<Tuple2<String, Integer>> output =  rdd2.collect();
>>
>>   for (Tuple2<?,?> tuple : output) {
>>
>>         System.out.println( tuple._1() + " : " + tuple._2() );
>>
>>     }
>>
>>   }
>>
>> On Sun, Jul 19, 2015 at 2:28 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> You mean this does not work?
>>>
>>> SELECT key, count(value) from table group by key
>>>
>>>
>>>
>>> On Sun, Jul 19, 2015 at 2:28 PM, N B <nb.nos...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> How do I go about performing the equivalent of the following SQL clause
>>>> in Spark Streaming? I will be using this on a Windowed DStream.
>>>>
>>>> SELECT key, count(distinct(value)) from table group by key;
>>>>
>>>> so for example, given the following dataset in the table:
>>>>
>>>>  key | value
>>>> -----+-------
>>>>  k1  | v1
>>>>  k1  | v1
>>>>  k1  | v2
>>>>  k1  | v3
>>>>  k1  | v3
>>>>  k2  | vv1
>>>>  k2  | vv1
>>>>  k2  | vv2
>>>>  k2  | vv2
>>>>  k2  | vv2
>>>>  k3  | vvv1
>>>>  k3  | vvv1
>>>>
>>>> the result will be:
>>>>
>>>>  key | count
>>>> -----+-------
>>>>  k1  |     3
>>>>  k2  |     2
>>>>  k3  |     1
>>>>
>>>> Thanks
>>>> Nikunj
>>>>
>>>>
>>>
>>
>

Reply via email to