[ 
https://issues.apache.org/jira/browse/FLINK-5601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301571#comment-17301571
 ] 

Julius Michaelis edited comment on FLINK-5601 at 3/16/21, 10:11 AM:
--------------------------------------------------------------------

Hey, we're also affected by this, and I'd love to know whether there's any 
progress on this (or why [~wind_ljy]'s PR wasn't merged - maybe I can help?).

By the way, it seems to me that what [~kkl0u] suggested and [~wind_ljy] 
implemented isn't sufficient: Say we have two inputs (assume p=1 for 
simplicity), each with their independent 
{{TimestampsAnd\{Periodic|Punctuated}WatermarksOperator}}. They will start up 
independently, one will start sending the saved watermark and some records 
downstream while the other one is still starting up. If we take the union of 
those two streams further down, its watermark will still be unset (because it 
hasn't received all upstream watermarks), yet it will already receive records 
from some of it inputs. So the "upon recovery, some late records may not be 
correctly dropped" problem would persist. Is this understanding of the problem 
correct? Is there a neat way to avoid it, e.g. (additionally) saving it at the 
{{WindowOperator}}?


was (Author: caesar):
Hey, we're also affected by this, and I'd love to know whether there's any 
progress on this (or why [~wind_ljy]'s PR wasn't merged - maybe I can help?).

By the way, it seems to me that what [~kkl0u] suggested and [~wind_ljy] 
implemented isn't sufficient: Say we have two inputs, each with their 
independent {{TimestampsAnd\{Periodic|Punctuated}WatermarksOperator}}. They 
will start up independently, one will start sending the saved watermark and 
some records downstream while the other one is still starting up. If we take 
the union of those two streams further down, its watermark will still be unset 
(because it hasn't received all upstream watermarks), yet it will already 
receive records from some of it inputs. So the "upon recovery, some late 
records may not be correctly dropped" problem would persist. Is this 
understanding of the problem correct? Is there a neat way to avoid it, e.g. 
(additionally) saving it at the {{WindowOperator}}?

> Window operator does not checkpoint watermarks
> ----------------------------------------------
>
>                 Key: FLINK-5601
>                 URL: https://issues.apache.org/jira/browse/FLINK-5601
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.5.0, 1.6.0, 1.7.0, 1.8.0, 1.9.0, 1.10.0, 1.11.0
>            Reporter: Ufuk Celebi
>            Assignee: Jiayi Liao
>            Priority: Critical
>              Labels: pull-request-available
>
> During release testing [~stefanrichte...@gmail.com] and I noticed that 
> watermarks are not checkpointed in the window operator.
> This can lead to non determinism when restoring checkpoints. I was running an 
> adjusted {{SessionWindowITCase}} via Kafka for testing migration and 
> rescaling and ran into failures, because the data generator required 
> determinisitic behaviour.
> What happened was that on restore it could happen that late elements were not 
> dropped, because the watermarks needed to be re-established after restore 
> first.
> [~aljoscha] Do you know whether there is a special reason for explicitly not 
> checkpointing watermarks?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to