Hi Subash,

The problem you are facing is not related to Flink. The problem is that the
"centroids" field is not initialized, which is general Java issue. Please
keep in mind that this list is not the best forum for Java questions.

Regards,
Robert



On Wed, Jul 13, 2016 at 6:45 PM, subash basnet <yasub...@gmail.com> wrote:

> Hello all,
>
> I need to collect the centroids to find out the nearest center for each
> point.
> DataStream<Point> points = newDataStream.map(new getPoints());
> DataStream<Centroid> *centroids* = newCentroidDataStream.map(new
> TupleCentroidConverter());
> ConnectedIterativeStreams<Point, Centroid> loop =
> points.iterate().withFeedbackType(Centroid.class);
> DataStream<Centroid> newCentroids = loop.flatMap(new
> SelectNearestCenter()).map(new CountAppender()).keyBy(0)
> .reduce(new CentroidAccumulator()).map(new CentroidAverager());
> DataStream<Centroid> finalCentroids =
> loop.closeWith(newCentroids.broadcast());
>
>
> But I am getting null pointer exception for collection of centroids when
> trying to add a centroid in *flatMap2. *Below is my code, how could I get
> rid of this null pointer exception, any other ideas.
>
>
> public static final class SelectNearestCenter implements
> CoFlatMapFunction<Point, Centroid, Tuple2<String, Point>> {
> private Collection<Centroid> *centroids*;
>                @Override
> public void flatMap1(Point p, Collector<Tuple2<String, Point>> out) throws
> Exception {
> if (centroids != null) {
> if (centroids.size() > 19) {
> // let's assume minimum size 20 for now
> for (Centroid centroid : *centroids*) {
> .....
> }
> }
> }
>                     // emit a new record with the center id and the data
> point.
> out.collect(new Tuple2<String, Point>(closestCentroidId, p));
> }
>
> @Override
> public void flatMap2(Centroid value, Collector<Tuple2<String, Point>> out)
> throws Exception {
>                         *centroids*.add(value);
> }
> }
>
>
>
> The instatation as below is not allowed. So it always throws null pointer
> exception.
> private Collection<Centroid> *centroids = * new Collection<Centroid>();
>
>
> Best Regards,
> Subash Basnet
>

Reply via email to