[ 
https://issues.apache.org/jira/browse/FLINK-28033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556114#comment-17556114
 ] 

Weijie Guo edited comment on FLINK-28033 at 6/19/22 6:43 PM:
-------------------------------------------------------------

[~ye-able] , No problem here, `newMinWatermark` is a local variable, it will be 
reset to the maximum value every time the method is called. When all input 
watermark advance, it will emit new watermark.


was (Author: weijie guo):
`newMinWatermark` is a local variable, it will be reset to the maximum value 
every time the method is called. When all input watermark advance, it will emit 
new watermark.

> find and output new min watermark mybe wrong when in multichannel
> -----------------------------------------------------------------
>
>                 Key: FLINK-28033
>                 URL: https://issues.apache.org/jira/browse/FLINK-28033
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>            Reporter: YeAble
>            Priority: Major
>
> File: StatusWatermarkValve.java
> Method:  findAndOutputNewMinWatermarkAcrossAlignedChannels
> {code:java}
> //代码占位符
> long newMinWatermark = Long.MAX_VALUE;
> boolean hasAlignedChannels = false;
> // determine new overall watermark by considering only watermark-aligned 
> channels across all
> // channels
> for (InputChannelStatus channelStatus : channelStatuses) {
>     if (channelStatus.isWatermarkAligned) {
>         hasAlignedChannels = true;
>         newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
>     }
> }
> // we acknowledge and output the new overall watermark if it really is 
> aggregated
> // from some remaining aligned channel, and is also larger than the last 
> output watermark
> if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
>     lastOutputWatermark = newMinWatermark;
>     output.emitWatermark(new Watermark(lastOutputWatermark));
> } {code}
>  channelStatus's initalized watermark is Long.MIN_VALUE. when one 
> channelStatus's watermark is changed,but other channelStatus's is not 
> changed, the newMinWatermark is always Long.MIN_VALUE and output not 
> emitwatermark。 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to