Hi,

I am using a groupreduce function to aggregate the content of the objects
but at the same time i need to return a unique counter from the function but
my attempts are failing and the identifiers are somehow very random and
getting duplicated.

Following is the part of my code which is supposed to generate a unique
counter and return it with out.collect.


        public static class sumReducer implements 
        GroupReduceFunction<Tuple2&lt;Integer, Point>, Tuple5<Integer,Point, 
Point,
Long, Long>> {

                double sum[] = null;
                double sumOfSquare[] = null;
                long timestamp = 0;
                @Override
                public void reduce(Iterable<Tuple2&lt;Integer, Point>> in,
Collector<Tuple5&lt;Integer,Point, Point, Long, Long>> out)
                                throws Exception {
                        
                        int id = 0;
                        long count = 0;
                        boolean flag = true;
                        for(Tuple2<Integer, Point> i:in)
                        {
                                if(flag)
                                {
                                        timestamp++;
                                        System.out.println("uniqueid: " + i.f0 
+ ", t: " + timestamp );
                                        sum = new double[i.f1.pt.length];
                                        sumOfSquare = new double[sum.length];
                                        id = i.f0;
                                        for(int j=0;j<sum.length;j++)
                                        {
                                                sum[j] = i.f1.pt[j];
                                                sumOfSquare[j] = i.f1.pt[j] * 
i.f1.pt[j];
                                        }
                                        flag = false;
                                }
                                else
                                {
                                        int len = i.f1.pt.length;
                                        for(int j=0;j&lt;len;j++)
                                        {
                                                sum[j] += i.f1.pt[j];
                                                sumOfSquare[j] += (i.f1.pt[j] * 
i.f1.pt[j]);
                                        }
                                }
                                count++;
                        }
                        out.collect(new Tuple5&lt;Integer,Point, Point, Long, 
Long>(id,new
Point(sum), new Point(sumOfSquare),count, timestamp));          
                }

I want the timestamp to be unique, but even though the code
"System.out.println("uniqueid: " + i.f0 + ", t: " + timestamp );" executes
once for each of the identifier (given by i.f0) by which it is grouped and
then the groupReducce function is called still I get the following output
for the above println statement.

uniqueid: 2, t: 1
uniqueid: 1, t: 1
uniqueid: 7, t: 2
uniqueid: 9, t: 3
uniqueid: 6, t: 2
uniqueid: 3, t: 1
uniqueid: 5, t: 2
uniqueid: 8, t: 3

I dont really get why I am getting this discrepancy, probably I am missing
some Flink concept, I am relatively very new to the flink platform and any
help is appreciated. Thanks a lot.

Thanks and Regards



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Return-unique-counter-using-groupReduceFunction-tp6452.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to