I would like to start a conversation about extending the Spark shuffle
manager surface to support fully disaggregated shuffle implementations.
This is closely related to the work in SPARK-25299
<https://issues.apache.org/jira/browse/SPARK-25299>, which is focused on
refactoring the shuffle manager API (and in particular, SortShuffleManager)
to use a pluggable storage backend. The motivation for that SPIP is further
enabling Spark on Kubernetes.


The motivation for this proposal is enabling full externalized
(disaggregated) shuffle service implementations. (Facebook’s Cosco shuffle
<https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service>
is one example of such a disaggregated shuffle service.) These changes
allow the bulk of the shuffle to run in a remote service such that minimal
state resides in executors and local disk spill is minimized. The net
effect is increased job stability and performance improvements in certain
scenarios. These changes should work well with or are complementary to
SPARK-25299. Some or all points may be merged into that issue as
appropriate.


Below is a description of each component of this proposal. These changes
can ideally be introduced incrementally. I would like to gather feedback
and gauge interest from others in the community to collaborate on this.
There are likely more points that would  be useful to disaggregated shuffle
services. We can outline a more concrete plan after gathering enough input.
A working session could help us kick off this joint effort; maybe something
in the mid-January to mid-February timeframe (depending on interest and
availability. I’m happy to host at our Sunnyvale, CA offices.


ProposalScheduling and re-executing tasks

Allow coordination between the service and the Spark DAG scheduler as to
whether a given block/partition needs to be recomputed when a task fails or
when shuffle block data cannot be read. Having such coordination is
important, e.g., for suppressing recomputation after aborted executors or
for forcing late recomputation if the service internally acts as a cache.
One catchall solution is to have the shuffle manager provide an indication
of whether shuffle data is external to executors (or nodes). Another
option: allow the shuffle manager (likely on the driver) to be queried for
the existence of shuffle data for a given executor ID (or perhaps map task,
reduce task, etc). Note that this is at the level of data the scheduler is
aware of (i.e., map/reduce partitions) rather than block IDs, which are
internal details for some shuffle managers.
ShuffleManager API

Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
service knows that data is still active. This is one way to enable
time-/job-scoped data because a disaggregated shuffle service cannot rely
on robust communication with Spark and in general has a distinct lifecycle
from the Spark deployment(s) it talks to. This would likely take the form
of a callback on ShuffleManager itself, but there are other approaches.


Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle
connections/streams/file handles as well as provide commit semantics).
SPARK-25299 adds commit semantics to the internal data storage layer, but
this is applicable to all shuffle managers at a higher level and should
apply equally to the ShuffleWriter.


Do not require ShuffleManagers to expose ShuffleBlockResolvers where they
are not needed. Ideally, this would be an implementation detail of the
shuffle manager itself. If there is substantial overlap between the
SortShuffleManager and other implementations, then the storage details can
be abstracted at the appropriate level. (SPARK-25299 does not currently
change this.)


Do not require MapStatus to include blockmanager IDs where they are not
relevant. This is captured by ShuffleBlockInfo
<https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj>
including an optional BlockManagerId in SPARK-25299. However, this change
should be lifted to the MapStatus level so that it applies to all
ShuffleManagers. Alternatively, use a more general data-location
abstraction than BlockManagerId. This gives the shuffle manager more
flexibility and the scheduler more information with respect to data
residence.
Serialization

Allow serializers to be used more flexibly and efficiently. For example,
have serializers support writing an arbitrary number of objects into an
existing OutputStream or ByteBuffer. This enables objects to be serialized
to direct buffers where doing so makes sense. More importantly, it allows
arbitrary metadata/framing data to be wrapped around individual objects
cheaply. Right now, that’s only possible at the stream level. (There are
hacks around this, but this would enable more idiomatic use in efficient
shuffle implementations.)


Have serializers indicate whether they are deterministic. This provides
much of the value of a shuffle service because it means that reducers do
not need to spill to disk when reading/merging/combining inputs--the data
can be grouped by the service, even without the service understanding data
types or byte representations. Alternative (less preferable since it would
break Java serialization, for example): require all serializers to be
deterministic.



--

- Ben

Reply via email to