Enabling fully disaggregated shuffle on Spark

2019-11-15 Thread Ben Sidhom
I would like to start a conversation about extending the Spark shuffle
manager surface to support fully disaggregated shuffle implementations.
This is closely related to the work in SPARK-25299
, which is focused on
refactoring the shuffle manager API (and in particular, SortShuffleManager)
to use a pluggable storage backend. The motivation for that SPIP is further
enabling Spark on Kubernetes.


The motivation for this proposal is enabling full externalized
(disaggregated) shuffle service implementations. (Facebook’s Cosco shuffle

is one example of such a disaggregated shuffle service.) These changes
allow the bulk of the shuffle to run in a remote service such that minimal
state resides in executors and local disk spill is minimized. The net
effect is increased job stability and performance improvements in certain
scenarios. These changes should work well with or are complementary to
SPARK-25299. Some or all points may be merged into that issue as
appropriate.


Below is a description of each component of this proposal. These changes
can ideally be introduced incrementally. I would like to gather feedback
and gauge interest from others in the community to collaborate on this.
There are likely more points that would  be useful to disaggregated shuffle
services. We can outline a more concrete plan after gathering enough input.
A working session could help us kick off this joint effort; maybe something
in the mid-January to mid-February timeframe (depending on interest and
availability. I’m happy to host at our Sunnyvale, CA offices.


ProposalScheduling and re-executing tasks

Allow coordination between the service and the Spark DAG scheduler as to
whether a given block/partition needs to be recomputed when a task fails or
when shuffle block data cannot be read. Having such coordination is
important, e.g., for suppressing recomputation after aborted executors or
for forcing late recomputation if the service internally acts as a cache.
One catchall solution is to have the shuffle manager provide an indication
of whether shuffle data is external to executors (or nodes). Another
option: allow the shuffle manager (likely on the driver) to be queried for
the existence of shuffle data for a given executor ID (or perhaps map task,
reduce task, etc). Note that this is at the level of data the scheduler is
aware of (i.e., map/reduce partitions) rather than block IDs, which are
internal details for some shuffle managers.
ShuffleManager API

Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
service knows that data is still active. This is one way to enable
time-/job-scoped data because a disaggregated shuffle service cannot rely
on robust communication with Spark and in general has a distinct lifecycle
from the Spark deployment(s) it talks to. This would likely take the form
of a callback on ShuffleManager itself, but there are other approaches.


Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle
connections/streams/file handles as well as provide commit semantics).
SPARK-25299 adds commit semantics to the internal data storage layer, but
this is applicable to all shuffle managers at a higher level and should
apply equally to the ShuffleWriter.


Do not require ShuffleManagers to expose ShuffleBlockResolvers where they
are not needed. Ideally, this would be an implementation detail of the
shuffle manager itself. If there is substantial overlap between the
SortShuffleManager and other implementations, then the storage details can
be abstracted at the appropriate level. (SPARK-25299 does not currently
change this.)


Do not require MapStatus to include blockmanager IDs where they are not
relevant. This is captured by ShuffleBlockInfo

including an optional BlockManagerId in SPARK-25299. However, this change
should be lifted to the MapStatus level so that it applies to all
ShuffleManagers. Alternatively, use a more general data-location
abstraction than BlockManagerId. This gives the shuffle manager more
flexibility and the scheduler more information with respect to data
residence.
Serialization

Allow serializers to be used more flexibly and efficiently. For example,
have serializers support writing an arbitrary number of objects into an
existing OutputStream or ByteBuffer. This enables objects to be serialized
to direct buffers where doing so makes sense. More importantly, it allows
arbitrary metadata/framing data to be wrapped around individual objects
cheaply. Right now, that’s only possible at the stream level. (There are
hacks around this, but this would enable more idiomatic use in efficient
shuffle implementations.)


Have serializers indicate whether they are deterministic. This provides
much of the value of a shuffle 

Re: Use Hadoop-3.2 as a default Hadoop profile in 3.0.0?

2019-11-15 Thread Cheng Lian
Cc Yuming, Steve, and Dongjoon

On Fri, Nov 15, 2019 at 10:37 AM Cheng Lian  wrote:

