[ 
https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011568#comment-17011568
 ] 

Yuan Mei edited comment on FLINK-14163 at 1/9/20 9:06 AM:
----------------------------------------------------------

Thanks for assigning the task to me!

 

I have written a first version based on the discussion, key changes:
 # change Execution#producedPartitions to Execution#producedPartitionsFuture, 
and initiate it as an incomplete future 
 # assign the producedPartitionsFuture in Execution#registerProducedPartitions
 # wrap any access to producedPartitions in a synchronous function. If later 
async registration is needed, callbacks can be added to substitute this method. 
The function is quite simple: fail if the producedPartitionsFuture is not done, 
otherwise return producedPartitionsFuture.get()

Code Link:

[https://github.com/apache/flink/compare/master...curcur:shuffle_master_async_interface?expand=1]

 

This works fine if producedPartitions are not supposed to be accessed without 
registration, which is natural since we have to assign before using. Notice 
that `registration` and `registration finished` is different. The former refers 
to whether registration is always required 
(Execution#registerProducedPartitions is always called before accessed), and 
the latter refers to whether partitions are successfully registered.

However, I find a lot of the unit tests fail because producedPartitions are 
accessed without Execution#registerProducedPartitions are called, for example 

ExecutionVertexDeploymentTest#testDeployCall()

ExecutionGraphCheckpointCoordinatorTest#testShutdownCheckpointCoordinatorOnFailure()

e.t.c.

In the old version, producedPartitions is initiated as an empty map, and works 
well in cases the real value of producedPartitions are not necessary.

I am wondering whether this is just a shortcut for tests or it is also 
used/allowed in some places in prod path?

 

If access without registration is possible in prod, we can make 
producedPartitionsFuture Optional to differentiate whether 
Execution#registerProducedPartitions is called or not.

 

Or a safer and simpler change is to keep all interfaces and usages as it is, 
and directly check isDone() and call get() after registration of 
producedPartitions, like this:

[https://github.com/apache/flink/compare/master...curcur:simpler_way?expand=1] 

 

 

 

 


was (Author: ym):
Thanks for assigning the task to me!

 

I have written a first version based on the discussion, key changes:
 # change Execution#producedPartitions to Execution#producedPartitionsFuture, 
and initiate it as an incomplete future 
 # assign the producedPartitionsFuture in Execution#registerProducedPartitions
 # wrap any access to producedPartitions in a synchronous function. If later 
async registration is needed, callbacks can be added to substitute this method. 
The function is quite simple: fail if the producedPartitionsFuture is not done, 
otherwise return producedPartitionsFuture.get()

Code Link:

[https://github.com/apache/flink/compare/master...curcur:shuffle_master_async_interface?expand=1]

 

This works fine if producedPartitions are not supposed to be accessed without 
registration, which is natural since we have to assign before using. Notice 
that `registration` and `registration finished` is different. The former refers 
to whether registration is always required 
(Execution#registerProducedPartitions is always called before accessed), and 
the latter refers to whether partitions are successfully registered.

However, I find a lot of the unit tests fail because producedPartitions are 
accessed without Execution#registerProducedPartitions are called, for example 

ExecutionVertexDeploymentTest#testDeployCall()

ExecutionGraphCheckpointCoordinatorTest#testShutdownCheckpointCoordinatorOnFailure()

e.t.c.

In the old version, producedPartitions is initiated as an empty map, and works 
well in cases the real value of producedPartitions are not necessary.

I am wondering whether this is just a shortcut for tests or it is also 
used/allowed in some places in prod path?

 

If access without registration is possible in prod, we can make 
producedPartitionsFuture Optional to differentiate whether 
Execution#registerProducedPartitions is called or not.

 

 

Or a safer and simpler change is to keep all interfaces and usages as it is, 
and directly check isDone() and call get() after registration of 
producedPartitions, like this:

 

 

 

 

 

> Execution#producedPartitions is possibly not assigned when used
> ---------------------------------------------------------------
>
>                 Key: FLINK-14163
>                 URL: https://issues.apache.org/jira/browse/FLINK-14163
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.9.0, 1.10.0
>            Reporter: Zhu Zhu
>            Assignee: Yuan Mei
>            Priority: Major
>             Fix For: 1.10.0
>
>
> Currently {{Execution#producedPartitions}} is assigned after the partitions 
> have completed the registration to shuffle master in 
> {{Execution#registerProducedPartitions(...)}}.
> The partition registration is an async interface 
> ({{ShuffleMaster#registerPartitionWithProducer(...)}}), so 
> {{Execution#producedPartitions}} is possible[1] not set when used. 
> Usages includes:
> 1. deploying this task, so that the task may be deployed without its result 
> partitions assigned, and the job would hang. (DefaultScheduler issue only, 
> since legacy scheduler handled this case)
> 2. generating input descriptors for downstream tasks: 
> 3. retrieve {{ResultPartitionID}} for partition releasing: 
> [1] If a user uses Flink default shuffle master {{NettyShuffleMaster}}, it is 
> not problematic at the moment since it returns a completed future on 
> registration, so that it would be a synchronized process. However, if users 
> implement their own shuffle service in which the 
> {{ShuffleMaster#registerPartitionWithProducer}} returns an pending future, it 
> can be a problem. This is possible since customizable shuffle service is open 
> to users since 1.9 (via config "shuffle-service-factory.class").
> To avoid issues to happen, we may either 
> 1. fix all the usages of {{Execution#producedPartitions}} regarding the async 
> assigning, or 
> 2. change {{ShuffleMaster#registerPartitionWithProducer(...)}} to a sync 
> interface



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to