[ https://issues.apache.org/jira/browse/FLINK-28033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556114#comment-17556114 ]
Weijie Guo edited comment on FLINK-28033 at 6/19/22 6:43 PM: ------------------------------------------------------------- [~ye-able] , No problem here, `newMinWatermark` is a local variable, it will be reset to the maximum value every time the method is called. When all input watermark advance, it will emit new watermark. was (Author: weijie guo): `newMinWatermark` is a local variable, it will be reset to the maximum value every time the method is called. When all input watermark advance, it will emit new watermark. > find and output new min watermark mybe wrong when in multichannel > ----------------------------------------------------------------- > > Key: FLINK-28033 > URL: https://issues.apache.org/jira/browse/FLINK-28033 > Project: Flink > Issue Type: Bug > Components: Runtime / Task > Reporter: YeAble > Priority: Major > > File: StatusWatermarkValve.java > Method: findAndOutputNewMinWatermarkAcrossAlignedChannels > {code:java} > //代码占位符 > long newMinWatermark = Long.MAX_VALUE; > boolean hasAlignedChannels = false; > // determine new overall watermark by considering only watermark-aligned > channels across all > // channels > for (InputChannelStatus channelStatus : channelStatuses) { > if (channelStatus.isWatermarkAligned) { > hasAlignedChannels = true; > newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); > } > } > // we acknowledge and output the new overall watermark if it really is > aggregated > // from some remaining aligned channel, and is also larger than the last > output watermark > if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) { > lastOutputWatermark = newMinWatermark; > output.emitWatermark(new Watermark(lastOutputWatermark)); > } {code} > channelStatus's initalized watermark is Long.MIN_VALUE. when one > channelStatus's watermark is changed,but other channelStatus's is not > changed, the newMinWatermark is always Long.MIN_VALUE and output not > emitwatermark。 -- This message was sent by Atlassian Jira (v8.20.7#820007)