Hi,

Thanks for the reply.

@Guowei
I agree that we can move forward step by step and start from the most
important part. Apart from the two points mentioned in your reply,
initializing and shutting down some external resources gracefully is also
important which is a reason for the open/close method.
About the cluster partitions and the ShuffleMasterContext, I agree that we
can postpone handling the cluster partitions because we need to do more to
support it, for ShuffleMasterContext, I think we still need it even we do
not support the cluster partitions in the first step. Currently, the
shuffle master can only access the cluster configuration, except that, I
think we also need need the ability of handling the fatal errors occurring
in the  ShuffleMaster gracefully by propagate the errors to the framework.
By introducing the ShuffleMasterContext, we can give ShuffleMaster the
ability to access both the  cluster configuration and the fatal error
handler. Instead of passing these components directly to the ShuffleMaster,
a ShuffleMasterContext interface can keep compatibility easily in the
future. Even we add some new method in the future, we can offer default
empty implementation in the interface which can keep compatibility.
About the JobShuffleContext::getConfiguration/listPartitions methods, I
agree that we can remove them in the first step and we can add them back
latter. As mentioned above, we can easily keep compatibility based on the
Context interface.

@Till
I totally agree that we should support that different jobs use different
shuffle services and the proposed solution will support this use case
eventually.

Best,
Yingjie

Till Rohrmann <trohrm...@apache.org> 于2021年7月7日周三 下午8:15写道:

