Hello all,

I tried to check if it works for tuple but same problem, the collection
still shows blank result. I took the id of centroid tuple and printed it,
but the collection displays empty.

DataStream<Centroid> centroids = newCentroidDataStream.map(new
TupleCentroidConverter());
DataStream<Tuple1<String>> centroidId = centroids.map(new TestMethod());
centroidId.print();
Iterator<Tuple1<String>> iter = DataStreamUtils.collect(centroidId);
Collection<Tuple1<String>> testCentroids = Lists.newArrayList(iter);
for (Tuple1<String> c : testCentroids) {
System.out.println(c);
}
Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016)
(Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18
17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(),
but no output for System.out.println(c); Best Regards, Subash Basnet

On Tue, Jul 19, 2016 at 10:48 AM, subash basnet <yasub...@gmail.com> wrote:

> Hello all,
>
> I am trying to convert datastream to collection, but it's shows blank
> result. There is a stream of data which can be viewed on the console on
> print(), but the collection of the same stream shows empty after
> conversion. Below is the code:
>
> DataStream<Centroid> centroids = newCentroidDataStream.map(new
> TupleCentroidConverter());
> centroids.print();
> Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
> Collection<Centroid> testCentroids = Lists.newArrayList(iter);
> for(Centroid c: testCentroids){
> System.out.println(c);
> }
>
> The above *centroids.print()* gives the following output in console:
>
> Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
> Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
> Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
> Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
> Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0
>
> But the next *System.out.println(c) *within the for loop prints nothing.
> What could be the problem.
>
> My maven has following configuration for dataStreamUtils:
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-streaming-contrib_2.10</artifactId>
> <version>${flink.version}</version>
> </dependency>
>
>
> Best Regards,
> Subash Basnet
>
>

Reply via email to