[
https://issues.apache.org/jira/browse/FLINK-7721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Aljoscha Krettek reopened FLINK-7721:
-------------------------------------
Reopen to fix git commit hash
> StatusWatermarkValve should output a new min watermark only if it was
> aggregated from aligned chhanels
> ------------------------------------------------------------------------------------------------------
>
> Key: FLINK-7721
> URL: https://issues.apache.org/jira/browse/FLINK-7721
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.2.1, 1.4.0, 1.3.2
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Context:
> {code}
> long newMinWatermark = Long.MAX_VALUE;
> for (InputChannelStatus channelStatus : channelStatuses) {
> if (channelStatus.isWatermarkAligned) {
> newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
> }
> }
> {code}
> In the calculation of the new min watermark in
> {{StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels()}},
> there is not verification that the calculated new min watermark
> {{newMinWatermark}} really is aggregated from some aligned channel.
> In the corner case where all input channels are currently not aligned but
> actually some are active, we would then incorrectly determine that the final
> aggregation of {{newMinWatermark}} is {{Long.MAX_VALUE}} and emit that.
> The fix would simply be to only emit the aggregated watermark IFF it was
> really calculated from some aligned input channel (as well as the already
> existing constraint that it needs to be larger than the last emitted
> watermark). This change should also safely cover the case that a
> {{Long.MAX_VALUE}} was genuinely aggregated from one of the input channels.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)