Hello all,

I need to collect the centroids to find out the nearest center for each
point.
DataStream<Point> points = newDataStream.map(new getPoints());
DataStream<Centroid> *centroids* = newCentroidDataStream.map(new
TupleCentroidConverter());
ConnectedIterativeStreams<Point, Centroid> loop =
points.iterate().withFeedbackType(Centroid.class);
DataStream<Centroid> newCentroids = loop.flatMap(new
SelectNearestCenter()).map(new CountAppender()).keyBy(0)
.reduce(new CentroidAccumulator()).map(new CentroidAverager());
DataStream<Centroid> finalCentroids =
loop.closeWith(newCentroids.broadcast());


But I am getting null pointer exception for collection of centroids when
trying to add a centroid in *flatMap2. *Below is my code, how could I get
rid of this null pointer exception, any other ideas.


public static final class SelectNearestCenter implements
CoFlatMapFunction<Point, Centroid, Tuple2<String, Point>> {
private Collection<Centroid> *centroids*;
               @Override
public void flatMap1(Point p, Collector<Tuple2<String, Point>> out) throws
Exception {
if (centroids != null) {
if (centroids.size() > 19) {
// let's assume minimum size 20 for now
for (Centroid centroid : *centroids*) {
.....
}
}
}
                    // emit a new record with the center id and the data
point.
out.collect(new Tuple2<String, Point>(closestCentroidId, p));
}

@Override
public void flatMap2(Centroid value, Collector<Tuple2<String, Point>> out)
throws Exception {
                        *centroids*.add(value);
}
}



The instatation as below is not allowed. So it always throws null pointer
exception.
private Collection<Centroid> *centroids = * new Collection<Centroid>();


Best Regards,
Subash Basnet

Reply via email to