[ 
https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14163:
----------------------------
    Description: 
Currently {{Execution#producedPartitions}} is assigned after the partitions 
have completed the registration to shuffle master in 
{{Execution#registerProducedPartitions(...)}}.
But the task deployment process (in {{Execution#deploy())}} will create 
{{ResultPartitionDeploymentDescriptor}} directly from 
{{Execution#producedPartitions}} without checking whether it's assigned.
This may lead to a task deployed without its result partitions. And eventually 
cause the job to hang.

It is not problematic at the moment when using Flink default shuffle master 
{{NettyShuffleMaster}} since it returns a completed future on registration. 
However, if the behavior is changed or if users are using a customized 
{{ShuffleMaster}}, it may cause problems.

Besides that, {{Execution#producedPartitions}} is also used for 
 * generating downstream task input descriptor
 * retrieve {{ResultPartitionID}} for partition releasing

To avoid issues to happen, we may need to change all the usages of 
{{Execution#producedPartitions}} to a callback way, e.g. change 
{{Execution#producedPartitions}} from {{Map<IntermediateResultPartitionID, 
ResultPartitionDeploymentDescriptor>}} to 
{{CompletableFuture<Map<IntermediateResultPartitionID, 
ResultPartitionDeploymentDescriptor>>}} and adjust all its usages.

  was:
Currently {{Execution#producedPartitions}} is assigned after the partitions 
have completed the registration to shuffle master in 
{{Execution#registerProducedPartitions(...)}}.
But the task deployment process (in {{Execution#deploy())}} will create 
{{ResultPartitionDeploymentDescriptor}} directly from 
{{Execution#producedPartitions}} without checking whether it's assigned.
This may lead to a task deployed without its result partitions. And eventually 
cause the job to hang.

It is not problematic at the moment when using Flink default shuffle master 
{{NettyShuffleMaster}} since it returns a completed future on registration. 
However, if the behavior is changed or if users are using a customized 
{{ShuffleMaster}}, it may cause problems.

Besides that, {{Execution#producedPartitions}} is also used for 
 * generating downstream task input descriptor
 * retrieve {{ResultPartitionID}} for partition releasing

To avoid issues to happen, we may need to change all the usages of 
{{Execution#producedPartitions} to a callback way, e.g. change 
{{Execution#producedPartitions} from {{Map<IntermediateResultPartitionID, 
ResultPartitionDeploymentDescriptor>}} to 
{{CompletableFuture<Map<IntermediateResultPartitionID, 
ResultPartitionDeploymentDescriptor>>}} and adjust all its usages.


> Execution#producedPartitions is possibly not assigned when used
> ---------------------------------------------------------------
>
>                 Key: FLINK-14163
>                 URL: https://issues.apache.org/jira/browse/FLINK-14163
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.10.0
>            Reporter: Zhu Zhu
>            Priority: Major
>             Fix For: 1.10.0
>
>
> Currently {{Execution#producedPartitions}} is assigned after the partitions 
> have completed the registration to shuffle master in 
> {{Execution#registerProducedPartitions(...)}}.
> But the task deployment process (in {{Execution#deploy())}} will create 
> {{ResultPartitionDeploymentDescriptor}} directly from 
> {{Execution#producedPartitions}} without checking whether it's assigned.
> This may lead to a task deployed without its result partitions. And 
> eventually cause the job to hang.
> It is not problematic at the moment when using Flink default shuffle master 
> {{NettyShuffleMaster}} since it returns a completed future on registration. 
> However, if the behavior is changed or if users are using a customized 
> {{ShuffleMaster}}, it may cause problems.
> Besides that, {{Execution#producedPartitions}} is also used for 
>  * generating downstream task input descriptor
>  * retrieve {{ResultPartitionID}} for partition releasing
> To avoid issues to happen, we may need to change all the usages of 
> {{Execution#producedPartitions}} to a callback way, e.g. change 
> {{Execution#producedPartitions}} from {{Map<IntermediateResultPartitionID, 
> ResultPartitionDeploymentDescriptor>}} to 
> {{CompletableFuture<Map<IntermediateResultPartitionID, 
> ResultPartitionDeploymentDescriptor>>}} and adjust all its usages.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to