Hello all,

I am trying to cluster datastream points around a centroid. My input is
stock data where the centroid id I have taken as the timestamp of the
stock. The error I am facing is in getting *id *of the *centroid* within
*flatMap2*. Below is my code if you could look:

ConnectedIterativeStreams<Point, Centroid> loop =
points.iterate().withFeedbackType(Centroid.class);
DataStream<Centroid> newCentroids = loop.flatMap(new
SelectNearestCenter(10)).map(new CountAppender()).keyBy(0)
.reduce(new CentroidAccumulator()).map(new CentroidAverager());
DataStream<Centroid> finalCentroids =
loop.closeWith(newCentroids.broadcast());

public static final class SelectNearestCenter implements
CoFlatMapFunction<Point, Centroid, Tuple2<String, Point>> {
private Centroid[] centroids;
private int size = 0;
private int count = 0;
private boolean flag = true;

public SelectNearestCenter(int size) {
this.size = size;
}

@Override
public void flatMap1(Point p, Collector<Tuple2<String, Point>> out) throws
Exception {
double minDistance = Double.MAX_VALUE;
*String closestCentroidId = "-1";*
if (centroids != null) {
// let's assume minimum size 20 for now
for (Centroid centroid : centroids) {
// compute distance
double distance = p.euclideanDistance(centroid);
// update nearest cluster if necessary
if (distance < minDistance) {
minDistance = distance;
closestCentroidId = centroid.id;
}
}
}
// emit a new record with the center id and the data point.
out.collect(new Tuple2<String, Point>(closestCentroidId, p));
}

@Override
public void flatMap2(Centroid value, Collector<Tuple2<String, Point>> out)
throws Exception {
if (flag) {
centroids = new Centroid[size];
flag = false;
}
if (count < size) {
*System.out.println(value);*
centroids[count] = value;
count++;
}
}
}


The centroid datastreams looks as below with string timestamp as id.
Fri Jul 15 15:30:55 CEST 2016  117.8818 117.9 117.8 117.835 1383700.0
Fri Jul 15 15:31:58 CEST 2016  117.835 117.99 117.82 117.885 118900.0

But now if I print the *centroid value *in *flatMap2* it shows with the id
as '-1':
-1  117.8818 117.9 117.8 117.835 1383700.0
-1  117.5309 117.575 117.48245 117.52 707100.0

This '-1' is from *flatMap1 *which get's assigned initially. To get rid of
this if I put the out.collect statement within the if centroids is not null
condition, it never goes inside the if condition as intially the centroids
is null, hence the execution never comes out of *flatMap1*.
It would be great if you could suggest what could be the probable problem
or solution to the case.


Best Regards,
Subash Basnet

Reply via email to