Hi devs ~
Recently our team designed and started to build Flink remote shuffle
service based on 'pluggable shuffle service framework'[1] for batch
processing jobs. We found some potential enhancements could be made on
'pluggable shuffle service' and created an umbrella JIRA[2]. I raise this
DISCUSSION and want to hear broader feedback / comments on one ticket [3]
-- "The partition tracker should support remote shuffle properly".

In current Flink, data partition is bound with the ResourceID of TM in
Execution#startTrackingPartitions and JM partition tracker will stop
tracking corresponding partitions when a TM
disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle
data is bound with computing resource (TM). It works fine for internal
shuffle service, but doesn't for remote shuffle service. Note that shuffle
data is accommodated on remote, the lifecycle of a completed partition is
capable to be decoupled with TM, i.e. TM is totally fine to be released
when no computing task is on it and further shuffle reading requests could
be directed to remote shuffle cluster. In addition, when a TM is lost, its
completed data partitions on remote shuffle cluster could avoid reproducing.

The issue mentioned above is because Flink JobMasterPartitionTracker mixed
up partition's locationID (where the partition is located) and tmID (which
TM the partition is produced from). In TM internal shuffle, partition's
locationID is the same with tmID, but it is not in remote shuffle;
JobMasterPartitionTracker as an independent component should be able to
differentiate locationID and tmID of a partition, thus to handle the
lifecycle of a partition properly;

We propose that JobMasterPartitionTracker indexes partitions with both
locationID and tmID. The process of registration and unregistration will be
like below:

A. Partition Registration
- Execution#registerProducedPartitions registers partition to ShuffleMaster
and gets a ShuffleDescriptor. Current
ShuffleDescriptor#storesLocalResourcesOn returns the location of the
producing TM ONLY IF the partition occupies local resources there. We
propose to change this method a proper name and always return the
locationID of the partition. It might be as below:
    ResourceID getLocationID();
- Execution#registerProducePartitions then registers partition to
JMPartitionTracker with tmID (ResourceID of TaskManager from
TaskManagerLocation) and the locationID (acquired in above step).
JobMasterPartitionTracker will indexes a partition with both tmID and
locationID;

B. Invokes from JM and ShuffleMaster
JobMasterPartitionTracker listens invokes from both JM and ShuffleMaster.
- When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a
TM disconnects, it will check whether the disconnected tmID equals a
certain locationID of a partition. If so, tracking of the corresponding
partition will be stopped.
- When JobMasterPartitionTracker hears from ShuffleMaster that a data
location gets lost, it will unregister corresponding partitions by
locationID;

C. Partition Unregistration
When unregister a partition, JobMasterPartitionTracker removes the
corresponding indexes to tmID and locationID firstly, and then release the
partition according to shuffle service types:
- If the locationID equals to the tmID, it indicates the partition is
accommodated by TM internal shuffle service, JMPartitionTracker will invoke
TaskExecutorGateway for the release;
- If the locationID doesn't equal to tmID, it indicates the partition is
accommodated by external shuffle service, JMPartitionTracker will invoke
ShuffleMaster for the release;

With the above change, JobMasterPartitionTracker can adapt with customized
shuffle service properly for partition's lifecycle.

Looking forward to inputs on this ~~

Best,
Jin


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Service
[2] https://issues.apache.org/jira/browse/FLINK-22672
[3] https://issues.apache.org/jira/browse/FLINK-22676

Reply via email to