Hello Aljoscha Krettek,

Thank you. As you suggested, I changed my code as below:
*snippet 1:*
DataStream<Centroid> centroids = newCentroidDataStream.map(new
TupleCentroidConverter());
ConnectedIterativeStreams<Point, Centroid> loop =
points.iterate().withFeedbackType(Centroid.class);
DataStream<Centroid> newCentroids = loop.flatMap(new
SelectNearestCenter(10)).map(new CountAppender())
.keyBy(0).reduce(new CentroidAccumulator()).map(new CentroidAverager());
DataStream<Centroid> finalCentroids = loop.closeWith(
*newCentroids.union(centroids).broadcast()*);

*snippet 2:*
ConnectedIterativeStreams<Point, Centroid> loop1 = points.iterate()
.withFeedbackType(Centroid.class);
DataStream<ClusteredPoint> clusteredPoints = loop1.flatMap(new
SelectNearestCenterForPoints(10));
loop1.closeWith(finalCentroids.broadcast());
clusteredPoints.print();

public class SelectNearestCenterForPoints implements
CoFlatMapFunction<Point, Centroid, ClusteredPoint>{
....
out.collect(new ClusteredPoint(closestCentroidId, p));
...
}

My problem is I get the *UnsupportedOperationException* exception on
execution the snippet 2 as below:
Exception in thread "main" java.lang.UnsupportedOperationException: Cannot
close an iteration with a feedback DataStream that does not originate from
said iteration.
at
org.apache.flink.streaming.api.datastream.IterativeStream$ConnectedIterativeStreams.closeWith(IterativeStream.java:181)


In snippet 1, the final datastream required was centroid data type. But in
snippet 2, I need to get another datatype of ClusteredPoint and pass
finalCentroids and points to collect the clusteredPoints. What could be the
solution to achieve this.


Regards,
Subash Basnet

On Tue, Jul 19, 2016 at 12:20 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> [image: Boxbe] <https://www.boxbe.com/overview> This message is eligible
> for Automatic Cleanup! (aljos...@apache.org) Add cleanup rule
> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Fkey%3DftcLCgv4iz5Gzslrv5lqPkt1zF7brN7kmM2xyIGdMGE%253D%26token%3DZEDoAEYicPh%252BRIGsFq%252FK42vw68gLGRmB77D%252BTZeynwYRZWLfweBFUVGFgJEf4tMnVZThM9aNoI50RHFnwWzlWm7PE4hOstIUKy1uVjp13W8upuwbA0sMYNOqiQWhJxwC7yf%252FFgAEdfU%253D&tc_serial=26137448549&tc_rand=163472718&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
> | More info
> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=26137448549&tc_rand=163472718&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>
> Hi,
> you have to ensure to filter the data that you send back on the feedback
> edge, i.e. the loop.closeWith(newCentroids.broadcast()); statement needs
> to take a stream that only has the centroids that you want to send back.
> And you need to make sure to emit centroids with a good timestamp if you
> want to preserve timestamps.
>
> What you can also do is to union the stream of initial centroids with the
> new centroids on the feedback edge, i.e:
> loop.closeWith(newCentroids.union(initialCentroids).broadcast())
>
> Cheers,
> Aljoscha
>
>
> On Mon, 18 Jul 2016 at 12:59 subash basnet <yasub...@gmail.com> wrote:
>
>> Hello all,
>>
>> I am trying to cluster datastream points around a centroid. My input is
>> stock data where the centroid id I have taken as the timestamp of the
>> stock. The error I am facing is in getting *id *of the *centroid* within
>> *flatMap2*. Below is my code if you could look:
>>
>> ConnectedIterativeStreams<Point, Centroid> loop =
>> points.iterate().withFeedbackType(Centroid.class);
>> DataStream<Centroid> newCentroids = loop.flatMap(new
>> SelectNearestCenter(10)).map(new CountAppender()).keyBy(0)
>> .reduce(new CentroidAccumulator()).map(new CentroidAverager());
>> DataStream<Centroid> finalCentroids =
>> loop.closeWith(newCentroids.broadcast());
>>
>> public static final class SelectNearestCenter implements
>> CoFlatMapFunction<Point, Centroid, Tuple2<String, Point>> {
>> private Centroid[] centroids;
>> private int size = 0;
>> private int count = 0;
>> private boolean flag = true;
>>
>> public SelectNearestCenter(int size) {
>> this.size = size;
>> }
>>
>> @Override
>> public void flatMap1(Point p, Collector<Tuple2<String, Point>> out)
>> throws Exception {
>> double minDistance = Double.MAX_VALUE;
>> *String closestCentroidId = "-1";*
>> if (centroids != null) {
>> // let's assume minimum size 20 for now
>> for (Centroid centroid : centroids) {
>> // compute distance
>> double distance = p.euclideanDistance(centroid);
>> // update nearest cluster if necessary
>> if (distance < minDistance) {
>> minDistance = distance;
>> closestCentroidId = centroid.id;
>> }
>> }
>> }
>> // emit a new record with the center id and the data point.
>> out.collect(new Tuple2<String, Point>(closestCentroidId, p));
>> }
>>
>> @Override
>> public void flatMap2(Centroid value, Collector<Tuple2<String, Point>>
>> out) throws Exception {
>> if (flag) {
>> centroids = new Centroid[size];
>> flag = false;
>> }
>> if (count < size) {
>> *System.out.println(value);*
>> centroids[count] = value;
>> count++;
>> }
>> }
>> }
>>
>>
>> The centroid datastreams looks as below with string timestamp as id.
>> Fri Jul 15 15:30:55 CEST 2016  117.8818 117.9 117.8 117.835 1383700.0
>> Fri Jul 15 15:31:58 CEST 2016  117.835 117.99 117.82 117.885 118900.0
>>
>> But now if I print the *centroid value *in *flatMap2* it shows with the
>> id as '-1':
>> -1  117.8818 117.9 117.8 117.835 1383700.0
>> -1  117.5309 117.575 117.48245 117.52 707100.0
>>
>> This '-1' is from *flatMap1 *which get's assigned initially. To get rid
>> of this if I put the out.collect statement within the if centroids is not
>> null condition, it never goes inside the if condition as intially the
>> centroids is null, hence the execution never comes out of *flatMap1*.
>> It would be great if you could suggest what could be the probable problem
>> or solution to the case.
>>
>>
>> Best Regards,
>> Subash Basnet
>>
>
>

Reply via email to