> Similar to Xiao, my major concern about making Hadoop 3.2 the default
> Hadoop version is quality control. The current hadoop-3.2 profile covers
> too many major component upgrades, i.e.:
>
>- Hadoop 3.2
>- Hive 2.3
>- JDK 11
>
> We have already found and fixed some feature and performance regressions
> related to these upgrades. Empirically, I’m not surprised at all if more
> regressions are lurking somewhere. On the other hand, we do want help from
> the community to help us to evaluate and stabilize these new changes.
> Following that, I’d like to propose:
>
>1.
>
>Introduce a new profile hive-2.3 to enable (hopefully) less risky
>Hadoop/Hive/JDK version combinations.
>
>This new profile allows us to decouple Hive 2.3 from the hadoop-3.2
>profile, so that users may try out some less risky Hadoop/Hive/JDK
>combinations: if you only want Hive 2.3 and/or JDK 11, you don’t need to
>face potential regressions introduced by the Hadoop 3.2 upgrade.
>
>Yuming Wang has already sent out PR #26533
> to exercise the Hadoop
>2.7 + Hive 2.3 + JDK 11 combination (this PR does not have the hive-2.3
>profile yet), and the result looks promising: the Kafka streaming and Arrow
>related test failures should be irrelevant to the topic discussed here.
>
>After decoupling Hive 2.3 and Hadoop 3.2, I don’t think it makes a lot
>of difference between having Hadoop 2.7 or Hadoop 3.2 as the default Hadoop
>version. For users who are still using Hadoop 2.x in production, they will
>have to use a hadoop-provided prebuilt package or build Spark 3.0
>against their own 2.x version anyway. It does make a difference for cloud
>users who don’t use Hadoop at all, though. And this probably also helps to
>stabilize the Hadoop 3.2 code path faster since our PR builder will
>exercise it regularly.
>2.
>
>Defer Hadoop 2.x upgrade to Spark 3.1+
>
>I personally do want to bump our Hadoop 2.x version to 2.9 or even
>2.10. Steve has already stated the benefits very well. My worry here is
>still quality control: Spark 3.0 has already had tons of changes and major
>component version upgrades that are subject to all kinds of known and
>hidden regressions. Having Hadoop 2.7 there provides us a safety net, since
>it’s proven to be stable. To me, it’s much less risky to upgrade Hadoop 2.7
>to 2.9/2.10 after we stabilize the Hadoop 3.2/Hive 2.3 combinations in the
>next 1 or 2 Spark 3.x releases.
>
> Cheng
>
> On Mon, Nov 4, 2019 at 11:24 AM Koert Kuipers  wrote:
>
>> i get that cdh and hdp backport a lot and in that way left 2.7 behind.
>> but they kept the public apis stable at the 2.7 level, because thats kind
>> of the point. arent those the hadoop apis spark uses?
>>
>> On Mon, Nov 4, 2019 at 10:07 AM Steve Loughran
>>  wrote:
>>
>>>
>>>
>>> On Mon, Nov 4, 2019 at 12:39 AM Nicholas Chammas <
>>> nicholas.cham...@gmail.com> wrote:
>>>
 On Fri, Nov 1, 2019 at 8:41 AM Steve Loughran
  wrote:

> It would be really good if the spark distributions shipped with later
> versions of the hadoop artifacts.
>

 I second this. If we need to keep a Hadoop 2.x profile around, why not
 make it Hadoop 2.8 or something newer?

>>>
>>> go for 2.9
>>>

 Koert Kuipers  wrote:

> given that latest hdp 2.x is still hadoop 2.7 bumping hadoop 2 profile
> to latest would probably be an issue for us.


 When was the last time HDP 2.x bumped their minor version of Hadoop? Do
 we want to wait for them to bump to Hadoop 2.8 before we do the same?

>>>
>>> The internal builds of CDH and HDP are not those of ASF 2.7.x. A really
>>> large proportion of the later branch-2 patches are backported. 2,7 was left
>>> behind a long time ago
>>>
>>>
>>>
>>>
>>


Re: Use Hadoop-3.2 as a default Hadoop profile in 3.0.0?

2019-11-15 Thread Cheng Lian
Similar to Xiao, my major concern about making Hadoop 3.2 the default
Hadoop version is quality control. The current hadoop-3.2 profile covers
too many major component upgrades, i.e.:

   - Hadoop 3.2
   - Hive 2.3
   - JDK 11

