Forgot to provide the link to the [1] reference: [1] https://issues.apache.org/jira/browse/FLINK-5017
On Thu, Jun 10, 2021 at 11:43 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi everyone, > > Sorry for chiming in late here. > > Regarding the topic of changing the definition of StreamStatus and > changing the name as well: > After digging into some of the roots of this implementation [1], initially > the StreamStatus was actually defined to mark "watermark idleness", and not > "record idleness" (in fact, the alternative name "WatermarkStatus" was > considered at the time). > > The concern at the time causing us to alter the definition to be "record > idleness" in the final implementation was due to the existence of periodic > timestamp / watermark generators within the pipeline. Those would continue > to generate non-increasing watermarks in the absence of any input records > from upstream. In this scenario, downstream operators would not be able to > consider that channel as idle and therefore watermark progress is locked. > We could consider a timeout-based approach on those specific operators to > toggle watermark idleness if the values remain constant for a period of > time, but then again, this is very ill-defined and most likely wrong. > > I have not followed the newest changes to the watermark generator > operators and am not sure if this issue is still relevant. > Otherwise, I don't see other problems with changing the definition here. > > Thanks, > Gordon > > On Wed, Jun 9, 2021 at 3:06 PM Arvid Heise <ar...@apache.org> wrote: > >> Hi Eron, >> >> again to recap from the other thread: >> - You are right that idleness is correct with static assignment and fully >> active partitions. In this case, the source defines idleness. (case A) >> - For the more pressing use cases of idle, assigned partitions, the user >> defines an idleness threshold, and it becomes potentially incorrect, when >> the partition becomes active again. (case B) >> - Same holds for dynamic assignment of splits. If a source without a split >> gets a split assigned dynamically, there is a realistic chance that the >> watermark advanced past the first record of the newly assigned split. >> (case >> C) >> You can certainly insist that only the first case is valid (as it's >> correct) but we know that users use it in other ways and that was also the >> intent of the devs. >> >> Now the question could be if it makes sense to distinguish these cases. >> Would you treat the idleness information differently (especially in the >> sink/source that motivated FLIP-167) if you knew that the idleness is >> guaranteed correct? >> We could have some WatermarkStatus with ACTIVE, IDLE (case A), TIMEOUT >> (case B). >> >> However, that would still leave case C, which probably would need to be >> solved completely differently. I could imagine that a source with dynamic >> assignments should never have IDLE subtasks and rather manage the idleness >> itself. For example, it could emit a watermark per second/minute that is >> directly fetched from the source system. I'm just not sure if the current >> WatermarkAssigner interface suffices in that regard... >> >> >> On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski <piotr.nowoj...@gmail.com> >> wrote: >> >> > Hi Eron, >> > >> > Can you elaborate a bit more what do you mean? I don’t understand what >> do >> > you mean by more general solution. >> > >> > As of now, stream is marked idle by a source/watermark generator, which >> > has an effect of temporarily ignoring this stream/partition from >> > calculating min watermark in the downstream tasks. However stream is >> > switching back to active when any record is emitted. This is what’s >> causing >> > problems described by Arvid. >> > >> > The core of our proposal is very simple. Keep everything as it is except >> > stating that stream will be changed back to active only once a >> watermark is >> > emitted again - not record. In other words disconnecting idleness from >> > presence of records and connecting it only to presence or lack of >> > watermarks and allowing to emit records while “stream status” is “idle” >> > >> > Piotrek >> > >> > >> > > Wiadomość napisana przez Eron Wright <ewri...@streamnative.io >> .invalid> >> > w dniu 09.06.2021, o godz. 06:01: >> > > >> > > It seems to me that idleness was introduced to deal with a very >> specific >> > > issue. In the pipeline, watermarks are aggregated not on a per-split >> > basis >> > > but on a per-subtask basis. This works well when each subtask has >> > exactly >> > > one split. When a sub-task has multiple splits, various complications >> > > occur involving the commingling of watermarks. And when a sub-task >> has >> > no >> > > splits, the pipeline stalls altogether. To deal with the latter >> problem, >> > > idleness was introduced. The sub-task simply declares itself to be >> idle >> > to >> > > be taken out of consideration for purposes of watermark aggregation. >> > > >> > > If we're looking for a more general solution, I would suggest we >> discuss >> > > how to track watermarks on a per-split basis. Or, as Till mentioned >> > > recently, an alternate solution may be to dynamically adjust the >> > > parallelism of the task. >> > > >> > > I don't agree with the notion that idleness involves a correctness >> > > tradeoff. The facility I described above has no impact on >> correctness. >> > > Meanwhile, various watermark strategies rely on heuristics involving >> the >> > > processing-time domain, and the term idleness seems to have found >> > purchase >> > > there too. The connection among the concepts seems tenuous. >> > > >> > > -E >> > > >> > > >> > > >> > >> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <pnowoj...@apache.org> >> > wrote: >> > >> >> > >> Hi Arvid, >> > >> >> > >> Thanks for writing down this summary and proposal. I think this was >> the >> > >> foundation of the disagreement in FLIP-167 discussion. Dawid was >> arguing >> > >> that idleness is intermittent, strictly a task local concept and as >> such >> > >> shouldn't be exposed in for example sinks. While me and Eron thought >> > that >> > >> it's a concept strictly connected to watermarks. >> > >> >> > >> 1. I'm big +1 for changing the StreamStatus definition to stream >> > "providing >> > >> watermark" and "not providing watermark". With respect to that I >> agree >> > with >> > >> Dawid that record bound idleness *(if we would ever need to >> > define/expose >> > >> it)* should be an intermittent concept, like for example the >> existing in >> > >> the Task/runtime input availability (StreamTaskInput#isAvailable). >> > >> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But >> > >> I also don't have any good ideas. >> > >> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`? >> > >> >> > >> Best, >> > >> Piotrek >> > >> >> > >> wt., 8 cze 2021 o 16:35 Arvid Heise <ar...@apache.org> napisał(a): >> > >> >> > >>> Hi devs, >> > >>> >> > >>> While discussing "Watermark propagation with Sink API" and during >> > >>> "[FLINK-18934] Idle stream does not advance watermark in connected >> > >> stream", >> > >>> we noticed some drawbacks on how Flink defines idle partitions >> > currently. >> > >>> >> > >>> To recap, idleness was always considered as a means to achieve >> progress >> > >> in >> > >>> window operators with idle partition in the source at the risk of >> > losing >> > >> a >> > >>> bit of correctness. In particular, records could be considered late, >> > >> simply >> > >>> because of that idleness timeout and not because they arrived out of >> > >> order. >> > >>> A potential reprocessing would not be causing these records to be >> > >>> considered late and we may end up with a different (correct) result. >> > >>> >> > >>> The drawbacks that we discovered are as follows: >> > >>> - We currently only use idleness to exclude respective upstream >> tasks >> > >> from >> > >>> participating in watermark generation. >> > >>> - However, the definition is bound to records. [1] In particular, >> > while a >> > >>> partition is idle, no records should be produced. >> > >>> - That brings us into quite a few edge cases, where operators emit >> > >> records, >> > >>> while they are actually idling: Think of timers, asyncIO operators, >> > >> window >> > >>> operators based on timeouts, etc. that trigger on an operator >> ingesting >> > >> an >> > >>> idle partition. >> > >>> - The proper solution would be to turn the operator active while >> > emitting >> > >>> and to return to being idle afterwards (but when?). However, this >> has >> > >> some >> > >>> unintended side-effects depending on when you switch back: >> > >>> - If you toggle stream status for each record, you get a huge >> overhead >> > >> on >> > >>> stream status records and quite a bit of processing in downstream >> > >> operators >> > >>> (that code path is not much optimized since switching is considered >> a >> > >> rare >> > >>> thing). >> > >>> - If you toggle after a certain time, you may get delays>idleness >> in >> > >> the >> > >>> downstream window operators. >> > >>> - You could turn back when you processed all pending mails, but if >> you >> > >>> have a self-replicating mail that would be never. Self-enqueueing, >> low >> > >>> timer would also produce a flood similar to the first case. >> > >>> >> > >>> All in all, the situation is quite unsatisfying because idleness >> > implies >> > >> no >> > >>> records. However, currently there is no need to have that >> implication: >> > >>> since we only use it for watermarks, we can easily allow records to >> be >> > >>> emitted (in fact that was the old behavior before FLINK-18934 in >> many >> > >>> cases) and still get the intended behavior in respect to watermarks: >> > >>> - A channel that is active is providing watermarks. >> > >>> - An idle channel is not providing any watermarks but can deliver >> > >> records. >> > >>> >> > >>> Ultimately, that would mean that we are actually not talking >> > idle/active >> > >>> partitions anymore. We are talking more about whether a particular >> > >> subtask >> > >>> should influence downstream watermark calculation or not. Leading to >> > the >> > >>> following questions: >> > >>> 1. Do we want to change the definition as outlined? >> > >>> 2. Do you see any problem with emitting records on subtask without >> > >> explicit >> > >>> watermarks? >> > >>> 3. If we want to go this way, we may need to refine the >> > >> names/definitions. >> > >>> Any ideas? >> > >>> >> > >>> I think idle partition should translate into something like >> > >>> automatic/implicit/passive watermarks; active partition into >> > >>> explicit/active watermarks. Then StreamStatus is more about >> > WatermarkMode >> > >>> (not really happy with this one). >> > >>> >> > >>> Best, >> > >>> >> > >>> Arvid >> > >>> >> > >>> [1] >> > >>> >> > >>> >> > >> >> > >> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86 >> > >>> >> > >> >> > >> >