[ 
https://issues.apache.org/jira/browse/FLINK-14228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haibo Sun updated FLINK-14228:
------------------------------
    Description: 
Currently, the runtime support implementation of 
{{Bounded[One|Multi]Input#endInput}} has the following problems:

* The runtime are propagating {{endInput}} immediately on the operator chain 
when input of the head operator is finished. Because some operators flush the 
buffered data in {{close}}, the downstream operators still receive records 
after executing {{endInput}}. This need the operators to flush the buffered 
data in {{endInput}} instead of {{close}}, like the PRs for fixing 
[issue#13491|https://issues.apache.org/jira/browse/FLINK-13491] and 
[issue#13376|https://issues.apache.org/jira/browse/FLINK-13376].

* Timers are not taken into account.

{{StreamOperator#close}} tells the operator to finish all its processing and 
flush output (all remaining buffered data), while {{endInput}} indicates that 
no more data will arrive on some input of the operator. That is to say, for the 
non-tail operators on the operator chain, when the upstream operator is closed, 
the input of its downstream operator arrives at the end. So for an operator 
chain {{OP1 -> OP2 -> ... }},  the logic should be:

# {{(Source/Network)Input}} of {{OP1}} is finished.
# call {{OP1#endInput}}
# quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new 
timers.
# wait for the pending timers (in processing) of {{OP1}} to finish.
# call {{OP1#close}}
# call {{OP2#endInput}}
# quiesce {{ProcessingTimeService}} to disallow {{OP2} from registering new 
timers.
#  ...


  was:
Currently, the runtime support implementation of 
{{Bounded[One|Multi]Input#endInput}} has the following problems:

1. The runtime are propagating {{endInput}} immediately on the operator chain 
when input of the head operator is finished. Because some operators flush the 
buffered data in {{close}}, the downstream operators still receive records 
after executing {{endInput}}. This need the operators to flush the buffered 
data in {{endInput}} instead of {{close}}, like the PRs for fixing issue#13491 
and issue#13376.2.Timers are not taken into account.
 close tells the operator to finish all its processing and flush output (all 
remaining buffered data), while endInput indicates that no more data will 
arrive on some input of the operator. That is to say, for the non-tail 
operators on the operator chain, when the upstream operator is closed, the 
input of its downstream operator arrives at the end. So for an operator chain 
OP1 -> OP2 -> ... ,  the logic should be:


> The runtime support for Bounded[One|Multi]Input#endInput does not properly 
> implement their semantics
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14228
>                 URL: https://issues.apache.org/jira/browse/FLINK-14228
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.9.0
>            Reporter: Haibo Sun
>            Priority: Major
>             Fix For: 1.10.0
>
>
> Currently, the runtime support implementation of 
> {{Bounded[One|Multi]Input#endInput}} has the following problems:
> * The runtime are propagating {{endInput}} immediately on the operator chain 
> when input of the head operator is finished. Because some operators flush the 
> buffered data in {{close}}, the downstream operators still receive records 
> after executing {{endInput}}. This need the operators to flush the buffered 
> data in {{endInput}} instead of {{close}}, like the PRs for fixing 
> [issue#13491|https://issues.apache.org/jira/browse/FLINK-13491] and 
> [issue#13376|https://issues.apache.org/jira/browse/FLINK-13376].
> * Timers are not taken into account.
> {{StreamOperator#close}} tells the operator to finish all its processing and 
> flush output (all remaining buffered data), while {{endInput}} indicates that 
> no more data will arrive on some input of the operator. That is to say, for 
> the non-tail operators on the operator chain, when the upstream operator is 
> closed, the input of its downstream operator arrives at the end. So for an 
> operator chain {{OP1 -> OP2 -> ... }},  the logic should be:
> # {{(Source/Network)Input}} of {{OP1}} is finished.
> # call {{OP1#endInput}}
> # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new 
> timers.
> # wait for the pending timers (in processing) of {{OP1}} to finish.
> # call {{OP1#close}}
> # call {{OP2#endInput}}
> # quiesce {{ProcessingTimeService}} to disallow {{OP2} from registering new 
> timers.
> #  ...



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to