We have already found and fixed some feature and performance regressions
related to these upgrades. Empirically, I’m not surprised at all if more
regressions are lurking somewhere. On the other hand, we do want help from
the community to help us to evaluate and stabilize these new changes.
Following that, I’d like to propose:

   1.

   Introduce a new profile hive-2.3 to enable (hopefully) less risky
   Hadoop/Hive/JDK version combinations.

   This new profile allows us to decouple Hive 2.3 from the hadoop-3.2
   profile, so that users may try out some less risky Hadoop/Hive/JDK
   combinations: if you only want Hive 2.3 and/or JDK 11, you don’t need to
   face potential regressions introduced by the Hadoop 3.2 upgrade.

   Yuming Wang has already sent out PR #26533
    to exercise the Hadoop 2.7
   + Hive 2.3 + JDK 11 combination (this PR does not have the hive-2.3
   profile yet), and the result looks promising: the Kafka streaming and Arrow
   related test failures should be irrelevant to the topic discussed here.

   After decoupling Hive 2.3 and Hadoop 3.2, I don’t think it makes a lot
   of difference between having Hadoop 2.7 or Hadoop 3.2 as the default Hadoop
   version. For users who are still using Hadoop 2.x in production, they will
   have to use a hadoop-provided prebuilt package or build Spark 3.0
   against their own 2.x version anyway. It does make a difference for cloud
   users who don’t use Hadoop at all, though. And this probably also helps to
   stabilize the Hadoop 3.2 code path faster since our PR builder will
   exercise it regularly.
   2.

   Defer Hadoop 2.x upgrade to Spark 3.1+

   I personally do want to bump our Hadoop 2.x version to 2.9 or even 2.10.
   Steve has already stated the benefits very well. My worry here is still
   quality control: Spark 3.0 has already had tons of changes and major
   component version upgrades that are subject to all kinds of known and
   hidden regressions. Having Hadoop 2.7 there provides us a safety net, since
   it’s proven to be stable. To me, it’s much less risky to upgrade Hadoop 2.7
   to 2.9/2.10 after we stabilize the Hadoop 3.2/Hive 2.3 combinations in the
   next 1 or 2 Spark 3.x releases.

Cheng

On Mon, Nov 4, 2019 at 11:24 AM Koert Kuipers  wrote:

> i get that cdh and hdp backport a lot and in that way left 2.7 behind. but
> they kept the public apis stable at the 2.7 level, because thats kind of
> the point. arent those the hadoop apis spark uses?
>
> On Mon, Nov 4, 2019 at 10:07 AM Steve Loughran 
> wrote:
>
>>
>>
>> On Mon, Nov 4, 2019 at 12:39 AM Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> On Fri, Nov 1, 2019 at 8:41 AM Steve Loughran
>>>  wrote:
>>>
 It would be really good if the spark distributions shipped with later
 versions of the hadoop artifacts.

>>>
>>> I second this. If we need to keep a Hadoop 2.x profile around, why not
>>> make it Hadoop 2.8 or something newer?
>>>
>>
>> go for 2.9
>>
>>>
>>> Koert Kuipers  wrote:
>>>
 given that latest hdp 2.x is still hadoop 2.7 bumping hadoop 2 profile
 to latest would probably be an issue for us.
>>>
>>>
>>> When was the last time HDP 2.x bumped their minor version of Hadoop? Do
>>> we want to wait for them to bump to Hadoop 2.8 before we do the same?
>>>
>>
>> The internal builds of CDH and HDP are not those of ASF 2.7.x. A really
>> large proportion of the later branch-2 patches are backported. 2,7 was left
>> behind a long time ago
>>
>>
>>
>>
>


Re: Adding JIRA ID as the prefix for the test case name

2019-11-15 Thread Steve Loughran
 Junit5: Display names.

Goes all the way to the XML.

https://junit.org/junit5/docs/current/user-guide/#writing-tests-display-names

On Thu, Nov 14, 2019 at 6:13 PM Shixiong(Ryan) Zhu 
wrote:

