Hi!

I am not sure I understand the problem exactly, but one problem I see in
your code is that you call "execute()" on and then "DataStreamUtils.collect(
datastream);"

The first call to "env.execute()" will start the program (source and
filter) and the results will simply go nowhere.
Then you call "DataStreamUtils.collect(datastream);", which internally
calls "execute" again.

In short: remote the first call to "env.execute()", that should do the
trick.

Stephan


On Thu, May 26, 2016 at 5:09 PM, Ahmed Nader <ahmednader...@gmail.com>
wrote:

> Hello,
> I have defined a custom source function for an infinite stream source,
> where in my overwritten run method I have a while true loop to keep
> listening for the input. I want to apply some transformations on the
> resulting datastream from my source and collect the output so far of these
> transformations in a collection.
> However when i leave my source running in an infinite loop, nothing is
> really executed.
> Here are some parts of my code to clarify more:
>
> my custom source class:
> public class FeedSource implements SourceFunction<Object>
>
> The run method in this class has a while(boolean variable == true)
>
> Then I call my source and apply filter on it:
> datastream = env.addSource(new FeedSource()).filter();
>
> then execute:
> env.execute();
>
> I want then to collect my datastream in a collection:
> Iterator iter = DataStreamUtils.collect(datastream);
>
> So is it possible to first of all apply filter on my stream that way? And
> then If I'm able to do so, is it possible to keep updating my collection
> with the content in my datastream so far?
>
> I hope I was able to make my question clear enough.
> Thanks,
> Ahmed
>
>
>

Reply via email to