Forgot to provide the link to the [1] reference:

[1] https://issues.apache.org/jira/browse/FLINK-5017

On Thu, Jun 10, 2021 at 11:43 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi everyone,
>
> Sorry for chiming in late here.
>
> Regarding the topic of changing the definition of StreamStatus and
> changing the name as well:
> After digging into some of the roots of this implementation [1], initially
> the StreamStatus was actually defined to mark "watermark idleness", and not
> "record idleness" (in fact, the alternative name "WatermarkStatus" was
> considered at the time).
>
> The concern at the time causing us to alter the definition to be "record
> idleness" in the final implementation was due to the existence of periodic
> timestamp / watermark generators within the pipeline. Those would continue
> to generate non-increasing watermarks in the absence of any input records
> from upstream. In this scenario, downstream operators would not be able to
> consider that channel as idle and therefore watermark progress is locked.
> We could consider a timeout-based approach on those specific operators to
> toggle watermark idleness if the values remain constant for a period of
> time, but then again, this is very ill-defined and most likely wrong.
>
> I have not followed the newest changes to the watermark generator
> operators and am not sure if this issue is still relevant.
> Otherwise, I don't see other problems with changing the definition here.
>
> Thanks,
> Gordon
>
> On Wed, Jun 9, 2021 at 3:06 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Eron,
>>
>> again to recap from the other thread:
>> - You are right that idleness is correct with static assignment and fully
>> active partitions. In this case, the source defines idleness. (case A)
>> - For the more pressing use cases of idle, assigned partitions, the user
>> defines an idleness threshold, and it becomes potentially incorrect, when
>> the partition becomes active again. (case B)
>> - Same holds for dynamic assignment of splits. If a source without a split
>> gets a split assigned dynamically, there is a realistic chance that the
>> watermark advanced past the first record of the newly assigned split.
>> (case
>> C)
>> You can certainly insist that only the first case is valid (as it's
>> correct) but we know that users use it in other ways and that was also the
>> intent of the devs.
>>
>> Now the question could be if it makes sense to distinguish these cases.
>> Would you treat the idleness information differently (especially in the
>> sink/source that motivated FLIP-167) if you knew that the idleness is
>> guaranteed correct?
>> We could have some WatermarkStatus with ACTIVE, IDLE (case A), TIMEOUT
>> (case B).
>>
>> However, that would still leave case C, which probably would need to be
>> solved completely differently. I could imagine that a source with dynamic
>> assignments should never have IDLE subtasks and rather manage the idleness
>> itself. For example, it could emit a watermark per second/minute that is
>> directly fetched from the source system. I'm just not sure if the current
>> WatermarkAssigner interface suffices in that regard...
>>
>>
>> On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski <piotr.nowoj...@gmail.com>
>> wrote:
>>
>> > Hi Eron,
>> >
>> > Can you elaborate a bit more what do you mean? I don’t understand what
>> do
>> > you mean by more general solution.
>> >
>> > As of now, stream is marked idle by a source/watermark generator, which
>> > has an effect of temporarily ignoring this stream/partition from
>> > calculating min watermark in the downstream tasks. However stream is
>> > switching back to active when any record is emitted. This is what’s
>> causing
>> > problems described by Arvid.
>> >
>> > The core of our proposal is very simple. Keep everything as it is except
>> > stating that stream will be changed back to active only once a
>> watermark is
>> > emitted again - not record. In other words disconnecting idleness from
>> > presence of records and connecting it only to presence or lack of
>> > watermarks and allowing to emit records while “stream status” is “idle”
>> >
>> > Piotrek
>> >
>> >
>> > > Wiadomość napisana przez Eron Wright <ewri...@streamnative.io
>> .invalid>
>> > w dniu 09.06.2021, o godz. 06:01:
>> > >
>> > > It seems to me that idleness was introduced to deal with a very
>> specific
>> > > issue.  In the pipeline, watermarks are aggregated not on a per-split
>> > basis
>> > > but on a per-subtask basis.  This works well when each subtask has
>> > exactly
>> > > one split.  When a sub-task has multiple splits, various complications
>> > > occur involving the commingling of watermarks.  And when a sub-task
>> has
>> > no
>> > > splits, the pipeline stalls altogether.  To deal with the latter
>> problem,
>> > > idleness was introduced.  The sub-task simply declares itself to be
>> idle
>> > to
>> > > be taken out of consideration for purposes of watermark aggregation.
>> > >
>> > > If we're looking for a more general solution, I would suggest we
>> discuss
>> > > how to track watermarks on a per-split basis.  Or, as Till mentioned
>> > > recently, an alternate solution may be to dynamically adjust the
>> > > parallelism of the task.
>> > >
>> > > I don't agree with the notion that idleness involves a correctness
>> > > tradeoff.  The facility I described above has no impact on
>> correctness.
>> > > Meanwhile, various watermark strategies rely on heuristics involving
>> the
>> > > processing-time domain, and the term idleness seems to have found
>> > purchase
>> > > there too.  The connection among the concepts seems tenuous.
>> > >
>> > > -E
>> > >
>> > >
>> > >
>> > >> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <pnowoj...@apache.org>
>> > wrote:
>> > >>
>> > >> Hi Arvid,
>> > >>
>> > >> Thanks for writing down this summary and proposal. I think this was
>> the
>> > >> foundation of the disagreement in FLIP-167 discussion. Dawid was
>> arguing
>> > >> that idleness is intermittent, strictly a task local concept and as
>> such
>> > >> shouldn't be exposed in for example sinks. While me and Eron thought
>> > that
>> > >> it's a concept strictly connected to watermarks.
>> > >>
>> > >> 1. I'm big +1 for changing the StreamStatus definition to stream
>> > "providing
>> > >> watermark" and "not providing watermark". With respect to that I
>> agree
>> > with
>> > >> Dawid that record bound idleness *(if we would ever need to
>> > define/expose
>> > >> it)* should be an intermittent concept, like for example the
>> existing in
>> > >> the Task/runtime input availability (StreamTaskInput#isAvailable).
>> > >> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But
>> > >> I also don't have any good ideas.
>> > >> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
>> > >>
>> > >> Best,
>> > >> Piotrek
>> > >>
>> > >> wt., 8 cze 2021 o 16:35 Arvid Heise <ar...@apache.org> napisał(a):
>> > >>
>> > >>> Hi devs,
>> > >>>
>> > >>> While discussing "Watermark propagation with Sink API" and during
>> > >>> "[FLINK-18934] Idle stream does not advance watermark in connected
>> > >> stream",
>> > >>> we noticed some drawbacks on how Flink defines idle partitions
>> > currently.
>> > >>>
>> > >>> To recap, idleness was always considered as a means to achieve
>> progress
>> > >> in
>> > >>> window operators with idle partition in the source at the risk of
>> > losing
>> > >> a
>> > >>> bit of correctness. In particular, records could be considered late,
>> > >> simply
>> > >>> because of that idleness timeout and not because they arrived out of
>> > >> order.
>> > >>> A potential reprocessing would not be causing these records to be
>> > >>> considered late and we may end up with a different (correct) result.
>> > >>>
>> > >>> The drawbacks that we discovered are as follows:
>> > >>> - We currently only use idleness to exclude respective upstream
>> tasks
>> > >> from
>> > >>> participating in watermark generation.
>> > >>> - However, the definition is bound to records. [1] In particular,
>> > while a
>> > >>> partition is idle, no records should be produced.
>> > >>> - That brings us into quite a few edge cases, where operators emit
>> > >> records,
>> > >>> while they are actually idling: Think of timers, asyncIO operators,
>> > >> window
>> > >>> operators based on timeouts, etc. that trigger on an operator
>> ingesting
>> > >> an
>> > >>> idle partition.
>> > >>> - The proper solution would be to turn the operator active while
>> > emitting
>> > >>> and to return to being idle afterwards (but when?). However, this
>> has
>> > >> some
>> > >>> unintended side-effects depending on when you switch back:
>> > >>>  - If you toggle stream status for each record, you get a huge
>> overhead
>> > >> on
>> > >>> stream status records and quite a bit of processing in downstream
>> > >> operators
>> > >>> (that code path is not much optimized since switching is considered
>> a
>> > >> rare
>> > >>> thing).
>> > >>>  - If you toggle after a certain time, you may get delays>idleness
>> in
>> > >> the
>> > >>> downstream window operators.
>> > >>>  - You could turn back when you processed all pending mails, but if
>> you
>> > >>> have a self-replicating mail that would be never. Self-enqueueing,
>> low
>> > >>> timer would also produce a flood similar to the first case.
>> > >>>
>> > >>> All in all, the situation is quite unsatisfying because idleness
>> > implies
>> > >> no
>> > >>> records. However, currently there is no need to have that
>> implication:
>> > >>> since we only use it for watermarks, we can easily allow records to
>> be
>> > >>> emitted (in fact that was the old behavior before FLINK-18934 in
>> many
>> > >>> cases) and still get the intended behavior in respect to watermarks:
>> > >>> - A channel that is active is providing watermarks.
>> > >>> - An idle channel is not providing any watermarks but can deliver
>> > >> records.
>> > >>>
>> > >>> Ultimately, that would mean that we are actually not talking
>> > idle/active
>> > >>> partitions anymore. We are talking more about whether a particular
>> > >> subtask
>> > >>> should influence downstream watermark calculation or not. Leading to
>> > the
>> > >>> following questions:
>> > >>> 1. Do we want to change the definition as outlined?
>> > >>> 2. Do you see any problem with emitting records on subtask without
>> > >> explicit
>> > >>> watermarks?
>> > >>> 3. If we want to go this way, we may need to refine the
>> > >> names/definitions.
>> > >>> Any ideas?
>> > >>>
>> > >>> I think idle partition should translate into something like
>> > >>> automatic/implicit/passive watermarks; active partition into
>> > >>> explicit/active watermarks. Then StreamStatus is more about
>> > WatermarkMode
>> > >>> (not really happy with this one).
>> > >>>
>> > >>> Best,
>> > >>>
>> > >>> Arvid
>> > >>>
>> > >>> [1]
>> > >>>
>> > >>>
>> > >>
>> >
>> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86
>> > >>>
>> > >>
>> >
>>
>

Reply via email to