Hi everyone,

I created a FLIP and started a discussion around that topic [1].

Best,

Arvid

[1]
https://lists.apache.org/thread.html/r8357d64b9cfdf5a233c53a20d9ac62b75c07c925ce2c43e162f1e39c%40%3Cdev.flink.apache.org%3E

On Thu, Jun 10, 2021 at 9:18 PM Eron Wright <ewri...@streamnative.io.invalid>
wrote:

> I quickly updated the draft PR that would propagate idleness information to
> the Sink function, based on the recent improvement provided by
> FLINK-18934.  For illustration purposes.
> https://github.com/streamnative/flink/pull/2
>
> On Thu, Jun 10, 2021 at 11:34 AM Eron Wright <ewri...@streamnative.io>
> wrote:
>
> > Regarding records vs watermarks, I feel it is wrong to include records in
> > the considerations, because the clearest definition of idleness (IMO) is
> > 'active participation in advancing the event-time clock', and records
> don't
> > directly affect the clock.  Of course, records indirectly influence the
> > clock by stimulating a generator.
> >
> > Let's focus on the problem that Arvid mentioned about the need to briefly
> > toggle idleness (as implemented by the AnnouncedStatus class).  Seems to
> me
> > that the idleness of an operator's inputs need not strictly determine
> > whether its output is idle.  The operator should be able to react to
> status
> > changes on a given input (implemented in FLINK-18934), and this MAY
> cause a
> > change to the output status at the operator's discretion.  The default
> > behavior would be passthrough.  Meanwhile, when a given operator emits a
> > watermark, it is re-asserting itself as a participant in advancing the
> > downstream event time clock, and its output channel should transition to
> > active and remain active.  An operator should also be able to mark its
> > output channel(s) as idle, to complete the framework.
> >
> > In concept, a watermark generator somewhere in the pipeline could 'take
> > control' of the event time clock when its input channel transitions to
> > idle.  The upstream source is relinquishing control of the clock in that
> > situation.
> >
> > BTW, I recommend looking at the PR of FLINK-18934 because it lays bare
> the
> > whole pipeline.  Nice work there Dawid!  To better reflect the decoupling
> > of input from output idleness, "AbstractStreamOperator::emitStreamStatus"
> > should be named "processStreamStatus" and call an overridable method to
> > emit the status change whenever the combined idleness flips.  This would
> > facilitate an idleness-aware watermark generator and an idleness-aware
> sink.
> >
> >
> > On Thu, Jun 10, 2021 at 3:31 AM Till Rohrmann <trohrm...@apache.org>
> > wrote:
> >
> >> Thanks for providing these details Gordon. I have to admit that I do not
> >> fully follow the reasoning why periodic watermark generators forced us
> to
> >> define idleness for records. Is it because the idleness was generated
> >> based
> >> on the non-availability of more data in the sources and not in the
> >> watermark generators which are executed after the records have been read
> >> from the external system? So was the problem where the stream status was
> >> decided in the end?
> >>
> >> If there is a periodic watermark generator somewhere in the pipeline
> that
> >> periodically generates watermarks, then we don't have to mark its output
> >> channels as watermark idle because watermarks are being sent. Hence,
> given
> >> that the watermark generation logic makes sense, the overall job should
> be
> >> able to make progress. If the watermark generator is informed about its
> >> input channel status, it could even decide whether to propagate the
> >> watermark idleness and stop generating watermarks or not. Of course,
> this
> >> leaves room for people shooting themselves into their feet.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Thu, Jun 10, 2021 at 5:44 AM Tzu-Li (Gordon) Tai <
> tzuli...@apache.org>
> >> wrote:
> >>
> >> > Forgot to provide the link to the [1] reference:
> >> >
> >> > [1] https://issues.apache.org/jira/browse/FLINK-5017
> >> >
> >> > On Thu, Jun 10, 2021 at 11:43 AM Tzu-Li (Gordon) Tai <
> >> tzuli...@apache.org>
> >> > wrote:
> >> >
> >> > > Hi everyone,
> >> > >
> >> > > Sorry for chiming in late here.
> >> > >
> >> > > Regarding the topic of changing the definition of StreamStatus and
> >> > > changing the name as well:
> >> > > After digging into some of the roots of this implementation [1],
> >> > initially
> >> > > the StreamStatus was actually defined to mark "watermark idleness",
> >> and
> >> > not
> >> > > "record idleness" (in fact, the alternative name "WatermarkStatus"
> was
> >> > > considered at the time).
> >> > >
> >> > > The concern at the time causing us to alter the definition to be
> >> "record
> >> > > idleness" in the final implementation was due to the existence of
> >> > periodic
> >> > > timestamp / watermark generators within the pipeline. Those would
> >> > continue
> >> > > to generate non-increasing watermarks in the absence of any input
> >> records
> >> > > from upstream. In this scenario, downstream operators would not be
> >> able
> >> > to
> >> > > consider that channel as idle and therefore watermark progress is
> >> locked.
> >> > > We could consider a timeout-based approach on those specific
> >> operators to
> >> > > toggle watermark idleness if the values remain constant for a period
> >> of
> >> > > time, but then again, this is very ill-defined and most likely
> wrong.
> >> > >
> >> > > I have not followed the newest changes to the watermark generator
> >> > > operators and am not sure if this issue is still relevant.
> >> > > Otherwise, I don't see other problems with changing the definition
> >> here.
> >> > >
> >> > > Thanks,
> >> > > Gordon
> >> > >
> >> > > On Wed, Jun 9, 2021 at 3:06 PM Arvid Heise <ar...@apache.org>
> wrote:
> >> > >
> >> > >> Hi Eron,
> >> > >>
> >> > >> again to recap from the other thread:
> >> > >> - You are right that idleness is correct with static assignment and
> >> > fully
> >> > >> active partitions. In this case, the source defines idleness. (case
> >> A)
> >> > >> - For the more pressing use cases of idle, assigned partitions, the
> >> user
> >> > >> defines an idleness threshold, and it becomes potentially
> incorrect,
> >> > when
> >> > >> the partition becomes active again. (case B)
> >> > >> - Same holds for dynamic assignment of splits. If a source without
> a
> >> > split
> >> > >> gets a split assigned dynamically, there is a realistic chance that
> >> the
> >> > >> watermark advanced past the first record of the newly assigned
> split.
> >> > >> (case
> >> > >> C)
> >> > >> You can certainly insist that only the first case is valid (as it's
> >> > >> correct) but we know that users use it in other ways and that was
> >> also
> >> > the
> >> > >> intent of the devs.
> >> > >>
> >> > >> Now the question could be if it makes sense to distinguish these
> >> cases.
> >> > >> Would you treat the idleness information differently (especially in
> >> the
> >> > >> sink/source that motivated FLIP-167) if you knew that the idleness
> is
> >> > >> guaranteed correct?
> >> > >> We could have some WatermarkStatus with ACTIVE, IDLE (case A),
> >> TIMEOUT
> >> > >> (case B).
> >> > >>
> >> > >> However, that would still leave case C, which probably would need
> to
> >> be
> >> > >> solved completely differently. I could imagine that a source with
> >> > dynamic
> >> > >> assignments should never have IDLE subtasks and rather manage the
> >> > idleness
> >> > >> itself. For example, it could emit a watermark per second/minute
> >> that is
> >> > >> directly fetched from the source system. I'm just not sure if the
> >> > current
> >> > >> WatermarkAssigner interface suffices in that regard...
> >> > >>
> >> > >>
> >> > >> On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski <
> >> piotr.nowoj...@gmail.com
> >> > >
> >> > >> wrote:
> >> > >>
> >> > >> > Hi Eron,
> >> > >> >
> >> > >> > Can you elaborate a bit more what do you mean? I don’t understand
> >> what
> >> > >> do
> >> > >> > you mean by more general solution.
> >> > >> >
> >> > >> > As of now, stream is marked idle by a source/watermark generator,
> >> > which
> >> > >> > has an effect of temporarily ignoring this stream/partition from
> >> > >> > calculating min watermark in the downstream tasks. However stream
> >> is
> >> > >> > switching back to active when any record is emitted. This is
> what’s
> >> > >> causing
> >> > >> > problems described by Arvid.
> >> > >> >
> >> > >> > The core of our proposal is very simple. Keep everything as it is
> >> > except
> >> > >> > stating that stream will be changed back to active only once a
> >> > >> watermark is
> >> > >> > emitted again - not record. In other words disconnecting idleness
> >> from
> >> > >> > presence of records and connecting it only to presence or lack of
> >> > >> > watermarks and allowing to emit records while “stream status” is
> >> > “idle”
> >> > >> >
> >> > >> > Piotrek
> >> > >> >
> >> > >> >
> >> > >> > > Wiadomość napisana przez Eron Wright <ewri...@streamnative.io
> >> > >> .invalid>
> >> > >> > w dniu 09.06.2021, o godz. 06:01:
> >> > >> > >
> >> > >> > > It seems to me that idleness was introduced to deal with a
> very
> >> > >> specific
> >> > >> > > issue.  In the pipeline, watermarks are aggregated not on a
> >> > per-split
> >> > >> > basis
> >> > >> > > but on a per-subtask basis.  This works well when each subtask
> >> has
> >> > >> > exactly
> >> > >> > > one split.  When a sub-task has multiple splits, various
> >> > complications
> >> > >> > > occur involving the commingling of watermarks.  And when a
> >> sub-task
> >> > >> has
> >> > >> > no
> >> > >> > > splits, the pipeline stalls altogether.  To deal with the
> latter
> >> > >> problem,
> >> > >> > > idleness was introduced.  The sub-task simply declares itself
> to
> >> be
> >> > >> idle
> >> > >> > to
> >> > >> > > be taken out of consideration for purposes of watermark
> >> aggregation.
> >> > >> > >
> >> > >> > > If we're looking for a more general solution, I would suggest
> we
> >> > >> discuss
> >> > >> > > how to track watermarks on a per-split basis.  Or, as Till
> >> mentioned
> >> > >> > > recently, an alternate solution may be to dynamically adjust
> the
> >> > >> > > parallelism of the task.
> >> > >> > >
> >> > >> > > I don't agree with the notion that idleness involves a
> >> correctness
> >> > >> > > tradeoff.  The facility I described above has no impact on
> >> > >> correctness.
> >> > >> > > Meanwhile, various watermark strategies rely on heuristics
> >> involving
> >> > >> the
> >> > >> > > processing-time domain, and the term idleness seems to have
> found
> >> > >> > purchase
> >> > >> > > there too.  The connection among the concepts seems tenuous.
> >> > >> > >
> >> > >> > > -E
> >> > >> > >
> >> > >> > >
> >> > >> > >
> >> > >> > >> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <
> >> > pnowoj...@apache.org>
> >> > >> > wrote:
> >> > >> > >>
> >> > >> > >> Hi Arvid,
> >> > >> > >>
> >> > >> > >> Thanks for writing down this summary and proposal. I think
> this
> >> was
> >> > >> the
> >> > >> > >> foundation of the disagreement in FLIP-167 discussion. Dawid
> was
> >> > >> arguing
> >> > >> > >> that idleness is intermittent, strictly a task local concept
> >> and as
> >> > >> such
> >> > >> > >> shouldn't be exposed in for example sinks. While me and Eron
> >> > thought
> >> > >> > that
> >> > >> > >> it's a concept strictly connected to watermarks.
> >> > >> > >>
> >> > >> > >> 1. I'm big +1 for changing the StreamStatus definition to
> stream
> >> > >> > "providing
> >> > >> > >> watermark" and "not providing watermark". With respect to
> that I
> >> > >> agree
> >> > >> > with
> >> > >> > >> Dawid that record bound idleness *(if we would ever need to
> >> > >> > define/expose
> >> > >> > >> it)* should be an intermittent concept, like for example the
> >> > >> existing in
> >> > >> > >> the Task/runtime input availability
> >> (StreamTaskInput#isAvailable).
> >> > >> > >> 3. I agree that neither `StreamStatus` nor `IDLE` is a good
> >> name.
> >> > But
> >> > >> > >> I also don't have any good ideas.
> >> > >> > >> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
> >> > >> > >>
> >> > >> > >> Best,
> >> > >> > >> Piotrek
> >> > >> > >>
> >> > >> > >> wt., 8 cze 2021 o 16:35 Arvid Heise <ar...@apache.org>
> >> napisał(a):
> >> > >> > >>
> >> > >> > >>> Hi devs,
> >> > >> > >>>
> >> > >> > >>> While discussing "Watermark propagation with Sink API" and
> >> during
> >> > >> > >>> "[FLINK-18934] Idle stream does not advance watermark in
> >> connected
> >> > >> > >> stream",
> >> > >> > >>> we noticed some drawbacks on how Flink defines idle
> partitions
> >> > >> > currently.
> >> > >> > >>>
> >> > >> > >>> To recap, idleness was always considered as a means to
> achieve
> >> > >> progress
> >> > >> > >> in
> >> > >> > >>> window operators with idle partition in the source at the
> risk
> >> of
> >> > >> > losing
> >> > >> > >> a
> >> > >> > >>> bit of correctness. In particular, records could be
> considered
> >> > late,
> >> > >> > >> simply
> >> > >> > >>> because of that idleness timeout and not because they arrived
> >> out
> >> > of
> >> > >> > >> order.
> >> > >> > >>> A potential reprocessing would not be causing these records
> to
> >> be
> >> > >> > >>> considered late and we may end up with a different (correct)
> >> > result.
> >> > >> > >>>
> >> > >> > >>> The drawbacks that we discovered are as follows:
> >> > >> > >>> - We currently only use idleness to exclude respective
> upstream
> >> > >> tasks
> >> > >> > >> from
> >> > >> > >>> participating in watermark generation.
> >> > >> > >>> - However, the definition is bound to records. [1] In
> >> particular,
> >> > >> > while a
> >> > >> > >>> partition is idle, no records should be produced.
> >> > >> > >>> - That brings us into quite a few edge cases, where operators
> >> emit
> >> > >> > >> records,
> >> > >> > >>> while they are actually idling: Think of timers, asyncIO
> >> > operators,
> >> > >> > >> window
> >> > >> > >>> operators based on timeouts, etc. that trigger on an operator
> >> > >> ingesting
> >> > >> > >> an
> >> > >> > >>> idle partition.
> >> > >> > >>> - The proper solution would be to turn the operator active
> >> while
> >> > >> > emitting
> >> > >> > >>> and to return to being idle afterwards (but when?). However,
> >> this
> >> > >> has
> >> > >> > >> some
> >> > >> > >>> unintended side-effects depending on when you switch back:
> >> > >> > >>>  - If you toggle stream status for each record, you get a
> huge
> >> > >> overhead
> >> > >> > >> on
> >> > >> > >>> stream status records and quite a bit of processing in
> >> downstream
> >> > >> > >> operators
> >> > >> > >>> (that code path is not much optimized since switching is
> >> > considered
> >> > >> a
> >> > >> > >> rare
> >> > >> > >>> thing).
> >> > >> > >>>  - If you toggle after a certain time, you may get
> >> delays>idleness
> >> > >> in
> >> > >> > >> the
> >> > >> > >>> downstream window operators.
> >> > >> > >>>  - You could turn back when you processed all pending mails,
> >> but
> >> > if
> >> > >> you
> >> > >> > >>> have a self-replicating mail that would be never.
> >> Self-enqueueing,
> >> > >> low
> >> > >> > >>> timer would also produce a flood similar to the first case.
> >> > >> > >>>
> >> > >> > >>> All in all, the situation is quite unsatisfying because
> >> idleness
> >> > >> > implies
> >> > >> > >> no
> >> > >> > >>> records. However, currently there is no need to have that
> >> > >> implication:
> >> > >> > >>> since we only use it for watermarks, we can easily allow
> >> records
> >> > to
> >> > >> be
> >> > >> > >>> emitted (in fact that was the old behavior before FLINK-18934
> >> in
> >> > >> many
> >> > >> > >>> cases) and still get the intended behavior in respect to
> >> > watermarks:
> >> > >> > >>> - A channel that is active is providing watermarks.
> >> > >> > >>> - An idle channel is not providing any watermarks but can
> >> deliver
> >> > >> > >> records.
> >> > >> > >>>
> >> > >> > >>> Ultimately, that would mean that we are actually not talking
> >> > >> > idle/active
> >> > >> > >>> partitions anymore. We are talking more about whether a
> >> particular
> >> > >> > >> subtask
> >> > >> > >>> should influence downstream watermark calculation or not.
> >> Leading
> >> > to
> >> > >> > the
> >> > >> > >>> following questions:
> >> > >> > >>> 1. Do we want to change the definition as outlined?
> >> > >> > >>> 2. Do you see any problem with emitting records on subtask
> >> without
> >> > >> > >> explicit
> >> > >> > >>> watermarks?
> >> > >> > >>> 3. If we want to go this way, we may need to refine the
> >> > >> > >> names/definitions.
> >> > >> > >>> Any ideas?
> >> > >> > >>>
> >> > >> > >>> I think idle partition should translate into something like
> >> > >> > >>> automatic/implicit/passive watermarks; active partition into
> >> > >> > >>> explicit/active watermarks. Then StreamStatus is more about
> >> > >> > WatermarkMode
> >> > >> > >>> (not really happy with this one).
> >> > >> > >>>
> >> > >> > >>> Best,
> >> > >> > >>>
> >> > >> > >>> Arvid
> >> > >> > >>>
> >> > >> > >>> [1]
> >> > >> > >>>
> >> > >> > >>>
> >> > >> > >>
> >> > >> >
> >> > >>
> >> >
> >>
> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86
> >> > >> > >>>
> >> > >> > >>
> >> > >> >
> >> > >>
> >> > >
> >> >
> >>
> >
>

Reply via email to