Hi Jamie, I think it does make consuming late arriving events more explicit! At cost of fix a predefined OutputTag<IN> which user have no control nor definition an extra UDF which essentially filter out all mainOutputs and only let sideOutput pass (like filterFunction)
Thanks, Chen > On Feb 24, 2017, at 1:17 PM, Jamie Grier <ja...@data-artisans.com> wrote: > > I prefer the ProcessFunction and side outputs solution over split() and > select() which I've never liked primarily due to the lack of type safety > and it also doesn't really seem to fit with the rest of Flink's API. > > On the late data question I strongly prefer the late data concept being > explicit in the API. Could we not also do something like: > > WindowedStream<> windowedStream = input > .keyBy(...) > .window(...); > > DataStream<> mainOutput = windowedStream > .apply(...); > > DataStream<> lateOutput = windowStream > .lateStream(); > > > > > > > > On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra <gyf...@apache.org> wrote: > >> Hi, >> >> Thanks for the nice proposal, I like the idea of side outputs, and it would >> make a lot of topologies much simpler. >> >> Regarding the API I think we should come up with a way of making side >> otuputs accessible from all sort of operators in a similar way. For >> instance through the RichFunction interface with a special collector that >> we invalidate when the user should not be collecting to it. (just a quick >> idea) >> >> I personally wouldn't deprecate the "universal" Split/Select API that can >> be used on any DataStream in favor of functionality that is only >> accessible trhough the process function/ or a few select operators. I think >> the Split/Select pattern is also very nice and I use it in many different >> contexts to get efficient multiway filtering (after map/co operators for >> examples). >> >> Regards, >> Gyula >> >> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2017. febr. 23., >> Cs, 15:42): >> >>> Hi Folks, >>> Chen and I have been working for a while now on making FLIP-13 (side >>> outputs) [1] a reality. We think we have a pretty good internal >>> implementation and also a proposal for an API but now we need to discuss >>> how we want to go forward with this, especially how we should deal with >>> split/select which does some of the same things side outputs can do. I'll >>> first quickly describe what the split/select API looks like, so that >> we're >>> all on the same page. Then I'll present the new proposed side output API >>> and then I'll present new API for getting dropped late data from a >> windowed >>> operation, which was the original motivation for adding side outputs. >>> >>> Split/select consists of two API calls: DataStream.split(OutputSelector) >>> and SplitStream.select(). You can use it like this: >>> >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3); >>> >>> final String EVEN_SELECTOR = "even"; >>> final String ODD_SELECTOR = "odd"; >>> >>> SplitStream<Integer> split = input.split( >>> new OutputSelector<Integer>() { >>> @Override >>> public Iterable<String> select(Integer value) { >>> if (value % 2 == 0) { >>> return Collections.singleton(EVEN_SELECTOR); >>> } else { >>> return Collections.singleton(ODD_SELECTOR); >>> } >>> } >>> }); >>> >>> DataStream<Integer> evenStream = split.select(EVEN_SELECTOR); >>> DataStream<Integer> oddStream = split.select(ODD_SELECTOR); >>> >>> The stream is split according to an OutputSelector that returns an >> Iterable >>> of Strings. Then you can use select() to get a new stream that only >>> contains elements with the given selector. Notice how the element type >> for >>> all the split streams is the same. >>> >>> The new side output API proposal adds a new type OutputTag<T> and relies >> on >>> extending ProcessFunction to allow emitting data to outputs besides the >>> main output. I think it's best explained with an example as well: >>> >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3); >>> >>> final OutputTag<String> sideOutput1 = new OutputTag<>("side-output-1"){} >> ; >>> final OutputTag<Integer> sideOutput2 = new OutputTag<>("side-output-2"){} >> ; >>> >>> SingleOutputStreamOperator<String> mainOutputStream = input >>> .process(new ProcessFunction<Integer, String>() { >>> >>> @Override >>> public void processElement( >>> Integer value, >>> Context ctx, >>> Collector<String> out) throws Exception { >>> >>> ctx.output(sideOutput1, "WE GOT: " + value); >>> ctx.output(sideOutput2, value); >>> out.collect("MAIN OUTPUT: " + value); >>> } >>> >>> }); >>> >>> DataStream<String> sideOutputStream1 = >>> mainOutputStream.getSideOutput(sideOutput1); >>> DataStream<Integer> sideOutputStream2 = >>> mainOutputStream.getSideOutput(sideOutput2); >>> >>> Notice how the OutputTags are anonymous inner classes, similar to >> TypeHint. >>> We need this to be able to analyse the type of the side-output streams. >>> Also notice, how the types of the side-output streams can be independent >> of >>> the main-output stream, also notice how everything is correctly type >>> checked by the Java Compiler. >>> >>> This change requires making ProcessFunction an abstract base class so >> that >>> not every user has to implement the onTimer() method. We would also need >> to >>> allow ProcessFunction on a non-keyed stream. >>> >>> Chen also implemented an API based on FlatMapFunction that looks like the >>> one proposed in the FLIP. This relies on CollectorWrapper, which can be >>> used to "pimp" a Collector to also allow emitting to side outputs. >>> >>> For WindowedStream we have two proposals: make OutputTag visible on the >>> WindowedStream API or make the result type of WindowedStream operations >>> more specific to allow a getDroppedDataSideOutput() method. For the first >>> proposal it would look like this: >>> >>> final OutputTag<String> lateDataTag = new OutputTag<>("side-output-1"){} >> ; >>> >>> DataStream<T> windowedResult = input >>> .keyBy(...) >>> .window(...) >>> .sideOutputLateData(lateDataTag) >>> .apply(...) >>> >>> DataStream<IN> lateData = windowedResult.getSideOutput(lateDataTag); >>> >>> For the second proposal it would look like this: >>> >>> WindowedOperator<T> windowedResult = input >>> .keyBy(...) >>> .window(...) >>> .apply(...) >>> >>> DataStream<IN> lateData = windowedResult.getSideOutput(); >>> >>> Right now, the result of window operations is a >>> SingleOutputStreamOperator<T>, same as it is for all DataStream >> operations. >>> Making the result type more specific, i.e. a WindowedOperator, would >> allow >>> us to add extra methods there. This would require wrapping a >>> SingleOutputStreamOperator and forwarding all the method calls to the >>> wrapped operator which can be a bit of a hassle for future changes. The >>> first proposal requires additional boilerplate code. >>> >>> Sorry for the long mail but I think it's necessary to get everyone on the >>> same page. The question is now: how should we proceed with the proposed >> API >>> and the old split/select API? I propose to deprecate split/select and >> only >>> have side outputs, going forward. Of course, I'm a bit biased on this. >> ;-) >>> If we decide to do this, we also need to decide on what the side output >> API >>> should look like. >>> >>> Happy discussing! Feedback very welcome. :-) >>> >>> Best, >>> Aljoscha >>> >>> [1] >>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- >> 13+Side+Outputs+in+Flink >>> >> > > > > -- > > Jamie Grier > data Artisans, Director of Applications Engineering > @jamiegrier <https://twitter.com/jamiegrier> > ja...@data-artisans.com