Hi, something like .withBroadcastSet() is not yet available in the DataStream API. I'm working on it, however. Using a (global) static variable will not work for this case since the computation is distributed.
The iteration does not work because the head of the iteration (the "loop" variable) is not used anywhere, therefore the loop is not closed. Cheers, Aljoscha On Mon, 16 May 2016 at 21:37 subash basnet <yasub...@gmail.com> wrote: > Hello all, > > How could I broadcast the variable in Datastream or perform similar > operation so that I could read the value as in DataSet: > IterativeDataSet<Centroid> *loop* = centroids.iterate(numIterations); > DataSet<Centroid> *newCentroids* = points.map(new SelectNearestCenter()). > *withBroadcastSet*(*loop*, "*centroids*") ... > INSIDE map function: > @Override public void open(...){ . > this.*centroids* = getRuntimeContext().getBroadcastVariable("*centroids* > "); > } > > Is defining '*loop*' as a global variable is only the option to use it in > the map functions. Any other possible methods. > When I use *loop *as global variable and read it inside map function as > below via *DataStreamUtils*: > > private static IterativeStream<Centroid> *loop*; > ... > *loop* = centroids.iterate(numIterations); > ... > INSIDE map function > @Override public void open(...){ > Iterator<Centroid> iter = DataStreamUtils.collect(*loop*); > this.*centroids* = Lists.newArrayList(iter); > } > > It throws below exception upon execution: > Exception in thread "Thread-13" java.lang.RuntimeException: Exception in > execute() > at > org.apache.flink.contrib.streaming.DataStreamUtils$CallExecute.run(DataStreamUtils.java:82) > *Caused by: java.lang.IllegalStateException: Iteration > FeedbackTransformation{id=15, name='Feedback', > outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String, > pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback > edges.* > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformFeedback(StreamGraphGenerator.java:295) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:166) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformSink(StreamGraphGenerator.java:441) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:158) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.generateInternal(StreamGraphGenerator.java:127) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:119) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1197) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:86) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1170) > at > org.apache.flink.contrib.streaming.DataStreamUtils$CallExecute.run(DataStreamUtils.java:80) > > > Could you please suggest me possible cause and solution to this exception, > as I am not able to see any other option beside to use global variable in > absence of broadcast of variable in datastream. > > Best Regards, > Subash Basnet >