Hello all, I am trying to cluster datastream points around a centroid. My input is stock data where the centroid id I have taken as the timestamp of the stock. The error I am facing is in getting *id *of the *centroid* within *flatMap2*. Below is my code if you could look:
ConnectedIterativeStreams<Point, Centroid> loop = points.iterate().withFeedbackType(Centroid.class); DataStream<Centroid> newCentroids = loop.flatMap(new SelectNearestCenter(10)).map(new CountAppender()).keyBy(0) .reduce(new CentroidAccumulator()).map(new CentroidAverager()); DataStream<Centroid> finalCentroids = loop.closeWith(newCentroids.broadcast()); public static final class SelectNearestCenter implements CoFlatMapFunction<Point, Centroid, Tuple2<String, Point>> { private Centroid[] centroids; private int size = 0; private int count = 0; private boolean flag = true; public SelectNearestCenter(int size) { this.size = size; } @Override public void flatMap1(Point p, Collector<Tuple2<String, Point>> out) throws Exception { double minDistance = Double.MAX_VALUE; *String closestCentroidId = "-1";* if (centroids != null) { // let's assume minimum size 20 for now for (Centroid centroid : centroids) { // compute distance double distance = p.euclideanDistance(centroid); // update nearest cluster if necessary if (distance < minDistance) { minDistance = distance; closestCentroidId = centroid.id; } } } // emit a new record with the center id and the data point. out.collect(new Tuple2<String, Point>(closestCentroidId, p)); } @Override public void flatMap2(Centroid value, Collector<Tuple2<String, Point>> out) throws Exception { if (flag) { centroids = new Centroid[size]; flag = false; } if (count < size) { *System.out.println(value);* centroids[count] = value; count++; } } } The centroid datastreams looks as below with string timestamp as id. Fri Jul 15 15:30:55 CEST 2016 117.8818 117.9 117.8 117.835 1383700.0 Fri Jul 15 15:31:58 CEST 2016 117.835 117.99 117.82 117.885 118900.0 But now if I print the *centroid value *in *flatMap2* it shows with the id as '-1': -1 117.8818 117.9 117.8 117.835 1383700.0 -1 117.5309 117.575 117.48245 117.52 707100.0 This '-1' is from *flatMap1 *which get's assigned initially. To get rid of this if I put the out.collect statement within the if centroids is not null condition, it never goes inside the if condition as intially the centroids is null, hence the execution never comes out of *flatMap1*. It would be great if you could suggest what could be the probable problem or solution to the case. Best Regards, Subash Basnet