Just tried the following and it worked:

public static void main(String[] args) throws IOException {
   StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

   final DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);
   source.print();

   final Iterator<Integer> iter = DataStreamUtils.collect(source);
   while (iter.hasNext()) {
      System.out.println(iter.next());
   }
}

It prints:

1
2
3
4
2> 2
1> 1
4> 4
3> 3

However, the collect util needs some improvements. It assumes that the
machine running the code is reachable on a random port by the Flink
cluster. If you have any firewalls, then this might not work.

Cheers,
Max

On Tue, Jul 19, 2016 at 10:13 PM, subash basnet <yasub...@gmail.com> wrote:

> Hello Till,
>
> Yup I can see the log output in my console, but there is no information
> there regarding if there is any error in conversion. Just normal warn and
> info as below:
> 22:09:16,676 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 22:09:16,676 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend is set to heap memory (checkpoint to jobmanager)
>
> The above message is always there when I run my project. It would be great
> if someone could check why the collection of datastream via DataStreamUtils
> is giving empty result.
>
> Best Regards,
> Subash Basnet
>
> On Tue, Jul 19, 2016 at 4:52 PM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> It depends if you have a log4j.properties file specified in your
>> classpath. If you see log output on the console, then it should also print
>> errors there.
>>
>> Cheers,
>> Till
>>
>> On Tue, Jul 19, 2016 at 3:08 PM, subash basnet <yasub...@gmail.com>
>> wrote:
>>
>>> Hello Till,
>>>
>>> Shouldn't it write something in the eclipse console if there is any
>>> error or warning. But nothing about error is printed on the console. And I
>>> checked the flink project folder: flink-core, flink streaming as such but
>>> couldn't find where the log is written when run via eclipse.
>>>
>>> Best Regards,
>>> Subash Basnet
>>>
>>> On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Have you checked your logs whether they contain some problems? In
>>>> general it is not recommended collecting the streaming result back to your
>>>> client. It might also be a problem with `DataStreamUtils.collect`.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Jul 19, 2016 at 2:42 PM, subash basnet <yasub...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello all,
>>>>>
>>>>> I tried to check if it works for tuple but same problem, the
>>>>> collection still shows blank result. I took the id of centroid tuple and
>>>>> printed it, but the collection displays empty.
>>>>>
>>>>> DataStream<Centroid> centroids = newCentroidDataStream.map(new
>>>>> TupleCentroidConverter());
>>>>> DataStream<Tuple1<String>> centroidId = centroids.map(new
>>>>> TestMethod());
>>>>> centroidId.print();
>>>>> Iterator<Tuple1<String>> iter = DataStreamUtils.collect(centroidId);
>>>>> Collection<Tuple1<String>> testCentroids = Lists.newArrayList(iter);
>>>>> for (Tuple1<String> c : testCentroids) {
>>>>> System.out.println(c);
>>>>> }
>>>>> Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST
>>>>> 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon
>>>>> Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for
>>>>> centroidId.print(), but no output for System.out.println(c); Best Regards,
>>>>> Subash Basnet
>>>>>
>>>>> On Tue, Jul 19, 2016 at 10:48 AM, subash basnet <yasub...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello all,
>>>>>>
>>>>>> I am trying to convert datastream to collection, but it's shows blank
>>>>>> result. There is a stream of data which can be viewed on the console on
>>>>>> print(), but the collection of the same stream shows empty after
>>>>>> conversion. Below is the code:
>>>>>>
>>>>>> DataStream<Centroid> centroids = newCentroidDataStream.map(new
>>>>>> TupleCentroidConverter());
>>>>>> centroids.print();
>>>>>> Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
>>>>>> Collection<Centroid> testCentroids = Lists.newArrayList(iter);
>>>>>> for(Centroid c: testCentroids){
>>>>>> System.out.println(c);
>>>>>> }
>>>>>>
>>>>>> The above *centroids.print()* gives the following output in console:
>>>>>>
>>>>>> Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
>>>>>> Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
>>>>>> Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265
>>>>>> 50300.0
>>>>>> Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
>>>>>> Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0
>>>>>>
>>>>>> But the next *System.out.println(c) *within the for loop prints
>>>>>> nothing. What could be the problem.
>>>>>>
>>>>>> My maven has following configuration for dataStreamUtils:
>>>>>> <dependency>
>>>>>> <groupId>org.apache.flink</groupId>
>>>>>> <artifactId>flink-streaming-contrib_2.10</artifactId>
>>>>>> <version>${flink.version}</version>
>>>>>> </dependency>
>>>>>>
>>>>>>
>>>>>> Best Regards,
>>>>>> Subash Basnet
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to