Hello all, I need to collect the centroids to find out the nearest center for each point. DataStream<Point> points = newDataStream.map(new getPoints()); DataStream<Centroid> *centroids* = newCentroidDataStream.map(new TupleCentroidConverter()); ConnectedIterativeStreams<Point, Centroid> loop = points.iterate().withFeedbackType(Centroid.class); DataStream<Centroid> newCentroids = loop.flatMap(new SelectNearestCenter()).map(new CountAppender()).keyBy(0) .reduce(new CentroidAccumulator()).map(new CentroidAverager()); DataStream<Centroid> finalCentroids = loop.closeWith(newCentroids.broadcast());
But I am getting null pointer exception for collection of centroids when trying to add a centroid in *flatMap2. *Below is my code, how could I get rid of this null pointer exception, any other ideas. public static final class SelectNearestCenter implements CoFlatMapFunction<Point, Centroid, Tuple2<String, Point>> { private Collection<Centroid> *centroids*; @Override public void flatMap1(Point p, Collector<Tuple2<String, Point>> out) throws Exception { if (centroids != null) { if (centroids.size() > 19) { // let's assume minimum size 20 for now for (Centroid centroid : *centroids*) { ..... } } } // emit a new record with the center id and the data point. out.collect(new Tuple2<String, Point>(closestCentroidId, p)); } @Override public void flatMap2(Centroid value, Collector<Tuple2<String, Point>> out) throws Exception { *centroids*.add(value); } } The instatation as below is not allowed. So it always throws null pointer exception. private Collection<Centroid> *centroids = * new Collection<Centroid>(); Best Regards, Subash Basnet