[ https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011568#comment-17011568 ]
Yuan Mei edited comment on FLINK-14163 at 1/9/20 9:06 AM: ---------------------------------------------------------- Thanks for assigning the task to me! I have written a first version based on the discussion, key changes: # change Execution#producedPartitions to Execution#producedPartitionsFuture, and initiate it as an incomplete future # assign the producedPartitionsFuture in Execution#registerProducedPartitions # wrap any access to producedPartitions in a synchronous function. If later async registration is needed, callbacks can be added to substitute this method. The function is quite simple: fail if the producedPartitionsFuture is not done, otherwise return producedPartitionsFuture.get() Code Link: [https://github.com/apache/flink/compare/master...curcur:shuffle_master_async_interface?expand=1] This works fine if producedPartitions are not supposed to be accessed without registration, which is natural since we have to assign before using. Notice that `registration` and `registration finished` is different. The former refers to whether registration is always required (Execution#registerProducedPartitions is always called before accessed), and the latter refers to whether partitions are successfully registered. However, I find a lot of the unit tests fail because producedPartitions are accessed without Execution#registerProducedPartitions are called, for example ExecutionVertexDeploymentTest#testDeployCall() ExecutionGraphCheckpointCoordinatorTest#testShutdownCheckpointCoordinatorOnFailure() e.t.c. In the old version, producedPartitions is initiated as an empty map, and works well in cases the real value of producedPartitions are not necessary. I am wondering whether this is just a shortcut for tests or it is also used/allowed in some places in prod path? If access without registration is possible in prod, we can make producedPartitionsFuture Optional to differentiate whether Execution#registerProducedPartitions is called or not. Or a safer and simpler change is to keep all interfaces and usages as it is, and directly check isDone() and call get() after registration of producedPartitions, like this: [https://github.com/apache/flink/compare/master...curcur:simpler_way?expand=1] was (Author: ym): Thanks for assigning the task to me! I have written a first version based on the discussion, key changes: # change Execution#producedPartitions to Execution#producedPartitionsFuture, and initiate it as an incomplete future # assign the producedPartitionsFuture in Execution#registerProducedPartitions # wrap any access to producedPartitions in a synchronous function. If later async registration is needed, callbacks can be added to substitute this method. The function is quite simple: fail if the producedPartitionsFuture is not done, otherwise return producedPartitionsFuture.get() Code Link: [https://github.com/apache/flink/compare/master...curcur:shuffle_master_async_interface?expand=1] This works fine if producedPartitions are not supposed to be accessed without registration, which is natural since we have to assign before using. Notice that `registration` and `registration finished` is different. The former refers to whether registration is always required (Execution#registerProducedPartitions is always called before accessed), and the latter refers to whether partitions are successfully registered. However, I find a lot of the unit tests fail because producedPartitions are accessed without Execution#registerProducedPartitions are called, for example ExecutionVertexDeploymentTest#testDeployCall() ExecutionGraphCheckpointCoordinatorTest#testShutdownCheckpointCoordinatorOnFailure() e.t.c. In the old version, producedPartitions is initiated as an empty map, and works well in cases the real value of producedPartitions are not necessary. I am wondering whether this is just a shortcut for tests or it is also used/allowed in some places in prod path? If access without registration is possible in prod, we can make producedPartitionsFuture Optional to differentiate whether Execution#registerProducedPartitions is called or not. Or a safer and simpler change is to keep all interfaces and usages as it is, and directly check isDone() and call get() after registration of producedPartitions, like this: > Execution#producedPartitions is possibly not assigned when used > --------------------------------------------------------------- > > Key: FLINK-14163 > URL: https://issues.apache.org/jira/browse/FLINK-14163 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.9.0, 1.10.0 > Reporter: Zhu Zhu > Assignee: Yuan Mei > Priority: Major > Fix For: 1.10.0 > > > Currently {{Execution#producedPartitions}} is assigned after the partitions > have completed the registration to shuffle master in > {{Execution#registerProducedPartitions(...)}}. > The partition registration is an async interface > ({{ShuffleMaster#registerPartitionWithProducer(...)}}), so > {{Execution#producedPartitions}} is possible[1] not set when used. > Usages includes: > 1. deploying this task, so that the task may be deployed without its result > partitions assigned, and the job would hang. (DefaultScheduler issue only, > since legacy scheduler handled this case) > 2. generating input descriptors for downstream tasks: > 3. retrieve {{ResultPartitionID}} for partition releasing: > [1] If a user uses Flink default shuffle master {{NettyShuffleMaster}}, it is > not problematic at the moment since it returns a completed future on > registration, so that it would be a synchronized process. However, if users > implement their own shuffle service in which the > {{ShuffleMaster#registerPartitionWithProducer}} returns an pending future, it > can be a problem. This is possible since customizable shuffle service is open > to users since 1.9 (via config "shuffle-service-factory.class"). > To avoid issues to happen, we may either > 1. fix all the usages of {{Execution#producedPartitions}} regarding the async > assigning, or > 2. change {{ShuffleMaster#registerPartitionWithProducer(...)}} to a sync > interface -- This message was sent by Atlassian Jira (v8.3.4#803005)