Maybe I could express it in a slightly different way: if adding the .filter() after .process() causes the side output to be somehow totally "lost", then I believe the .getSideOutput() could be aware that there is not such side output to be listened to from upstream, and throw an exception. I mean, this should be possible when building the DAG, it shouldn't require starting the stream to detect? Thanks..
On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Juho, > > Now that I think of it this seems like a bug to me: why does the call to > getSideOutput succeed if it doesn't provide _any_ input? > > > With the way side outputs work, I don’t think this is possible (or would > make sense). An operator does not know whether or not it would ever emit > some element with a given tag. > As far as I understand it, calling `getSideOutput` essentially adds a > virtual node to the result stream graph that listens to the specified tag > from the upstream input. > > While I’m not aware whether or not your observation is expected behavior, > from an API perspective, I can see why it is a bit confusing for you. > Aljoscha would be the expert here, maybe he’ll have more insights. I’ve > looped him in cc’ed. > > Cheers, > Gordon > > > On 12 January 2018 at 4:05:13 PM, Juho Autio (juho.au...@rovio.com) wrote: > > When I run the code below (Flink 1.4.0 or 1.3.1), only "a" is printed. If > I switch the position of .process() & .filter() (ie. filter first, then > process), both "a" & "b" are printed, as expected. > > I guess it's a bit hard to say what the side output should include in this > case: the stream before filtering or after it? > > What I would suggest is Flink to protect against this kind of a user error > that is hard to debug. Would it be possible that Flink throws an exception > if one tries to call .getSideOutput() on anything that doesn't actually > provide that side output? Now that I think of it this seems like a bug to > me: why does the call to getSideOutput succeed if it doesn't provide _any_ > input? I would expect it to get the side output data stream _before_ > applying .filter(). > > import org.apache.flink.api.common.functions.FilterFunction; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.datastream. > SingleOutputStreamOperator; > import org.apache.flink.streaming.api.environment. > StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.ProcessFunction; > import org.apache.flink.util.Collector; > import org.apache.flink.util.OutputTag; > > public class SideOutputProblem { > > public static void main(String[] args) throws Exception { > > StreamExecutionEnvironment env = StreamExecutionEnvironment. > getExecutionEnvironment(); > DataStreamSource<String> stream = env.fromElements("a", "b"); > OutputTag<String> sideOutputTag = new OutputTag<String>("side- > output"){}; > > SingleOutputStreamOperator<String> processed = stream > > .process(new ProcessFunction<String, String>() { > @Override > public void processElement(String s, Context context, > Collector<String> collector) throws Exception { > if ("a".equals(s)) { > collector.collect(s); > } else { > context.output(sideOutputTag, s); > } > } > }) > > .filter(new FilterFunction<String>() { > @Override > public boolean filter(String s) throws Exception { > return true; > } > }); > > processed.getSideOutput(sideOutputTag).printToErr(); > > processed.print(); > > env.execute(); > } > > } > > Cheers, > Juho > >