Hi subash,

A stream is infinite, hence it has no notion of "final" count. To get
distinct counts you need to define a period (= a window [1] ) over which
you count elements and emit a result, by adding a winow operator before the
reduce.
For example the following will emit distinct counts every 10 minutes over
the last 10 minutes period:

*stream.keyby(2)*
*      .window(Time.minutes(10))*
*      .reduce(new GridPointsCount())*

[1]
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html


On Wed, Aug 24, 2016 at 6:14 PM, subash basnet <yasub...@gmail.com> wrote:

> Hello Kostas,
>
> Sorry for late reply. But I couldn't understand how to apply split in
> datastream, such as in below to get the distinct output stream element with
> the count after applying group by and reduce.
>
> DataStream<Tuple2<String, Long>> gridWithDensity =
> pointsWithGridCoordinates.map(new AddCountAppender())
> .keyBy(2).reduce(*new GridPointsCount()*).map(new
> RetrieveGridWithCount());
> gridWithDensity.print();
>
> Current Output:
>       Required Output:
> (33330,1)
>          (33330,3)
> (33330,2)
>          (00000,4)
> (00000,1)
> (00000,2)
> (00000,3)
> (33330,3)
> (00000,4)
>
> public static final class GridPointsCount implements
> ReduceFunction<Tuple4<Point, Grid, String, Long>> {
> @Override
> public Tuple4<Point, Grid, String, Long> reduce(Tuple4<Point, Grid,
> String, Long> val1,
> Tuple4<Point, Grid, String, Long> val2) {
> return new Tuple4<Point, Grid, String, Long>(val1.f0, val1.f1, val1.f2,
> val1.f3 + val2.f3);
> }
> }
>
>
> Regards,
> Subash Basnet
>
> On Mon, Aug 22, 2016 at 6:34 PM, Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> [image: Boxbe] <https://www.boxbe.com/overview> This message is eligible
>> for Automatic Cleanup! (k.klou...@data-artisans.com) Add cleanup rule
>> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Fkey%3DDbXSEeCvlLA38dy4LWQ%252Bbi5EVsEyM7uPcveSQFq%252FvFY%253D%26token%3DiyAq2d4gLBvR1lxgjbsxqD%252BdBWvTfV7BV7%252BvSygyQXwgHoGt5X14QdpMF1iSW4G0Qw7Sb6h%252FaXTQuS4dPnyuWCemTmCcMq0fJSpZwsztLpp9PMU7tCLvpRqvo9N%252B9Aj7ixZD8zvIdLvXB2%252FQqkPEDw%253D%253D&tc_serial=26521059433&tc_rand=1244322567&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>> | More info
>> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=26521059433&tc_rand=1244322567&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>
>> Hi Subash,
>>
>> You should also split your elements in windows.
>> If not, Flink emits an element for each incoming record.
>> That is why you have:
>>
>> (1,1)
>> (1,2)
>> (1,3)
>>
>> …
>>
>> Kostas
>>
>> > On Aug 22, 2016, at 5:58 PM, subash basnet <yasub...@gmail.com> wrote:
>> >
>> > Hello all,
>> >
>> > I grouped by the input based on it's id to count the number of elements
>> in each group.
>> > DataStream<Tuple2<String, Long>> gridWithCount;
>> > Upon printing the above datastream it shows with duplicate rows:
>> > Output:
>> > (1, 1)
>> > (1,2)
>> > (2,1)
>> > (1,3)
>> > (2,2).......
>> >
>> > Whereas I wanted the distinct rows with final count:
>> > Needed Output:
>> > (1,3)
>> > (2,2)..
>> >
>> > What could be the way to achieve this.
>> >
>> >
>> > Regards,
>> > Subash Basnet
>>
>>
>>
>

Reply via email to