Hi Folks,
Chen and I have been working for a while now on making FLIP-13 (side
outputs) [1] a reality. We think we have a pretty good internal
implementation and also a proposal for an API but now we need to discuss
how we want to go forward with this, especially how we should deal with
split/select which does some of the same things side outputs can do. I'll
first quickly describe what the split/select API looks like, so that we're
all on the same page. Then I'll present the new proposed side output API
and then I'll present new API for getting dropped late data from a windowed
operation, which was the original motivation for adding side outputs.

Split/select consists of two API calls: DataStream.split(OutputSelector)
and SplitStream.select(). You can use it like this:

DataStreamSource<Integer> input = env.fromElements(1, 2, 3);

final String EVEN_SELECTOR = "even";
final String ODD_SELECTOR = "odd";

SplitStream<Integer> split = input.split(
        new OutputSelector<Integer>() {
            @Override
            public Iterable<String> select(Integer value) {
                if (value % 2 == 0) {
                    return Collections.singleton(EVEN_SELECTOR);
                } else {
                    return Collections.singleton(ODD_SELECTOR);
                }
            }
        });

DataStream<Integer> evenStream = split.select(EVEN_SELECTOR);
DataStream<Integer> oddStream = split.select(ODD_SELECTOR);

The stream is split according to an OutputSelector that returns an Iterable
of Strings. Then you can use select() to get a new stream that only
contains elements with the given selector. Notice how the element type for
all the split streams is the same.

The new side output API proposal adds a new type OutputTag<T> and relies on
extending ProcessFunction to allow emitting data to outputs besides the
main output. I think it's best explained with an example as well:

DataStreamSource<Integer> input = env.fromElements(1, 2, 3);

final OutputTag<String> sideOutput1 = new OutputTag<>("side-output-1"){};
final OutputTag<Integer> sideOutput2 = new OutputTag<>("side-output-2"){};

SingleOutputStreamOperator<String> mainOutputStream = input
        .process(new ProcessFunction<Integer, String>() {

            @Override
            public void processElement(
                    Integer value,
                    Context ctx,
                    Collector<String> out) throws Exception {

                ctx.output(sideOutput1, "WE GOT: " + value);
                ctx.output(sideOutput2, value);
                out.collect("MAIN OUTPUT: " + value);
            }

        });

DataStream<String> sideOutputStream1 =
mainOutputStream.getSideOutput(sideOutput1);
DataStream<Integer> sideOutputStream2 =
mainOutputStream.getSideOutput(sideOutput2);

Notice how the OutputTags are anonymous inner classes, similar to TypeHint.
We need this to be able to analyse the type of the side-output streams.
Also notice, how the types of the side-output streams can be independent of
the main-output stream, also notice how everything is correctly type
checked by the Java Compiler.

This change requires making ProcessFunction an abstract base class so that
not every user has to implement the onTimer() method. We would also need to
allow ProcessFunction on a non-keyed stream.

Chen also implemented an API based on FlatMapFunction that looks like the
one proposed in the FLIP. This relies on CollectorWrapper, which can be
used to "pimp" a Collector to also allow emitting to side outputs.

For WindowedStream we have two proposals: make OutputTag visible on the
WindowedStream API or make the result type of WindowedStream operations
more specific to allow a getDroppedDataSideOutput() method. For the first
proposal it would look like this:

final OutputTag<String> lateDataTag = new OutputTag<>("side-output-1"){};

DataStream<T> windowedResult = input
  .keyBy(...)
  .window(...)
  .sideOutputLateData(lateDataTag)
  .apply(...)

DataStream<IN> lateData = windowedResult.getSideOutput(lateDataTag);

For the second proposal it would look like this:

WindowedOperator<T> windowedResult = input
  .keyBy(...)
  .window(...)
  .apply(...)

DataStream<IN> lateData = windowedResult.getSideOutput();

Right now, the result of window operations is a
SingleOutputStreamOperator<T>, same as it is for all DataStream operations.
Making the result type more specific, i.e. a WindowedOperator, would allow
us to add extra methods there. This would require wrapping a
SingleOutputStreamOperator and forwarding all the method calls to the
wrapped operator which can be a bit of a hassle for future changes. The
first proposal requires additional boilerplate code.

Sorry for the long mail but I think it's necessary to get everyone on the
same page. The question is now: how should we proceed with the proposed API
and the old split/select API? I propose to deprecate split/select and only
have side outputs, going forward. Of course, I'm a bit biased on this. ;-)
If we decide to do this, we also need to decide on what the side output API
should look like.

Happy discussing! Feedback very welcome. :-)

Best,
Aljoscha

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-13+Side+Outputs+in+Flink

Reply via email to