[ https://issues.apache.org/jira/browse/FLINK-21707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298809#comment-17298809 ]
Zhu Zhu edited comment on FLINK-21707 at 3/10/21, 12:56 PM: ------------------------------------------------------------ [~mapohl] Sorry for the confusing. It is not related to stop-with-savepoint. It is a BUG of {{PipelinedRegionSchedulingStrategy}}. And thanks a lot that the exception you post clearly shows the problem. (I did not tried debug log so it take me quite some time to figure out the cause) [~trohrmann] >> Why do we have to wait for a blocking intermediate result to be fully >> produced before scheduling independent consumers? I think it is not strictly required. But Flink was scheduling batch jobs in stage-wise pattern so pipelined region scheduling was designed to be aligned with it. In Blink version the scheduler was already reworked to trigger the scheduling of BLOCKING partition downstream vertices individually. It has been working well so I think it's safe to also do this for Flink. One known side effect is the computing complexity will increase because we now need to check all the consumers for each finished partition. However, with the improvement FLINK-21328 the performance will no longer be a problem. >> We should make sure that these kind of IllegalStateException don't only show >> up when the DEBUG log is enabled. Instead, we should fail hard. Big +1. The trouble shooting took me quite some time because the root error was not printed in INFO logs. What I am thinking is to wrap all the invocations on SchedulingStrategy methods with a try-catch and fail globally when any exception is caught. This is because the call stack can be quite deep and complex considering the underlying {{DefaultScheduler}} and {{ExecutionSlotAllocator}}. And we actually do not expect any exception to be thrown directly in these invocations. WDYT? was (Author: zhuzh): [~mapohl] Sorry for the confusing. It is not related to stop-with-savepoint. It is a BUG of {{PipelinedRegionSchedulingStrategy}}. And thanks that the exception you post clearly shows the problem. (I did not tried debug log so it take me quite some time to figure out the cause :() [~trohrmann] >> Why do we have to wait for a blocking intermediate result to be fully >> produced before scheduling independent consumers? I think it is not strictly required. But Flink was scheduling batch jobs in stage-wise pattern so pipelined region scheduling was designed to be aligned with it. In Blink version the scheduler was already reworked to trigger the scheduling of BLOCKING partition downstream vertices individually. I has been working well so I think it's safe to also do this for Flink. One known side effect is the computing complexity will increase because we now need to check all the consumers for each finished partition. However, with the improvement FLINK-21328 the performance will no longer be a problem. >> We should make sure that these kind of IllegalStateException don't only show >> up when the DEBUG log is enabled. Instead, we should fail hard. Big +1. The trouble shooting took me quite some time because the root error was not printed in INFO logs. What I am thinking is to wrap all the invocations on SchedulingStrategy methods with a try-catch and fail globally when any exception is caught. This is because the call stack can be quite deep and complex considering the underlying {{DefaultScheduler}} and {{ExecutionSlotAllocator}}. And we actually do not expect any exception to be thrown directly in these invocations. WDYT? > Job is possible to hang when restarting a FINISHED task with POINTWISE > BLOCKING consumers > ----------------------------------------------------------------------------------------- > > Key: FLINK-21707 > URL: https://issues.apache.org/jira/browse/FLINK-21707 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.11.3, 1.12.2, 1.13.0 > Reporter: Zhu Zhu > Priority: Blocker > > Job is possible to hang when restarting a FINISHED task with POINTWISE > BLOCKING consumers. This is because > {{PipelinedRegionSchedulingStrategy#onExecutionStateChange()}} will try to > schedule all the consumer tasks/regions of the finished *ExecutionJobVertex*, > even though the regions are not the exact consumers of the finished > *ExecutionVertex*. In this case, some of the regions can be in non-CREATED > state because they are not connected to nor affected by the restarted tasks. > However, {{PipelinedRegionSchedulingStrategy#maybeScheduleRegion()}} does not > allow to schedule a non-CREATED region and will throw an Exception and breaks > the scheduling of all the other regions. One example to show this problem > case can be found at > [PipelinedRegionSchedulingITCase#testRecoverFromPartitionException > |https://github.com/zhuzhurk/flink/commit/1eb036b6566c5cb4958d9957ba84dc78ce62a08c]. > To fix the problem, we can add a filter in > {{PipelinedRegionSchedulingStrategy#onExecutionStateChange()}} to only > trigger the scheduling of regions in CREATED state. -- This message was sent by Atlassian Jira (v8.3.4#803005)