[jira] [Commented] (FLINK-17706) Clarify licensing situation for flink-benchmarks
[ https://issues.apache.org/jira/browse/FLINK-17706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126319#comment-17126319 ] Haibo Sun commented on FLINK-17706: --- I'm ok with licensing my contributions to the Apache License 2.0. Thanks for driving it [~NicoK] > Clarify licensing situation for flink-benchmarks > - > > Key: FLINK-17706 > URL: https://issues.apache.org/jira/browse/FLINK-17706 > Project: Flink > Issue Type: Sub-task > Components: Benchmarks >Affects Versions: 1.11.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Fix For: 1.11.0 > > > After enabling the rat plugin, it finds the following files with missing or > invalid license headers, broken down by all contributors I could find in the > git history. If I see this correctly, every contributor should acknowledge > the change of their files to the Apache License and then we could add the > license headers and continue the project move: > * [~rgrebennikov] + [~NicoK] > {code:java} > > src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java > > src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java > {code} > * [~sunhaibotb] > {code:java} > src/main/java/org/apache/flink/benchmark/functions/QueuingLongSource.java > > src/main/java/org/apache/flink/benchmark/operators/MultiplyByTwoCoStreamMap.java{code} > * [~pnowojski] > {code:java} > src/main/java/org/apache/flink/benchmark/functions/IntLongApplications.java > src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java > src/main/java/org/apache/flink/benchmark/functions/LongSource.java > src/main/java/org/apache/flink/benchmark/functions/MultiplyByTwo.java > src/main/java/org/apache/flink/benchmark/functions/MultiplyIntLongByTwo.java > src/main/java/org/apache/flink/benchmark/functions/SuccessException.java > src/main/java/org/apache/flink/benchmark/functions/SumReduce.java > src/main/java/org/apache/flink/benchmark/functions/SumReduceIntLong.java > src/main/java/org/apache/flink/benchmark/functions/ValidatingCounter.java > src/main/java/org/apache/flink/benchmark/CollectSink.java{code} > * [~pnowojski] + [~sunhaibotb] + [~xintongsong] > {code:java} >src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java{code} > * [~NicoK] > {code:java} > src/main/resources/avro/mypojo.avsc > src/main/resources/protobuf/MyPojo.proto > src/main/resources/thrift/mypojo.thrift{code} > * [~pnowojski] + [~liyu] > {code:java} > save_jmh_result.py{code} > The license should be clarified with the author and all contributors of that > file. > Please, every tagged contributor, express your decision (whether you are fine > with the change) below, so we can continue. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-14228) The runtime support for Bounded[One|Multi]Input#endInput does not properly implement their semantics
[ https://issues.apache.org/jira/browse/FLINK-14228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun closed FLINK-14228. - Resolution: Fixed > The runtime support for Bounded[One|Multi]Input#endInput does not properly > implement their semantics > > > Key: FLINK-14228 > URL: https://issues.apache.org/jira/browse/FLINK-14228 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Fix For: 1.11.0 > > > Currently, the runtime support implementation of > {{Bounded[One|Multi]Input#endInput}} has the following problems: > * The runtime are propagating {{endInput}} immediately on the operator chain > when input of the head operator is finished. Because some operators flush the > buffered data in {{close}}, the downstream operators still receive records > after executing {{endInput}}. This need the operators to flush the buffered > data in {{endInput}} instead of {{close}}, like the PRs for fixing > [issue#13491|https://issues.apache.org/jira/browse/FLINK-13491] and > [issue#13376|https://issues.apache.org/jira/browse/FLINK-13376]. > * Timers are not taken into account. > Actually, {{StreamOperator#close}} tells the operator to finish all its > processing and flush output (all remaining buffered data), while {{endInput}} > indicates that no more data will arrive on some input of the operator. That > is to say, for the non-tail operators on the operator chain, when the > upstream operator is closed, the input of its downstream operator arrives at > the end. So for an operator chain {{OP1 -> OP2 -> ... }}, the logic should > be: > # {{(Source/Network)Input}} of {{OP1}} is finished. > # call {{OP1#endInput}} > # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new > timers. > # wait for the pending timers (in processing) of {{OP1}} to finish. > # call {{OP1#close}} > # call {{OP2#endInput}} > # quiesce {{ProcessingTimeService}} to disallow {{OP2}} from registering new > timers. > # ... -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-16174) Add a better tryYield() method to MailboxExecutor to return the lowest priority of the remaining mails
[ https://issues.apache.org/jira/browse/FLINK-16174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17039990#comment-17039990 ] Haibo Sun edited comment on FLINK-16174 at 2/19/20 12:57 PM: - CC [~pnowojski], [~roman_khachatryan] was (Author: sunhaibotb): CC [~pnowojski][~roman_khachatryan] > Add a better tryYield() method to MailboxExecutor to return the lowest > priority of the remaining mails > -- > > Key: FLINK-16174 > URL: https://issues.apache.org/jira/browse/FLINK-16174 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.10.0 >Reporter: Haibo Sun >Priority: Major > > Currently, we use chainIndex as the priority to create MailboxExecutor to > process its mails. When MailboxExecutor#tryYield is called to process mails, > it will take the mails of this operator and all downstream operators in the > chain. But sometimes, after calling MailboxExecutor#tryYield, we need to know > whether there is any mail of the current operator in the mailbox, which can > simplify some operations. > For example, when we close a operator in runtime, after quiescing the > processing time service and waiting for its running timers to finish, if > there is no mail of the current operator in the mailbox, we call > StreamOperator#close to close the operator. Then the runtime code of closing > a operator can be simplified as follows. > {code:java} > quiesceProcessingTimeService().get(); > while (mailboxExecuto.betterTryYield() <= self.priority) {} > closeOperator(actionExecutor); > {code} > With the existing #tryYield method, if the following simplified code is used > to close a operator, then when a downstream operator is implemented like > MailboxOperatorTest.ReplicatingMail, the tryyield() loop will > be prevented from exiting, which results deadlock. > {code:java} > quiesceProcessingTimeService().get(); > while (mailboxExecuto.tryYield()) {} > closeOperator(actionExecutor); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16174) Add a better tryYield() method to MailboxExecutor to return the lowest priority of the remaining mails
[ https://issues.apache.org/jira/browse/FLINK-16174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17039990#comment-17039990 ] Haibo Sun commented on FLINK-16174: --- CC [~pnowojski][~roman_khachatryan] > Add a better tryYield() method to MailboxExecutor to return the lowest > priority of the remaining mails > -- > > Key: FLINK-16174 > URL: https://issues.apache.org/jira/browse/FLINK-16174 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.10.0 >Reporter: Haibo Sun >Priority: Major > > Currently, we use chainIndex as the priority to create MailboxExecutor to > process its mails. When MailboxExecutor#tryYield is called to process mails, > it will take the mails of this operator and all downstream operators in the > chain. But sometimes, after calling MailboxExecutor#tryYield, we need to know > whether there is any mail of the current operator in the mailbox, which can > simplify some operations. > For example, when we close a operator in runtime, after quiescing the > processing time service and waiting for its running timers to finish, if > there is no mail of the current operator in the mailbox, we call > StreamOperator#close to close the operator. Then the runtime code of closing > a operator can be simplified as follows. > {code:java} > quiesceProcessingTimeService().get(); > while (mailboxExecuto.betterTryYield() <= self.priority) {} > closeOperator(actionExecutor); > {code} > With the existing #tryYield method, if the following simplified code is used > to close a operator, then when a downstream operator is implemented like > MailboxOperatorTest.ReplicatingMail, the tryyield() loop will > be prevented from exiting, which results deadlock. > {code:java} > quiesceProcessingTimeService().get(); > while (mailboxExecuto.tryYield()) {} > closeOperator(actionExecutor); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16174) Add a better tryYield() method to MailboxExecutor to return the lowest priority of the remaining mails
Haibo Sun created FLINK-16174: - Summary: Add a better tryYield() method to MailboxExecutor to return the lowest priority of the remaining mails Key: FLINK-16174 URL: https://issues.apache.org/jira/browse/FLINK-16174 Project: Flink Issue Type: Improvement Components: Runtime / Task Affects Versions: 1.10.0 Reporter: Haibo Sun Currently, we use chainIndex as the priority to create MailboxExecutor to process its mails. When MailboxExecutor#tryYield is called to process mails, it will take the mails of this operator and all downstream operators in the chain. But sometimes, after calling MailboxExecutor#tryYield, we need to know whether there is any mail of the current operator in the mailbox, which can simplify some operations. For example, when we close a operator in runtime, after quiescing the processing time service and waiting for its running timers to finish, if there is no mail of the current operator in the mailbox, we call StreamOperator#close to close the operator. Then the runtime code of closing a operator can be simplified as follows. {code:java} quiesceProcessingTimeService().get(); while (mailboxExecuto.betterTryYield() <= self.priority) {} closeOperator(actionExecutor); {code} With the existing #tryYield method, if the following simplified code is used to close a operator, then when a downstream operator is implemented like MailboxOperatorTest.ReplicatingMail, the tryyield() loop will be prevented from exiting, which results deadlock. {code:java} quiesceProcessingTimeService().get(); while (mailboxExecuto.tryYield()) {} closeOperator(actionExecutor); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-15070) Supplement cases of blocking partition with compression for benchmark
[ https://issues.apache.org/jira/browse/FLINK-15070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun closed FLINK-15070. - > Supplement cases of blocking partition with compression for benchmark > - > > Key: FLINK-15070 > URL: https://issues.apache.org/jira/browse/FLINK-15070 > Project: Flink > Issue Type: Task > Components: Benchmarks >Reporter: zhijiang >Assignee: Haibo Sun >Priority: Minor > > ATM the benchmark only covers the case of pipelined partition used in > streaming job, so it is better to also cover the case of blocking partition > for batch job. Then we can easily trace the performance concerns for any > changes future. > This ticket would introduce the blocking partition cases for uncompressed > file, uncompressed mmap and compressed file. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (FLINK-15070) Supplement cases of blocking partition with compression for benchmark
[ https://issues.apache.org/jira/browse/FLINK-15070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun resolved FLINK-15070. --- Resolution: Fixed > Supplement cases of blocking partition with compression for benchmark > - > > Key: FLINK-15070 > URL: https://issues.apache.org/jira/browse/FLINK-15070 > Project: Flink > Issue Type: Task > Components: Benchmarks >Reporter: zhijiang >Assignee: Haibo Sun >Priority: Minor > > ATM the benchmark only covers the case of pipelined partition used in > streaming job, so it is better to also cover the case of blocking partition > for batch job. Then we can easily trace the performance concerns for any > changes future. > This ticket would introduce the blocking partition cases for uncompressed > file, uncompressed mmap and compressed file. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15070) Supplement cases of blocking partition with compression for benchmark
[ https://issues.apache.org/jira/browse/FLINK-15070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17009245#comment-17009245 ] Haibo Sun commented on FLINK-15070: --- Hotfix PR: https://github.com/dataArtisans/flink-benchmarks/pull/42 > Supplement cases of blocking partition with compression for benchmark > - > > Key: FLINK-15070 > URL: https://issues.apache.org/jira/browse/FLINK-15070 > Project: Flink > Issue Type: Task > Components: Benchmarks >Reporter: zhijiang >Assignee: Haibo Sun >Priority: Minor > > ATM the benchmark only covers the case of pipelined partition used in > streaming job, so it is better to also cover the case of blocking partition > for batch job. Then we can easily trace the performance concerns for any > changes future. > This ticket would introduce the blocking partition cases for uncompressed > file, uncompressed mmap and compressed file. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15070) Supplement cases of blocking partition with compression for benchmark
[ https://issues.apache.org/jira/browse/FLINK-15070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16994334#comment-16994334 ] Haibo Sun commented on FLINK-15070: --- The PR: https://github.com/dataArtisans/flink-benchmarks/pull/39 > Supplement cases of blocking partition with compression for benchmark > - > > Key: FLINK-15070 > URL: https://issues.apache.org/jira/browse/FLINK-15070 > Project: Flink > Issue Type: Task > Components: Benchmarks >Reporter: zhijiang >Assignee: Haibo Sun >Priority: Minor > > ATM the benchmark only covers the case of pipelined partition used in > streaming job, so it is better to also cover the case of blocking partition > for batch job. Then we can easily trace the performance concerns for any > changes future. > This ticket would introduce the blocking partition cases for uncompressed > file, uncompressed mmap and compressed file. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14228) The runtime support for Bounded[One|Multi]Input#endInput does not properly implement their semantics
[ https://issues.apache.org/jira/browse/FLINK-14228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-14228: -- Fix Version/s: (was: 1.10.0) 1.11.0 > The runtime support for Bounded[One|Multi]Input#endInput does not properly > implement their semantics > > > Key: FLINK-14228 > URL: https://issues.apache.org/jira/browse/FLINK-14228 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Fix For: 1.11.0 > > > Currently, the runtime support implementation of > {{Bounded[One|Multi]Input#endInput}} has the following problems: > * The runtime are propagating {{endInput}} immediately on the operator chain > when input of the head operator is finished. Because some operators flush the > buffered data in {{close}}, the downstream operators still receive records > after executing {{endInput}}. This need the operators to flush the buffered > data in {{endInput}} instead of {{close}}, like the PRs for fixing > [issue#13491|https://issues.apache.org/jira/browse/FLINK-13491] and > [issue#13376|https://issues.apache.org/jira/browse/FLINK-13376]. > * Timers are not taken into account. > Actually, {{StreamOperator#close}} tells the operator to finish all its > processing and flush output (all remaining buffered data), while {{endInput}} > indicates that no more data will arrive on some input of the operator. That > is to say, for the non-tail operators on the operator chain, when the > upstream operator is closed, the input of its downstream operator arrives at > the end. So for an operator chain {{OP1 -> OP2 -> ... }}, the logic should > be: > # {{(Source/Network)Input}} of {{OP1}} is finished. > # call {{OP1#endInput}} > # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new > timers. > # wait for the pending timers (in processing) of {{OP1}} to finish. > # call {{OP1#close}} > # call {{OP2#endInput}} > # quiesce {{ProcessingTimeService}} to disallow {{OP2}} from registering new > timers. > # ... -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-14230) Change the endInput call of the downstream operator to after the upstream operator closes
[ https://issues.apache.org/jira/browse/FLINK-14230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun closed FLINK-14230. - > Change the endInput call of the downstream operator to after the upstream > operator closes > -- > > Key: FLINK-14230 > URL: https://issues.apache.org/jira/browse/FLINK-14230 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > This ticket is for fixing the error of propagating "endInput" on the chain > immediately after the input of the head operator is finished. Correctly, > "endInput" of the downstream operator should be invoked only after closing > the upstream operator. > After "endInput" of the downstream operator on the chain is invoked > correctly, we revert the changes of PR#9298 and PR#9221. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14239) Emitting the max watermark in StreamSource#run may cause it to arrive the downstream early
Haibo Sun created FLINK-14239: - Summary: Emitting the max watermark in StreamSource#run may cause it to arrive the downstream early Key: FLINK-14239 URL: https://issues.apache.org/jira/browse/FLINK-14239 Project: Flink Issue Type: Bug Reporter: Haibo Sun Fix For: 1.10.0 For {{Source}}, the max watermark is emitted in {{StreamSource#run}} currently. If some records are also output in {{close}} of {{RichSourceFunction}}, then the max watermark will reach the downstream operator before these records. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14230) Change the endInput call of the downstream operator to after the upstream operator closes
[ https://issues.apache.org/jira/browse/FLINK-14230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-14230: -- Summary: Change the endInput call of the downstream operator to after the upstream operator closes (was: Change the endinput call of the downstream operator to after the upstream operator closes) > Change the endInput call of the downstream operator to after the upstream > operator closes > -- > > Key: FLINK-14230 > URL: https://issues.apache.org/jira/browse/FLINK-14230 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14228) The runtime support for Bounded[One|Multi]Input#endInput does not properly implement their semantics
[ https://issues.apache.org/jira/browse/FLINK-14228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-14228: -- Description: Currently, the runtime support implementation of {{Bounded[One|Multi]Input#endInput}} has the following problems: * The runtime are propagating {{endInput}} immediately on the operator chain when input of the head operator is finished. Because some operators flush the buffered data in {{close}}, the downstream operators still receive records after executing {{endInput}}. This need the operators to flush the buffered data in {{endInput}} instead of {{close}}, like the PRs for fixing [issue#13491|https://issues.apache.org/jira/browse/FLINK-13491] and [issue#13376|https://issues.apache.org/jira/browse/FLINK-13376]. * Timers are not taken into account. Actually, {{StreamOperator#close}} tells the operator to finish all its processing and flush output (all remaining buffered data), while {{endInput}} indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain {{OP1 -> OP2 -> ... }}, the logic should be: # {{(Source/Network)Input}} of {{OP1}} is finished. # call {{OP1#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new timers. # wait for the pending timers (in processing) of {{OP1}} to finish. # call {{OP1#close}} # call {{OP2#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP2}} from registering new timers. # ... was: Currently, the runtime support implementation of {{Bounded[One|Multi]Input#endInput}} has the following problems: * The runtime are propagating {{endInput}} immediately on the operator chain when input of the head operator is finished. Because some operators flush the buffered data in {{close}}, the downstream operators still receive records after executing {{endInput}}. This need the operators to flush the buffered data in {{endInput}} instead of {{close}}, like the PRs for fixing issue#13491 and issue#13376. * Timers are not taken into account. Actually, {{StreamOperator#close}} tells the operator to finish all its processing and flush output (all remaining buffered data), while {{endInput}} indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain {{OP1 -> OP2 -> ... }}, the logic should be: # {{(Source/Network)Input}} of {{OP1}} is finished. # call {{OP1#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new timers. # wait for the pending timers (in processing) of {{OP1}} to finish. # call {{OP1#close}} # call {{OP2#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP2}} from registering new timers. # ... > The runtime support for Bounded[One|Multi]Input#endInput does not properly > implement their semantics > > > Key: FLINK-14228 > URL: https://issues.apache.org/jira/browse/FLINK-14228 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Fix For: 1.10.0 > > > Currently, the runtime support implementation of > {{Bounded[One|Multi]Input#endInput}} has the following problems: > * The runtime are propagating {{endInput}} immediately on the operator chain > when input of the head operator is finished. Because some operators flush the > buffered data in {{close}}, the downstream operators still receive records > after executing {{endInput}}. This need the operators to flush the buffered > data in {{endInput}} instead of {{close}}, like the PRs for fixing > [issue#13491|https://issues.apache.org/jira/browse/FLINK-13491] and > [issue#13376|https://issues.apache.org/jira/browse/FLINK-13376]. > * Timers are not taken into account. > Actually, {{StreamOperator#close}} tells the operator to finish all its > processing and flush output (all remaining buffered data), while {{endInput}} > indicates that no more data will arrive on some input of the operator. That > is to say, for the non-tail operators on the operator chain, when the > upstream operator is closed, the input of its downstream operator arrives at > the end. So for an operator chain {{OP1 -> OP2 -> ... }}, the logic should > be: > # {{(Source/Network)Input}} of {{OP1}} is finished. > # call {{OP1#endInput}} > # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from
[jira] [Updated] (FLINK-14228) The runtime support for Bounded[One|Multi]Input#endInput does not properly implement their semantics
[ https://issues.apache.org/jira/browse/FLINK-14228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-14228: -- Description: Currently, the runtime support implementation of {{Bounded[One|Multi]Input#endInput}} has the following problems: * The runtime are propagating {{endInput}} immediately on the operator chain when input of the head operator is finished. Because some operators flush the buffered data in {{close}}, the downstream operators still receive records after executing {{endInput}}. This need the operators to flush the buffered data in {{endInput}} instead of {{close}}, like the PRs for fixing issue#13491 and issue#13376. * Timers are not taken into account. Actually, {{StreamOperator#close}} tells the operator to finish all its processing and flush output (all remaining buffered data), while {{endInput}} indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain {{OP1 -> OP2 -> ... }}, the logic should be: # {{(Source/Network)Input}} of {{OP1}} is finished. # call {{OP1#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new timers. # wait for the pending timers (in processing) of {{OP1}} to finish. # call {{OP1#close}} # call {{OP2#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP2}} from registering new timers. # ... was: Currently, the runtime support implementation of {{Bounded[One|Multi]Input#endInput}} has the following problems: * The runtime are propagating {{endInput}} immediately on the operator chain when input of the head operator is finished. Because some operators flush the buffered data in {{close}}, the downstream operators still receive records after executing {{endInput}}. This need the operators to flush the buffered data in {{endInput}} instead of {{close}}, like the PRs for fixing issue#13491 and issue#13376. * Timers are not taken into account. Actually, {{StreamOperator#close}} tells the operator to finish all its processing and flush output (all remaining buffered data), while {{endInput}} indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain \{{OP1 -> OP2 -> ... }}, the logic should be: # {{(Source/Network)Input}} of {{OP1}} is finished. # call {{OP1#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new timers. # wait for the pending timers (in processing) of {{OP1}} to finish. # call {{OP1#close}} # call {{OP2#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP2} from registering new timers. # ... > The runtime support for Bounded[One|Multi]Input#endInput does not properly > implement their semantics > > > Key: FLINK-14228 > URL: https://issues.apache.org/jira/browse/FLINK-14228 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Fix For: 1.10.0 > > > Currently, the runtime support implementation of > {{Bounded[One|Multi]Input#endInput}} has the following problems: > * The runtime are propagating {{endInput}} immediately on the operator chain > when input of the head operator is finished. Because some operators flush the > buffered data in {{close}}, the downstream operators still receive records > after executing {{endInput}}. This need the operators to flush the buffered > data in {{endInput}} instead of {{close}}, like the PRs for fixing > issue#13491 and issue#13376. > * Timers are not taken into account. > Actually, {{StreamOperator#close}} tells the operator to finish all its > processing and flush output (all remaining buffered data), while {{endInput}} > indicates that no more data will arrive on some input of the operator. That > is to say, for the non-tail operators on the operator chain, when the > upstream operator is closed, the input of its downstream operator arrives at > the end. So for an operator chain {{OP1 -> OP2 -> ... }}, the logic should > be: > # {{(Source/Network)Input}} of {{OP1}} is finished. > # call {{OP1#endInput}} > # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new > timers. > # wait for the pending timers (in processing) of {{OP1}} to finish. > # call {{OP1#close}} > # call {{OP2#endInput}} > # quiesce {{ProcessingTimeService}} to disallow {{OP2}} from
[jira] [Updated] (FLINK-14228) The runtime support for Bounded[One|Multi]Input#endInput does not properly implement their semantics
[ https://issues.apache.org/jira/browse/FLINK-14228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-14228: -- Description: Currently, the runtime support implementation of {{Bounded[One|Multi]Input#endInput}} has the following problems: * The runtime are propagating {{endInput}} immediately on the operator chain when input of the head operator is finished. Because some operators flush the buffered data in {{close}}, the downstream operators still receive records after executing {{endInput}}. This need the operators to flush the buffered data in {{endInput}} instead of {{close}}, like the PRs for fixing issue#13491 and issue#13376. * Timers are not taken into account. Actually, {{StreamOperator#close}} tells the operator to finish all its processing and flush output (all remaining buffered data), while {{endInput}} indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain \{{OP1 -> OP2 -> ... }}, the logic should be: # {{(Source/Network)Input}} of {{OP1}} is finished. # call {{OP1#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new timers. # wait for the pending timers (in processing) of {{OP1}} to finish. # call {{OP1#close}} # call {{OP2#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP2} from registering new timers. # ... was: Currently, the runtime support implementation of {{Bounded[One|Multi]Input#endInput}} has the following problems: * The runtime are propagating {{endInput}} immediately on the operator chain when input of the head operator is finished. Because some operators flush the buffered data in {{close}}, the downstream operators still receive records after executing {{endInput}}. This need the operators to flush the buffered data in {{endInput}} instead of {{close}}, like the PRs for fixing issue#13491 and issue#13376. * Timers are not taken into account. Actually, {{StreamOperator#close}} tells the operator to finish all its processing and flush output (all remaining buffered data), while {{endInput}} indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain \{{OP1 -> OP2 -> ... }}, the logic should be: # {{(Source/Network)Input}} of {{OP1}} is finished. # call {{OP1#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new timers. # wait for the pending timers (in processing) of {{OP1}} to finish. # call {{OP1#close}} # call {{OP2#endInput}} # quiesce {{ProcessingTimeService}} to disallow \{{OP2} from registering new timers. # ... > The runtime support for Bounded[One|Multi]Input#endInput does not properly > implement their semantics > > > Key: FLINK-14228 > URL: https://issues.apache.org/jira/browse/FLINK-14228 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Fix For: 1.10.0 > > > Currently, the runtime support implementation of > {{Bounded[One|Multi]Input#endInput}} has the following problems: > * The runtime are propagating {{endInput}} immediately on the operator chain > when input of the head operator is finished. Because some operators flush the > buffered data in {{close}}, the downstream operators still receive records > after executing {{endInput}}. This need the operators to flush the buffered > data in {{endInput}} instead of {{close}}, like the PRs for fixing > issue#13491 and issue#13376. > * Timers are not taken into account. > Actually, {{StreamOperator#close}} tells the operator to finish all its > processing and flush output (all remaining buffered data), while {{endInput}} > indicates that no more data will arrive on some input of the operator. That > is to say, for the non-tail operators on the operator chain, when the > upstream operator is closed, the input of its downstream operator arrives at > the end. So for an operator chain \{{OP1 -> OP2 -> ... }}, the logic should > be: > # {{(Source/Network)Input}} of {{OP1}} is finished. > # call {{OP1#endInput}} > # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new > timers. > # wait for the pending timers (in processing) of {{OP1}} to finish. > # call {{OP1#close}} > # call {{OP2#endInput}} > # quiesce {{ProcessingTimeService}} to disallow {{OP2} from
[jira] [Updated] (FLINK-14228) The runtime support for Bounded[One|Multi]Input#endInput does not properly implement their semantics
[ https://issues.apache.org/jira/browse/FLINK-14228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-14228: -- Description: Currently, the runtime support implementation of {{Bounded[One|Multi]Input#endInput}} has the following problems: * The runtime are propagating {{endInput}} immediately on the operator chain when input of the head operator is finished. Because some operators flush the buffered data in {{close}}, the downstream operators still receive records after executing {{endInput}}. This need the operators to flush the buffered data in {{endInput}} instead of {{close}}, like the PRs for fixing issue#13491 and issue#13376. * Timers are not taken into account. Actually, {{StreamOperator#close}} tells the operator to finish all its processing and flush output (all remaining buffered data), while {{endInput}} indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain \{{OP1 -> OP2 -> ... }}, the logic should be: # {{(Source/Network)Input}} of {{OP1}} is finished. # call {{OP1#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new timers. # wait for the pending timers (in processing) of {{OP1}} to finish. # call {{OP1#close}} # call {{OP2#endInput}} # quiesce {{ProcessingTimeService}} to disallow \{{OP2} from registering new timers. # ... was: Currently, the runtime support implementation of {{Bounded[One|Multi]Input#endInput}} has the following problems: * The runtime are propagating {{endInput}} immediately on the operator chain when input of the head operator is finished. Because some operators flush the buffered data in {{close}}, the downstream operators still receive records after executing {{endInput}}. This need the operators to flush the buffered data in {{endInput}} instead of {{close}}, like the PRs for fixing issue#13491 and issue#13376. * Timers are not taken into account. {{Actually, StreamOperator#close}} tells the operator to finish all its processing and flush output (all remaining buffered data), while {{endInput}} indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain \{{OP1 -> OP2 -> ... }}, the logic should be: # {{(Source/Network)Input}} of {{OP1}} is finished. # call {{OP1#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new timers. # wait for the pending timers (in processing) of {{OP1}} to finish. # call {{OP1#close}} # call {{OP2#endInput}} # quiesce {{ProcessingTimeService}} to disallow \{{OP2} from registering new timers. # ... > The runtime support for Bounded[One|Multi]Input#endInput does not properly > implement their semantics > > > Key: FLINK-14228 > URL: https://issues.apache.org/jira/browse/FLINK-14228 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Fix For: 1.10.0 > > > Currently, the runtime support implementation of > {{Bounded[One|Multi]Input#endInput}} has the following problems: > * The runtime are propagating {{endInput}} immediately on the operator chain > when input of the head operator is finished. Because some operators flush the > buffered data in {{close}}, the downstream operators still receive records > after executing {{endInput}}. This need the operators to flush the buffered > data in {{endInput}} instead of {{close}}, like the PRs for fixing > issue#13491 and issue#13376. > * Timers are not taken into account. > Actually, {{StreamOperator#close}} tells the operator to finish all its > processing and flush output (all remaining buffered data), while {{endInput}} > indicates that no more data will arrive on some input of the operator. That > is to say, for the non-tail operators on the operator chain, when the > upstream operator is closed, the input of its downstream operator arrives at > the end. So for an operator chain \{{OP1 -> OP2 -> ... }}, the logic should > be: > # {{(Source/Network)Input}} of {{OP1}} is finished. > # call {{OP1#endInput}} > # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new > timers. > # wait for the pending timers (in processing) of {{OP1}} to finish. > # call {{OP1#close}} > # call {{OP2#endInput}} > # quiesce {{ProcessingTimeService}} to disallow \{{OP2} from
[jira] [Updated] (FLINK-14228) The runtime support for Bounded[One|Multi]Input#endInput does not properly implement their semantics
[ https://issues.apache.org/jira/browse/FLINK-14228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-14228: -- Description: Currently, the runtime support implementation of {{Bounded[One|Multi]Input#endInput}} has the following problems: * The runtime are propagating {{endInput}} immediately on the operator chain when input of the head operator is finished. Because some operators flush the buffered data in {{close}}, the downstream operators still receive records after executing {{endInput}}. This need the operators to flush the buffered data in {{endInput}} instead of {{close}}, like the PRs for fixing issue#13491 and issue#13376. * Timers are not taken into account. {{Actually, StreamOperator#close}} tells the operator to finish all its processing and flush output (all remaining buffered data), while {{endInput}} indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain \{{OP1 -> OP2 -> ... }}, the logic should be: # {{(Source/Network)Input}} of {{OP1}} is finished. # call {{OP1#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new timers. # wait for the pending timers (in processing) of {{OP1}} to finish. # call {{OP1#close}} # call {{OP2#endInput}} # quiesce {{ProcessingTimeService}} to disallow \{{OP2} from registering new timers. # ... was: Currently, the runtime support implementation of {{Bounded[One|Multi]Input#endInput}} has the following problems: * The runtime are propagating {{endInput}} immediately on the operator chain when input of the head operator is finished. Because some operators flush the buffered data in {{close}}, the downstream operators still receive records after executing {{endInput}}. This need the operators to flush the buffered data in {{endInput}} instead of {{close}}, like the PRs for fixing [issue#13491|https://issues.apache.org/jira/browse/FLINK-13491] and [issue#13376|https://issues.apache.org/jira/browse/FLINK-13376]. * Timers are not taken into account. {{StreamOperator#close}} tells the operator to finish all its processing and flush output (all remaining buffered data), while {{endInput}} indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain {{OP1 -> OP2 -> ... }}, the logic should be: # {{(Source/Network)Input}} of {{OP1}} is finished. # call {{OP1#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new timers. # wait for the pending timers (in processing) of {{OP1}} to finish. # call {{OP1#close}} # call {{OP2#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP2} from registering new timers. # ... > The runtime support for Bounded[One|Multi]Input#endInput does not properly > implement their semantics > > > Key: FLINK-14228 > URL: https://issues.apache.org/jira/browse/FLINK-14228 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Fix For: 1.10.0 > > > Currently, the runtime support implementation of > {{Bounded[One|Multi]Input#endInput}} has the following problems: > * The runtime are propagating {{endInput}} immediately on the operator chain > when input of the head operator is finished. Because some operators flush the > buffered data in {{close}}, the downstream operators still receive records > after executing {{endInput}}. This need the operators to flush the buffered > data in {{endInput}} instead of {{close}}, like the PRs for fixing > issue#13491 and issue#13376. > * Timers are not taken into account. > {{Actually, StreamOperator#close}} tells the operator to finish all its > processing and flush output (all remaining buffered data), while {{endInput}} > indicates that no more data will arrive on some input of the operator. That > is to say, for the non-tail operators on the operator chain, when the > upstream operator is closed, the input of its downstream operator arrives at > the end. So for an operator chain \{{OP1 -> OP2 -> ... }}, the logic should > be: > # {{(Source/Network)Input}} of {{OP1}} is finished. > # call {{OP1#endInput}} > # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new > timers. > # wait for the pending timers (in processing) of {{OP1}} to finish. > # call {{OP1#close}} > #
[jira] [Updated] (FLINK-14230) Change the endinput call of the downstream operator to after the upstream operator closes
[ https://issues.apache.org/jira/browse/FLINK-14230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-14230: -- Summary: Change the endinput call of the downstream operator to after the upstream operator closes (was: Change the endinput call of the downstream operator to after closing the upstream operator) > Change the endinput call of the downstream operator to after the upstream > operator closes > -- > > Key: FLINK-14230 > URL: https://issues.apache.org/jira/browse/FLINK-14230 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14231) Support the timers of the upstream operator with endInput properly
Haibo Sun created FLINK-14231: - Summary: Support the timers of the upstream operator with endInput properly Key: FLINK-14231 URL: https://issues.apache.org/jira/browse/FLINK-14231 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: Haibo Sun -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14230) Change the endinput call of the downstream operator to after closing the upstream operator
[ https://issues.apache.org/jira/browse/FLINK-14230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-14230: -- Summary: Change the endinput call of the downstream operator to after closing the upstream operator (was: Change the endinput call of the downstream operator to after the upstream operator closes) > Change the endinput call of the downstream operator to after closing the > upstream operator > --- > > Key: FLINK-14230 > URL: https://issues.apache.org/jira/browse/FLINK-14230 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: Haibo Sun >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14230) Change the endinput call of the downstream operator to after the upstream operator closes
Haibo Sun created FLINK-14230: - Summary: Change the endinput call of the downstream operator to after the upstream operator closes Key: FLINK-14230 URL: https://issues.apache.org/jira/browse/FLINK-14230 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: Haibo Sun -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-14229) Change the endinput call of the downstream operator to after the upstream operator closes
[ https://issues.apache.org/jira/browse/FLINK-14229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun closed FLINK-14229. - Resolution: Invalid > Change the endinput call of the downstream operator to after the upstream > operator closes > -- > > Key: FLINK-14229 > URL: https://issues.apache.org/jira/browse/FLINK-14229 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: Haibo Sun >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14229) Change the endinput call of the downstream operator to after the upstream operator closes
Haibo Sun created FLINK-14229: - Summary: Change the endinput call of the downstream operator to after the upstream operator closes Key: FLINK-14229 URL: https://issues.apache.org/jira/browse/FLINK-14229 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: Haibo Sun -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14228) The runtime support for Bounded[One|Multi]Input#endInput does not properly implement their semantics
[ https://issues.apache.org/jira/browse/FLINK-14228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-14228: -- Description: Currently, the runtime support implementation of {{Bounded[One|Multi]Input#endInput}} has the following problems: * The runtime are propagating {{endInput}} immediately on the operator chain when input of the head operator is finished. Because some operators flush the buffered data in {{close}}, the downstream operators still receive records after executing {{endInput}}. This need the operators to flush the buffered data in {{endInput}} instead of {{close}}, like the PRs for fixing [issue#13491|https://issues.apache.org/jira/browse/FLINK-13491] and [issue#13376|https://issues.apache.org/jira/browse/FLINK-13376]. * Timers are not taken into account. {{StreamOperator#close}} tells the operator to finish all its processing and flush output (all remaining buffered data), while {{endInput}} indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain {{OP1 -> OP2 -> ... }}, the logic should be: # {{(Source/Network)Input}} of {{OP1}} is finished. # call {{OP1#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new timers. # wait for the pending timers (in processing) of {{OP1}} to finish. # call {{OP1#close}} # call {{OP2#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP2} from registering new timers. # ... was: Currently, the runtime support implementation of {{Bounded[One|Multi]Input#endInput}} has the following problems: 1. The runtime are propagating {{endInput}} immediately on the operator chain when input of the head operator is finished. Because some operators flush the buffered data in {{close}}, the downstream operators still receive records after executing {{endInput}}. This need the operators to flush the buffered data in {{endInput}} instead of {{close}}, like the PRs for fixing issue#13491 and issue#13376.2.Timers are not taken into account. close tells the operator to finish all its processing and flush output (all remaining buffered data), while endInput indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain OP1 -> OP2 -> ... , the logic should be: > The runtime support for Bounded[One|Multi]Input#endInput does not properly > implement their semantics > > > Key: FLINK-14228 > URL: https://issues.apache.org/jira/browse/FLINK-14228 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0 >Reporter: Haibo Sun >Priority: Major > Fix For: 1.10.0 > > > Currently, the runtime support implementation of > {{Bounded[One|Multi]Input#endInput}} has the following problems: > * The runtime are propagating {{endInput}} immediately on the operator chain > when input of the head operator is finished. Because some operators flush the > buffered data in {{close}}, the downstream operators still receive records > after executing {{endInput}}. This need the operators to flush the buffered > data in {{endInput}} instead of {{close}}, like the PRs for fixing > [issue#13491|https://issues.apache.org/jira/browse/FLINK-13491] and > [issue#13376|https://issues.apache.org/jira/browse/FLINK-13376]. > * Timers are not taken into account. > {{StreamOperator#close}} tells the operator to finish all its processing and > flush output (all remaining buffered data), while {{endInput}} indicates that > no more data will arrive on some input of the operator. That is to say, for > the non-tail operators on the operator chain, when the upstream operator is > closed, the input of its downstream operator arrives at the end. So for an > operator chain {{OP1 -> OP2 -> ... }}, the logic should be: > # {{(Source/Network)Input}} of {{OP1}} is finished. > # call {{OP1#endInput}} > # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new > timers. > # wait for the pending timers (in processing) of {{OP1}} to finish. > # call {{OP1#close}} > # call {{OP2#endInput}} > # quiesce {{ProcessingTimeService}} to disallow {{OP2} from registering new > timers. > # ... -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14228) The runtime support for Bounded[One|Multi]Input#endInput does not properly implement their semantics
[ https://issues.apache.org/jira/browse/FLINK-14228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-14228: -- Description: Currently, the runtime support implementation of {{Bounded[One|Multi]Input#endInput}} has the following problems: * The runtime are propagating {{endInput}} immediately on the operator chain when input of the head operator is finished. Because some operators flush the buffered data in {{close}}, the downstream operators still receive records after executing {{endInput}}. This need the operators to flush the buffered data in {{endInput}} instead of {{close}}, like the PRs for fixing [issue#13491|https://issues.apache.org/jira/browse/FLINK-13491] and [issue#13376|https://issues.apache.org/jira/browse/FLINK-13376]. * Timers are not taken into account. {{StreamOperator#close}} tells the operator to finish all its processing and flush output (all remaining buffered data), while {{endInput}} indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain {{OP1 -> OP2 -> ... }}, the logic should be: # {{(Source/Network)Input}} of {{OP1}} is finished. # call {{OP1#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new timers. # wait for the pending timers (in processing) of {{OP1}} to finish. # call {{OP1#close}} # call {{OP2#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP2} from registering new timers. # ... was: Currently, the runtime support implementation of {{Bounded[One|Multi]Input#endInput}} has the following problems: * The runtime are propagating {{endInput}} immediately on the operator chain when input of the head operator is finished. Because some operators flush the buffered data in {{close}}, the downstream operators still receive records after executing {{endInput}}. This need the operators to flush the buffered data in {{endInput}} instead of {{close}}, like the PRs for fixing [issue#13491|https://issues.apache.org/jira/browse/FLINK-13491] and [issue#13376|https://issues.apache.org/jira/browse/FLINK-13376]. * Timers are not taken into account. {{StreamOperator#close}} tells the operator to finish all its processing and flush output (all remaining buffered data), while {{endInput}} indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain {{OP1 -> OP2 -> ... }}, the logic should be: # {{(Source/Network)Input}} of {{OP1}} is finished. # call {{OP1#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new timers. # wait for the pending timers (in processing) of {{OP1}} to finish. # call {{OP1#close}} # call {{OP2#endInput}} # quiesce {{ProcessingTimeService}} to disallow {{OP2} from registering new timers. # ... > The runtime support for Bounded[One|Multi]Input#endInput does not properly > implement their semantics > > > Key: FLINK-14228 > URL: https://issues.apache.org/jira/browse/FLINK-14228 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0 >Reporter: Haibo Sun >Priority: Major > Fix For: 1.10.0 > > > Currently, the runtime support implementation of > {{Bounded[One|Multi]Input#endInput}} has the following problems: > * The runtime are propagating {{endInput}} immediately on the operator chain > when input of the head operator is finished. Because some operators flush the > buffered data in {{close}}, the downstream operators still receive records > after executing {{endInput}}. This need the operators to flush the buffered > data in {{endInput}} instead of {{close}}, like the PRs for fixing > [issue#13491|https://issues.apache.org/jira/browse/FLINK-13491] and > [issue#13376|https://issues.apache.org/jira/browse/FLINK-13376]. > * Timers are not taken into account. > {{StreamOperator#close}} tells the operator to finish all its processing and > flush output (all remaining buffered data), while {{endInput}} indicates that > no more data will arrive on some input of the operator. That is to say, for > the non-tail operators on the operator chain, when the upstream operator is > closed, the input of its downstream operator arrives at the end. So for an > operator chain {{OP1 -> OP2 -> ... }}, the logic should be: > # {{(Source/Network)Input}} of {{OP1}} is finished. > # call {{OP1#endInput}} > # quiesce {{ProcessingTimeService}} to
[jira] [Updated] (FLINK-14228) The runtime support for Bounded[One|Multi]Input#endInput does not properly implement their semantics
[ https://issues.apache.org/jira/browse/FLINK-14228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-14228: -- Description: Currently, the runtime support implementation of {{Bounded[One|Multi]Input#endInput}} has the following problems: 1. The runtime are propagating {{endInput}} immediately on the operator chain when input of the head operator is finished. Because some operators flush the buffered data in {{close}}, the downstream operators still receive records after executing {{endInput}}. This need the operators to flush the buffered data in {{endInput}} instead of {{close}}, like the PRs for fixing issue#13491 and issue#13376.2.Timers are not taken into account. close tells the operator to finish all its processing and flush output (all remaining buffered data), while endInput indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain OP1 -> OP2 -> ... , the logic should be: was: Currently, the runtime support implementation of {{Bounded[One|Multi]Input#endInput}} has the following problems:1.The runtime are propagating endInput immediately on the operator chain when input of the head operator is finished.Because some operators flush the buffered data in close, the downstream operators still receive records after executing endInput.This need the operator to flush the buffered data in "endInput" instead of "close", like the PRs for fixing issue#13491 and issue#13376.2.Timers are not taken into account. close tells the operator to finish all its processing and flush output (all remaining buffered data), while endInput indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain OP1 -> OP2 -> ... , the logic should be: > The runtime support for Bounded[One|Multi]Input#endInput does not properly > implement their semantics > > > Key: FLINK-14228 > URL: https://issues.apache.org/jira/browse/FLINK-14228 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.9.0 >Reporter: Haibo Sun >Priority: Major > Fix For: 1.10.0 > > > Currently, the runtime support implementation of > {{Bounded[One|Multi]Input#endInput}} has the following problems: > 1. The runtime are propagating {{endInput}} immediately on the operator chain > when input of the head operator is finished. Because some operators flush the > buffered data in {{close}}, the downstream operators still receive records > after executing {{endInput}}. This need the operators to flush the buffered > data in {{endInput}} instead of {{close}}, like the PRs for fixing > issue#13491 and issue#13376.2.Timers are not taken into account. > close tells the operator to finish all its processing and flush output (all > remaining buffered data), while endInput indicates that no more data will > arrive on some input of the operator. That is to say, for the non-tail > operators on the operator chain, when the upstream operator is closed, the > input of its downstream operator arrives at the end. So for an operator chain > OP1 -> OP2 -> ... , the logic should be: -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14228) The runtime support for Bounded[One|Multi]Input#endInput does not properly implement their semantics
Haibo Sun created FLINK-14228: - Summary: The runtime support for Bounded[One|Multi]Input#endInput does not properly implement their semantics Key: FLINK-14228 URL: https://issues.apache.org/jira/browse/FLINK-14228 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.9.0 Reporter: Haibo Sun Fix For: 1.10.0 Currently, the runtime support implementation of {{Bounded[One|Multi]Input#endInput}} has the following problems:1.The runtime are propagating endInput immediately on the operator chain when input of the head operator is finished.Because some operators flush the buffered data in close, the downstream operators still receive records after executing endInput.This need the operator to flush the buffered data in "endInput" instead of "close", like the PRs for fixing issue#13491 and issue#13376.2.Timers are not taken into account. close tells the operator to finish all its processing and flush output (all remaining buffered data), while endInput indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain OP1 -> OP2 -> ... , the logic should be: -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-13516) YARNSessionFIFOSecuredITCase fails on Java 11
[ https://issues.apache.org/jira/browse/FLINK-13516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923044#comment-16923044 ] Haibo Sun edited comment on FLINK-13516 at 9/5/19 4:58 AM: --- The failure of the case is due to the failure of authentication when the yarn client requests access authorization of resource manager, and subsequent retries lead to test timeout. New encryption types of aes128-cts-hmac-sha256-128 and aes256-cts-hmac-sha384-192 (for Kerberos 5) enabled by default were added in Java 11, while the current version of MiniKdc used by Flink does not support these encryption types and does not work well when these encryption types are enabled, which results in the authentication failure. Error Log: {{DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedActionException as:hadoop/localh...@example.com (auth:KERBEROS) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Message stream modified (41) - Message stream modified)]}} There are two solutions to fix this issue, one is to add a configuration template named "minikdc-krb5.conf" in the test resource directory, and explicitly set default_tkt_enctypes and default_tgs_enctypes to use aes128-cts-hmac-sha1-96 in the template file, the other is to bump MiniKdc to the latest version 3.2.0 (I tested that this version has solved this problem). I've tested both solutions on my local machine, and all tests that depend on MiniKdc work well on Java 8 and Java 11. Given that the version of minikdc will be updated sooner or later, if it runs successfully on Travis, I suggest to use the second solution. [~Zentol], what do you think? was (Author: sunhaibotb): The failure of the case is due to the failure of authentication when the yarn client requests access authorization of resource manager, and subsequent retries lead to test timeout. New encryption types of aes128-cts-hmac-sha256-128 and aes256-cts-hmac-sha384-192 (for Kerberos 5) enabled by default were added in Java 11, while the current version of MiniKdc used by Flink does not support these encryption types and does not work well when these encryption types are enabled, which results in the authentication failure. {{DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedActionException as:hadoop/localh...@example.com (auth:KERBEROS) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Message stream modified (41) - Message stream modified)]}} There are two solutions to fix this issue, one is to add a configuration template named "minikdc-krb5.conf" in the test resource directory, and explicitly set default_tkt_enctypes and default_tgs_enctypes to use aes128-cts-hmac-sha1-96 in the template file, the other is to bump MiniKdc to the latest version 3.2.0 (I tested that this version has solved this problem). I've tested both solutions on my local machine, and all tests that depend on MiniKdc work well on Java 8 and Java 11. Given that the version of minikdc will be updated sooner or later, if it runs successfully on Travis, I suggest to use the second solution. [~Zentol], what do you think? > YARNSessionFIFOSecuredITCase fails on Java 11 > - > > Key: FLINK-13516 > URL: https://issues.apache.org/jira/browse/FLINK-13516 > Project: Flink > Issue Type: Sub-task > Components: Deployment / YARN, Tests >Reporter: Chesnay Schepler >Assignee: Haibo Sun >Priority: Major > Fix For: 1.10.0 > > > {{YARNSessionFIFOSecuredITCase#testDetachedMode}} times out when run on Java > 11. This may be related to security changes in Java 11. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Comment Edited] (FLINK-13516) YARNSessionFIFOSecuredITCase fails on Java 11
[ https://issues.apache.org/jira/browse/FLINK-13516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923044#comment-16923044 ] Haibo Sun edited comment on FLINK-13516 at 9/5/19 4:58 AM: --- The failure of the case is due to the failure of authentication when the yarn client requests access authorization of resource manager, and subsequent retries lead to test timeout. New encryption types of aes128-cts-hmac-sha256-128 and aes256-cts-hmac-sha384-192 (for Kerberos 5) enabled by default were added in Java 11, while the current version of MiniKdc used by Flink does not support these encryption types and does not work well when these encryption types are enabled, which results in the authentication failure. {{DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedActionException as:hadoop/localh...@example.com (auth:KERBEROS) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Message stream modified (41) - Message stream modified)]}} There are two solutions to fix this issue, one is to add a configuration template named "minikdc-krb5.conf" in the test resource directory, and explicitly set default_tkt_enctypes and default_tgs_enctypes to use aes128-cts-hmac-sha1-96 in the template file, the other is to bump MiniKdc to the latest version 3.2.0 (I tested that this version has solved this problem). I've tested both solutions on my local machine, and all tests that depend on MiniKdc work well on Java 8 and Java 11. Given that the version of minikdc will be updated sooner or later, if it runs successfully on Travis, I suggest to use the second solution. [~Zentol], what do you think? was (Author: sunhaibotb): The failure of the case is due to the failure of authentication when the yarn client requests access authorization of resource manager, and subsequent retries lead to test timeout. New encryption types of aes128-cts-hmac-sha256-128 and aes256-cts-hmac-sha384-192 (for Kerberos 5) enabled by default were added in Java 11, while the current version of MiniKdc used by Flink does not support these encryption types and does not work well when these encryption types are enabled, which results in the authentication failure. There are two solutions to fix this issue, one is to add a configuration template named "minikdc-krb5.conf" in the test resource directory, and explicitly set default_tkt_enctypes and default_tgs_enctypes to use aes128-cts-hmac-sha1-96 in the template file, the other is to bump MiniKdc to the latest version 3.2.0 (I tested that this version has solved this problem). I've tested both solutions on my local machine, and all tests that depend on MiniKdc work well on Java 8 and Java 11. Considering that the version of MiniKdc will be updated sooner or later, I suggest to use the second solution. [~Zentol], what do you think? > YARNSessionFIFOSecuredITCase fails on Java 11 > - > > Key: FLINK-13516 > URL: https://issues.apache.org/jira/browse/FLINK-13516 > Project: Flink > Issue Type: Sub-task > Components: Deployment / YARN, Tests >Reporter: Chesnay Schepler >Assignee: Haibo Sun >Priority: Major > Fix For: 1.10.0 > > > {{YARNSessionFIFOSecuredITCase#testDetachedMode}} times out when run on Java > 11. This may be related to security changes in Java 11. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Comment Edited] (FLINK-13516) YARNSessionFIFOSecuredITCase fails on Java 11
[ https://issues.apache.org/jira/browse/FLINK-13516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923044#comment-16923044 ] Haibo Sun edited comment on FLINK-13516 at 9/5/19 4:43 AM: --- The failure of the case is due to the failure of authentication when the yarn client requests access authorization of resource manager, and subsequent retries lead to test timeout. New encryption types of aes128-cts-hmac-sha256-128 and aes256-cts-hmac-sha384-192 (for Kerberos 5) enabled by default were added in Java 11, while the current version of MiniKdc used by Flink does not support these encryption types and does not work well when these encryption types are enabled, which results in the authentication failure. There are two solutions to fix this issue, one is to add a configuration template named "minikdc-krb5.conf" in the test resource directory, and explicitly set default_tkt_enctypes and default_tgs_enctypes to use aes128-cts-hmac-sha1-96 in the template file, the other is to bump MiniKdc to the latest version 3.2.0 (I tested that this version has solved this problem). I've tested both solutions on my local machine, and all tests that depend on MiniKdc work well on Java 8 and Java 11. Considering that the version of MiniKdc will be updated sooner or later, I suggest to use the second solution. [~Zentol], what do you think? was (Author: sunhaibotb): The failure of the case is due to the failure of authentication when the yarn client requests access authorization of resource manager, and subsequent retries lead to test timeout. New encryption types of aes128-cts-hmac-sha256-128 and aes256-cts-hmac-sha384-192 (for Kerberos 5) enabled by default were added in Java 11, while the current version of MiniKdc used by Flink does not support these encryption types and does not work well when these encryption types are enabled, which results in the authentication failure. There are two solutions to fix this issue, one is to add a configuration template named "minikdc-krb5.conf" in the test resource directory, and explicitly set default_tkt_enctypes and default_tgs_enctypes to use aes128-cts-hmac-sha1-96 in the template file, the other is to bump MiniKdc to the latest version 3.2.0 (I tested that this version has solved this problem). I've tested both solutions on my local machine, and all tests that depend on MiniKdc work well on Java 8 and Java 11. Considering that the version of MiniKdc will be updated sooner or later, I suggest to use the second solution. [~Zentol], what do you think? > YARNSessionFIFOSecuredITCase fails on Java 11 > - > > Key: FLINK-13516 > URL: https://issues.apache.org/jira/browse/FLINK-13516 > Project: Flink > Issue Type: Sub-task > Components: Deployment / YARN, Tests >Reporter: Chesnay Schepler >Assignee: Haibo Sun >Priority: Major > Fix For: 1.10.0 > > > {{YARNSessionFIFOSecuredITCase#testDetachedMode}} times out when run on Java > 11. This may be related to security changes in Java 11. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-13516) YARNSessionFIFOSecuredITCase fails on Java 11
[ https://issues.apache.org/jira/browse/FLINK-13516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923044#comment-16923044 ] Haibo Sun commented on FLINK-13516: --- The failure of the case is due to the failure of authentication when the yarn client requests access authorization of resource manager, and subsequent retries lead to test timeout. New encryption types of aes128-cts-hmac-sha256-128 and aes256-cts-hmac-sha384-192 (for Kerberos 5) enabled by default were added in Java 11, while the current version of MiniKdc used by Flink does not support these encryption types and does not work well when these encryption types are enabled, which results in the authentication failure. There are two solutions to fix this issue, one is to add a configuration template named "minikdc-krb5.conf" in the test resource directory, and explicitly set default_tkt_enctypes and default_tgs_enctypes to use aes128-cts-hmac-sha1-96 in the template file, the other is to bump MiniKdc to the latest version 3.2.0 (I tested that this version has solved this problem). I've tested both solutions on my local machine, and all tests that depend on MiniKdc work well on Java 8 and Java 11. Considering that the version of MiniKdc will be updated sooner or later, I suggest to use the second solution. [~Zentol], what do you think? > YARNSessionFIFOSecuredITCase fails on Java 11 > - > > Key: FLINK-13516 > URL: https://issues.apache.org/jira/browse/FLINK-13516 > Project: Flink > Issue Type: Sub-task > Components: Deployment / YARN, Tests >Reporter: Chesnay Schepler >Assignee: Haibo Sun >Priority: Major > Fix For: 1.10.0 > > > {{YARNSessionFIFOSecuredITCase#testDetachedMode}} times out when run on Java > 11. This may be related to security changes in Java 11. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-13515) ClassLoaderITCase fails on Java 11
[ https://issues.apache.org/jira/browse/FLINK-13515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16920617#comment-16920617 ] Haibo Sun commented on FLINK-13515: --- Sorry for the delay. This week I will work on this JIRA. > ClassLoaderITCase fails on Java 11 > -- > > Key: FLINK-13515 > URL: https://issues.apache.org/jira/browse/FLINK-13515 > Project: Flink > Issue Type: Sub-task > Components: Command Line Client, Tests >Reporter: Chesnay Schepler >Assignee: Haibo Sun >Priority: Major > Fix For: 1.10.0 > > > {{ClassLoaderITCas#testCheckpointedStreamingClassloaderJobWithCustomClassLoader}} > fails on Java 11 because the usercode exception can be serialized in the > client. This shouldn't be possible since the user-jar isn't on the classpath > of the client. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-13516) YARNSessionFIFOSecuredITCase fails on Java 11
[ https://issues.apache.org/jira/browse/FLINK-13516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16920618#comment-16920618 ] Haibo Sun commented on FLINK-13516: --- Sorry for the delay. This week I will work on this JIRA. > YARNSessionFIFOSecuredITCase fails on Java 11 > - > > Key: FLINK-13516 > URL: https://issues.apache.org/jira/browse/FLINK-13516 > Project: Flink > Issue Type: Sub-task > Components: Deployment / YARN, Tests >Reporter: Chesnay Schepler >Assignee: Haibo Sun >Priority: Major > Fix For: 1.10.0 > > > {{YARNSessionFIFOSecuredITCase#testDetachedMode}} times out when run on Java > 11. This may be related to security changes in Java 11. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-12818) Improve stability of twoInputMapSink benchmark
[ https://issues.apache.org/jira/browse/FLINK-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16913989#comment-16913989 ] Haibo Sun commented on FLINK-12818: --- {quote}Alternatively until this is confirmed as a production code issue, we can just try to live with this benchmark instability and relay on long term trends. {quote} +1. I think we can adopt this way first. {quote}I think if we want to investigate this further, we would need to print compiled byte code and try to understand the difference. {quote} I agree. I'll try when I have time. > Improve stability of twoInputMapSink benchmark > -- > > Key: FLINK-12818 > URL: https://issues.apache.org/jira/browse/FLINK-12818 > Project: Flink > Issue Type: Sub-task > Components: Benchmarks >Reporter: Piotr Nowojski >Priority: Critical > Attachments: RecordWriter-emit.png > > > The {{twoInputMapSink}} benchmark is very unstable over time: > http://codespeed.dak8s.net:8000/timeline/#/?exe=1=twoInputMapSink=2=200=off=on=on > It should be fixed, otherwise the benchmark can not be used. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-13516) YARNSessionFIFOSecuredITCase fails on Java 11
[ https://issues.apache.org/jira/browse/FLINK-13516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16910939#comment-16910939 ] Haibo Sun commented on FLINK-13516: --- Hi, [~Zentol]. I can help with this task. Can you assign it to me? > YARNSessionFIFOSecuredITCase fails on Java 11 > - > > Key: FLINK-13516 > URL: https://issues.apache.org/jira/browse/FLINK-13516 > Project: Flink > Issue Type: Sub-task > Components: Deployment / YARN, Tests >Reporter: Chesnay Schepler >Priority: Major > Fix For: 1.10.0 > > > {{YARNSessionFIFOSecuredITCase#testDetachedMode}} times out when run on Java > 11. This may be related to security changes in Java 11. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-13515) ClassLoaderITCase fails on Java 11
[ https://issues.apache.org/jira/browse/FLINK-13515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16910938#comment-16910938 ] Haibo Sun commented on FLINK-13515: --- Hi, [~Zentol]. I can help with this task. Can you assign it to me? > ClassLoaderITCase fails on Java 11 > -- > > Key: FLINK-13515 > URL: https://issues.apache.org/jira/browse/FLINK-13515 > Project: Flink > Issue Type: Sub-task > Components: Command Line Client, Tests >Reporter: Chesnay Schepler >Priority: Major > Fix For: 1.10.0 > > > {{ClassLoaderITCas#testCheckpointedStreamingClassloaderJobWithCustomClassLoader}} > fails on Java 11 because the usercode exception can be serialized in the > client. This shouldn't be possible since the user-jar isn't on the classpath > of the client. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Comment Edited] (FLINK-13491) AsyncWaitOperator doesn't handle endInput call properly
[ https://issues.apache.org/jira/browse/FLINK-13491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16895958#comment-16895958 ] Haibo Sun edited comment on FLINK-13491 at 7/30/19 9:52 AM: So for one-input operator, {{endInput()}} repeats {{close()}} semantically. Maybe we don't need {{BoundedOneInput}}, but we need to call {{close()}} immediately when the input is finished? was (Author: sunhaibotb): So for one-input operator, `endInput()` repeats `close()` semantically. Maybe we don't need `BoundedOneInput`, but we need to call `close()` immediately when the input is finished? > AsyncWaitOperator doesn't handle endInput call properly > --- > > Key: FLINK-13491 > URL: https://issues.apache.org/jira/browse/FLINK-13491 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.9.0 >Reporter: Piotr Nowojski >Priority: Blocker > Fix For: 1.9.0 > > > This is the same issue as for {{ContinousFileReaderOperator}} in > https://issues.apache.org/jira/browse/FLINK-13376. {{AsyncWaitOperator}} will > propagate {{endInput}} notification immediately, even if it has some records > buffered. > Side note, this also shows that the current {{BoundedOneInput#endInput}} API > is easy to mishandle if an operator buffers some records internally. Maybe we > could redesign this API somehow [~aljoscha] [~sunhaibotb]? -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13491) AsyncWaitOperator doesn't handle endInput call properly
[ https://issues.apache.org/jira/browse/FLINK-13491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16895973#comment-16895973 ] Haibo Sun commented on FLINK-13491: --- For two-input operator, {{endInput(int inputId)}} is semantically different from {{close()}}, but should we call {{close()}} immediately after the last input is finished? > AsyncWaitOperator doesn't handle endInput call properly > --- > > Key: FLINK-13491 > URL: https://issues.apache.org/jira/browse/FLINK-13491 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.9.0 >Reporter: Piotr Nowojski >Priority: Blocker > Fix For: 1.9.0 > > > This is the same issue as for {{ContinousFileReaderOperator}} in > https://issues.apache.org/jira/browse/FLINK-13376. {{AsyncWaitOperator}} will > propagate {{endInput}} notification immediately, even if it has some records > buffered. > Side note, this also shows that the current {{BoundedOneInput#endInput}} API > is easy to mishandle if an operator buffers some records internally. Maybe we > could redesign this API somehow [~aljoscha] [~sunhaibotb]? -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-13491) AsyncWaitOperator doesn't handle endInput call properly
[ https://issues.apache.org/jira/browse/FLINK-13491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16895958#comment-16895958 ] Haibo Sun edited comment on FLINK-13491 at 7/30/19 9:37 AM: So for one-input operator, `endInput()` repeats `close()` semantically. Maybe we don't need `BoundedOneInput`, but we need to call `close()` immediately when the input is finished? was (Author: sunhaibotb): So for one-input operator, `endInput ()` repeats `close ()` semantically. Maybe we don't need `BoundedOneInput`, but we need to call `close()` immediately when the input is finished? > AsyncWaitOperator doesn't handle endInput call properly > --- > > Key: FLINK-13491 > URL: https://issues.apache.org/jira/browse/FLINK-13491 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.9.0 >Reporter: Piotr Nowojski >Priority: Blocker > Fix For: 1.9.0 > > > This is the same issue as for {{ContinousFileReaderOperator}} in > https://issues.apache.org/jira/browse/FLINK-13376. {{AsyncWaitOperator}} will > propagate {{endInput}} notification immediately, even if it has some records > buffered. > Side note, this also shows that the current {{BoundedOneInput#endInput}} API > is easy to mishandle if an operator buffers some records internally. Maybe we > could redesign this API somehow [~aljoscha] [~sunhaibotb]? -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13491) AsyncWaitOperator doesn't handle endInput call properly
[ https://issues.apache.org/jira/browse/FLINK-13491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16895958#comment-16895958 ] Haibo Sun commented on FLINK-13491: --- So for one-input operator, `endInput ()` repeats `close ()` semantically. Maybe we don't need `BoundedOneInput`, but we need to call `close()` immediately when the input is finished? > AsyncWaitOperator doesn't handle endInput call properly > --- > > Key: FLINK-13491 > URL: https://issues.apache.org/jira/browse/FLINK-13491 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.9.0 >Reporter: Piotr Nowojski >Priority: Blocker > Fix For: 1.9.0 > > > This is the same issue as for {{ContinousFileReaderOperator}} in > https://issues.apache.org/jira/browse/FLINK-13376. {{AsyncWaitOperator}} will > propagate {{endInput}} notification immediately, even if it has some records > buffered. > Side note, this also shows that the current {{BoundedOneInput#endInput}} API > is easy to mishandle if an operator buffers some records internally. Maybe we > could redesign this API somehow [~aljoscha] [~sunhaibotb]? -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-12818) Improve stability of twoInputMapSink benchmark
[ https://issues.apache.org/jira/browse/FLINK-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16894906#comment-16894906 ] Haibo Sun edited comment on FLINK-12818 at 7/29/19 7:19 AM: Hi [~pnowojski], The benchmark on `TwoInputSelectableStreamTask`/`StreamTwoInputSelectableProcessor` was also unstable, and the original expectation of stabilizing was broken. I made some other attempts, including upgrading `JDK 1.8` to the latest version "1.8.0_212", closing the hyper-threading of CPUs, and disabling checkpointing, but the benchmark is still unstable. After using VTune for analysis, it was found that the slow JVM-fork was more time-consuming than the fast one, mainly in the `RecordWriter#emit()` method (the stack information is shown in the following figure). I suspect this is related to the cache miss of CPU. After disabling checkpointing and adjusting the settings by the following code, the benchmark becomes stable on my local machine, but it becomes unstable once checkpointing is enabled. *Code of Class FlinkEnvironmentContext :* {code:java} public class FlinkEnvironmentContext { public StreamExecutionEnvironment env; private final int parallelism = 1; private final boolean objectReuse = true; @Setup public void setUp() throws IOException { Configuration configuration = new Configuration(); configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, 2); configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 0); configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "1mb"); env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, configuration); // set up the execution environment env.setParallelism(parallelism); env.getConfig().disableSysoutLogging(); if (objectReuse) { env.getConfig().enableObjectReuse(); } env.setStateBackend(new MemoryStateBackend()); } public void execute() throws Exception { env.execute(); } }{code} *Call Stack of RecordWriter#emit() :* !RecordWriter-emit.png! was (Author: sunhaibotb): Hi [~pnowojski], The benchmark on `TwoInputSelectableStreamTask`/`StreamTwoInputSelectableProcessor` was also unstable, and the original expectation of stabilizing was broken. I made some other attempts, including upgrading `JDK 1.8` to the latest version "1.8.0_212", closing the hyper-threading of CPUs, and disabling checkpointing, but the benchmark is still unstable. After using VTune for analysis, it was found that the slow JVM-fork was more time-consuming than the fast one, mainly in the `RecordWriter#emit()` method (the stack information is shown in the following figure). I suspect this is related to the cache miss of CPU. After disabling checkpointing and adjusting the settings by the following code, the benchmark becomes stable, but it becomes unstable once checkpointing is enabled. *Code of Class FlinkEnvironmentContext :* {code:java} public class FlinkEnvironmentContext { public StreamExecutionEnvironment env; private final int parallelism = 1; private final boolean objectReuse = true; @Setup public void setUp() throws IOException { Configuration configuration = new Configuration(); configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, 2); configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 0); configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "1mb"); env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, configuration); // set up the execution environment env.setParallelism(parallelism); env.getConfig().disableSysoutLogging(); if (objectReuse) { env.getConfig().enableObjectReuse(); } env.setStateBackend(new MemoryStateBackend()); } public void execute() throws Exception { env.execute(); } }{code} *Call Stack of RecordWriter#emit() :* !RecordWriter-emit.png! > Improve stability of twoInputMapSink benchmark > -- > > Key: FLINK-12818 > URL: https://issues.apache.org/jira/browse/FLINK-12818 > Project: Flink > Issue Type: Sub-task > Components: Benchmarks >Reporter: Piotr Nowojski >Priority: Critical > Attachments: RecordWriter-emit.png > > > The {{twoInputMapSink}} benchmark is very unstable over time: > http://codespeed.dak8s.net:8000/timeline/#/?exe=1=twoInputMapSink=2=200=off=on=on > It should be fixed, otherwise the benchmark can not be used. -- This message was sent by Atlassian JIRA
[jira] [Commented] (FLINK-12818) Improve stability of twoInputMapSink benchmark
[ https://issues.apache.org/jira/browse/FLINK-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16894906#comment-16894906 ] Haibo Sun commented on FLINK-12818: --- Hi [~pnowojski], The benchmark on `TwoInputSelectableStreamTask`/`StreamTwoInputSelectableProcessor` was also unstable, and the original expectation of stabilizing was broken. I made some other attempts, including upgrading `JDK 1.8` to the latest version "1.8.0_212", closing the hyper-threading of CPUs, and disabling checkpointing, but the benchmark is still unstable. After using VTune for analysis, it was found that the slow JVM-fork was more time-consuming than the fast one, mainly in the `RecordWriter#emit()` method (the stack information is shown in the following figure). I suspect this is related to the cache miss of CPU. After disabling checkpointing and adjusting the settings by the following code, the benchmark becomes stable, but it becomes unstable once checkpointing is enabled. *Code of Class FlinkEnvironmentContext :* {code:java} public class FlinkEnvironmentContext { public StreamExecutionEnvironment env; private final int parallelism = 1; private final boolean objectReuse = true; @Setup public void setUp() throws IOException { Configuration configuration = new Configuration(); configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, 2); configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 0); configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "1mb"); env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, configuration); // set up the execution environment env.setParallelism(parallelism); env.getConfig().disableSysoutLogging(); if (objectReuse) { env.getConfig().enableObjectReuse(); } env.setStateBackend(new MemoryStateBackend()); } public void execute() throws Exception { env.execute(); } }{code} *Call Stack of RecordWriter#emit() :* !RecordWriter-emit.png! > Improve stability of twoInputMapSink benchmark > -- > > Key: FLINK-12818 > URL: https://issues.apache.org/jira/browse/FLINK-12818 > Project: Flink > Issue Type: Sub-task > Components: Benchmarks >Reporter: Piotr Nowojski >Priority: Critical > Attachments: RecordWriter-emit.png > > > The {{twoInputMapSink}} benchmark is very unstable over time: > http://codespeed.dak8s.net:8000/timeline/#/?exe=1=twoInputMapSink=2=200=off=on=on > It should be fixed, otherwise the benchmark can not be used. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-12818) Improve stability of twoInputMapSink benchmark
[ https://issues.apache.org/jira/browse/FLINK-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-12818: -- Attachment: RecordWriter-emit.png > Improve stability of twoInputMapSink benchmark > -- > > Key: FLINK-12818 > URL: https://issues.apache.org/jira/browse/FLINK-12818 > Project: Flink > Issue Type: Sub-task > Components: Benchmarks >Reporter: Piotr Nowojski >Priority: Critical > Attachments: RecordWriter-emit.png > > > The {{twoInputMapSink}} benchmark is very unstable over time: > http://codespeed.dak8s.net:8000/timeline/#/?exe=1=twoInputMapSink=2=200=off=on=on > It should be fixed, otherwise the benchmark can not be used. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13242) StandaloneResourceManagerTest fails on travis
[ https://issues.apache.org/jira/browse/FLINK-13242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16887573#comment-16887573 ] Haibo Sun commented on FLINK-13242: --- Another instance: [https://api.travis-ci.com/v3/job/216807085/log.txt] > StandaloneResourceManagerTest fails on travis > - > > Key: FLINK-13242 > URL: https://issues.apache.org/jira/browse/FLINK-13242 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0, 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > https://travis-ci.org/apache/flink/jobs/557696989 > {code} > 08:28:06.475 [ERROR] > testStartupPeriod(org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerTest) > Time elapsed: 10.276 s <<< FAILURE! > java.lang.AssertionError: condition was not fulfilled before the deadline > at > org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerTest.assertHappensUntil(StandaloneResourceManagerTest.java:114) > at > org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerTest.testStartupPeriod(StandaloneResourceManagerTest.java:60) > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13299) flink-python failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-13299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-13299: -- Affects Version/s: 1.10.0 > flink-python failed on Travis > - > > Key: FLINK-13299 > URL: https://issues.apache.org/jira/browse/FLINK-13299 > Project: Flink > Issue Type: Bug >Affects Versions: 1.10.0 >Reporter: Haibo Sun >Priority: Major > > Log: [https://api.travis-ci.com/v3/job/216620643/log.txt] > Error: > ___ summary > > ERROR: py27: InvocationError for command > /home/travis/build/flink-ci/flink/flink-python/dev/.conda/bin/python3.7 -m > virtualenv --no-download --python > /home/travis/build/flink-ci/flink/flink-python/dev/.conda/envs/2.7/bin/python2.7 > py27 (exited with code 1) py33: commands succeeded ERROR: py34: > InvocationError for command > /home/travis/build/flink-ci/flink/flink-python/dev/.conda/bin/python3.7 -m > virtualenv --no-download --python > /home/travis/build/flink-ci/flink/flink-python/dev/.conda/envs/3.4/bin/python3.4 > py34 (exited with code 100) py35: commands succeeded py36: commands > succeeded py37: commands succeeded tox checks... > [FAILED] PYTHON exited with EXIT CODE: 1. Trying to KILL watchdog > (12896). ./tools/travis_watchdog.sh: line 229: 12896 Terminated watchdog -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13299) flink-python failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-13299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-13299: -- Description: Log: [https://api.travis-ci.com/v3/job/216620643/log.txt] Error: ___ summary ERROR: py27: InvocationError for command /home/travis/build/flink-ci/flink/flink-python/dev/.conda/bin/python3.7 -m virtualenv --no-download --python /home/travis/build/flink-ci/flink/flink-python/dev/.conda/envs/2.7/bin/python2.7 py27 (exited with code 1) py33: commands succeeded ERROR: py34: InvocationError for command /home/travis/build/flink-ci/flink/flink-python/dev/.conda/bin/python3.7 -m virtualenv --no-download --python /home/travis/build/flink-ci/flink/flink-python/dev/.conda/envs/3.4/bin/python3.4 py34 (exited with code 100) py35: commands succeeded py36: commands succeeded py37: commands succeeded tox checks... [FAILED] PYTHON exited with EXIT CODE: 1. Trying to KILL watchdog (12896). ./tools/travis_watchdog.sh: line 229: 12896 Terminated watchdog was:Log: [https://api.travis-ci.com/v3/job/216620643/log.txt] > flink-python failed on Travis > - > > Key: FLINK-13299 > URL: https://issues.apache.org/jira/browse/FLINK-13299 > Project: Flink > Issue Type: Bug >Reporter: Haibo Sun >Priority: Major > > Log: [https://api.travis-ci.com/v3/job/216620643/log.txt] > Error: > ___ summary > ERROR: py27: InvocationError for command > /home/travis/build/flink-ci/flink/flink-python/dev/.conda/bin/python3.7 -m > virtualenv --no-download --python > /home/travis/build/flink-ci/flink/flink-python/dev/.conda/envs/2.7/bin/python2.7 > py27 (exited with code 1) py33: commands succeeded ERROR: py34: > InvocationError for command > /home/travis/build/flink-ci/flink/flink-python/dev/.conda/bin/python3.7 -m > virtualenv --no-download --python > /home/travis/build/flink-ci/flink/flink-python/dev/.conda/envs/3.4/bin/python3.4 > py34 (exited with code 100) py35: commands succeeded py36: commands > succeeded py37: commands succeeded tox checks... > [FAILED] PYTHON exited with EXIT CODE: 1. Trying to KILL watchdog > (12896). ./tools/travis_watchdog.sh: line 229: 12896 Terminated watchdog -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13299) flink-python failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-13299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-13299: -- Description: Log: [https://api.travis-ci.com/v3/job/216620643/log.txt] Error: ___ summary ERROR: py27: InvocationError for command /home/travis/build/flink-ci/flink/flink-python/dev/.conda/bin/python3.7 -m virtualenv --no-download --python /home/travis/build/flink-ci/flink/flink-python/dev/.conda/envs/2.7/bin/python2.7 py27 (exited with code 1) py33: commands succeeded ERROR: py34: InvocationError for command /home/travis/build/flink-ci/flink/flink-python/dev/.conda/bin/python3.7 -m virtualenv --no-download --python /home/travis/build/flink-ci/flink/flink-python/dev/.conda/envs/3.4/bin/python3.4 py34 (exited with code 100) py35: commands succeeded py36: commands succeeded py37: commands succeeded tox checks... [FAILED] PYTHON exited with EXIT CODE: 1. Trying to KILL watchdog (12896). ./tools/travis_watchdog.sh: line 229: 12896 Terminated watchdog was: Log: [https://api.travis-ci.com/v3/job/216620643/log.txt] Error: ___ summary ERROR: py27: InvocationError for command /home/travis/build/flink-ci/flink/flink-python/dev/.conda/bin/python3.7 -m virtualenv --no-download --python /home/travis/build/flink-ci/flink/flink-python/dev/.conda/envs/2.7/bin/python2.7 py27 (exited with code 1) py33: commands succeeded ERROR: py34: InvocationError for command /home/travis/build/flink-ci/flink/flink-python/dev/.conda/bin/python3.7 -m virtualenv --no-download --python /home/travis/build/flink-ci/flink/flink-python/dev/.conda/envs/3.4/bin/python3.4 py34 (exited with code 100) py35: commands succeeded py36: commands succeeded py37: commands succeeded tox checks... [FAILED] PYTHON exited with EXIT CODE: 1. Trying to KILL watchdog (12896). ./tools/travis_watchdog.sh: line 229: 12896 Terminated watchdog > flink-python failed on Travis > - > > Key: FLINK-13299 > URL: https://issues.apache.org/jira/browse/FLINK-13299 > Project: Flink > Issue Type: Bug >Reporter: Haibo Sun >Priority: Major > > Log: [https://api.travis-ci.com/v3/job/216620643/log.txt] > Error: > ___ summary > > ERROR: py27: InvocationError for command > /home/travis/build/flink-ci/flink/flink-python/dev/.conda/bin/python3.7 -m > virtualenv --no-download --python > /home/travis/build/flink-ci/flink/flink-python/dev/.conda/envs/2.7/bin/python2.7 > py27 (exited with code 1) py33: commands succeeded ERROR: py34: > InvocationError for command > /home/travis/build/flink-ci/flink/flink-python/dev/.conda/bin/python3.7 -m > virtualenv --no-download --python > /home/travis/build/flink-ci/flink/flink-python/dev/.conda/envs/3.4/bin/python3.4 > py34 (exited with code 100) py35: commands succeeded py36: commands > succeeded py37: commands succeeded tox checks... > [FAILED] PYTHON exited with EXIT CODE: 1. Trying to KILL watchdog > (12896). ./tools/travis_watchdog.sh: line 229: 12896 Terminated watchdog -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13299) flink-python failed on Travis
Haibo Sun created FLINK-13299: - Summary: flink-python failed on Travis Key: FLINK-13299 URL: https://issues.apache.org/jira/browse/FLINK-13299 Project: Flink Issue Type: Bug Reporter: Haibo Sun Log: [https://api.travis-ci.com/v3/job/216620643/log.txt] -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-12916) KeyedComplexChainTest.testMigrationAndRestore failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-12916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885897#comment-16885897 ] Haibo Sun commented on FLINK-12916: --- Another instance: [https://api.travis-ci.com/v3/job/216341547/log.txt] > KeyedComplexChainTest.testMigrationAndRestore failed on Travis > -- > > Key: FLINK-12916 > URL: https://issues.apache.org/jira/browse/FLINK-12916 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Tests >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Yun Tang >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > The test case {{KeyedComplexChainTest.testMigrationAndRestore}} failed on > Travis because a Task received the cancellation from one of its inputs > {code} > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task > received cancellation from one of its inputs > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyAbortOnCancellationBarrier(BarrierBuffer.java:428) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processCancellationBarrier(BarrierBuffer.java:327) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.pollNext(BarrierBuffer.java:208) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:128) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.performDefaultAction(OneInputStreamTask.java:101) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:268) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:376) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:676) > ... 1 more > {code} > https://api.travis-ci.org/v3/job/548181384/log.txt -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (FLINK-12573) ability to add suffix to part file created in Bucket (StreamingFileSink)
[ https://issues.apache.org/jira/browse/FLINK-12573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16876835#comment-16876835 ] Haibo Sun edited comment on FLINK-12573 at 7/2/19 10:06 AM: Hi, [~LouisXu777] Is there any progress on this issue? Flink should expose a interface to allow customizing the filename. was (Author: sunhaibotb): Hi, [~LouisXu777] Is there any progress on this issue? I think that Flink should expose a interface to allow customizing the filename. > ability to add suffix to part file created in Bucket (StreamingFileSink) > > > Key: FLINK-12573 > URL: https://issues.apache.org/jira/browse/FLINK-12573 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: yitzchak lieberman >Assignee: Louis Xu >Priority: Major > > a possibility to add suffix to part file path other than: > new Path(bucketPath, PART_PREFIX + -- subtaskIndex + partCounter); > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12573) ability to add suffix to part file created in Bucket (StreamingFileSink)
[ https://issues.apache.org/jira/browse/FLINK-12573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16876835#comment-16876835 ] Haibo Sun commented on FLINK-12573: --- Hi, [~LouisXu777] Is there any progress on this issue? I think that Flink should expose a interface to allow customizing the filename. > ability to add suffix to part file created in Bucket (StreamingFileSink) > > > Key: FLINK-12573 > URL: https://issues.apache.org/jira/browse/FLINK-12573 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: yitzchak lieberman >Assignee: Louis Xu >Priority: Major > > a possibility to add suffix to part file path other than: > new Path(bucketPath, PART_PREFIX + -- subtaskIndex + partCounter); > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-13051) Drop the non-selectable two-input StreamTask and Processor
[ https://issues.apache.org/jira/browse/FLINK-13051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-13051: -- Description: After `StreamTwoInputSelectableProcessor` supports `CheckpointBarrierHandler`, we should drop the non-selectable two-input StreamTask and Processor. (was: After `StreamTwoInputSelectableProcessor` supports `CheckpointBarrierHandler`, we should drop the non-selectable two-input StreamTask and Processor.) > Drop the non-selectable two-input StreamTask and Processor > -- > > Key: FLINK-13051 > URL: https://issues.apache.org/jira/browse/FLINK-13051 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > > After `StreamTwoInputSelectableProcessor` supports > `CheckpointBarrierHandler`, we should drop the non-selectable two-input > StreamTask and Processor. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13051) Drop the non-selectable two-input StreamTask and Processor
Haibo Sun created FLINK-13051: - Summary: Drop the non-selectable two-input StreamTask and Processor Key: FLINK-13051 URL: https://issues.apache.org/jira/browse/FLINK-13051 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: Haibo Sun Assignee: Haibo Sun After `StreamTwoInputSelectableProcessor` supports `CheckpointBarrierHandler`, we should drop the non-selectable two-input StreamTask and Processor. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-13014) ChainBreakTest failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-13014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-13014: -- Description: Log: [https://api.travis-ci.org/v3/job/551094138/log.txt] 07:12:52.246 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 16.872 s <<< FAILURE! - in org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest 07:12:52.247 [ERROR] testMigrationAndRestore[Migrate Savepoint: 1.3](org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest) Time elapsed: 1.545 s <<< ERROR! java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs was: 07:12:52.246 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 16.872 s <<< FAILURE! - in org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest 07:12:52.247 [ERROR] testMigrationAndRestore[Migrate Savepoint: 1.3](org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest) Time elapsed: 1.545 s <<< ERROR! java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs > ChainBreakTest failed on Travis > --- > > Key: FLINK-13014 > URL: https://issues.apache.org/jira/browse/FLINK-13014 > Project: Flink > Issue Type: Bug >Reporter: Haibo Sun >Priority: Major > > Log: [https://api.travis-ci.org/v3/job/551094138/log.txt] > > 07:12:52.246 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time > elapsed: 16.872 s <<< FAILURE! - in > org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest > 07:12:52.247 [ERROR] testMigrationAndRestore[Migrate Savepoint: > 1.3](org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest) > Time elapsed: 1.545 s <<< ERROR! > java.util.concurrent.ExecutionException: > java.util.concurrent.CompletionException: > org.apache.flink.runtime.checkpoint.CheckpointException: Task received > cancellation from one of its inputs > Caused by: java.util.concurrent.CompletionException: > org.apache.flink.runtime.checkpoint.CheckpointException: Task received > cancellation from one of its inputs > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task > received cancellation from one of its inputs > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task > received cancellation from one of its inputs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13014) ChainBreakTest failed on Travis
Haibo Sun created FLINK-13014: - Summary: ChainBreakTest failed on Travis Key: FLINK-13014 URL: https://issues.apache.org/jira/browse/FLINK-13014 Project: Flink Issue Type: Bug Reporter: Haibo Sun 07:12:52.246 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 16.872 s <<< FAILURE! - in org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest 07:12:52.247 [ERROR] testMigrationAndRestore[Migrate Savepoint: 1.3](org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest) Time elapsed: 1.545 s <<< ERROR! java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12967) Change the input selection switching in StreamTwoInputSelectableProcessor#checkFinished to invoking the nextSelection method of the stream operator
Haibo Sun created FLINK-12967: - Summary: Change the input selection switching in StreamTwoInputSelectableProcessor#checkFinished to invoking the nextSelection method of the stream operator Key: FLINK-12967 URL: https://issues.apache.org/jira/browse/FLINK-12967 Project: Flink Issue Type: Sub-task Reporter: Haibo Sun Assignee: Haibo Sun The runtime (`StreamTwoInputSelectableProcessor#checkFinished()`) switches the input selection when one input is finished, because `BoundedxInput.endInput()` was not supported before the PR#8731 (https://github.com/apache/flink/pull/8731) is merged. Now we should change the logic of `StreamTwoInputSelectableProcessor#checkFinished()` to invoke `InputSelectable#nextSelection()`, and the input selection should been switched in `endInput()` by the operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12895) TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure failed on travis
[ https://issues.apache.org/jira/browse/FLINK-12895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16869298#comment-16869298 ] Haibo Sun commented on FLINK-12895: --- (y) > TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure > failed on travis > > > Key: FLINK-12895 > URL: https://issues.apache.org/jira/browse/FLINK-12895 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Tests >Affects Versions: 1.9.0 >Reporter: Haibo Sun >Assignee: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.7.3, 1.8.1, 1.9.0 > > > Logs: [https://api.travis-ci.org/v3/job/547509708/log.txt] > Build: [https://travis-ci.org/apache/flink/builds/547509701] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12895) TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure failed on travis
[ https://issues.apache.org/jira/browse/FLINK-12895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-12895: -- Description: Logs: [https://api.travis-ci.org/v3/job/547509708/log.txt] Build: [https://travis-ci.org/apache/flink/builds/547509701] was:Logs: [https://api.travis-ci.org/v3/job/547509708/log.txt] > TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure > failed on travis > > > Key: FLINK-12895 > URL: https://issues.apache.org/jira/browse/FLINK-12895 > Project: Flink > Issue Type: Bug >Affects Versions: 1.9.0 >Reporter: Haibo Sun >Priority: Major > > Logs: [https://api.travis-ci.org/v3/job/547509708/log.txt] > Build: [https://travis-ci.org/apache/flink/builds/547509701] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12895) TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure failed on travis
Haibo Sun created FLINK-12895: - Summary: TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure failed on travis Key: FLINK-12895 URL: https://issues.apache.org/jira/browse/FLINK-12895 Project: Flink Issue Type: Bug Affects Versions: 1.9.0 Reporter: Haibo Sun Logs: [https://api.travis-ci.org/v3/job/547509708/log.txt] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11879) Add JobGraph validators for the uses of InputSelectable, BoundedOneInput and BoundedMultiInput
[ https://issues.apache.org/jira/browse/FLINK-11879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-11879: -- Description: - Rejects the jobs containing operators which were implemented `InputSelectable` in case of enabled checkpointing. - Rejects the jobs containing operators which were implemented `BoundedInput` or `BoundedMultiInput` in case of enabled checkpointing. - Rejects the jobs containing operators which were implemented `InputSelectable` in case that credit-based flow control is disabled. was: - Rejects the jobs containing operators which were implemented `TwoInputSelectable` in case of enabled checkpointing. - Rejects the jobs containing operators which were implemented `BoundedInput` or `BoundedMultiInput` in case of enabled checkpointing. - Rejects the jobs containing operators which were implemented `TwoInputSelectable` in case that credit-based flow control is disabled. > Add JobGraph validators for the uses of InputSelectable, BoundedOneInput and > BoundedMultiInput > -- > > Key: FLINK-11879 > URL: https://issues.apache.org/jira/browse/FLINK-11879 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > > - Rejects the jobs containing operators which were implemented > `InputSelectable` in case of enabled checkpointing. > - Rejects the jobs containing operators which were implemented > `BoundedInput` or `BoundedMultiInput` in case of enabled checkpointing. > - Rejects the jobs containing operators which were implemented > `InputSelectable` in case that credit-based flow control is disabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11879) Add JobGraph validators for the uses of InputSelectable, BoundedOneInput and BoundedMultiInput
[ https://issues.apache.org/jira/browse/FLINK-11879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-11879: -- Description: - Rejects the jobs containing operators which were implemented `TwoInputSelectable` in case of enabled checkpointing. - Rejects the jobs containing operators which were implemented `BoundedInput` or `BoundedMultiInput` in case of enabled checkpointing. - Rejects the jobs containing operators which were implemented `TwoInputSelectable` in case that credit-based flow control is disabled. was: - Rejects the jobs containing operators which were implemented `TwoInputSelectable` in case of enabled checkpointing. - Rejects the jobs containing operators which were implemented `BoundedInput` or `BoundedTwoInput` in case of enabled checkpointing. - Rejects the jobs containing operators which were implemented `TwoInputSelectable` in case that credit-based flow control is disabled. > Add JobGraph validators for the uses of InputSelectable, BoundedOneInput and > BoundedMultiInput > -- > > Key: FLINK-11879 > URL: https://issues.apache.org/jira/browse/FLINK-11879 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > > - Rejects the jobs containing operators which were implemented > `TwoInputSelectable` in case of enabled checkpointing. > - Rejects the jobs containing operators which were implemented > `BoundedInput` or `BoundedMultiInput` in case of enabled checkpointing. > - Rejects the jobs containing operators which were implemented > `TwoInputSelectable` in case that credit-based flow control is disabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12818) Improve stability of twoInputMapSink benchmark
[ https://issues.apache.org/jira/browse/FLINK-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun reassigned FLINK-12818: - Assignee: Haibo Sun > Improve stability of twoInputMapSink benchmark > -- > > Key: FLINK-12818 > URL: https://issues.apache.org/jira/browse/FLINK-12818 > Project: Flink > Issue Type: Sub-task > Components: Benchmarks >Reporter: Piotr Nowojski >Assignee: Haibo Sun >Priority: Critical > > The {{twoInputMapSink}} benchmark is very unstable over time: > http://codespeed.dak8s.net:8000/timeline/#/?exe=1=twoInputMapSink=2=200=off=on=on > It should be fixed, otherwise the benchmark can not be used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12426) TM occasionally hang in deploying state
[ https://issues.apache.org/jira/browse/FLINK-12426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16855247#comment-16855247 ] Haibo Sun commented on FLINK-12426: --- [~QiLuo], the pull request of FLINK-12547 has been merged into the master branch. If this JIRA is the same problem as it, can you close this issue as a duplicate? > TM occasionally hang in deploying state > --- > > Key: FLINK-12426 > URL: https://issues.apache.org/jira/browse/FLINK-12426 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Qi >Priority: Major > > Hi all, > > We use Flink batch and start thousands of jobs per day. Occasionally we > observed some stuck jobs, due to some TM hang in “DEPLOYING” state. > > It seems that the TM is calling BlobClient to download jars from > JM/BlobServer. Under hood it’s calling Socket.connect() and then > Socket.read() to retrieve results. > > These jobs usually have many TM slots (1~2k). We checked the TM log and > dumped the TM thread. It indeed hung on socket read to download jar from Blob > server. > > We're using Flink 1.5 but this may also affect later versions since related > code are not changed much. We've tried to add socket timeout in BlobClient, > but still no luck. > > > TM log > > ... > INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task > DataSource (at createInput(ExecutionEnvironment.java:548) (our.code)) > (184/2000). > INFO org.apache.flink.runtime.taskmanager.Task - DataSource (at > createInput(ExecutionEnvironment.java:548) (our.code)) (184/2000) switched > from CREATED to DEPLOYING. > INFO org.apache.flink.runtime.taskmanager.Task - Creating FileSystem stream > leak safety net for task DataSource (at > createInput(ExecutionEnvironment.java:548) (our.code)) (184/2000) [DEPLOYING] > INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task > DataSource (at createInput(ExecutionEnvironment.java:548) (our.code)) > (184/2000) [DEPLOYING]. > INFO org.apache.flink.runtime.blob.BlobClient - Downloading > 19e65c0caa41f264f9ffe4ca2a48a434/p-3ecd6341bf97d5512b14c93f6c9f51f682b6db26-37d5e69d156ee00a924c1ebff0c0d280 > from some-host-ip-port > {color:#22}no more logs...{color} > > > TM thread dump: > > _"DataSource (at createInput(ExecutionEnvironment.java:548) (our.code)) > (1999/2000)" #72 prio=5 os_prio=0 tid=0x7fb9a1521000 nid=0xa0994 runnable > [0x7fb97cfbf000]_ > _java.lang.Thread.State: RUNNABLE_ > _at java.net.SocketInputStream.socketRead0(Native Method)_ > _at > java.net.SocketInputStream.socketRead(SocketInputStream.java:116)_ > _at java.net.SocketInputStream.read(SocketInputStream.java:171)_ > _at java.net.SocketInputStream.read(SocketInputStream.java:141)_ > _at > org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:152)_ > _at > org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:140)_ > _at > org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:170)_ > _at > org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)_ > _at > org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:206)_ > _at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)_ > _- locked <0x00078ab60ba8> (a java.lang.Object)_ > _at > org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:893)_ > _at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)_ > _at java.lang.Thread.run(Thread.java:748)_ > __ > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12547) Deadlock when the task thread downloads jars using BlobClient
[ https://issues.apache.org/jira/browse/FLINK-12547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun closed FLINK-12547. - > Deadlock when the task thread downloads jars using BlobClient > - > > Key: FLINK-12547 > URL: https://issues.apache.org/jira/browse/FLINK-12547 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.8.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > Time Spent: 20m > Remaining Estimate: 0h > > The jstack is as follows (this jstack is from an old Flink version, but the > master branch has the same problem). > {code:java} > "Source: Custom Source (76/400)" #68 prio=5 os_prio=0 tid=0x7f8139cd3000 > nid=0xe2 runnable [0x7f80da5fd000] > java.lang.Thread.State: RUNNABLE > at java.net.SocketInputStream.socketRead0(Native Method) > at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) > at java.net.SocketInputStream.read(SocketInputStream.java:170) > at java.net.SocketInputStream.read(SocketInputStream.java:141) > at > org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:152) > at > org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:140) > at > org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:164) > at > org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181) > at > org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:206) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120) > - locked <0x00062cf2a188> (a java.lang.Object) > at > org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:968) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:604) > at java.lang.Thread.run(Thread.java:834) > Locked ownable synchronizers: > - None > {code} > > The reason is that SO_TIMEOUT is not set in the socket connection of the blob > client. When the network packet loss seriously due to the high CPU load of > the machine, the blob client connection fails to perceive that the server has > been disconnected, which results in blocking in the native method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-1722) Streaming not respecting FinalizeOnMaster for output formats
[ https://issues.apache.org/jira/browse/FLINK-1722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16855222#comment-16855222 ] Haibo Sun commented on FLINK-1722: -- OK, I will. > Streaming not respecting FinalizeOnMaster for output formats > > > Key: FLINK-1722 > URL: https://issues.apache.org/jira/browse/FLINK-1722 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Reporter: Robert Metzger >Priority: Major > > The Hadoop output formats execute a process in the end to move the produced > files from a temp directory to the final location. > The batch API is annotating output formats that execute something in the end > with the {{FinalizeOnMaster}} interface. > The streaming component is not respecting this interface. Hence, > HadoopOutputFormats aren't writing their final data into the desired > destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-1722) Streaming not respecting FinalizeOnMaster for output formats
[ https://issues.apache.org/jira/browse/FLINK-1722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun reassigned FLINK-1722: Assignee: Haibo Sun > Streaming not respecting FinalizeOnMaster for output formats > > > Key: FLINK-1722 > URL: https://issues.apache.org/jira/browse/FLINK-1722 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Reporter: Robert Metzger >Assignee: Haibo Sun >Priority: Major > > The Hadoop output formats execute a process in the end to move the produced > files from a temp directory to the final location. > The batch API is annotating output formats that execute something in the end > with the {{FinalizeOnMaster}} interface. > The streaming component is not respecting this interface. Hence, > HadoopOutputFormats aren't writing their final data into the desired > destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-1722) Streaming not respecting FinalizeOnMaster for output formats
[ https://issues.apache.org/jira/browse/FLINK-1722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852871#comment-16852871 ] Haibo Sun edited comment on FLINK-1722 at 5/31/19 9:57 AM: --- [~aljoscha] [~rmetzger] , I wrote a simple design document, and let's start from this document to address this issue. What do you think? Design doc: [https://docs.google.com/document/d/1i_hFIqZRuNMZrfbjDIpquj6muh8tvV2fFOlCLqw2e8w/edit?usp=sharing], Any questions and suggestions are welcome. Thanks. was (Author: sunhaibotb): [~aljoscha] [~rmetzger] , I wrote a simple design document, and let's start from this document to address this issue. What do you think? Google doc: [https://docs.google.com/document/d/1i_hFIqZRuNMZrfbjDIpquj6muh8tvV2fFOlCLqw2e8w/edit?usp=sharing], Any questions and suggestions are welcome. Thanks. > Streaming not respecting FinalizeOnMaster for output formats > > > Key: FLINK-1722 > URL: https://issues.apache.org/jira/browse/FLINK-1722 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Reporter: Robert Metzger >Priority: Major > > The Hadoop output formats execute a process in the end to move the produced > files from a temp directory to the final location. > The batch API is annotating output formats that execute something in the end > with the {{FinalizeOnMaster}} interface. > The streaming component is not respecting this interface. Hence, > HadoopOutputFormats aren't writing their final data into the desired > destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-1722) Streaming not respecting FinalizeOnMaster for output formats
[ https://issues.apache.org/jira/browse/FLINK-1722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852871#comment-16852871 ] Haibo Sun edited comment on FLINK-1722 at 5/31/19 9:57 AM: --- [~aljoscha] [~rmetzger] , I wrote a simple design document, and let's start from this document to address this issue. What do you think? Design doc: [https://docs.google.com/document/d/1i_hFIqZRuNMZrfbjDIpquj6muh8tvV2fFOlCLqw2e8w/edit?usp=sharing] Any questions and suggestions are welcome. Thanks. was (Author: sunhaibotb): [~aljoscha] [~rmetzger] , I wrote a simple design document, and let's start from this document to address this issue. What do you think? Design doc: [https://docs.google.com/document/d/1i_hFIqZRuNMZrfbjDIpquj6muh8tvV2fFOlCLqw2e8w/edit?usp=sharing], Any questions and suggestions are welcome. Thanks. > Streaming not respecting FinalizeOnMaster for output formats > > > Key: FLINK-1722 > URL: https://issues.apache.org/jira/browse/FLINK-1722 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Reporter: Robert Metzger >Priority: Major > > The Hadoop output formats execute a process in the end to move the produced > files from a temp directory to the final location. > The batch API is annotating output formats that execute something in the end > with the {{FinalizeOnMaster}} interface. > The streaming component is not respecting this interface. Hence, > HadoopOutputFormats aren't writing their final data into the desired > destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-1722) Streaming not respecting FinalizeOnMaster for output formats
[ https://issues.apache.org/jira/browse/FLINK-1722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852871#comment-16852871 ] Haibo Sun edited comment on FLINK-1722 at 5/31/19 9:57 AM: --- [~aljoscha] [~rmetzger] , I wrote a simple design document, and let's start from this document to address this issue. What do you think? Google doc: [https://docs.google.com/document/d/1i_hFIqZRuNMZrfbjDIpquj6muh8tvV2fFOlCLqw2e8w/edit?usp=sharing], Any questions and suggestions are welcome. Thanks. was (Author: sunhaibotb): [~aljoscha] [~rmetzger] , I wrote a simple design document, and let's start from this document to address this issue. What do you think? Google doc: [https://docs.google.com/document/d/1i_hFIqZRuNMZrfbjDIpquj6muh8tvV2fFOlCLqw2e8w/edit?usp=sharing], Any questions and suggestions are welcome. Thanks. > Streaming not respecting FinalizeOnMaster for output formats > > > Key: FLINK-1722 > URL: https://issues.apache.org/jira/browse/FLINK-1722 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Reporter: Robert Metzger >Priority: Major > > The Hadoop output formats execute a process in the end to move the produced > files from a temp directory to the final location. > The batch API is annotating output formats that execute something in the end > with the {{FinalizeOnMaster}} interface. > The streaming component is not respecting this interface. Hence, > HadoopOutputFormats aren't writing their final data into the desired > destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-1722) Streaming not respecting FinalizeOnMaster for output formats
[ https://issues.apache.org/jira/browse/FLINK-1722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852871#comment-16852871 ] Haibo Sun commented on FLINK-1722: -- [~aljoscha] [~rmetzger] , I wrote a simple design document, and let's start from this document to address this issue. What do you think? Google doc: [https://docs.google.com/document/d/1i_hFIqZRuNMZrfbjDIpquj6muh8tvV2fFOlCLqw2e8w/edit?usp=sharing], Any questions and suggestions are welcome. Thanks. > Streaming not respecting FinalizeOnMaster for output formats > > > Key: FLINK-1722 > URL: https://issues.apache.org/jira/browse/FLINK-1722 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Reporter: Robert Metzger >Priority: Major > > The Hadoop output formats execute a process in the end to move the produced > files from a temp directory to the final location. > The batch API is annotating output formats that execute something in the end > with the {{FinalizeOnMaster}} interface. > The streaming component is not respecting this interface. Hence, > HadoopOutputFormats aren't writing their final data into the desired > destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12547) Deadlock when the task thread downloads jars using BlobClient
[ https://issues.apache.org/jira/browse/FLINK-12547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848633#comment-16848633 ] Haibo Sun edited comment on FLINK-12547 at 5/27/19 6:08 AM: [~QiLuo], Because the blob client has a retry mechanism, I understand that "The TM hangs for over an hour (longer than the SO_TIMEOUT)" is possible, but it does not mean that SO_TIMEOUT does not work. In addition, it is not excluded that there may be other reasons leading to hang. `30 minutes` is too longer, and I suggest to set SO_TIMEOUT to a smaller value. was (Author: sunhaibotb): [~QiLuo], because the blob client has a retry mechanism, I understand that "The TM hangs for over an hour (longer than the SO_TIMEOUT)" is possible, but it does not mean that SO_TIMEOUT does not work. In addition, `30 minutes` is too longer, and I think you should set SO_TIMEOUT to a smaller value. > Deadlock when the task thread downloads jars using BlobClient > - > > Key: FLINK-12547 > URL: https://issues.apache.org/jira/browse/FLINK-12547 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.8.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The jstack is as follows (this jstack is from an old Flink version, but the > master branch has the same problem). > {code:java} > "Source: Custom Source (76/400)" #68 prio=5 os_prio=0 tid=0x7f8139cd3000 > nid=0xe2 runnable [0x7f80da5fd000] > java.lang.Thread.State: RUNNABLE > at java.net.SocketInputStream.socketRead0(Native Method) > at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) > at java.net.SocketInputStream.read(SocketInputStream.java:170) > at java.net.SocketInputStream.read(SocketInputStream.java:141) > at > org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:152) > at > org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:140) > at > org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:164) > at > org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181) > at > org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:206) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120) > - locked <0x00062cf2a188> (a java.lang.Object) > at > org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:968) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:604) > at java.lang.Thread.run(Thread.java:834) > Locked ownable synchronizers: > - None > {code} > > The reason is that SO_TIMEOUT is not set in the socket connection of the blob > client. When the network packet loss seriously due to the high CPU load of > the machine, the blob client connection fails to perceive that the server has > been disconnected, which results in blocking in the native method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12547) Deadlock when the task thread downloads jars using BlobClient
[ https://issues.apache.org/jira/browse/FLINK-12547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848633#comment-16848633 ] Haibo Sun commented on FLINK-12547: --- [~QiLuo], because the blob client has a retry mechanism, I understand that "The TM hangs for over an hour (longer than the SO_TIMEOUT)" is possible, but it does not mean that SO_TIMEOUT does not work. In addition, `30 minutes` is too longer, and I think you should set SO_TIMEOUT to a smaller value. > Deadlock when the task thread downloads jars using BlobClient > - > > Key: FLINK-12547 > URL: https://issues.apache.org/jira/browse/FLINK-12547 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.8.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The jstack is as follows (this jstack is from an old Flink version, but the > master branch has the same problem). > {code:java} > "Source: Custom Source (76/400)" #68 prio=5 os_prio=0 tid=0x7f8139cd3000 > nid=0xe2 runnable [0x7f80da5fd000] > java.lang.Thread.State: RUNNABLE > at java.net.SocketInputStream.socketRead0(Native Method) > at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) > at java.net.SocketInputStream.read(SocketInputStream.java:170) > at java.net.SocketInputStream.read(SocketInputStream.java:141) > at > org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:152) > at > org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:140) > at > org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:164) > at > org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181) > at > org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:206) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120) > - locked <0x00062cf2a188> (a java.lang.Object) > at > org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:968) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:604) > at java.lang.Thread.run(Thread.java:834) > Locked ownable synchronizers: > - None > {code} > > The reason is that SO_TIMEOUT is not set in the socket connection of the blob > client. When the network packet loss seriously due to the high CPU load of > the machine, the blob client connection fails to perceive that the server has > been disconnected, which results in blocking in the native method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12547) Deadlock when the task thread downloads jars using BlobClient
[ https://issues.apache.org/jira/browse/FLINK-12547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-12547: -- Description: The jstack is as follows (this jstack is from an old Flink version, but the master branch has the same problem). {code:java} "Source: Custom Source (76/400)" #68 prio=5 os_prio=0 tid=0x7f8139cd3000 nid=0xe2 runnable [0x7f80da5fd000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:170) at java.net.SocketInputStream.read(SocketInputStream.java:141) at org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:152) at org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:140) at org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:164) at org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181) at org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:206) at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120) - locked <0x00062cf2a188> (a java.lang.Object) at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:968) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:604) at java.lang.Thread.run(Thread.java:834) Locked ownable synchronizers: - None {code} The reason is that SO_TIMEOUT is not set in the socket connection of the blob client. When the network packet loss seriously due to the high CPU load of the machine, the blob client connection fails to perceive that the server has been disconnected, which results in blocking in the native method. was: The jstack is as follows (this jstack is from an old Flink version, but the master branch has the same problem). {code:java} "Source: Custom Source (76/400)" #68 prio=5 os_prio=0 tid=0x7f8139cd3000 nid=0xe2 runnable [0x7f80da5fd000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:170) at java.net.SocketInputStream.read(SocketInputStream.java:141) at org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:152) at org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:140) at org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:164) at org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181) at org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:206) at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120) - locked <0x00062cf2a188> (a java.lang.Object) at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:968) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:604) at java.lang.Thread.run(Thread.java:834) Locked ownable synchronizers: - None {code} The reason is that SO_TIMEOUT is not set in the socket connection of the blob client. When the network packet loss seriously due to the high CPU load of the machine, the blob client connection fails to perceive that the server has been disconnected, which results in blocking in the native method. > Deadlock when the task thread downloads jars using BlobClient > - > > Key: FLINK-12547 > URL: https://issues.apache.org/jira/browse/FLINK-12547 > Project: Flink > Issue Type: Bug > Components: Runtime / Operators >Affects Versions: 1.8.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > > The jstack is as follows (this jstack is from an old Flink version, but the > master branch has the same problem). > {code:java} > "Source: Custom Source (76/400)" #68 prio=5 os_prio=0 tid=0x7f8139cd3000 > nid=0xe2 runnable [0x7f80da5fd000] > java.lang.Thread.State: RUNNABLE > at java.net.SocketInputStream.socketRead0(Native Method) > at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) > at java.net.SocketInputStream.read(SocketInputStream.java:170) > at java.net.SocketInputStream.read(SocketInputStream.java:141) > at > org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:152) > at > org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:140) > at > org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:164) > at > org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181) > at >
[jira] [Created] (FLINK-12547) Deadlock when the task thread downloads jars using BlobClient
Haibo Sun created FLINK-12547: - Summary: Deadlock when the task thread downloads jars using BlobClient Key: FLINK-12547 URL: https://issues.apache.org/jira/browse/FLINK-12547 Project: Flink Issue Type: Bug Components: Runtime / Operators Affects Versions: 1.8.0 Reporter: Haibo Sun Assignee: Haibo Sun The jstack is as follows (this jstack is from an old Flink version, but the master branch has the same problem). {code:java} "Source: Custom Source (76/400)" #68 prio=5 os_prio=0 tid=0x7f8139cd3000 nid=0xe2 runnable [0x7f80da5fd000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:170) at java.net.SocketInputStream.read(SocketInputStream.java:141) at org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:152) at org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:140) at org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:164) at org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181) at org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:206) at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120) - locked <0x00062cf2a188> (a java.lang.Object) at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:968) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:604) at java.lang.Thread.run(Thread.java:834) Locked ownable synchronizers: - None {code} The reason is that SO_TIMEOUT is not set in the socket connection of the blob client. When the network packet loss seriously due to the high CPU load of the machine, the blob client connection fails to perceive that the server has been disconnected, which results in blocking in the native method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12529) Release record-deserializer buffers timely to improve the efficiency of heap usage on taskmanager
[ https://issues.apache.org/jira/browse/FLINK-12529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842078#comment-16842078 ] Haibo Sun edited comment on FLINK-12529 at 5/17/19 10:25 AM: - > But as far as I understand you would only need to modify the > {{Stream(Two)InputProcessor#processBufferOrEvent}} code (assuming that this > change would base on my PR?). If this change based on your PR (https://github.com/apache/flink/pull/8467), that's it. I will put this change after your PR because it is simple. was (Author: sunhaibotb): > But as far as I understand you would only need to modify the > {{Stream(Two)InputProcessor#processBufferOrEvent}} code (assuming that this > change would base on my PR?). If this change based on your PR (https://github.com/apache/flink/pull/8467), your are right. This change is simple, so I will put it after your PR. > Release record-deserializer buffers timely to improve the efficiency of heap > usage on taskmanager > - > > Key: FLINK-12529 > URL: https://issues.apache.org/jira/browse/FLINK-12529 > Project: Flink > Issue Type: Improvement > Components: Runtime / Operators >Affects Versions: 1.8.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > In input processors (`StreamInputProcessor` and `StreamTwoInputProcessor`), > each input channel has a corresponding record deserializer. Currently, these > record deserializers are cleaned up at the end of the task (look at > `StreamInputProcessor#cleanup()` and `StreamTwoInputProcessor#cleanup()`). > This is not a problem for unbounded streams, but it may reduce the efficiency > of heap memory usage on taskmanger when input is bounded stream. > For example, in case that all inputs are bounded streams, some of them end > very early because of the small amount of data, and the other end very late > because of the large amount of data, then the buffers of the record > deserializers corresponding to the input channels finished early is idle for > a long time and no longer used. > In another case, when both unbounded and bounded streams exist in the inputs, > the buffers of the record deserializers corresponding to the bounded stream > are idle for ever (no longer used) after the bounded streams are finished. > Especially when the record and the parallelism of upstream are large, the > total size of `SpanningWrapper#buffer` are very large. The size of > `SpanningWrapper#buffer` is allowed to reach up to 5 MB, and if the > parallelism of upstream is 100, the maximum total size will reach 500 MB (in > our production, there are jobs with the record size up to hundreds of KB and > the parallelism of upstream up to 1000). > Overall, after receiving `EndOfPartitionEvent` from the input channel, the > corresponding record deserializer should be cleared immediately to improve > the efficiency of heap memory usage on taskmanager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12529) Release record-deserializer buffers timely to improve the efficiency of heap usage on taskmanager
[ https://issues.apache.org/jira/browse/FLINK-12529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842078#comment-16842078 ] Haibo Sun edited comment on FLINK-12529 at 5/17/19 10:23 AM: - > But as far as I understand you would only need to modify the > {{Stream(Two)InputProcessor#processBufferOrEvent}} code (assuming that this > change would base on my PR?). If this change based on your PR (https://github.com/apache/flink/pull/8467), your are right. This change is simple, so I will put it after your PR. was (Author: sunhaibotb): > But as far as I understand you would only need to modify the > {{Stream(Two)InputProcessor#processBufferOrEvent}} code (assuming that this > change would base on my PR?). If this change based on your [PR#8467|[https://github.com/apache/flink/pull/8467]], your are right. This change is simple, so I will put it after your PR. > Release record-deserializer buffers timely to improve the efficiency of heap > usage on taskmanager > - > > Key: FLINK-12529 > URL: https://issues.apache.org/jira/browse/FLINK-12529 > Project: Flink > Issue Type: Improvement > Components: Runtime / Operators >Affects Versions: 1.8.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > In input processors (`StreamInputProcessor` and `StreamTwoInputProcessor`), > each input channel has a corresponding record deserializer. Currently, these > record deserializers are cleaned up at the end of the task (look at > `StreamInputProcessor#cleanup()` and `StreamTwoInputProcessor#cleanup()`). > This is not a problem for unbounded streams, but it may reduce the efficiency > of heap memory usage on taskmanger when input is bounded stream. > For example, in case that all inputs are bounded streams, some of them end > very early because of the small amount of data, and the other end very late > because of the large amount of data, then the buffers of the record > deserializers corresponding to the input channels finished early is idle for > a long time and no longer used. > In another case, when both unbounded and bounded streams exist in the inputs, > the buffers of the record deserializers corresponding to the bounded stream > are idle for ever (no longer used) after the bounded streams are finished. > Especially when the record and the parallelism of upstream are large, the > total size of `SpanningWrapper#buffer` are very large. The size of > `SpanningWrapper#buffer` is allowed to reach up to 5 MB, and if the > parallelism of upstream is 100, the maximum total size will reach 500 MB (in > our production, there are jobs with the record size up to hundreds of KB and > the parallelism of upstream up to 1000). > Overall, after receiving `EndOfPartitionEvent` from the input channel, the > corresponding record deserializer should be cleared immediately to improve > the efficiency of heap memory usage on taskmanager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12529) Release record-deserializer buffers timely to improve the efficiency of heap usage on taskmanager
[ https://issues.apache.org/jira/browse/FLINK-12529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842078#comment-16842078 ] Haibo Sun edited comment on FLINK-12529 at 5/17/19 10:22 AM: - > But as far as I understand you would only need to modify the > {{Stream(Two)InputProcessor#processBufferOrEvent}} code (assuming that this > change would base on my PR?). If this change based on your [PR#8467|[https://github.com/apache/flink/pull/8467]], your are right. This change is simple, so I will put it after your PR. was (Author: sunhaibotb): >But as far as I understand you would only need to modify the >{{Stream(Two)InputProcessor#processBufferOrEvent}} code (assuming that this >change would base on my PR?). If this change based on your [PR#8467|[https://github.com/apache/flink/pull/8467]], your are right. This change is simple, so I will put it after your PR. > Release record-deserializer buffers timely to improve the efficiency of heap > usage on taskmanager > - > > Key: FLINK-12529 > URL: https://issues.apache.org/jira/browse/FLINK-12529 > Project: Flink > Issue Type: Improvement > Components: Runtime / Operators >Affects Versions: 1.8.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > In input processors (`StreamInputProcessor` and `StreamTwoInputProcessor`), > each input channel has a corresponding record deserializer. Currently, these > record deserializers are cleaned up at the end of the task (look at > `StreamInputProcessor#cleanup()` and `StreamTwoInputProcessor#cleanup()`). > This is not a problem for unbounded streams, but it may reduce the efficiency > of heap memory usage on taskmanger when input is bounded stream. > For example, in case that all inputs are bounded streams, some of them end > very early because of the small amount of data, and the other end very late > because of the large amount of data, then the buffers of the record > deserializers corresponding to the input channels finished early is idle for > a long time and no longer used. > In another case, when both unbounded and bounded streams exist in the inputs, > the buffers of the record deserializers corresponding to the bounded stream > are idle for ever (no longer used) after the bounded streams are finished. > Especially when the record and the parallelism of upstream are large, the > total size of `SpanningWrapper#buffer` are very large. The size of > `SpanningWrapper#buffer` is allowed to reach up to 5 MB, and if the > parallelism of upstream is 100, the maximum total size will reach 500 MB (in > our production, there are jobs with the record size up to hundreds of KB and > the parallelism of upstream up to 1000). > Overall, after receiving `EndOfPartitionEvent` from the input channel, the > corresponding record deserializer should be cleared immediately to improve > the efficiency of heap memory usage on taskmanager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12529) Release record-deserializer buffers timely to improve the efficiency of heap usage on taskmanager
[ https://issues.apache.org/jira/browse/FLINK-12529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842078#comment-16842078 ] Haibo Sun commented on FLINK-12529: --- >But as far as I understand you would only need to modify the >{{Stream(Two)InputProcessor#processBufferOrEvent}} code (assuming that this >change would base on my PR?). If this change based on your [PR#8467|[https://github.com/apache/flink/pull/8467]], your are right. This change is simple, so I will put it after your PR. > Release record-deserializer buffers timely to improve the efficiency of heap > usage on taskmanager > - > > Key: FLINK-12529 > URL: https://issues.apache.org/jira/browse/FLINK-12529 > Project: Flink > Issue Type: Improvement > Components: Runtime / Operators >Affects Versions: 1.8.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > In input processors (`StreamInputProcessor` and `StreamTwoInputProcessor`), > each input channel has a corresponding record deserializer. Currently, these > record deserializers are cleaned up at the end of the task (look at > `StreamInputProcessor#cleanup()` and `StreamTwoInputProcessor#cleanup()`). > This is not a problem for unbounded streams, but it may reduce the efficiency > of heap memory usage on taskmanger when input is bounded stream. > For example, in case that all inputs are bounded streams, some of them end > very early because of the small amount of data, and the other end very late > because of the large amount of data, then the buffers of the record > deserializers corresponding to the input channels finished early is idle for > a long time and no longer used. > In another case, when both unbounded and bounded streams exist in the inputs, > the buffers of the record deserializers corresponding to the bounded stream > are idle for ever (no longer used) after the bounded streams are finished. > Especially when the record and the parallelism of upstream are large, the > total size of `SpanningWrapper#buffer` are very large. The size of > `SpanningWrapper#buffer` is allowed to reach up to 5 MB, and if the > parallelism of upstream is 100, the maximum total size will reach 500 MB (in > our production, there are jobs with the record size up to hundreds of KB and > the parallelism of upstream up to 1000). > Overall, after receiving `EndOfPartitionEvent` from the input channel, the > corresponding record deserializer should be cleared immediately to improve > the efficiency of heap memory usage on taskmanager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12529) Release record-deserializer buffers timely to improve the efficiency of heap usage on taskmanager
[ https://issues.apache.org/jira/browse/FLINK-12529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-12529: -- Summary: Release record-deserializer buffers timely to improve the efficiency of heap usage on taskmanager (was: Release buffers of the record deserializers timely to improve the efficiency of heap memory usage on taskmanager) > Release record-deserializer buffers timely to improve the efficiency of heap > usage on taskmanager > - > > Key: FLINK-12529 > URL: https://issues.apache.org/jira/browse/FLINK-12529 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.8.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > > In input processors (`StreamInputProcessor` and `StreamTwoInputProcessor`), > each input channel has a corresponding record deserializer. Currently, these > record deserializers are cleaned up at the end of the task (look at > `StreamInputProcessor#cleanup()` and `StreamTwoInputProcessor#cleanup()`). > This is not a problem for unbounded streams, but it may reduce the efficiency > of heap memory usage on taskmanger when input is bounded stream. > For example, in case that all inputs are bounded streams, some of them end > very early because of the small amount of data, and the other end very late > because of the large amount of data, then the buffers of the record > deserializers corresponding to the input channels finished early is idle for > a long time and no longer used. > In another case, when both unbounded and bounded streams exist in the inputs, > the buffers of the record deserializers corresponding to the bounded stream > are idle for ever (no longer used) after the bounded streams are finished. > Especially when the record and the parallelism of upstream are large, the > total size of `SpanningWrapper#buffer` are very large. The size of > `SpanningWrapper#buffer` is allowed to reach up to 5 MB, and if the > parallelism of upstream is 100, the maximum total size will reach 500 MB (in > our production, there are jobs with the record size up to hundreds of KB and > the parallelism of upstream up to 1000). > Overall, after receiving `EndOfPartitionEvent` from the input channel, the > corresponding record deserializer should be cleared immediately to improve > the efficiency of heap memory usage on taskmanager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12529) Release buffers of the record deserializers timely to improve the efficiency of heap memory usage on taskmanager
[ https://issues.apache.org/jira/browse/FLINK-12529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-12529: -- Summary: Release buffers of the record deserializers timely to improve the efficiency of heap memory usage on taskmanager (was: Release buffers of the record deserializer timely to improve the efficiency of heap memory usage on taskmanager) > Release buffers of the record deserializers timely to improve the efficiency > of heap memory usage on taskmanager > > > Key: FLINK-12529 > URL: https://issues.apache.org/jira/browse/FLINK-12529 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.8.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > > In input processors (`StreamInputProcessor` and `StreamTwoInputProcessor`), > each input channel has a corresponding record deserializer. Currently, these > record deserializers are cleaned up at the end of the task (look at > `StreamInputProcessor#cleanup()` and `StreamTwoInputProcessor#cleanup()`). > This is not a problem for unbounded streams, but it may reduce the efficiency > of heap memory usage on taskmanger when input is bounded stream. > For example, in case that all inputs are bounded streams, some of them end > very early because of the small amount of data, and the other end very late > because of the large amount of data, then the buffers of the record > deserializers corresponding to the input channels finished early is idle for > a long time and no longer used. > In another case, when both unbounded and bounded streams exist in the inputs, > the buffers of the record deserializers corresponding to the bounded stream > are idle for ever (no longer used) after the bounded streams are finished. > Especially when the record and the parallelism of upstream are large, the > total size of `SpanningWrapper#buffer` are very large. The size of > `SpanningWrapper#buffer` is allowed to reach up to 5 MB, and if the > parallelism of upstream is 100, the maximum total size will reach 500 MB (in > our production, there are jobs with the record size up to hundreds of KB and > the parallelism of upstream up to 1000). > Overall, after receiving `EndOfPartitionEvent` from the input channel, the > corresponding record deserializer should be cleared immediately to improve > the efficiency of heap memory usage on taskmanager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12529) Release buffers of the record deserializer timely to improve the efficiency of heap memory usage on taskmanager
[ https://issues.apache.org/jira/browse/FLINK-12529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841470#comment-16841470 ] Haibo Sun commented on FLINK-12529: --- [~pnowojski], do you think there is any problem with this improvement? > Release buffers of the record deserializer timely to improve the efficiency > of heap memory usage on taskmanager > --- > > Key: FLINK-12529 > URL: https://issues.apache.org/jira/browse/FLINK-12529 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.8.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > > In input processors (`StreamInputProcessor` and `StreamTwoInputProcessor`), > each input channel has a corresponding record deserializer. Currently, these > record deserializers are cleaned up at the end of the task (look at > `StreamInputProcessor#cleanup()` and `StreamTwoInputProcessor#cleanup()`). > This is not a problem for unbounded streams, but it may reduce the efficiency > of heap memory usage on taskmanger when input is bounded stream. > For example, in case that all inputs are bounded streams, some of them end > very early because of the small amount of data, and the other end very late > because of the large amount of data, then the buffers of the record > deserializers corresponding to the input channels finished early is idle for > a long time and no longer used. > In another case, when both unbounded and bounded streams exist in the inputs, > the buffers of the record deserializers corresponding to the bounded stream > are idle for ever (no longer used) after the bounded streams are finished. > Especially when the record and the parallelism of upstream are large, the > total size of `SpanningWrapper#buffer` are very large. The size of > `SpanningWrapper#buffer` is allowed to reach up to 5 MB, and if the > parallelism of upstream is 100, the maximum total size will reach 500 MB (in > our production, there are jobs with the record size up to hundreds of KB and > the parallelism of upstream up to 1000). > Overall, after receiving `EndOfPartitionEvent` from the input channel, the > corresponding record deserializer should be cleared immediately to improve > the efficiency of heap memory usage on taskmanager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12529) Release buffers of the record deserializer timely to improve the efficiency of heap memory usage on taskmanager
[ https://issues.apache.org/jira/browse/FLINK-12529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841470#comment-16841470 ] Haibo Sun edited comment on FLINK-12529 at 5/16/19 3:35 PM: [~pnowojski], do you think there is any problem with this improvement? was (Author: sunhaibotb): [~pnowojski], do you think there is any problem with this improvement? > Release buffers of the record deserializer timely to improve the efficiency > of heap memory usage on taskmanager > --- > > Key: FLINK-12529 > URL: https://issues.apache.org/jira/browse/FLINK-12529 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.8.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > > In input processors (`StreamInputProcessor` and `StreamTwoInputProcessor`), > each input channel has a corresponding record deserializer. Currently, these > record deserializers are cleaned up at the end of the task (look at > `StreamInputProcessor#cleanup()` and `StreamTwoInputProcessor#cleanup()`). > This is not a problem for unbounded streams, but it may reduce the efficiency > of heap memory usage on taskmanger when input is bounded stream. > For example, in case that all inputs are bounded streams, some of them end > very early because of the small amount of data, and the other end very late > because of the large amount of data, then the buffers of the record > deserializers corresponding to the input channels finished early is idle for > a long time and no longer used. > In another case, when both unbounded and bounded streams exist in the inputs, > the buffers of the record deserializers corresponding to the bounded stream > are idle for ever (no longer used) after the bounded streams are finished. > Especially when the record and the parallelism of upstream are large, the > total size of `SpanningWrapper#buffer` are very large. The size of > `SpanningWrapper#buffer` is allowed to reach up to 5 MB, and if the > parallelism of upstream is 100, the maximum total size will reach 500 MB (in > our production, there are jobs with the record size up to hundreds of KB and > the parallelism of upstream up to 1000). > Overall, after receiving `EndOfPartitionEvent` from the input channel, the > corresponding record deserializer should be cleared immediately to improve > the efficiency of heap memory usage on taskmanager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12529) Release buffers of the record deserializer timely to improve the efficiency of heap memory usage on taskmanager
Haibo Sun created FLINK-12529: - Summary: Release buffers of the record deserializer timely to improve the efficiency of heap memory usage on taskmanager Key: FLINK-12529 URL: https://issues.apache.org/jira/browse/FLINK-12529 Project: Flink Issue Type: Improvement Affects Versions: 1.8.0 Reporter: Haibo Sun Assignee: Haibo Sun In input processors (`StreamInputProcessor` and `StreamTwoInputProcessor`), each input channel has a corresponding record deserializer. Currently, these record deserializers are cleaned up at the end of the task (look at `StreamInputProcessor#cleanup()` and `StreamTwoInputProcessor#cleanup()`). This is not a problem for unbounded streams, but it may reduce the efficiency of heap memory usage on taskmanger when input is bounded stream. For example, in case that all inputs are bounded streams, some of them end very early because of the small amount of data, and the other end very late because of the large amount of data, then the buffers of the record deserializers corresponding to the input channels finished early is idle for a long time and no longer used. In another case, when both unbounded and bounded streams exist in the inputs, the buffers of the record deserializers corresponding to the bounded stream are idle for ever (no longer used) after the bounded streams are finished. Especially when the record and the parallelism of upstream are large, the total size of `SpanningWrapper#buffer` are very large. The size of `SpanningWrapper#buffer` is allowed to reach up to 5 MB, and if the parallelism of upstream is 100, the maximum total size will reach 500 MB (in our production, there are jobs with the record size up to hundreds of KB and the parallelism of upstream up to 1000). Overall, after receiving `EndOfPartitionEvent` from the input channel, the corresponding record deserializer should be cleared immediately to improve the efficiency of heap memory usage on taskmanager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12292) Add micro benchmarks for two-input processors
[ https://issues.apache.org/jira/browse/FLINK-12292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun closed FLINK-12292. - Resolution: Invalid > Add micro benchmarks for two-input processors > - > > Key: FLINK-12292 > URL: https://issues.apache.org/jira/browse/FLINK-12292 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Operators >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > Adds benchmarks for `StreamTwoInputProcessor` and > `StreamTwoInputSelectableProcessor` to ensure that > StreamTwoInputSelectableProcessor's throughput is the same or the regression > is acceptable in the case of constant `InputSelection.ALL`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11877) Implement the runtime handling of the InputSelectable interface
[ https://issues.apache.org/jira/browse/FLINK-11877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-11877: -- Description: - Introduces a new `Input` interface to represent the logical input of operators. - Introduces a new `NetworkInput` class to represent the network input of operators. - Introduces a new `StreamTwoInputSelectableProcessor` class to implement selectively reading. - Introduces a new `TwoInputSelectableStreamTask` class to execute the operators with the selectively reading. - Adds benchmarks for `StreamTwoInputProcessor` and `StreamTwoInputSelectableProcessor` to ensure that StreamTwoInputSelectableProcessor's throughput is the same or the regression is acceptable in the case of constant `InputSelection.ALL`. was: - Introduces a new `Input` interface to represent the logical input of operators. - Introduces a new `NetworkInput` class to represent the network input of operators. - Introduces a new `StreamTwoInputSelectableProcessor` class to implement selectively reading. - Introduces a new `TwoInputSelectableStreamTask` class to execute the operators with the selectively reading. > Implement the runtime handling of the InputSelectable interface > --- > > Key: FLINK-11877 > URL: https://issues.apache.org/jira/browse/FLINK-11877 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Operators >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > - Introduces a new `Input` interface to represent the logical input of > operators. > - Introduces a new `NetworkInput` class to represent the network input of > operators. > - Introduces a new `StreamTwoInputSelectableProcessor` class to implement > selectively reading. > - Introduces a new `TwoInputSelectableStreamTask` class to execute the > operators with the selectively reading. > - Adds benchmarks for `StreamTwoInputProcessor` and > `StreamTwoInputSelectableProcessor` to ensure that > StreamTwoInputSelectableProcessor's throughput is the same or the regression > is acceptable in the case of constant `InputSelection.ALL`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12292) Add micro benchmarks for two-input processors
[ https://issues.apache.org/jira/browse/FLINK-12292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-12292: -- Summary: Add micro benchmarks for two-input processors (was: Add benchmarks for the input processors) > Add micro benchmarks for two-input processors > - > > Key: FLINK-12292 > URL: https://issues.apache.org/jira/browse/FLINK-12292 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Operators >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Adds benchmarks for `StreamTwoInputProcessor` and > `StreamTwoInputSelectableProcessor` to ensure that > StreamTwoInputSelectableProcessor's throughput is the same or the regression > is acceptable in the case of constant `InputSelection.ALL`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11877) Implement the runtime handling of the InputSelectable interface
[ https://issues.apache.org/jira/browse/FLINK-11877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-11877: -- Description: - Introduces a new `Input` interface to represent the logical input of operators. - Introduces a new `NetworkInput` class to represent the network input of operators. - Introduces a new `StreamTwoInputSelectableProcessor` class to implement selectively reading. - Introduces a new `TwoInputSelectableStreamTask` class to execute the operators with the selectively reading. was: - Introduces a new class `Input` to represent the logical input of operators. - Introduces a new class `StreamTwoInputSelectableProcessor` to implement selectively reading. - Adds benchmarks for `StreamTwoInputProcessor` and `StreamTwoInputSelectableProcessor` to ensure that StreamTwoInputSelectableProcessor's throughput is the same or the regression is acceptable in the case of constant `ALL`. > Implement the runtime handling of the InputSelectable interface > --- > > Key: FLINK-11877 > URL: https://issues.apache.org/jira/browse/FLINK-11877 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Operators >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > - Introduces a new `Input` interface to represent the logical input of > operators. > - Introduces a new `NetworkInput` class to represent the network input of > operators. > - Introduces a new `StreamTwoInputSelectableProcessor` class to implement > selectively reading. > - Introduces a new `TwoInputSelectableStreamTask` class to execute the > operators with the selectively reading. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12292) Add benchmarks for the input processors
Haibo Sun created FLINK-12292: - Summary: Add benchmarks for the input processors Key: FLINK-12292 URL: https://issues.apache.org/jira/browse/FLINK-12292 Project: Flink Issue Type: Sub-task Components: Runtime / Operators Reporter: Haibo Sun Assignee: Haibo Sun Adds benchmarks for `StreamTwoInputProcessor` and `StreamTwoInputSelectableProcessor` to ensure that StreamTwoInputSelectableProcessor's throughput is the same or the regression is acceptable in the case of constant `InputSelection.ALL`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12291) Add benchmarks for the input processors
[ https://issues.apache.org/jira/browse/FLINK-12291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun closed FLINK-12291. - Resolution: Invalid > Add benchmarks for the input processors > --- > > Key: FLINK-12291 > URL: https://issues.apache.org/jira/browse/FLINK-12291 > Project: Flink > Issue Type: Task > Components: Runtime / Operators >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > > Adds benchmarks for `StreamTwoInputProcessor` and > `StreamTwoInputSelectableProcessor` to ensure that > StreamTwoInputSelectableProcessor's throughput is the same or the regression > is acceptable in the case of constant `InputSelection.ALL`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12291) Add benchmarks for the input processors
Haibo Sun created FLINK-12291: - Summary: Add benchmarks for the input processors Key: FLINK-12291 URL: https://issues.apache.org/jira/browse/FLINK-12291 Project: Flink Issue Type: Task Components: Runtime / Operators Reporter: Haibo Sun Assignee: Haibo Sun Adds benchmarks for `StreamTwoInputProcessor` and `StreamTwoInputSelectableProcessor` to ensure that StreamTwoInputSelectableProcessor's throughput is the same or the regression is acceptable in the case of constant `InputSelection.ALL`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11876) Introduce the new interfaces InputSelectable, BoundedOneInput and BoundedMultiInput
[ https://issues.apache.org/jira/browse/FLINK-11876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-11876: -- Summary: Introduce the new interfaces InputSelectable, BoundedOneInput and BoundedMultiInput (was: Introduce the new interfaces TwoInputSelectable, BoundedOneInput and BoundedTwoInput) > Introduce the new interfaces InputSelectable, BoundedOneInput and > BoundedMultiInput > --- > > Key: FLINK-11876 > URL: https://issues.apache.org/jira/browse/FLINK-11876 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Operators >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11879) Add JobGraph validators for the uses of InputSelectable, BoundedOneInput and BoundedMultiInput
[ https://issues.apache.org/jira/browse/FLINK-11879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-11879: -- Summary: Add JobGraph validators for the uses of InputSelectable, BoundedOneInput and BoundedMultiInput (was: Add JobGraph validators for the uses of TwoInputSelectable, BoundedOneInput and BoundedTwoInput) > Add JobGraph validators for the uses of InputSelectable, BoundedOneInput and > BoundedMultiInput > -- > > Key: FLINK-11879 > URL: https://issues.apache.org/jira/browse/FLINK-11879 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Operators >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > > - Rejects the jobs containing operators which were implemented > `TwoInputSelectable` in case of enabled checkpointing. > - Rejects the jobs containing operators which were implemented > `BoundedInput` or `BoundedTwoInput` in case of enabled checkpointing. > - Rejects the jobs containing operators which were implemented > `TwoInputSelectable` in case that credit-based flow control is disabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11878) Implement the runtime handling of BoundedOneInput and BoundedMultiInput
[ https://issues.apache.org/jira/browse/FLINK-11878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-11878: -- Summary: Implement the runtime handling of BoundedOneInput and BoundedMultiInput (was: Implement the runtime handling of BoundedOneInput and BoundedTwoInput) > Implement the runtime handling of BoundedOneInput and BoundedMultiInput > --- > > Key: FLINK-11878 > URL: https://issues.apache.org/jira/browse/FLINK-11878 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Operators >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11877) Implement the runtime handling of the InputSelectable interface
[ https://issues.apache.org/jira/browse/FLINK-11877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-11877: -- Summary: Implement the runtime handling of the InputSelectable interface (was: Implement the runtime handling of TwoInputSelectable) > Implement the runtime handling of the InputSelectable interface > --- > > Key: FLINK-11877 > URL: https://issues.apache.org/jira/browse/FLINK-11877 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Operators >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > > - Introduces a new class `Input` to represent the logical input of operators. > - Introduces a new class `StreamTwoInputSelectableProcessor` to implement > selectively reading. > - Adds benchmarks for `StreamTwoInputProcessor` and > `StreamTwoInputSelectableProcessor` to ensure that > StreamTwoInputSelectableProcessor's throughput is the same or the regression > is acceptable in the case of constant `ALL`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11880) Add JobGraph validators for the uses of TwoInputSelectable, BoundedOneInput and BoundedTwoInput
[ https://issues.apache.org/jira/browse/FLINK-11880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun closed FLINK-11880. - Resolution: Duplicate It repeats FLINK-11879 . > Add JobGraph validators for the uses of TwoInputSelectable, BoundedOneInput > and BoundedTwoInput > --- > > Key: FLINK-11880 > URL: https://issues.apache.org/jira/browse/FLINK-11880 > Project: Flink > Issue Type: Sub-task >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > > - Rejects the jobs containing operators which were implemented > `TwoInputSelectable` in case of enabled checkpointing. > - Rejects the jobs containing operators which were implemented > `BoundedInput` or `BoundedTwoInput` in case of enabled checkpointing. > - Rejects the jobs containing operators which were implemented > `TwoInputSelectable` in case that credit-based flow control is disabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11879) Add JobGraph validators for the uses of TwoInputSelectable, BoundedOneInput and BoundedTwoInput
Haibo Sun created FLINK-11879: - Summary: Add JobGraph validators for the uses of TwoInputSelectable, BoundedOneInput and BoundedTwoInput Key: FLINK-11879 URL: https://issues.apache.org/jira/browse/FLINK-11879 Project: Flink Issue Type: Sub-task Reporter: Haibo Sun Assignee: Haibo Sun - Rejects the jobs containing operators which were implemented `TwoInputSelectable` in case of enabled checkpointing. - Rejects the jobs containing operators which were implemented `BoundedInput` or `BoundedTwoInput` in case of enabled checkpointing. - Rejects the jobs containing operators which were implemented `TwoInputSelectable` in case that credit-based flow control is disabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11880) Add JobGraph validators for the uses of TwoInputSelectable, BoundedOneInput and BoundedTwoInput
Haibo Sun created FLINK-11880: - Summary: Add JobGraph validators for the uses of TwoInputSelectable, BoundedOneInput and BoundedTwoInput Key: FLINK-11880 URL: https://issues.apache.org/jira/browse/FLINK-11880 Project: Flink Issue Type: Sub-task Reporter: Haibo Sun Assignee: Haibo Sun - Rejects the jobs containing operators which were implemented `TwoInputSelectable` in case of enabled checkpointing. - Rejects the jobs containing operators which were implemented `BoundedInput` or `BoundedTwoInput` in case of enabled checkpointing. - Rejects the jobs containing operators which were implemented `TwoInputSelectable` in case that credit-based flow control is disabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11877) Implement the runtime handling of TwoInputSelectable
[ https://issues.apache.org/jira/browse/FLINK-11877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-11877: -- Description: - Introduces a new class `Input` to represent the logical input of operators. - Introduces a new class `StreamTwoInputSelectableProcessor` to implement selectively reading. - Adds benchmarks for `StreamTwoInputProcessor` and `StreamTwoInputSelectableProcessor` to ensure that StreamTwoInputSelectableProcessor's throughput is the same or the regression is acceptable in the case of constant `ALL`. was: - Introduces a new class `Input` to represent the logical input of operators. - Introduces a new class `StreamTwoInputSelectableProcessor` to implement selectively reading. - Adds benchmarks for `StreamTwoInputProcessor` and `StreamTwoInputSelectableProcessor` to ensure good performance. > Implement the runtime handling of TwoInputSelectable > > > Key: FLINK-11877 > URL: https://issues.apache.org/jira/browse/FLINK-11877 > Project: Flink > Issue Type: Sub-task >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > > - Introduces a new class `Input` to represent the logical input of operators. > - Introduces a new class `StreamTwoInputSelectableProcessor` to implement > selectively reading. > - Adds benchmarks for `StreamTwoInputProcessor` and > `StreamTwoInputSelectableProcessor` to ensure that > StreamTwoInputSelectableProcessor's throughput is the same or the regression > is acceptable in the case of constant `ALL`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11878) Implement the runtime handling of BoundedOneInput and BoundedTwoInput
Haibo Sun created FLINK-11878: - Summary: Implement the runtime handling of BoundedOneInput and BoundedTwoInput Key: FLINK-11878 URL: https://issues.apache.org/jira/browse/FLINK-11878 Project: Flink Issue Type: Sub-task Reporter: Haibo Sun Assignee: Haibo Sun -- This message was sent by Atlassian JIRA (v7.6.3#76005)