Nice design proposal, and +1 to the general idea.

A few thoughts / suggestions:

*binary vs. n-ary*

I would plan ahead for N-ary operators. Not because we necessarily need
n-ary inputs (one can probably build that purely in the API) but because of
future side inputs. The proposal should be able to handle that as well.

*enum vs. integer*

The above might be easier is to realize when going directly with integer
and having ANY, FIRST, SECOND, etc. as pre-defined constants.
Performance wise, it is probably not difference whether to use int or enum.

*generic selectable interface*

>From the proposal, I don't understand quite what that interface is for. My
understanding is that the input processor or task that calls the
operators's functions would anyways work on the TwoInputStreamOperator
interface, for efficiency.

*end-input*

I think we should not make storing the end-input the operator's
responsibility
There is a simple way to handle this, which is also consistent with other
aspects of handling finished tasks:

  - If a task is finished, that should be stored in the checkpoint.
 - Upon restoring a finished task, if it has still running successors, we
deploy a "finished input channel", which immediately send the "end of
input" when task is started.
 - the operator will hence set the end of input immediately again upon

*early-out*

Letting nextSelection() return “NONE” or “FINISHED" may be relevant for
early-out cases, but I would remove this from the scope of this proposal.
There are most likely other big changes involved, like communicating this
to the upstream operators.

*distributed stream deadlocks*

We had this issue in the DataSet API. Earlier versions of the DataSet API
made an analysis of the flow detecting dams and whether the pipeline
breaking behavior in the flow would cause deadlocks, and introduce
artificial pipeline breakers in response.

The logic was really complicated and it took a while to become stable. We
had several issues that certain user functions (like mapPartition) could
either be pipelined or have a full dam (not possible to know for the
system), so we had to insert artificial pipeline breakers in all paths.

In the end we simply decided that in the case of a diamond-style flow, we
make the point where the flow first forks as blocking shuffle. That was
super simple, solved all issues, and has the additional nice property that
it great point to materialize data for recovery, because it helps both
paths of the diamond upon failure.

My suggestion:
==> For streaming, no problem so far, nothing to do
==> For batch, would suggest to go with the simple solution described above
first, and improve when we see cases where this impacts performance
significantly

*empty input / selection timeout*

I can see that being relevant in future streaming cases, for example with
side inputs. You want to wait for the side input data, but with a timeout,
so the program can still proceed with non-perfect context data in case that
context data is very late.

Because we do not support side inputs at the moment, we may want to defer
this for now. Let's not over-design for problems that are not well
understood at this point.

*timers*

I don't understand the problem with timers. Timers are bound to the
operator, not the input, so they should still work if an input ends.
There are cases where some state in the operator that is only relevant as
long as an input still has data (like in symmetric joins) and the timers
are relevant to that state.
When the state is dropped, the timers should also be dropped, but that is
the operator's logic on "endInput()". So there is no inherent issue between
input and timers.

Best,
Stephan


On Sat, Feb 2, 2019 at 3:55 AM Guowei Ma <guowei....@gmail.com> wrote:

> Hi, guys:
> I propose a design to enhance Stream Operator API for Batch’s requirements.
> This is also the Flink’s goal that Batch is a special case of Streaming.
> This
> proposal mainly contains two changes to operator api:
>
> 1. Allow "StreamOperator" can choose which input to read;
> 2. Notify "StreamOperator" that an input has ended.
>
>
> This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo Sun
> offlline.
> It will be great to hear the feed backs and suggestions from the community.
> Please kindly share your comments and suggestions.
>
> Best
> GuoWei Ma.
>
>  Enhance Operator API to Support Dynamically Sel...
> <
> https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web
> >
>

Reply via email to