Hi everyone, I created a FLIP and started a discussion around that topic [1].
Best, Arvid [1] https://lists.apache.org/thread.html/r8357d64b9cfdf5a233c53a20d9ac62b75c07c925ce2c43e162f1e39c%40%3Cdev.flink.apache.org%3E On Thu, Jun 10, 2021 at 9:18 PM Eron Wright <ewri...@streamnative.io.invalid> wrote: > I quickly updated the draft PR that would propagate idleness information to > the Sink function, based on the recent improvement provided by > FLINK-18934. For illustration purposes. > https://github.com/streamnative/flink/pull/2 > > On Thu, Jun 10, 2021 at 11:34 AM Eron Wright <ewri...@streamnative.io> > wrote: > > > Regarding records vs watermarks, I feel it is wrong to include records in > > the considerations, because the clearest definition of idleness (IMO) is > > 'active participation in advancing the event-time clock', and records > don't > > directly affect the clock. Of course, records indirectly influence the > > clock by stimulating a generator. > > > > Let's focus on the problem that Arvid mentioned about the need to briefly > > toggle idleness (as implemented by the AnnouncedStatus class). Seems to > me > > that the idleness of an operator's inputs need not strictly determine > > whether its output is idle. The operator should be able to react to > status > > changes on a given input (implemented in FLINK-18934), and this MAY > cause a > > change to the output status at the operator's discretion. The default > > behavior would be passthrough. Meanwhile, when a given operator emits a > > watermark, it is re-asserting itself as a participant in advancing the > > downstream event time clock, and its output channel should transition to > > active and remain active. An operator should also be able to mark its > > output channel(s) as idle, to complete the framework. > > > > In concept, a watermark generator somewhere in the pipeline could 'take > > control' of the event time clock when its input channel transitions to > > idle. The upstream source is relinquishing control of the clock in that > > situation. > > > > BTW, I recommend looking at the PR of FLINK-18934 because it lays bare > the > > whole pipeline. Nice work there Dawid! To better reflect the decoupling > > of input from output idleness, "AbstractStreamOperator::emitStreamStatus" > > should be named "processStreamStatus" and call an overridable method to > > emit the status change whenever the combined idleness flips. This would > > facilitate an idleness-aware watermark generator and an idleness-aware > sink. > > > > > > On Thu, Jun 10, 2021 at 3:31 AM Till Rohrmann <trohrm...@apache.org> > > wrote: > > > >> Thanks for providing these details Gordon. I have to admit that I do not > >> fully follow the reasoning why periodic watermark generators forced us > to > >> define idleness for records. Is it because the idleness was generated > >> based > >> on the non-availability of more data in the sources and not in the > >> watermark generators which are executed after the records have been read > >> from the external system? So was the problem where the stream status was > >> decided in the end? > >> > >> If there is a periodic watermark generator somewhere in the pipeline > that > >> periodically generates watermarks, then we don't have to mark its output > >> channels as watermark idle because watermarks are being sent. Hence, > given > >> that the watermark generation logic makes sense, the overall job should > be > >> able to make progress. If the watermark generator is informed about its > >> input channel status, it could even decide whether to propagate the > >> watermark idleness and stop generating watermarks or not. Of course, > this > >> leaves room for people shooting themselves into their feet. > >> > >> Cheers, > >> Till > >> > >> On Thu, Jun 10, 2021 at 5:44 AM Tzu-Li (Gordon) Tai < > tzuli...@apache.org> > >> wrote: > >> > >> > Forgot to provide the link to the [1] reference: > >> > > >> > [1] https://issues.apache.org/jira/browse/FLINK-5017 > >> > > >> > On Thu, Jun 10, 2021 at 11:43 AM Tzu-Li (Gordon) Tai < > >> tzuli...@apache.org> > >> > wrote: > >> > > >> > > Hi everyone, > >> > > > >> > > Sorry for chiming in late here. > >> > > > >> > > Regarding the topic of changing the definition of StreamStatus and > >> > > changing the name as well: > >> > > After digging into some of the roots of this implementation [1], > >> > initially > >> > > the StreamStatus was actually defined to mark "watermark idleness", > >> and > >> > not > >> > > "record idleness" (in fact, the alternative name "WatermarkStatus" > was > >> > > considered at the time). > >> > > > >> > > The concern at the time causing us to alter the definition to be > >> "record > >> > > idleness" in the final implementation was due to the existence of > >> > periodic > >> > > timestamp / watermark generators within the pipeline. Those would > >> > continue > >> > > to generate non-increasing watermarks in the absence of any input > >> records > >> > > from upstream. In this scenario, downstream operators would not be > >> able > >> > to > >> > > consider that channel as idle and therefore watermark progress is > >> locked. > >> > > We could consider a timeout-based approach on those specific > >> operators to > >> > > toggle watermark idleness if the values remain constant for a period > >> of > >> > > time, but then again, this is very ill-defined and most likely > wrong. > >> > > > >> > > I have not followed the newest changes to the watermark generator > >> > > operators and am not sure if this issue is still relevant. > >> > > Otherwise, I don't see other problems with changing the definition > >> here. > >> > > > >> > > Thanks, > >> > > Gordon > >> > > > >> > > On Wed, Jun 9, 2021 at 3:06 PM Arvid Heise <ar...@apache.org> > wrote: > >> > > > >> > >> Hi Eron, > >> > >> > >> > >> again to recap from the other thread: > >> > >> - You are right that idleness is correct with static assignment and > >> > fully > >> > >> active partitions. In this case, the source defines idleness. (case > >> A) > >> > >> - For the more pressing use cases of idle, assigned partitions, the > >> user > >> > >> defines an idleness threshold, and it becomes potentially > incorrect, > >> > when > >> > >> the partition becomes active again. (case B) > >> > >> - Same holds for dynamic assignment of splits. If a source without > a > >> > split > >> > >> gets a split assigned dynamically, there is a realistic chance that > >> the > >> > >> watermark advanced past the first record of the newly assigned > split. > >> > >> (case > >> > >> C) > >> > >> You can certainly insist that only the first case is valid (as it's > >> > >> correct) but we know that users use it in other ways and that was > >> also > >> > the > >> > >> intent of the devs. > >> > >> > >> > >> Now the question could be if it makes sense to distinguish these > >> cases. > >> > >> Would you treat the idleness information differently (especially in > >> the > >> > >> sink/source that motivated FLIP-167) if you knew that the idleness > is > >> > >> guaranteed correct? > >> > >> We could have some WatermarkStatus with ACTIVE, IDLE (case A), > >> TIMEOUT > >> > >> (case B). > >> > >> > >> > >> However, that would still leave case C, which probably would need > to > >> be > >> > >> solved completely differently. I could imagine that a source with > >> > dynamic > >> > >> assignments should never have IDLE subtasks and rather manage the > >> > idleness > >> > >> itself. For example, it could emit a watermark per second/minute > >> that is > >> > >> directly fetched from the source system. I'm just not sure if the > >> > current > >> > >> WatermarkAssigner interface suffices in that regard... > >> > >> > >> > >> > >> > >> On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski < > >> piotr.nowoj...@gmail.com > >> > > > >> > >> wrote: > >> > >> > >> > >> > Hi Eron, > >> > >> > > >> > >> > Can you elaborate a bit more what do you mean? I don’t understand > >> what > >> > >> do > >> > >> > you mean by more general solution. > >> > >> > > >> > >> > As of now, stream is marked idle by a source/watermark generator, > >> > which > >> > >> > has an effect of temporarily ignoring this stream/partition from > >> > >> > calculating min watermark in the downstream tasks. However stream > >> is > >> > >> > switching back to active when any record is emitted. This is > what’s > >> > >> causing > >> > >> > problems described by Arvid. > >> > >> > > >> > >> > The core of our proposal is very simple. Keep everything as it is > >> > except > >> > >> > stating that stream will be changed back to active only once a > >> > >> watermark is > >> > >> > emitted again - not record. In other words disconnecting idleness > >> from > >> > >> > presence of records and connecting it only to presence or lack of > >> > >> > watermarks and allowing to emit records while “stream status” is > >> > “idle” > >> > >> > > >> > >> > Piotrek > >> > >> > > >> > >> > > >> > >> > > Wiadomość napisana przez Eron Wright <ewri...@streamnative.io > >> > >> .invalid> > >> > >> > w dniu 09.06.2021, o godz. 06:01: > >> > >> > > > >> > >> > > It seems to me that idleness was introduced to deal with a > very > >> > >> specific > >> > >> > > issue. In the pipeline, watermarks are aggregated not on a > >> > per-split > >> > >> > basis > >> > >> > > but on a per-subtask basis. This works well when each subtask > >> has > >> > >> > exactly > >> > >> > > one split. When a sub-task has multiple splits, various > >> > complications > >> > >> > > occur involving the commingling of watermarks. And when a > >> sub-task > >> > >> has > >> > >> > no > >> > >> > > splits, the pipeline stalls altogether. To deal with the > latter > >> > >> problem, > >> > >> > > idleness was introduced. The sub-task simply declares itself > to > >> be > >> > >> idle > >> > >> > to > >> > >> > > be taken out of consideration for purposes of watermark > >> aggregation. > >> > >> > > > >> > >> > > If we're looking for a more general solution, I would suggest > we > >> > >> discuss > >> > >> > > how to track watermarks on a per-split basis. Or, as Till > >> mentioned > >> > >> > > recently, an alternate solution may be to dynamically adjust > the > >> > >> > > parallelism of the task. > >> > >> > > > >> > >> > > I don't agree with the notion that idleness involves a > >> correctness > >> > >> > > tradeoff. The facility I described above has no impact on > >> > >> correctness. > >> > >> > > Meanwhile, various watermark strategies rely on heuristics > >> involving > >> > >> the > >> > >> > > processing-time domain, and the term idleness seems to have > found > >> > >> > purchase > >> > >> > > there too. The connection among the concepts seems tenuous. > >> > >> > > > >> > >> > > -E > >> > >> > > > >> > >> > > > >> > >> > > > >> > >> > >> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski < > >> > pnowoj...@apache.org> > >> > >> > wrote: > >> > >> > >> > >> > >> > >> Hi Arvid, > >> > >> > >> > >> > >> > >> Thanks for writing down this summary and proposal. I think > this > >> was > >> > >> the > >> > >> > >> foundation of the disagreement in FLIP-167 discussion. Dawid > was > >> > >> arguing > >> > >> > >> that idleness is intermittent, strictly a task local concept > >> and as > >> > >> such > >> > >> > >> shouldn't be exposed in for example sinks. While me and Eron > >> > thought > >> > >> > that > >> > >> > >> it's a concept strictly connected to watermarks. > >> > >> > >> > >> > >> > >> 1. I'm big +1 for changing the StreamStatus definition to > stream > >> > >> > "providing > >> > >> > >> watermark" and "not providing watermark". With respect to > that I > >> > >> agree > >> > >> > with > >> > >> > >> Dawid that record bound idleness *(if we would ever need to > >> > >> > define/expose > >> > >> > >> it)* should be an intermittent concept, like for example the > >> > >> existing in > >> > >> > >> the Task/runtime input availability > >> (StreamTaskInput#isAvailable). > >> > >> > >> 3. I agree that neither `StreamStatus` nor `IDLE` is a good > >> name. > >> > But > >> > >> > >> I also don't have any good ideas. > >> > >> > >> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`? > >> > >> > >> > >> > >> > >> Best, > >> > >> > >> Piotrek > >> > >> > >> > >> > >> > >> wt., 8 cze 2021 o 16:35 Arvid Heise <ar...@apache.org> > >> napisał(a): > >> > >> > >> > >> > >> > >>> Hi devs, > >> > >> > >>> > >> > >> > >>> While discussing "Watermark propagation with Sink API" and > >> during > >> > >> > >>> "[FLINK-18934] Idle stream does not advance watermark in > >> connected > >> > >> > >> stream", > >> > >> > >>> we noticed some drawbacks on how Flink defines idle > partitions > >> > >> > currently. > >> > >> > >>> > >> > >> > >>> To recap, idleness was always considered as a means to > achieve > >> > >> progress > >> > >> > >> in > >> > >> > >>> window operators with idle partition in the source at the > risk > >> of > >> > >> > losing > >> > >> > >> a > >> > >> > >>> bit of correctness. In particular, records could be > considered > >> > late, > >> > >> > >> simply > >> > >> > >>> because of that idleness timeout and not because they arrived > >> out > >> > of > >> > >> > >> order. > >> > >> > >>> A potential reprocessing would not be causing these records > to > >> be > >> > >> > >>> considered late and we may end up with a different (correct) > >> > result. > >> > >> > >>> > >> > >> > >>> The drawbacks that we discovered are as follows: > >> > >> > >>> - We currently only use idleness to exclude respective > upstream > >> > >> tasks > >> > >> > >> from > >> > >> > >>> participating in watermark generation. > >> > >> > >>> - However, the definition is bound to records. [1] In > >> particular, > >> > >> > while a > >> > >> > >>> partition is idle, no records should be produced. > >> > >> > >>> - That brings us into quite a few edge cases, where operators > >> emit > >> > >> > >> records, > >> > >> > >>> while they are actually idling: Think of timers, asyncIO > >> > operators, > >> > >> > >> window > >> > >> > >>> operators based on timeouts, etc. that trigger on an operator > >> > >> ingesting > >> > >> > >> an > >> > >> > >>> idle partition. > >> > >> > >>> - The proper solution would be to turn the operator active > >> while > >> > >> > emitting > >> > >> > >>> and to return to being idle afterwards (but when?). However, > >> this > >> > >> has > >> > >> > >> some > >> > >> > >>> unintended side-effects depending on when you switch back: > >> > >> > >>> - If you toggle stream status for each record, you get a > huge > >> > >> overhead > >> > >> > >> on > >> > >> > >>> stream status records and quite a bit of processing in > >> downstream > >> > >> > >> operators > >> > >> > >>> (that code path is not much optimized since switching is > >> > considered > >> > >> a > >> > >> > >> rare > >> > >> > >>> thing). > >> > >> > >>> - If you toggle after a certain time, you may get > >> delays>idleness > >> > >> in > >> > >> > >> the > >> > >> > >>> downstream window operators. > >> > >> > >>> - You could turn back when you processed all pending mails, > >> but > >> > if > >> > >> you > >> > >> > >>> have a self-replicating mail that would be never. > >> Self-enqueueing, > >> > >> low > >> > >> > >>> timer would also produce a flood similar to the first case. > >> > >> > >>> > >> > >> > >>> All in all, the situation is quite unsatisfying because > >> idleness > >> > >> > implies > >> > >> > >> no > >> > >> > >>> records. However, currently there is no need to have that > >> > >> implication: > >> > >> > >>> since we only use it for watermarks, we can easily allow > >> records > >> > to > >> > >> be > >> > >> > >>> emitted (in fact that was the old behavior before FLINK-18934 > >> in > >> > >> many > >> > >> > >>> cases) and still get the intended behavior in respect to > >> > watermarks: > >> > >> > >>> - A channel that is active is providing watermarks. > >> > >> > >>> - An idle channel is not providing any watermarks but can > >> deliver > >> > >> > >> records. > >> > >> > >>> > >> > >> > >>> Ultimately, that would mean that we are actually not talking > >> > >> > idle/active > >> > >> > >>> partitions anymore. We are talking more about whether a > >> particular > >> > >> > >> subtask > >> > >> > >>> should influence downstream watermark calculation or not. > >> Leading > >> > to > >> > >> > the > >> > >> > >>> following questions: > >> > >> > >>> 1. Do we want to change the definition as outlined? > >> > >> > >>> 2. Do you see any problem with emitting records on subtask > >> without > >> > >> > >> explicit > >> > >> > >>> watermarks? > >> > >> > >>> 3. If we want to go this way, we may need to refine the > >> > >> > >> names/definitions. > >> > >> > >>> Any ideas? > >> > >> > >>> > >> > >> > >>> I think idle partition should translate into something like > >> > >> > >>> automatic/implicit/passive watermarks; active partition into > >> > >> > >>> explicit/active watermarks. Then StreamStatus is more about > >> > >> > WatermarkMode > >> > >> > >>> (not really happy with this one). > >> > >> > >>> > >> > >> > >>> Best, > >> > >> > >>> > >> > >> > >>> Arvid > >> > >> > >>> > >> > >> > >>> [1] > >> > >> > >>> > >> > >> > >>> > >> > >> > >> > >> > >> > > >> > >> > >> > > >> > https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86 > >> > >> > >>> > >> > >> > >> > >> > >> > > >> > >> > >> > > > >> > > >> > > >