Hi Jamie,

I think it does make consuming late arriving events more explicit! At cost of 
fix a predefined OutputTag<IN> which user have no control nor definition
an extra UDF which essentially filter out all mainOutputs and only let 
sideOutput pass (like filterFunction)

Thanks,
Chen

> On Feb 24, 2017, at 1:17 PM, Jamie Grier <ja...@data-artisans.com> wrote:
> 
> I prefer the ProcessFunction and side outputs solution over split() and
> select() which I've never liked primarily due to the lack of type safety
> and it also doesn't really seem to fit with the rest of Flink's API.
> 
> On the late data question I strongly prefer the late data concept being
> explicit in the API.  Could we not also do something like:
> 
> WindowedStream<> windowedStream = input
>  .keyBy(...)
>  .window(...);
> 
> DataStream<> mainOutput = windowedStream
>  .apply(...);
> 
> DataStream<> lateOutput = windowStream
>  .lateStream();
> 
> 
> 
> 
> 
> 
> 
> On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra <gyf...@apache.org> wrote:
> 
>> Hi,
>> 
>> Thanks for the nice proposal, I like the idea of side outputs, and it would
>> make a lot of topologies much simpler.
>> 
>> Regarding the API I think we should come up with a way of making side
>> otuputs accessible from all sort of operators in a similar way. For
>> instance through the RichFunction interface with a special collector that
>> we invalidate when the user should not be collecting to it. (just a quick
>> idea)
>> 
>> I personally wouldn't deprecate the "universal" Split/Select API that can
>> be used on any  DataStream in favor of functionality that is only
>> accessible trhough the process function/ or a few select operators. I think
>> the Split/Select pattern is also very nice and I use it in many different
>> contexts to get efficient multiway filtering (after map/co operators for
>> examples).
>> 
>> Regards,
>> Gyula
>> 
>> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2017. febr. 23.,
>> Cs, 15:42):
>> 
>>> Hi Folks,
>>> Chen and I have been working for a while now on making FLIP-13 (side
>>> outputs) [1] a reality. We think we have a pretty good internal
>>> implementation and also a proposal for an API but now we need to discuss
>>> how we want to go forward with this, especially how we should deal with
>>> split/select which does some of the same things side outputs can do. I'll
>>> first quickly describe what the split/select API looks like, so that
>> we're
>>> all on the same page. Then I'll present the new proposed side output API
>>> and then I'll present new API for getting dropped late data from a
>> windowed
>>> operation, which was the original motivation for adding side outputs.
>>> 
>>> Split/select consists of two API calls: DataStream.split(OutputSelector)
>>> and SplitStream.select(). You can use it like this:
>>> 
>>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
>>> 
>>> final String EVEN_SELECTOR = "even";
>>> final String ODD_SELECTOR = "odd";
>>> 
>>> SplitStream<Integer> split = input.split(
>>>        new OutputSelector<Integer>() {
>>>            @Override
>>>            public Iterable<String> select(Integer value) {
>>>                if (value % 2 == 0) {
>>>                    return Collections.singleton(EVEN_SELECTOR);
>>>                } else {
>>>                    return Collections.singleton(ODD_SELECTOR);
>>>                }
>>>            }
>>>        });
>>> 
>>> DataStream<Integer> evenStream = split.select(EVEN_SELECTOR);
>>> DataStream<Integer> oddStream = split.select(ODD_SELECTOR);
>>> 
>>> The stream is split according to an OutputSelector that returns an
>> Iterable
>>> of Strings. Then you can use select() to get a new stream that only
>>> contains elements with the given selector. Notice how the element type
>> for
>>> all the split streams is the same.
>>> 
>>> The new side output API proposal adds a new type OutputTag<T> and relies
>> on
>>> extending ProcessFunction to allow emitting data to outputs besides the
>>> main output. I think it's best explained with an example as well:
>>> 
>>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
>>> 
>>> final OutputTag<String> sideOutput1 = new OutputTag<>("side-output-1"){}
>> ;
>>> final OutputTag<Integer> sideOutput2 = new OutputTag<>("side-output-2"){}
>> ;
>>> 
>>> SingleOutputStreamOperator<String> mainOutputStream = input
>>>        .process(new ProcessFunction<Integer, String>() {
>>> 
>>>            @Override
>>>            public void processElement(
>>>                    Integer value,
>>>                    Context ctx,
>>>                    Collector<String> out) throws Exception {
>>> 
>>>                ctx.output(sideOutput1, "WE GOT: " + value);
>>>                ctx.output(sideOutput2, value);
>>>                out.collect("MAIN OUTPUT: " + value);
>>>            }
>>> 
>>>        });
>>> 
>>> DataStream<String> sideOutputStream1 =
>>> mainOutputStream.getSideOutput(sideOutput1);
>>> DataStream<Integer> sideOutputStream2 =
>>> mainOutputStream.getSideOutput(sideOutput2);
>>> 
>>> Notice how the OutputTags are anonymous inner classes, similar to
>> TypeHint.
>>> We need this to be able to analyse the type of the side-output streams.
>>> Also notice, how the types of the side-output streams can be independent
>> of
>>> the main-output stream, also notice how everything is correctly type
>>> checked by the Java Compiler.
>>> 
>>> This change requires making ProcessFunction an abstract base class so
>> that
>>> not every user has to implement the onTimer() method. We would also need
>> to
>>> allow ProcessFunction on a non-keyed stream.
>>> 
>>> Chen also implemented an API based on FlatMapFunction that looks like the
>>> one proposed in the FLIP. This relies on CollectorWrapper, which can be
>>> used to "pimp" a Collector to also allow emitting to side outputs.
>>> 
>>> For WindowedStream we have two proposals: make OutputTag visible on the
>>> WindowedStream API or make the result type of WindowedStream operations
>>> more specific to allow a getDroppedDataSideOutput() method. For the first
>>> proposal it would look like this:
>>> 
>>> final OutputTag<String> lateDataTag = new OutputTag<>("side-output-1"){}
>> ;
>>> 
>>> DataStream<T> windowedResult = input
>>>  .keyBy(...)
>>>  .window(...)
>>>  .sideOutputLateData(lateDataTag)
>>>  .apply(...)
>>> 
>>> DataStream<IN> lateData = windowedResult.getSideOutput(lateDataTag);
>>> 
>>> For the second proposal it would look like this:
>>> 
>>> WindowedOperator<T> windowedResult = input
>>>  .keyBy(...)
>>>  .window(...)
>>>  .apply(...)
>>> 
>>> DataStream<IN> lateData = windowedResult.getSideOutput();
>>> 
>>> Right now, the result of window operations is a
>>> SingleOutputStreamOperator<T>, same as it is for all DataStream
>> operations.
>>> Making the result type more specific, i.e. a WindowedOperator, would
>> allow
>>> us to add extra methods there. This would require wrapping a
>>> SingleOutputStreamOperator and forwarding all the method calls to the
>>> wrapped operator which can be a bit of a hassle for future changes. The
>>> first proposal requires additional boilerplate code.
>>> 
>>> Sorry for the long mail but I think it's necessary to get everyone on the
>>> same page. The question is now: how should we proceed with the proposed
>> API
>>> and the old split/select API? I propose to deprecate split/select and
>> only
>>> have side outputs, going forward. Of course, I'm a bit biased on this.
>> ;-)
>>> If we decide to do this, we also need to decide on what the side output
>> API
>>> should look like.
>>> 
>>> Happy discussing! Feedback very welcome. :-)
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>> [1]
>>> 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
>> 13+Side+Outputs+in+Flink
>>> 
>> 
> 
> 
> 
> -- 
> 
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> ja...@data-artisans.com

Reply via email to