Hi Gyula, I tried doing something like the following in the 2 flatmaps, but i am not getting desired results and still confused how the concept you put forward would work:
public static final class MyCoFlatmap implements CoFlatMapFunction<Point, Centroid, Centroid>{ Centroid[] centroids; @Override public void flatMap1(Point in, Collector<Centroid> out) throws Exception { if(flag) { Centroids = new Centroid[numofMC]; flag = false; } if(id < numofMC) { System.out.println(id); Centroid generatedMC = CentroidCreator.generateCentroid(id,timestamp,in); Centroids[id] = generatedMC; out.collect(generatedMC); id++; } else { Centroid closestMC = null; double minDistance = Double.MAX_VALUE; for(Centroid mc : Centroids) { double distance = distance(in.pt, mc.getCenter()); if (distance < minDistance) { closestMC = mc; minDistance = distance; } } double radius = getRadius(closestMC, Centroids); if (minDistance < radius) { closestMC.insert(in.pt, timestamp); } out.collect(closestMC); } } @Override public void flatMap2(Centroid in, Collector<Centroid> out) throws Exception { Centroids[in.id] = in; System.out.println("MC: "+in.toString()); } } as mentioned in my previous reply, i understand that each of the map function in the co-flat map would receive one tuple each at a time .. so that would mean if i have a datastream of centroids, it would arrive one at a time on the partitions and that would defeat the purpose because i need all of the centroid to compare the distance to. I tried storing the centroids in an array of centroid but i again dont understand how i can push all of the changes back. a small example or code snippet would really be helpful. Thanks a lot Regards Biplob -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6816.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.