Thanks guys for the discussion in the email and also this afternoon!

>From our experience, we do not need to change Spark DAG scheduler to
implement a remote shuffle service. Current Spark shuffle manager
interfaces are pretty good and easy to implement. But we do feel the need
to modify MapStatus to make it more generic.

The current limit with MapStatus is that it assumes* a map output only
exists on a single executor* (see following). One easy update could be
making MapStatus supports the scenario where *a map output could be on
multiple remote servers*.

private[spark] sealed trait MapStatus {
def location: BlockManagerId
}

class BlockManagerId private {
private var executorId_ : String,
private var host_ : String,
private var port_ : Int,
}

Also, MapStatus is a sealed trait, thus our ShuffleManager plugin could not
extend it with our own implementation. How about *making MapStatus a public
non-sealed trait*? So different Shuffle Manager plugin could implement
their own MapStatus classes.

Best,
Bo

On Wed, Dec 4, 2019 at 3:27 PM Ben Sidhom <sid...@google.com.invalid> wrote:

> Hey Imran (and everybody who made it to the sync today):
>
> Thanks for the comments. Responses below:
>
> Scheduling and re-executing tasks
>>> Allow coordination between the service and the Spark DAG scheduler as to
>>> whether a given block/partition needs to be recomputed when a task fails or
>>> when shuffle block data cannot be read. Having such coordination is
>>> important, e.g., for suppressing recomputation after aborted executors or
>>> for forcing late recomputation if the service internally acts as a cache.
>>> One catchall solution is to have the shuffle manager provide an indication
>>> of whether shuffle data is external to executors (or nodes). Another
>>> option: allow the shuffle manager (likely on the driver) to be queried for
>>> the existence of shuffle data for a given executor ID (or perhaps map task,
>>> reduce task, etc). Note that this is at the level of data the scheduler is
>>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>>> internal details for some shuffle managers.
>>
>>
>> sounds reasonable, and I think @Matt Cheah  mentioned something like this
>> has come up with their work on SPARK-25299 and was going to be added even
>> for that work.  (of course, need to look at the actual proposal closely and
>> how it impacts the scheduler.)
>
>
> While this is something that was discussed before, it is not something
> that is *currently* in the scope of SPARK-25299. Given the number of
> parties who are doing async data pushes (either as a backup, as in the case
> of the proposal in SPARK-25299, or as the sole mechanism of data
> distribution), I expect this to be an issue at the forefront for many
> people. I have not yet written a specific proposal for how this should be
> done. Rather, I wanted to gauge how many others see this as an important
> issue and figure out the most reasonable solutions for the community as a
> whole. It sounds like people have been getting by this using hacks so far.
> I would be curious to hear what does and does not work well and which
> solutions we would be OK with in Spark upstream.
>
>
> ShuffleManager API
>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
>>> service knows that data is still active. This is one way to enable
>>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>>> on robust communication with Spark and in general has a distinct lifecycle
>>> from the Spark deployment(s) it talks to. This would likely take the form
>>> of a callback on ShuffleManager itself, but there are other approaches.
>>
>>
>
> I believe this can already be done, but maybe its much uglier than it
>> needs to be (though I don't recall the details off the top of my head).
>
>
> As far as I'm aware, this would need to be added out-of-band, e.g., by the
> ShuffleManager itself firing off its own heartbeat thread(s) (on the
> driver, executors, or both). While obviously this is possible, it's also
> prone to leaks and puts more burden on shuffle implementations. In fact, I
> don't have a robust understanding of the lifecycle of the ShuffleManager
> object itself. IIRC (from some ad-hoc tests I did a while back), a new one
> is spawned on each executor itself (as opposed to being instantiated once
> on the driver and deserialized onto executors). If executor
> (ShuffleManager) instances do not receive shutdown hooks, shuffle
> implementations may be prone to resource leaks. Worse, if the behavior of
> ShuffleManager instantiation is not stable between Spark releases, there
> may be correctness issues due to intializers/constructors running in
> unexpected ways. Then you have the ShuffleManager instance used for
> registration. As far as I can tell, this runs on the driver, but might this
> be migrated between machines (either now or in future Spark releases),
> e.g., in cluster mode?
>
> If this were taken care of by the Spark scheduler rather than the shuffle
> manager itself, we could avoid an entire class of subtle issues. My
> off-the-cuff suggestion above was to expose a callback on the
> ShuffleManager that allows implementations to define their own heartbeat
> logic. That could then be invoked by the scheduler when and where
> appropriate (along with any other lifecycle callbacks we might add).
>
> Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle
>>> connections/streams/file handles as well as provide commit semantics).
>>> SPARK-25299 adds commit semantics to the internal data storage layer, but
>>> this is applicable to all shuffle managers at a higher level and should
>>> apply equally to the ShuffleWriter.
>>
>>
>> ShuffleWriter has a
>>
>>> def stop(success: Boolean): Option[MapStatus]
>>
>>  I would need more info about why that isn't enough.  (But if there is a
>> need for it, yes this makes sense.)
>
>
> That's probably fine for most purposes. However, that stop hook only
> exists on shuffle writers. What about on readers? In any case, each
> instance reader/writer instance appears to only be invoked once for reading
> or writing. If ShuffleManagers can assume that behavior is stable, this
> point is less important. In any case, if we do intend to enable "external"
> shuffle implementations, we should make the APIs as explicit as possible
> and ensure we're enabling cleanup (and commits) wherever possible.
>
> Serialization
>>> Allow serializers to be used more flexibly and efficiently. For example,
>>> have serializers support writing an arbitrary number of objects into an
>>> existing OutputStream or ByteBuffer. This enables objects to be serialized
>>> to direct buffers where doing so makes sense. More importantly, it allows
>>> arbitrary metadata/framing data to be wrapped around individual objects
>>> cheaply. Right now, that’s only possible at the stream level. (There are
>>> hacks around this, but this would enable more idiomatic use in efficient
>>> shuffle implementations.)
>>
>>
>
> I don't really understand how this is different from the existing
>> SerializationStream -- probably a small example would clarify.
>
>
> I illustrated the use case poorly above. It *can* be worked around as of
> now, but not cleanly-and-efficiently (you *can* get one at a time).
> Consider shuffle implementations that do not dump raw stream data to some
> storage service but need to frame serialized objects in some way. They are
> stuck jumping through hoops with the current SerializationStream structure
> (e.g., instantiating a fake/wrapper OutputStream and serializer instance
> for each frame or doing even worse trickery to avoid that allocation
> penalty). If serializers could write to an *existing* byte array
> or---better yet---a ByteBuffer, then this song and dance could be avoided.
>
> I would advocate for ByteBuffers as a first-class data sink as a
> performance optimization. This confers 2 benefits:
>
>    - Users of asynchronous byte channels don't have to copy data between
>    arrays and buffers or give up asynchronicity.
>    - Direct buffers avoid excess data copies and kernel boundary jumps
>    when writing to certain sink
>
> Now that I think about it, this *could *equally benefit the SPARK-25299
> use case where channels are used.
>
> Have serializers indicate whether they are deterministic. This provides
>>> much of the value of a shuffle service because it means that reducers do
>>> not need to spill to disk when reading/merging/combining inputs--the data
>>> can be grouped by the service, even without the service understanding data
>>> types or byte representations. Alternative (less preferable since it would
>>> break Java serialization, for example): require all serializers to be
>>> deterministic.
>>
>>
>
> I really don't understand this one, sorry, can you elaborate more?  I'm
>> not sure what determinism has to do with spilling to disk.  There is
>> already supportsRelocationOfSerializedObjects , though that is private,
>> which seems related but I think you're talking about something else?
>
>
> First off, by deterministic serialization I mean literally that: one
> object (or two objects that are considered equal) will serialize to the
> same byte representation no matter when/how it is serialized. This point is
> about allowing external shuffle/merging services to operate on the
> key/value level without having to actually understand the byte
> representation of objects. Instead of merging *partitions*, shuffle
> managers can merge *data elements*. All of this can be done without
> shipping JVM Comparator functions (i.e., arbitrary code) to shuffle
> services.
>
> There are some dirty hacks/workarounds that can approximate this behavior
> even without strictly deterministic serialization, but we can only
> *guarantee* that shuffle readers (or writers for that matter) do not
> require local disk spill (no more local ExternalSorters) when we're working
> with deterministic serializers and a shuffle service that understands so.
>
> As far as I'm aware, supportsRelocationOfSerializedObjects only means that
> a given object can be moved around within a segment of serialized data.
> (For example, certain object graphs with cycles or other unusual data
> structures can be encoded but impose requirements on data stream ordering.)
> Note that serialized object relocation is a necessary but not sufficient
> condition for deterministic serialization (and spill-free shuffles).
>
>
>
> Anyway, there were a *lot* of people on the call today and we didn't get
> a chance to dig into the nitty-gritty details of these points. I would like
> to know what others think of these (not-fleshed-out) proposals, how they do
> (or do not) work with disaggregated shuffle implementations in the wild,
> and alternative workarounds that people have used so far. I'm particularly
> interested in learning how others have dealt with async writes and data
> reconciliation. Once I have that feedback, I'm happy to put out a more
> focused design doc that we can collect further comments on and iterate.
>
> On Wed, Dec 4, 2019 at 10:58 AM Imran Rashid <iras...@cloudera.com.invalid>
> wrote:
>
>> Hi Ben,
>>
>> in general everything you're proposing sounds reasonable.  For me, at
>> least, I'd need more details on most of the points before I fully
>> understand them, but I'm definitely in favor of the general goal for making
>> spark support fully disaggregated shuffle.  Of course, I also want to make
>> sure it can be done in a way that involves the least risky changes to spark
>> itself and we can continue to support.
>>
>> One very-high level point which I think is worth keeping in mind for the
>> wider community following this -- the key difference between what you are
>> proposing and SPARK-25299, is that SPARK-25299 still uses spark's existing
>> shuffle implementation, which leverages local disk.  Your goal is to better
>> support shuffling all data via some external service, which avoids shuffle
>> data hitting executors local disks entirely.  This was already possible, to
>> some extent, even before SPARK-25299 with the ShuffleManager api; but as
>> you note, there are shortcomings which need to be addressed.  (Historical
>> note: that api wasn't designed with totally distributed shuffle services in
>> mind, it was to support hash- vs. sort-based shuffle, all still on spark's
>> executors.)
>>
>> One thing that I thought you would have needed, but you didn't mention
>> here, is changes to the scheduler to add an extra step between the
>> shuffle-write & shuffle-read stages, if it needs to do any work to
>> reorganize data, I think I have heard this come up in prior discussions.
>>
>> A couple of inline comments below:
>>
>> On Fri, Nov 15, 2019 at 6:10 PM Ben Sidhom <sid...@google.com.invalid>
>> wrote:
>>
>>> Proposal
>>> Scheduling and re-executing tasks
>>>
>>> Allow coordination between the service and the Spark DAG scheduler as to
>>> whether a given block/partition needs to be recomputed when a task fails or
>>> when shuffle block data cannot be read. Having such coordination is
>>> important, e.g., for suppressing recomputation after aborted executors or
>>> for forcing late recomputation if the service internally acts as a cache.
>>> One catchall solution is to have the shuffle manager provide an indication
>>> of whether shuffle data is external to executors (or nodes). Another
>>> option: allow the shuffle manager (likely on the driver) to be queried for
>>> the existence of shuffle data for a given executor ID (or perhaps map task,
>>> reduce task, etc). Note that this is at the level of data the scheduler is
>>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>>> internal details for some shuffle managers.
>>>
>>
>> sounds reasonable, and I think @Matt Cheah <mch...@palantir.com>
>> mentioned something like this has come up with their work on SPARK-25299
>> and was going to be added even for that work.  (of course, need to look at
>> the actual proposal closely and how it impacts the scheduler.)
>>
>>> ShuffleManager API
>>>
>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
>>> service knows that data is still active. This is one way to enable
>>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>>> on robust communication with Spark and in general has a distinct lifecycle
>>> from the Spark deployment(s) it talks to. This would likely take the form
>>> of a callback on ShuffleManager itself, but there are other approaches.
>>>
>>
>> I believe this can already be done, but maybe its much uglier than it
>> needs to be (though I don't recall the details off the top of my head).
>>
>>
>>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>>> close/recycle connections/streams/file handles as well as provide commit
>>> semantics). SPARK-25299 adds commit semantics to the internal data storage
>>> layer, but this is applicable to all shuffle managers at a higher level and
>>> should apply equally to the ShuffleWriter.
>>>
>>
>> ShuffleWriter has a
>>
>> def stop(success: Boolean): Option[MapStatus]
>>
>>  I would need more info about why that isn't enough.  (But if there is a
>> need for it, yes this makes sense.)
>>
>>> Serialization
>>>
>>> Allow serializers to be used more flexibly and efficiently. For example,
>>> have serializers support writing an arbitrary number of objects into an
>>> existing OutputStream or ByteBuffer. This enables objects to be serialized
>>> to direct buffers where doing so makes sense. More importantly, it allows
>>> arbitrary metadata/framing data to be wrapped around individual objects
>>> cheaply. Right now, that’s only possible at the stream level. (There are
>>> hacks around this, but this would enable more idiomatic use in efficient
>>> shuffle implementations.)
>>>
>>
>> I don't really understand how this is different from the existing
>> SerializationStream -- probably a small example would clarify.
>>
>>
>>> Have serializers indicate whether they are deterministic. This provides
>>> much of the value of a shuffle service because it means that reducers do
>>> not need to spill to disk when reading/merging/combining inputs--the data
>>> can be grouped by the service, even without the service understanding data
>>> types or byte representations. Alternative (less preferable since it would
>>> break Java serialization, for example): require all serializers to be
>>> deterministic.
>>>
>>
>> I really don't understand this one, sorry, can you elaborate more?  I'm
>> not sure what determinism has to do with spilling to disk.  There is
>> already supportsRelocationOfSerializedObjects , though that is private,
>> which seems related but I think you're talking about something else?
>>
>> thanks,
>> Imran
>>
>>>
>
> --
> -Ben
>

Reply via email to