> One quick comment: When developing the ShuffleService abstraction we also
> thought that different jobs might want to use different ShuffleServices
> depending on their workload (e.g. batch vs. streaming workload). So
> ideally, the chosen solution here can also support this use case
> eventually.
>
> Cheers,
> Till
>
> On Wed, Jul 7, 2021 at 12:50 PM Guowei Ma <guowei....@gmail.com> wrote:
>
> > Hi,
> > Thank Yingjie for initiating this discussion. What I understand that the
> > document[1] actually mainly discusses two issues:
> > 1. ShuffleMaster should be at the cluster level instead of the job level
> > 2. ShuffleMaster should notify PartitionTracker that some data has been
> > lost
> >
> > Relatively speaking, I think the second problem is more serious. Because
> > for external or remote batch shuffling services, after the machine
> storing
> > shuffled data goes offline, PartitionTracker needs to be notified in time
> > to avoid repeated failures of the job. Therefore, it is hoped that when
> > shuffle data goes offline due to a machine error, ShuffleMaster can
> notify
> > the PartitionTracker in time. This requires ShuffleMaster to notify the
> > PartitionTracker with a handle such as JobShuffleContext.
> >
> > So how to pass JobShuffleContext to ShuffleMaster? There are two options:
> > 1. After ShuffleMaster is created, pass JobShuffleContext to
> ShuffleMaster,
> > such as ShuffleMaster::register(JobShuffleContext)
> > 2. Pass ShuffleServiceFactory when creating ShuffleMaster, such as
> > ShuffleServiceFatroctry.createcreateShuffleMaster(JobShuffleContext).
> >
> > Which one to choose is actually related to issue 1. Because if
> > ShuffleMaster is a cluster level, you should choose option 1, otherwise,
> > choose option 2. I think ShuffleMaster should be at the cluster level,
> for
> > example, because we don't need to maintain a ShuffleMaster for each job
> in
> > a SessionCluster; in addition, this ShuffleMaster should also be used by
> > RM's PartitionTracker in the future. Therefore, I think Option 1 is more
> > appropriate.
> >
> > To sum up, we may give priority to solving problem 2, while taking into
> > account that ShuffleMaster should be a cluster-level component.
> Therefore,
> > I think we could ignore the date ShuffleMasterContext at the beginning;
> at
> > the same time, JobShuffleContext::getConfiguration/listPartitions should
> > not be needed.
> >
> > [1]
> >
> >
> https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit
> >
> > Best,
> > Guowei
> >
> >
> > On Fri, Jun 11, 2021 at 4:15 PM Yingjie Cao <kevin.ying...@gmail.com>
> > wrote:
> >
> > > Hi devs,
> > >
> > > I'd like to start a discussion about "Lifecycle of ShuffleMaster and
> its
> > > Relationship with JobMaster and PartitionTracker". (These are things we
> > > found when moving our external shuffle to the pluggable shuffle service
> > > framework.)
> > >
> > > The mail client may fail to display the right format. If so, please
> refer
> > > to this document:
> > >
> > >
> >
> https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit?usp=sharing
> > > .
> > > Lifecycle of ShuffleMaster
> > >
> > > Currently, the lifecycle of ShuffleMaster seems unclear.  The
> > > ShuffleServiceFactory is loaded for each JobMaster instance and then
> > > ShuffleServiceFactory#createShuffleMaster will be called to create a
> > > ShuffleMaser instance. However, the default NettyShuffleServiceFactory
> > > always returns the same ShuffleMaser singleton instance for all jobs.
> > Based
> > > on the current implementation, the lifecycle of ShuffleMaster seems
> open
> > > and depends on the shuffle plugin themselves. However, at the TM side,
> > > the ShuffleEnvironment
> > > is a part of the TaskManagerServices whose lifecycle is decoupled with
> > jobs
> > > which is more like a service. It means there is also an inconsistency
> > > between the TM side and the JM side.
> > >
> > > From my understanding, the reason for this is that the pluggable
> shuffle
> > > framework is still not completely finished yet, for example, there is a
> > > follow up umbrella ticket  FLINK-19551
> > > <https://issues.apache.org/jira/browse/FLINK-19551> for the pluggable
> > > shuffle service framework and in its subtasks, there is one task (
> > > FLINK-12731 <https://issues.apache.org/jira/browse/FLINK-12731>) which
> > > aims
> > > to load shuffle plugin with the PluginManager. I think this can solve
> the
> > > issue mentioned above. After the corresponding factory  loaded by the
> > > PluginManager, all ShuffleMaster instances can be stored in a map
> indexed
> > > by the corresponding factory class name  which can be shared by all
> jobs.
> > > After that, the ShuffleMaster becomes a cluster level service which is
> > > consistent with the ShuffleEnvironment at the TM side.
> > >
> > > As a summary, we propose to finish FLINK-12731
> > > <https://issues.apache.org/jira/browse/FLINK-12731> and make the
> shuffle
> > > service a real cluster level service first. Furthermore, we add two
> > > lifecycle methods to the ShuffleMaster interface, including start and
> > > close responding
> > > for initialization (for example, contacting the external system) and
> > > graceful shutdown (for example, releasing the resources) respectively
> > > (these methods already exist in the ShuffleEnvironment interface at the
> > TM
> > > side). What do you think?
> > > Relationship of ShuffleMaster & JobMaster
> > >
> > > Currently, JobMaster holds the reference to the corresponding
> > ShuffleMaster
> > > and it can register partitions (allocate ShuffleDescriptor from) to
> > > ShuffleMaster
> > > by the registerPartitionWithProducer method. To support use cases like
> > > allocating external resources when a job starts and releasing all
> > allocated
> > > resources when a job terminates, we may also need some job level
> > > initialization and finalization. These job level initialization and
> > > finalization are also helpful when serving multiple jobs
> simultaneously.
> > >
> > > As a summary,  we propose to add two job level lifecycle methods
> > > registerJob
> > > and unregisterJob responding for job level shuffle initialization and
> > > finalization, for example, releasing all external resources occupied by
> > the
> > > corresponding job. What do you think?
> > > Relationship of ShuffleMaster & PartitionTracker
> > >
> > > Currently, the JobMasterPartitionTracker can release external result
> > > partitions through the releasePartitionExternally method of
> > ShuffleMaster.
> > > However, the shuffle plugin (ShuffleMaster) may also need the ability
> of
> > > stopping  tracking some partitions depending on the status of the
> > external
> > > services, for example, if the external storage node which stores some
> > > partitions crashes, we need to stop tracking all partitions in it to
> > avoid
> > > reproducing the lost partitions one by one. By introducing something
> like
> > > ShuffleContext which delegates to the partition tracker, this
> requirement
> > > can be easily satisfied. Besides, for cluster partitions, we also need
> to
> > > have the ability to release them.
> > >
> > > As a summary, we propose to add a releaseDataSetExternally method to
> > > the ShuffleMaster
> > > interface which is responsible for releasing cluster partitions.
> Besides,
> > > we propose to add a ShuffleContext which can delegate to the
> > > PartitionTracker and stop tracking partitions. For the cluster
> partitions
> > > and job partitions, two separated ShuffleContext abstracts are needed.
> > > What do you think?
> > > Interface Change Summary
> > >
> > > As discussed in the above sections, we propose to make some interface
> > > changes around the ShuffleMaster interface. The first change is to
> > > pass a ShuffleMasterContex
> > > instance to the ShuffleServiceFactory when creating the ShuffleMaster
> > just
> > > like the ShuffleEnvironment creation at the TM side. Changes are marked
> > > with bold texts (the same below).
> > >
> > > public interface ShuffleServiceFactory<
> > >         SD extends ShuffleDescriptor, P extends ResultPartitionWriter,
> G
> > > extends IndexedInputGate> {
> > >
> > >     /**
> > >     * Factory method to create a specific {@link ShuffleMaster}
> > > implementation.
> > >     */
> > >     ShuffleMaster<SD> createShuffleMaster(ShuffleMasterContext
> > > shuffleMasterContext);
> > >
> > >     /**
> > >     * Factory method to create a specific local {@link
> > ShuffleEnvironment}
> > > implementation.
> > >     */
> > >     ShuffleEnvironment<P, G> createShuffleEnvironment(
> > >             ShuffleEnvironmentContext shuffleEnvironmentContext);
> > > }
> > >
> > > The following  is the ShuffleMasterContext interface. It will be
> > > implemented by the pluggable shuffle framework itself and can be used
> by
> > > the shuffle plugin. A context Interface is more friendly if we want to
> > > extend it in the future.
> > >
> > > public interface ShuffleMasterContext {
> > >
> > >     /** Gets the cluster configuration. */
> > >     Configuration getConfiguration();
> > >
> > >     /** Handles the fatal error if any. */
> > >     void onFatalError(Throwable throwable);
> > >
> > >     /**
> > >     * Stops tracking the target dataset (cluster partitions), which
> means
> > > these data can not be reused anymore.
> > >     */
> > >     CompletableFuture<Void> stopTrackingDataSet(IntermediateDataSetID
> > > dataSetID);
> > >
> > >     /** Returns IDs of all datasets (cluster partitions) being tracked
> by
> > > this cluster currently. */
> > >     CompletableFuture<List<IntermediateDataSetID>> listDataSets();
> > > }
> > >
> > > The second part to be enhanced is the ShuffleMaster interface. Methods
> to
> > > be added include start, close, registerJob, unregisterJob and
> > > releaseDataSetExternally. In addition, because each ShuffleMaster
> > instance
> > > can serve multiple jobs simultaneously, when registering partitions,
> one
> > > should also provide the corresponding JobID. The following shows the
> > > updated ShuffleMaster interface:
> > >
> > > public interface ShuffleMaster<T extends ShuffleDescriptor> extends
> > > AutoCloseable {
> > >
> > >     /**
> > >     * Starts this shuffle master, for example getting the access and
> > > connecting to the external
> > >     * system.
> > >     */
> > >     void start() throws Exception;
> > >
> > >     /** Closes this shuffle master which releases all resources. */
> > >     void close() throws Exception;
> > >
> > >     /** Registers the target job to this shuffle master. */
> > >     void registerJob(JobShuffleContext context);
> > >
> > >     /** Unregisters the target job from this shuffle master. */
> > >     void unregisterJob(JobID jobID);
> > >
> > >     /** Asynchronously register a partition and its producer with the
> > > shuffle service. */
> > >     CompletableFuture<T> registerPartitionWithProducer(
> > >             JobID jobID,
> > >             PartitionDescriptor partitionDescriptor,
> > >             ProducerDescriptor producerDescriptor);
> > >
> > >     /** Releases any external resources occupied by the given
> partition.
> > */
> > >     void releasePartitionExternally(ShuffleDescriptor
> shuffleDescriptor);
> > >
> > >     /** Releases the target cluster partitions stored externally if
> any.
> > */
> > >     void releaseDataSetExternally(IntermediateDataSetID dataSetID);
> > > }
> > >
> > > The following  is the JobShuffleContext interface. It will be
> implemented
> > > by the pluggable shuffle framework itself and can be used by the
> shuffle
> > > plugin.
> > >
> > > public interface JobShuffleContext {
> > >
> > >     /** Gets the corresponding job configuration. */
> > >     Configuration getConfiguration();
> > >
> > >     /** Gets the corresponding {@link JobID}. */
> > >     JobID getJobID();
> > >
> > >     /**
> > >     * Stops tracking the target result partitions, which means these
> > > partitions will be reproduced if used afterwards.
> > >     */
> > >     CompletableFuture<Void>
> > > stopTrackingPartitions(Collection<ResultPartitionID>
> > > partitionIDS);
> > >
> > >     /** Returns information of all partitions being tracked for the
> > current
> > > job. */
> > >     CompletableFuture<List<ResultPartitionDeploymentDescriptor>>
> > > listPartitions();
> > > }
> > >
> > > What do you think of these changes? Any feedback is highly appreciated.
> > >
> >
>

Reply via email to