Hi subash, A stream is infinite, hence it has no notion of "final" count. To get distinct counts you need to define a period (= a window [1] ) over which you count elements and emit a result, by adding a winow operator before the reduce. For example the following will emit distinct counts every 10 minutes over the last 10 minutes period:
*stream.keyby(2)* * .window(Time.minutes(10))* * .reduce(new GridPointsCount())* [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html On Wed, Aug 24, 2016 at 6:14 PM, subash basnet <yasub...@gmail.com> wrote: > Hello Kostas, > > Sorry for late reply. But I couldn't understand how to apply split in > datastream, such as in below to get the distinct output stream element with > the count after applying group by and reduce. > > DataStream<Tuple2<String, Long>> gridWithDensity = > pointsWithGridCoordinates.map(new AddCountAppender()) > .keyBy(2).reduce(*new GridPointsCount()*).map(new > RetrieveGridWithCount()); > gridWithDensity.print(); > > Current Output: > Required Output: > (33330,1) > (33330,3) > (33330,2) > (00000,4) > (00000,1) > (00000,2) > (00000,3) > (33330,3) > (00000,4) > > public static final class GridPointsCount implements > ReduceFunction<Tuple4<Point, Grid, String, Long>> { > @Override > public Tuple4<Point, Grid, String, Long> reduce(Tuple4<Point, Grid, > String, Long> val1, > Tuple4<Point, Grid, String, Long> val2) { > return new Tuple4<Point, Grid, String, Long>(val1.f0, val1.f1, val1.f2, > val1.f3 + val2.f3); > } > } > > > Regards, > Subash Basnet > > On Mon, Aug 22, 2016 at 6:34 PM, Kostas Kloudas < > k.klou...@data-artisans.com> wrote: > >> [image: Boxbe] <https://www.boxbe.com/overview> This message is eligible >> for Automatic Cleanup! (k.klou...@data-artisans.com) Add cleanup rule >> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Fkey%3DDbXSEeCvlLA38dy4LWQ%252Bbi5EVsEyM7uPcveSQFq%252FvFY%253D%26token%3DiyAq2d4gLBvR1lxgjbsxqD%252BdBWvTfV7BV7%252BvSygyQXwgHoGt5X14QdpMF1iSW4G0Qw7Sb6h%252FaXTQuS4dPnyuWCemTmCcMq0fJSpZwsztLpp9PMU7tCLvpRqvo9N%252B9Aj7ixZD8zvIdLvXB2%252FQqkPEDw%253D%253D&tc_serial=26521059433&tc_rand=1244322567&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> >> | More info >> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=26521059433&tc_rand=1244322567&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> >> >> Hi Subash, >> >> You should also split your elements in windows. >> If not, Flink emits an element for each incoming record. >> That is why you have: >> >> (1,1) >> (1,2) >> (1,3) >> >> … >> >> Kostas >> >> > On Aug 22, 2016, at 5:58 PM, subash basnet <yasub...@gmail.com> wrote: >> > >> > Hello all, >> > >> > I grouped by the input based on it's id to count the number of elements >> in each group. >> > DataStream<Tuple2<String, Long>> gridWithCount; >> > Upon printing the above datastream it shows with duplicate rows: >> > Output: >> > (1, 1) >> > (1,2) >> > (2,1) >> > (1,3) >> > (2,2)....... >> > >> > Whereas I wanted the distinct rows with final count: >> > Needed Output: >> > (1,3) >> > (2,2).. >> > >> > What could be the way to achieve this. >> > >> > >> > Regards, >> > Subash Basnet >> >> >> >