> Should we also add a guideline for non Scala tests? Other languages (Java,
> Python, R) don't support using string as a test name.
>
> Best Regards,
> Ryan
>
>
> On Thu, Nov 14, 2019 at 4:04 AM Hyukjin Kwon  wrote:
>
>> I opened a PR - https://github.com/apache/spark-website/pull/231
>>
>> 2019년 11월 13일 (수) 오전 10:43, Hyukjin Kwon 님이 작성:
>>
>>> > In general a test should be self descriptive and I don't think we
>>> should be adding JIRA ticket references wholesale. Any action that the
>>> reader has to take to understand why a test was introduced is one too many.
>>> However in some cases the thing we are trying to test is very subtle and in
>>> that case a reference to a JIRA ticket might be useful, I do still feel
>>> that this should be a backstop and that properly documenting your tests is
>>> a much better way of dealing with this.
>>>
>>> Yeah, the test should be self-descriptive. I don't think adding a JIRA
>>> prefix harms this point. Probably I should add this sentence in the
>>> guidelines as well.
>>> Adding a JIRA prefix just adds one extra hint to track down details. I
>>> think it's fine to stick to this practice and make it simpler and clear to
>>> follow.
>>>
>>> > 1. what if multiple JIRA IDs relating to the same test? we just take
>>> the very first JIRA ID?
>>> Ideally one JIRA should describe one issue and one PR should fix one
>>> JIRA with a dedicated test.
>>> Yeah, I think I would take the very first JIRA ID.
>>>
>>> > 2. are we going to have a full scan of all existing tests and attach a
>>> JIRA ID to it?
>>> Yea, let's don't do this.
>>>
>>> > It's a nice-to-have, not super essential, just because ...
>>> It's been asked multiple times and each committer seems having a
>>> different understanding on this.
>>> It's not a biggie but wanted to make it clear and conclude this.
>>>
>>> > I'd add this only when a test specifically targets a certain issue.
>>> Yes, so this one I am not sure. From what I heard, people adds the JIRA
>>> in cases below:
>>>
>>> - Whenever the JIRA type is a bug
>>> - When a PR adds a couple of tests
>>> - Only when a test specifically targets a certain issue.
>>> - ...
>>>
>>> Which one do we prefer and simpler to follow?
>>>
>>> Or I can combine as below (im gonna reword when I actually document
>>> this):
>>> 1. In general, we should add a JIRA ID as prefix of a test when a PR
>>> targets to fix a specific issue.
>>> In practice, it usually happens when a JIRA type is a bug or a PR
>>> adds a couple of tests.
>>> 2. Uses "SPARK-: test name" format
>>>
>>> If we have no objection with ^, let me go with this.
>>>
>>> 2019년 11월 13일 (수) 오전 8:14, Sean Owen 님이 작성:
>>>
 Let's suggest "SPARK-12345:" but not go back and change a bunch of test
 cases.
 I'd add this only when a test specifically targets a certain issue.
 It's a nice-to-have, not super essential, just because in the rare
 case you need to understand why a test asserts something, you can go
 back and find what added it in the git history without much trouble.

 On Mon, Nov 11, 2019 at 10:46 AM Hyukjin Kwon 
 wrote:
 >
 > Hi all,
 >
 > Maybe it's not a big deal but it brought some confusions time to time
 into Spark dev and community. I think it's time to discuss about when/which
 format to add a JIRA ID as a prefix for the test case name in Scala test
 cases.
 >
 > Currently we have many test case names with prefixes as below:
 >
 > test("SPARK-X blah blah")
 > test("SPARK-X: blah blah")
 > test("SPARK-X - blah blah")
 > test("[SPARK-X] blah blah")
 > …
 >
 > It is a good practice to have the JIRA ID in general because, for
 instance,
 > it makes us put less efforts to track commit histories (or even when
 the files
 > are totally moved), or to track related information of tests failed.
 > Considering Spark's getting big, I think it's good to document.
 >
 > I would like to suggest this and document it in our guideline:
 >
 > 1. Add a prefix into a test name when a PR adds a couple of tests.
 > 2. Uses "SPARK-: test name" format which is used in our code base
 most
 >   often[1].
 >
 > We should make it simple and clear but closer to the actual practice.
 So, I would like to listen to what other people think. I would appreciate
 if you guys give some feedback about when to add the JIRA prefix. One
 alternative is that, we only add the prefix when the JIRA's type is bug.
 >
 > [1]
 > git grep -E 'test\("\SPARK-([0-9]+):' | wc -l
 >  923
 > git grep -E 'test\("\SPARK-([0-9]+) ' | wc -l
 >  477
 > git grep -E 'test\("\[SPARK-([0-9]+)\]' | wc -l
 >   16
 > git grep 

Re: Ask for ARM CI for spark

2019-11-15 Thread bo zhaobo
Hi @Sean Owen  ,

Thanks for your idea.

We may use the bad words to describe our request. That's true that we
cannot just say "Spark support ARM from release 3.0.0", and we also cannot
say the past releases cannot run on ARM. But the reality is the past
releases didn't get a fully test on ARM like the current testing we do. And
that's true that current CI system have no resources can fit this kind
request(test on ARM).

