Everything works as expected. The while loop blocks until the iterator doesn't have data anymore (=the program has ended). All data will end up in the ArrayList.
The latter exception comes from a duplicate call to execute(). Actually, collect() internally calls execute() because the job has to run to transfer data back to the client. On Wed, Jul 20, 2016 at 3:38 PM, subash basnet <yasub...@gmail.com> wrote: > hello maximilian, > > Thanks! I learned new thing today :). But my problem still exists. Your > example has little data and it works fine. But in my datastream I have set > timeWindow as Time.seconds(5). What I found out is, if I print as below as > your example: > > Iterator<Centroid> iter = DataStreamUtils.collect(centroids); > List<Centroid> testCentroids = new ArrayList<Centroid>(); > while (iter.hasNext()) { > System.out.println(iter.next()); > } > > It prints the result in a streaming manner. But now if I collect in > arrayList and print as below: > > Iterator<Centroid> iter = DataStreamUtils.collect(centroids); > List<Centroid> testCentroids = new ArrayList<Centroid>(); > while (iter.hasNext()) { > testCentroids.add(iter.next()); > } > for(Centroid centroid: testCentroids){ > System.out.println(centroid); > } > > It waits for all the time, till all the stream get's collected in the > arrayList I guess, and prints all the values in the arraylist finally. I > had just waited for roughly around 2 minutes, found out that arraylist got > printed and the program ended automatically after the print of the > arraylist along with some exception messages. Why is this arraylist > collection waiting till a huge collection of multiple input stream of > centroids gets printed at once. What could be the issue. > And it printed the following exceptions also along with all the items in > the arraylist: > > Exception in thread "Thread-1" java.lang.RuntimeException: Exception in > execute() > at > org.apache.flink.contrib.streaming.DataStreamUtils$CallExecute.run(DataStreamUtils.java:82) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > > Exception in thread "main" java.lang.IllegalStateException:* No operators > defined in streaming topology. Cannot execute.* > * at * > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1195) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:86) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1170) > > Regards, > Subash Basnet > > > Best Regards, > Subash Basnet > > > On Wed, Jul 20, 2016 at 2:05 PM, Maximilian Michels <m...@apache.org> > wrote: > >> [image: Boxbe] <https://www.boxbe.com/overview> This message is eligible >> for Automatic Cleanup! (m...@apache.org) Add cleanup rule >> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Fkey%3DtABTT%252FgNidCQDVBZ1VZwv4sudEpk975%252BnEq3uSwabpQ%253D%26token%3DlPYxVKh%252BtKiJRU8ZZ8Osl9V0fFDilZIDMWgch%252FeaSny6Ll7yM%252FRBQaSbVWmzaiW%252BJac3W%252BNPXC%252BqqiB0xmw3kGqPtRfBfbX88DS%252FZQDkHTiY%252Fk6t%252FfYS5cgcaPvIMBkLOYgnMKk7oHk%253D&tc_serial=26150801137&tc_rand=865269733&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> >> | More info >> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=26150801137&tc_rand=865269733&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> >> >> Ah, now I see where the problem lies. You're reusing the Iterator >> which you have already used in the for loop. You can only iterate over >> the elements once! This is the nature of the Java Iterator and >> DataStreamUtils.collect(..) returns an iterator. >> >> On Wed, Jul 20, 2016 at 1:11 PM, subash basnet <yasub...@gmail.com> >> wrote: >> > >> > Hello Maximilian, >> > >> > Thank's for the update. Yup it works in the example you gave. I checked >> with collection also it works. But not in my datastream case after the >> collection. >> > DataStream<Centroid> centroids = newCentroidDataStream.map(new >> TupleCentroidConverter()); >> > Iterator<Centroid> iter = DataStreamUtils.collect(centroids); >> > while (iter.hasNext()) { >> > System.out.println(iter.next()); >> > } >> > Collection<Centroid> testCentroids = Lists.newArrayList(iter); >> > for (Centroid c : testCentroids) { >> > System.out.println(c); >> > } >> > >> > In the above code the while loop prints the result as below, but the >> next for loop after the collection gives blank. >> > >> > Tue Jul 19 15:49:00 CEST 2016 118.7 118.81 118.7 118.77 76300.0 >> > Tue Jul 19 15:47:02 CEST 2016 118.85 118.885 118.8 118.84 75600.0 >> > Tue Jul 19 15:46:00 CEST 2016 118.8627 118.93 118.79 118.8 76300.0 >> > Tue Jul 19 15:45:59 CEST 2016 118.8 118.94 118.77 118.9 106800.0 >> > >> > Not sure, what is the problem, as after collection it gives blank >> result in my case but works in the example you gave. Below is my >> newCentroidDataStream: >> > >> > @SuppressWarnings("serial") >> > DataStream<Tuple2<String, Double[]>> newCentroidDataStream = >> keyedEdits.timeWindow(Time.seconds(1)) >> > .fold(new Tuple2<>("", columns1), new FoldFunction<Stock, >> Tuple2<String, Double[]>>() { >> > @Override >> > public Tuple2<String, Double[]> fold(Tuple2<String, Double[]> st, Stock >> value) { >> > Double[] columns = new Double[5];// close,high,low,open,volume >> > columns[0] = value.getClose(); >> > columns[1] = value.getHigh(); >> > columns[2] = value.getLow(); >> > columns[3] = value.getOpen(); >> > columns[4] = (double) value.getVolume(); >> > return (new Tuple2<String, Double[]>(value.getId(), columns)); >> > } >> > }); >> > >> > Regards, >> > Subash Basnet >> > >> > On Wed, Jul 20, 2016 at 12:20 PM, Maximilian Michels <m...@apache.org> >> wrote: >> >> >> >> This message is eligible for Automatic Cleanup! (m...@apache.org) Add >> cleanup rule | More info >> >> >> >> Just tried the following and it worked: >> >> >> >> public static void main(String[] args) throws IOException { >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> >> >> final DataStreamSource<Integer> source = env.fromElements(1, 2, 3, >> 4); >> >> source.print(); >> >> >> >> final Iterator<Integer> iter = DataStreamUtils.collect(source); >> >> while (iter.hasNext()) { >> >> System.out.println(iter.next()); >> >> } >> >> } >> >> >> >> It prints: >> >> >> >> 1 >> >> 2 >> >> 3 >> >> 4 >> >> 2> 2 >> >> 1> 1 >> >> 4> 4 >> >> 3> 3 >> >> >> >> However, the collect util needs some improvements. It assumes that the >> machine running the code is reachable on a random port by the Flink >> cluster. If you have any firewalls, then this might not work. >> >> >> >> Cheers, >> >> Max >> >> >> >> On Tue, Jul 19, 2016 at 10:13 PM, subash basnet <yasub...@gmail.com> >> wrote: >> >>> >> >>> Hello Till, >> >>> >> >>> Yup I can see the log output in my console, but there is no >> information there regarding if there is any error in conversion. Just >> normal warn and info as below: >> >>> 22:09:16,676 WARN >> org.apache.flink.streaming.runtime.tasks.StreamTask - No state >> backend has been specified, using default state backend (Memory / >> JobManager) >> >>> 22:09:16,676 INFO >> org.apache.flink.streaming.runtime.tasks.StreamTask - State >> backend is set to heap memory (checkpoint to jobmanager) >> >>> >> >>> The above message is always there when I run my project. It would be >> great if someone could check why the collection of datastream via >> DataStreamUtils is giving empty result. >> >>> >> >>> Best Regards, >> >>> Subash Basnet >> >>> >> >>> On Tue, Jul 19, 2016 at 4:52 PM, Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>>> >> >>>> It depends if you have a log4j.properties file specified in your >> classpath. If you see log output on the console, then it should also print >> errors there. >> >>>> >> >>>> Cheers, >> >>>> Till >> >>>> >> >>>> On Tue, Jul 19, 2016 at 3:08 PM, subash basnet <yasub...@gmail.com> >> wrote: >> >>>>> >> >>>>> Hello Till, >> >>>>> >> >>>>> Shouldn't it write something in the eclipse console if there is any >> error or warning. But nothing about error is printed on the console. And I >> checked the flink project folder: flink-core, flink streaming as such but >> couldn't find where the log is written when run via eclipse. >> >>>>> >> >>>>> Best Regards, >> >>>>> Subash Basnet >> >>>>> >> >>>>> On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann < >> trohrm...@apache.org> wrote: >> >>>>>> >> >>>>>> Have you checked your logs whether they contain some problems? In >> general it is not recommended collecting the streaming result back to your >> client. It might also be a problem with `DataStreamUtils.collect`. >> >>>>>> >> >>>>>> Cheers, >> >>>>>> Till >> >>>>>> >> >>>>>> On Tue, Jul 19, 2016 at 2:42 PM, subash basnet <yasub...@gmail.com> >> wrote: >> >>>>>>> >> >>>>>>> Hello all, >> >>>>>>> >> >>>>>>> I tried to check if it works for tuple but same problem, the >> collection still shows blank result. I took the id of centroid tuple and >> printed it, but the collection displays empty. >> >>>>>>> >> >>>>>>> DataStream<Centroid> centroids = newCentroidDataStream.map(new >> TupleCentroidConverter()); >> >>>>>>> DataStream<Tuple1<String>> centroidId = centroids.map(new >> TestMethod()); >> >>>>>>> centroidId.print(); >> >>>>>>> Iterator<Tuple1<String>> iter = >> DataStreamUtils.collect(centroidId); >> >>>>>>> Collection<Tuple1<String>> testCentroids = >> Lists.newArrayList(iter); >> >>>>>>> for (Tuple1<String> c : testCentroids) { >> >>>>>>> System.out.println(c); >> >>>>>>> } >> >>>>>>> Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST >> 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon >> Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for >> centroidId.print(), but no output for System.out.println(c); Best Regards, >> Subash Basnet >> >>>>>>> >> >>>>>>> On Tue, Jul 19, 2016 at 10:48 AM, subash basnet < >> yasub...@gmail.com> wrote: >> >>>>>>>> >> >>>>>>>> Hello all, >> >>>>>>>> >> >>>>>>>> I am trying to convert datastream to collection, but it's shows >> blank result. There is a stream of data which can be viewed on the console >> on print(), but the collection of the same stream shows empty after >> conversion. Below is the code: >> >>>>>>>> >> >>>>>>>> DataStream<Centroid> centroids = newCentroidDataStream.map(new >> TupleCentroidConverter()); >> >>>>>>>> centroids.print(); >> >>>>>>>> Iterator<Centroid> iter = DataStreamUtils.collect(centroids); >> >>>>>>>> Collection<Centroid> testCentroids = Lists.newArrayList(iter); >> >>>>>>>> for(Centroid c: testCentroids){ >> >>>>>>>> System.out.println(c); >> >>>>>>>> } >> >>>>>>>> >> >>>>>>>> The above centroids.print() gives the following output in >> console: >> >>>>>>>> >> >>>>>>>> Mon Jul 18 21:29:01 CEST 2016 119.3701 119.4 119.3701 119.38 >> 27400.0 >> >>>>>>>> Mon Jul 18 21:23:00 CEST 2016 119.3463 119.37 119.315 119.37 >> 48200.0 >> >>>>>>>> Mon Jul 18 21:27:59 CEST 2016 119.3401 119.3401 119.26 119.265 >> 50300.0 >> >>>>>>>> Mon Jul 18 21:36:00 CEST 2016 119.48 119.505 119.47 119.4741 >> 37400.0 >> >>>>>>>> Mon Jul 18 21:33:00 CEST 2016 119.535 119.54 119.445 119.455 >> 152900.0 >> >>>>>>>> >> >>>>>>>> But the next System.out.println(c) within the for loop prints >> nothing. What could be the problem. >> >>>>>>>> >> >>>>>>>> My maven has following configuration for dataStreamUtils: >> >>>>>>>> <dependency> >> >>>>>>>> <groupId>org.apache.flink</groupId> >> >>>>>>>> <artifactId>flink-streaming-contrib_2.10</artifactId> >> >>>>>>>> <version>${flink.version}</version> >> >>>>>>>> </dependency> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> Best Regards, >> >>>>>>>> Subash Basnet >> >>>>>>>> >> >>>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> >> >> >> >> > >> >> >