Everything works as expected. The while loop blocks until the iterator
doesn't have data anymore (=the program has ended). All data will end up in
the ArrayList.

The latter exception comes from a duplicate call to execute(). Actually,
collect() internally calls execute() because the job has to run to transfer
data back to the client.

On Wed, Jul 20, 2016 at 3:38 PM, subash basnet <yasub...@gmail.com> wrote:

> hello maximilian,
>
> Thanks! I learned new thing today :). But my problem still exists. Your
> example has little data and it works fine. But in my datastream I have set
> timeWindow as Time.seconds(5). What I found out is, if I print as below as
> your example:
>
> Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
> List<Centroid> testCentroids = new ArrayList<Centroid>();
>   while (iter.hasNext()) {
> System.out.println(iter.next());
> }
>
> It prints the result in a streaming manner. But now if I collect in
> arrayList and print as below:
>
> Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
> List<Centroid> testCentroids = new ArrayList<Centroid>();
>   while (iter.hasNext()) {
> testCentroids.add(iter.next());
>   }
> for(Centroid centroid: testCentroids){
> System.out.println(centroid);
> }
>
> It waits for all the time, till all the stream get's collected in the
> arrayList I guess, and prints all the values in the arraylist finally. I
> had just waited for roughly around 2 minutes, found out that arraylist got
> printed and the program ended automatically after the print of the
> arraylist along with some exception messages. Why is this arraylist
> collection waiting till a huge collection of multiple input stream of
> centroids gets printed at once. What could be the issue.
> And it printed the following exceptions also along with all the items in
> the arraylist:
>
> Exception in thread "Thread-1" java.lang.RuntimeException: Exception in
> execute()
> at
> org.apache.flink.contrib.streaming.DataStreamUtils$CallExecute.run(DataStreamUtils.java:82)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>
> Exception in thread "main" java.lang.IllegalStateException:* No operators
> defined in streaming topology. Cannot execute.*
> * at *
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1195)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:86)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1170)
>
> Regards,
> Subash Basnet
>
>
> Best Regards,
> Subash Basnet
>
>
> On Wed, Jul 20, 2016 at 2:05 PM, Maximilian Michels <m...@apache.org>
> wrote:
>
>> [image: Boxbe] <https://www.boxbe.com/overview> This message is eligible
>> for Automatic Cleanup! (m...@apache.org) Add cleanup rule
>> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Fkey%3DtABTT%252FgNidCQDVBZ1VZwv4sudEpk975%252BnEq3uSwabpQ%253D%26token%3DlPYxVKh%252BtKiJRU8ZZ8Osl9V0fFDilZIDMWgch%252FeaSny6Ll7yM%252FRBQaSbVWmzaiW%252BJac3W%252BNPXC%252BqqiB0xmw3kGqPtRfBfbX88DS%252FZQDkHTiY%252Fk6t%252FfYS5cgcaPvIMBkLOYgnMKk7oHk%253D&tc_serial=26150801137&tc_rand=865269733&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>> | More info
>> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=26150801137&tc_rand=865269733&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>
>> Ah, now I see where the problem lies. You're reusing the Iterator
>> which you have already used in the for loop. You can only iterate over
>> the elements once! This is the nature of the Java Iterator and
>> DataStreamUtils.collect(..) returns an iterator.
>>
>> On Wed, Jul 20, 2016 at 1:11 PM, subash basnet <yasub...@gmail.com>
>> wrote:
>> >
>> > Hello Maximilian,
>> >
>> > Thank's for the update. Yup it works in the example you gave. I checked
>> with collection also it works. But not in my datastream case after the
>> collection.
>> > DataStream<Centroid> centroids = newCentroidDataStream.map(new
>> TupleCentroidConverter());
>> > Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
>> > while (iter.hasNext()) {
>> > System.out.println(iter.next());
>> > }
>> > Collection<Centroid> testCentroids = Lists.newArrayList(iter);
>> > for (Centroid c : testCentroids) {
>> > System.out.println(c);
>> > }
>> >
>> > In the above code the while loop prints the result as below, but the
>> next for loop after the collection gives blank.
>> >
>> > Tue Jul 19 15:49:00 CEST 2016  118.7 118.81 118.7 118.77 76300.0
>> > Tue Jul 19 15:47:02 CEST 2016  118.85 118.885 118.8 118.84 75600.0
>> > Tue Jul 19 15:46:00 CEST 2016  118.8627 118.93 118.79 118.8 76300.0
>> > Tue Jul 19 15:45:59 CEST 2016  118.8 118.94 118.77 118.9 106800.0
>> >
>> > Not sure, what is the problem, as after collection it gives blank
>> result in my case but works in the example you gave. Below is my
>> newCentroidDataStream:
>> >
>> > @SuppressWarnings("serial")
>> > DataStream<Tuple2<String, Double[]>> newCentroidDataStream =
>> keyedEdits.timeWindow(Time.seconds(1))
>> > .fold(new Tuple2<>("", columns1), new FoldFunction<Stock,
>> Tuple2<String, Double[]>>() {
>> > @Override
>> > public Tuple2<String, Double[]> fold(Tuple2<String, Double[]> st, Stock
>> value) {
>> > Double[] columns = new Double[5];// close,high,low,open,volume
>> > columns[0] = value.getClose();
>> > columns[1] = value.getHigh();
>> > columns[2] = value.getLow();
>> > columns[3] = value.getOpen();
>> > columns[4] = (double) value.getVolume();
>> > return (new Tuple2<String, Double[]>(value.getId(), columns));
>> > }
>> > });
>> >
>> > Regards,
>> > Subash Basnet
>> >
>> > On Wed, Jul 20, 2016 at 12:20 PM, Maximilian Michels <m...@apache.org>
>> wrote:
>> >>
>> >> This message is eligible for Automatic Cleanup! (m...@apache.org) Add
>> cleanup rule | More info
>> >>
>> >> Just tried the following and it worked:
>> >>
>> >> public static void main(String[] args) throws IOException {
>> >>    StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> >>
>> >>    final DataStreamSource<Integer> source = env.fromElements(1, 2, 3,
>> 4);
>> >>    source.print();
>> >>
>> >>    final Iterator<Integer> iter = DataStreamUtils.collect(source);
>> >>    while (iter.hasNext()) {
>> >>       System.out.println(iter.next());
>> >>    }
>> >> }
>> >>
>> >> It prints:
>> >>
>> >> 1
>> >> 2
>> >> 3
>> >> 4
>> >> 2> 2
>> >> 1> 1
>> >> 4> 4
>> >> 3> 3
>> >>
>> >> However, the collect util needs some improvements. It assumes that the
>> machine running the code is reachable on a random port by the Flink
>> cluster. If you have any firewalls, then this might not work.
>> >>
>> >> Cheers,
>> >> Max
>> >>
>> >> On Tue, Jul 19, 2016 at 10:13 PM, subash basnet <yasub...@gmail.com>
>> wrote:
>> >>>
>> >>> Hello Till,
>> >>>
>> >>> Yup I can see the log output in my console, but there is no
>> information there regarding if there is any error in conversion. Just
>> normal warn and info as below:
>> >>> 22:09:16,676 WARN
>> org.apache.flink.streaming.runtime.tasks.StreamTask           - No state
>> backend has been specified, using default state backend (Memory /
>> JobManager)
>> >>> 22:09:16,676 INFO
>> org.apache.flink.streaming.runtime.tasks.StreamTask           - State
>> backend is set to heap memory (checkpoint to jobmanager)
>> >>>
>> >>> The above message is always there when I run my project. It would be
>> great if someone could check why the collection of datastream via
>> DataStreamUtils is giving empty result.
>> >>>
>> >>> Best Regards,
>> >>> Subash Basnet
>> >>>
>> >>> On Tue, Jul 19, 2016 at 4:52 PM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>> >>>>
>> >>>> It depends if you have a log4j.properties file specified in your
>> classpath. If you see log output on the console, then it should also print
>> errors there.
>> >>>>
>> >>>> Cheers,
>> >>>> Till
>> >>>>
>> >>>> On Tue, Jul 19, 2016 at 3:08 PM, subash basnet <yasub...@gmail.com>
>> wrote:
>> >>>>>
>> >>>>> Hello Till,
>> >>>>>
>> >>>>> Shouldn't it write something in the eclipse console if there is any
>> error or warning. But nothing about error is printed on the console. And I
>> checked the flink project folder: flink-core, flink streaming as such but
>> couldn't find where the log is written when run via eclipse.
>> >>>>>
>> >>>>> Best Regards,
>> >>>>> Subash Basnet
>> >>>>>
>> >>>>> On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann <
>> trohrm...@apache.org> wrote:
>> >>>>>>
>> >>>>>> Have you checked your logs whether they contain some problems? In
>> general it is not recommended collecting the streaming result back to your
>> client. It might also be a problem with `DataStreamUtils.collect`.
>> >>>>>>
>> >>>>>> Cheers,
>> >>>>>> Till
>> >>>>>>
>> >>>>>> On Tue, Jul 19, 2016 at 2:42 PM, subash basnet <yasub...@gmail.com>
>> wrote:
>> >>>>>>>
>> >>>>>>> Hello all,
>> >>>>>>>
>> >>>>>>> I tried to check if it works for tuple but same problem, the
>> collection still shows blank result. I took the id of centroid tuple and
>> printed it, but the collection displays empty.
>> >>>>>>>
>> >>>>>>> DataStream<Centroid> centroids = newCentroidDataStream.map(new
>> TupleCentroidConverter());
>> >>>>>>> DataStream<Tuple1<String>> centroidId = centroids.map(new
>> TestMethod());
>> >>>>>>> centroidId.print();
>> >>>>>>> Iterator<Tuple1<String>> iter =
>> DataStreamUtils.collect(centroidId);
>> >>>>>>> Collection<Tuple1<String>> testCentroids =
>> Lists.newArrayList(iter);
>> >>>>>>> for (Tuple1<String> c : testCentroids) {
>> >>>>>>> System.out.println(c);
>> >>>>>>> }
>> >>>>>>> Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST
>> 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon
>> Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for
>> centroidId.print(), but no output for System.out.println(c); Best Regards,
>> Subash Basnet
>> >>>>>>>
>> >>>>>>> On Tue, Jul 19, 2016 at 10:48 AM, subash basnet <
>> yasub...@gmail.com> wrote:
>> >>>>>>>>
>> >>>>>>>> Hello all,
>> >>>>>>>>
>> >>>>>>>> I am trying to convert datastream to collection, but it's shows
>> blank result. There is a stream of data which can be viewed on the console
>> on print(), but the collection of the same stream shows empty after
>> conversion. Below is the code:
>> >>>>>>>>
>> >>>>>>>> DataStream<Centroid> centroids = newCentroidDataStream.map(new
>> TupleCentroidConverter());
>> >>>>>>>> centroids.print();
>> >>>>>>>> Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
>> >>>>>>>> Collection<Centroid> testCentroids = Lists.newArrayList(iter);
>> >>>>>>>> for(Centroid c: testCentroids){
>> >>>>>>>> System.out.println(c);
>> >>>>>>>> }
>> >>>>>>>>
>> >>>>>>>> The above centroids.print() gives the following output in
>> console:
>> >>>>>>>>
>> >>>>>>>> Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38
>> 27400.0
>> >>>>>>>> Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37
>> 48200.0
>> >>>>>>>> Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265
>> 50300.0
>> >>>>>>>> Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741
>> 37400.0
>> >>>>>>>> Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455
>> 152900.0
>> >>>>>>>>
>> >>>>>>>> But the next System.out.println(c) within the for loop prints
>> nothing. What could be the problem.
>> >>>>>>>>
>> >>>>>>>> My maven has following configuration for dataStreamUtils:
>> >>>>>>>> <dependency>
>> >>>>>>>> <groupId>org.apache.flink</groupId>
>> >>>>>>>> <artifactId>flink-streaming-contrib_2.10</artifactId>
>> >>>>>>>> <version>${flink.version}</version>
>> >>>>>>>> </dependency>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Best Regards,
>> >>>>>>>> Subash Basnet
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >>
>> >
>>
>>
>

Reply via email to