And please try to think, if a user wants to run lastest Spark release on
ARM(even the old releases), but community doesn't say that the specific
Spark release get testing on ARM. I think the users might think there is a
risk run on ARM, if he/she has no choice, they have to run spark on ARM,
they will build the CI system by themselves. That's very expensive. Right?
But now, community will do the same testing on ARM in the upstream, this
will save the users' resources. That's the reason announcing by community
in some ways is official and the best. Such as "In XXX release, Spark gets
fully testing on ARM" or "In XXX release, Spark community integrated an ARM
CI system. ". Once user see that, he/she would be very comfortable to use
Spark on ARM. ;-)

Thanks for your paitent, we just discuss here, if I do something not good,
please feel free to correct and discuss. ;-)

Thanks,

BR

ZhaoBo




[image: Mailtrack]

Sender
notified by
Mailtrack

19/11/15
下午05:43:57

Sean Owen  于2019年11月15日周五 下午5:04写道:

> I don't think that's true either, not yet. Being JVM-based with no
> native code, I just don't even think it would be common to assume it
> doesn't work and it apparently has. If you want to announce it, that's
> up to you.
>
> On Fri, Nov 15, 2019 at 3:01 AM Tianhua huang 
> wrote:
> >
> > @Sean Owen,
> > Thanks for attention this.
> > I agree with you, it's probably not very appropriate to say 'support arm
> from 3.0 release'. How about change to the word "Spark community supports
> fully tests on arm from 3.0 release"?
> > Let's try to think about it from the user's point of view than
> developer,users have to know exactly whether spark supports arm well and
> wheter spark fully tests on arm. If we specify spark is fully tests on arm,
> I believe users will have much more confidence to run spark on arm.
> >
>


Re: Ask for ARM CI for spark

2019-11-15 Thread Tianhua huang
@Sean Owen,
Thanks for attention this.
I agree with you, it's probably not very appropriate to say 'support arm
from 3.0 release'. How about change to the word "Spark community supports
fully tests on arm from 3.0 release"?
Let's try to think about it from the user's point of view than
developer,users have to know exactly whether spark supports arm well and
wheter spark fully tests on arm. If we specify spark is fully tests on arm,
I believe users will have much more confidence to run spark on arm.

On Fri, Nov 15, 2019 at 4:05 PM Sean Owen  wrote:

> I'm not against it, but the JIRAs will already show that the small
> ARM-related difference like floating-point in log() were resolved.
> Those aren't major enough to highlight as key changes in the 2000+
> resolved. it didn't really not-work before either, as I understand;
> Spark isn't specific to an architecture, so I don't know if that
> situation changed materially in 3.0; it still otherwise ran in 2.x on
> ARM right? It would imply people couldn't use it on ARM previously.
> You can certainly announce you endorse 3.0 as a good release for ARM
> and/or call attention to it on user@.
>
> On Thu, Nov 14, 2019 at 9:01 PM bo zhaobo 
> wrote:
> >
> > Hi @Sean Owen ,
> >
> > Thanks for reply. We know that Spark community has own release date and
> plan. We are happy to follow Spark community. But we think it's great if
> community could add a sentence into the next releasenotes and claim "Spark
> can support Arm from this release." after we finish the test work on ARM.
> That's all. We just want a community official caliber that spark support
> ARM for attracting more users to use spark.
> >
>


Re: Ask for ARM CI for spark

2019-11-15 Thread Sean Owen
I'm not against it, but the JIRAs will already show that the small
ARM-related difference like floating-point in log() were resolved.
Those aren't major enough to highlight as key changes in the 2000+
resolved. it didn't really not-work before either, as I understand;
Spark isn't specific to an architecture, so I don't know if that
situation changed materially in 3.0; it still otherwise ran in 2.x on
ARM right? It would imply people couldn't use it on ARM previously.
You can certainly announce you endorse 3.0 as a good release for ARM
and/or call attention to it on user@.

On Thu, Nov 14, 2019 at 9:01 PM bo zhaobo  wrote:
>
> Hi @Sean Owen ,
>
> Thanks for reply. We know that Spark community has own release date and plan. 
> We are happy to follow Spark community. But we think it's great if community 
> could add a sentence into the next releasenotes and claim "Spark can support 
> Arm from this release." after we finish the test work on ARM. That's all. We 
> just want a community official caliber that spark support ARM for attracting 
> more users to use spark.
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org