[ https://issues.apache.org/jira/browse/FLINK-14228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Haibo Sun updated FLINK-14228: ------------------------------ Description: Currently, the runtime support implementation of {{Bounded[One|Multi]Input#endInput}} has the following problems: * The runtime are propagating {{endInput}} immediately on the operator chain when input of the head operator is finished. Because some operators flush the buffered data in {{close}}, the downstream operators still receive records after executing {{endInput}}. This need the operators to flush the buffered data in {{endInput}} instead of {{close}}, like the PRs for fixing issue#13491 and issue#13376. * Timers are not taken into account. {{Actually, StreamOperator#close}} tells the operator to finish all its processing and flush output (all remaining buffered data), while {{endInput}} indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain \{{OP1 -> OP2 -> ... }}, the logic should be: # {{(Source/Network)Input}} of {{OP1}} is finished. # call {{OP1#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new timers. # wait for the pending timers (in processing) of {{OP1}} to finish. # call {{OP1#close}} # call {{OP2#endInput}} # quiesce {{ProcessingTimeService}} to disallow \{{OP2} from registering new timers. # ... was: Currently, the runtime support implementation of {{Bounded[One|Multi]Input#endInput}} has the following problems: * The runtime are propagating {{endInput}} immediately on the operator chain when input of the head operator is finished. Because some operators flush the buffered data in {{close}}, the downstream operators still receive records after executing {{endInput}}. This need the operators to flush the buffered data in {{endInput}} instead of {{close}}, like the PRs for fixing [issue#13491|https://issues.apache.org/jira/browse/FLINK-13491] and [issue#13376|https://issues.apache.org/jira/browse/FLINK-13376]. * Timers are not taken into account. {{StreamOperator#close}} tells the operator to finish all its processing and flush output (all remaining buffered data), while {{endInput}} indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain {{OP1 -> OP2 -> ... }}, the logic should be: # {{(Source/Network)Input}} of {{OP1}} is finished. # call {{OP1#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new timers. # wait for the pending timers (in processing) of {{OP1}} to finish. # call {{OP1#close}} # call {{OP2#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP2} from registering new timers. # ... > The runtime support for Bounded[One|Multi]Input#endInput does not properly > implement their semantics > ---------------------------------------------------------------------------------------------------- > > Key: FLINK-14228 > URL: https://issues.apache.org/jira/browse/FLINK-14228 > Project: Flink > Issue Type: Bug > Components: Runtime / Task > Affects Versions: 1.9.0 > Reporter: Haibo Sun > Assignee: Haibo Sun > Priority: Major > Fix For: 1.10.0 > > > Currently, the runtime support implementation of > {{Bounded[One|Multi]Input#endInput}} has the following problems: > * The runtime are propagating {{endInput}} immediately on the operator chain > when input of the head operator is finished. Because some operators flush the > buffered data in {{close}}, the downstream operators still receive records > after executing {{endInput}}. This need the operators to flush the buffered > data in {{endInput}} instead of {{close}}, like the PRs for fixing > issue#13491 and issue#13376. > * Timers are not taken into account. > {{Actually, StreamOperator#close}} tells the operator to finish all its > processing and flush output (all remaining buffered data), while {{endInput}} > indicates that no more data will arrive on some input of the operator. That > is to say, for the non-tail operators on the operator chain, when the > upstream operator is closed, the input of its downstream operator arrives at > the end. So for an operator chain \{{OP1 -> OP2 -> ... }}, the logic should > be: > # {{(Source/Network)Input}} of {{OP1}} is finished. > # call {{OP1#endInput}} > # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new > timers. > # wait for the pending timers (in processing) of {{OP1}} to finish. > # call {{OP1#close}} > # call {{OP2#endInput}} > # quiesce {{ProcessingTimeService}} to disallow \{{OP2} from registering new > timers. > # ... -- This message was sent by Atlassian Jira (v8.3.4#803005)