Hi Gyula,

I tried doing something like the following in the 2 flatmaps, but i am not
getting desired results and still confused how the concept you put forward
would work:

public static final class MyCoFlatmap implements CoFlatMapFunction<Point,
Centroid, Centroid>{
                
                Centroid[] centroids;

                
                @Override
                public void flatMap1(Point in, Collector<Centroid> out) throws 
Exception {

                        if(flag)
                        {
                                Centroids = new Centroid[numofMC];
                                flag = false;
                        }
                        if(id < numofMC)
                        {   
                                System.out.println(id);
                                Centroid generatedMC =
CentroidCreator.generateCentroid(id,timestamp,in);
                                Centroids[id] = generatedMC;
                                out.collect(generatedMC);
                                id++;
                        }
                        else
                        {
                                Centroid closestMC = null;
                                double minDistance = Double.MAX_VALUE;
                                for(Centroid mc : Centroids) 
                                {
                              double distance = distance(in.pt, mc.getCenter());
                              if (distance < minDistance) {
                                closestMC = mc;
                                minDistance = distance;
                              }
                            }
                                double radius = getRadius(closestMC, Centroids);
                                if (minDistance < radius) 
                                {
                                        closestMC.insert(in.pt, timestamp);
                                }
                                out.collect(closestMC);
                        }
                }

                @Override
                public void flatMap2(Centroid in, Collector<Centroid> out) 
throws
Exception {
                        Centroids[in.id] = in;
                        System.out.println("MC: "+in.toString());
                }
                
        }

as mentioned in my previous reply,  i understand that each of the map
function in the co-flat map would receive one tuple each at a time .. so
that would mean if i have a datastream of centroids, it would arrive one at
a time on the partitions and that would defeat the purpose because i need
all of the centroid to compare the distance to. 

I tried storing the centroids in an array of centroid but  i again dont
understand how i can push all of the changes back.

a small example or code snippet would really be helpful.

Thanks a lot

Regards
Biplob



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6816.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to