Hi! I am not sure I understand the problem exactly, but one problem I see in your code is that you call "execute()" on and then "DataStreamUtils.collect( datastream);"
The first call to "env.execute()" will start the program (source and filter) and the results will simply go nowhere. Then you call "DataStreamUtils.collect(datastream);", which internally calls "execute" again. In short: remote the first call to "env.execute()", that should do the trick. Stephan On Thu, May 26, 2016 at 5:09 PM, Ahmed Nader <ahmednader...@gmail.com> wrote: > Hello, > I have defined a custom source function for an infinite stream source, > where in my overwritten run method I have a while true loop to keep > listening for the input. I want to apply some transformations on the > resulting datastream from my source and collect the output so far of these > transformations in a collection. > However when i leave my source running in an infinite loop, nothing is > really executed. > Here are some parts of my code to clarify more: > > my custom source class: > public class FeedSource implements SourceFunction<Object> > > The run method in this class has a while(boolean variable == true) > > Then I call my source and apply filter on it: > datastream = env.addSource(new FeedSource()).filter(); > > then execute: > env.execute(); > > I want then to collect my datastream in a collection: > Iterator iter = DataStreamUtils.collect(datastream); > > So is it possible to first of all apply filter on my stream that way? And > then If I'm able to do so, is it possible to keep updating my collection > with the content in my datastream so far? > > I hope I was able to make my question clear enough. > Thanks, > Ahmed > > >