Hi Binjie, Ethan!

Sorry for the late reply! I've been diving into current Celeborn
architecture and working out a plausible solution for shuffle spills.

Developing a performant general remote store for spill data is a complex
task - it's hard to achieve the same latency on DFS/S3 as on local SSD. I'm
trying to concentrate on shuffle spills only for now, but overall we want
to have small disks on our compute nodes and might need to have a
workaround.

On the sorting approach - afaik, PartitionFilesSorter is used to sort by
mapper id for AQE support, effectively locating mapper data sequentially in
the partition location file. It doesn't have to decompress the data, since
sorting elements are batches and could be sorted in compressed form.

I believe we would need something close to PartitionFilesSorter, but for
our purposes of having a partition sorted by ordering key, we would have to
decompress and also extract (or provide) the ordering key in binary
sortable format and overwrite the partition location file. I'm thinking of
doing that after the partition location file splits. In this case K-Way
merge would still have to be present on the reducer side, since partition
location files are not sorted between themselves.

I'm still shaping out the design/proposal at this point and planning to
share and get more feedback soon.

On Fri, Aug 30, 2024 at 2:45 AM Ethan Feng <ethanf...@apache.org> wrote:

> Hello Aidar,
>
> Thank you for sharing your insights on minimizing disk spills
> attributed to external sorting. I've grasped the concept and am keen
> to delve deeper into this proposal. Both our MapReduce plugin and TEZ
> plugin stand to benefit significantly from such an enhancement.
>
> I've identified a component called "PartitionFilesSorter" that has the
> potential to facilitate the sorting of shuffle files directly on disk,
> bypassing the conventional K-way merge process. This approach holds
> promise in optimizing our operations.
>
> To ensure compatibility across multiple engines, implementing an
> interface for sorting serialized values becomes imperative. A possible
> signature for this interface might resemble:
>
> int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
>
> This would allow for flexible and engine-agnostic sorting
> capabilities. I'm looking forward to seeing the design.
>
> Thanks,
> Ethan Feng
>
> Binjie Yang <binjiey...@apache.org> 于2024年8月30日周五 17:27写道:
> >
> > Hi Aidar,
> > This feature sounds great and can solve disk problems in many scenarios.
> >
> > I have some small questions about the details. If it's a spill in a
> > non-shuffle scenario, will celeborn cluster also be responsible for
> > the corresponding spill data?
> >
> > Overall, sort the reduce partition files on the celeborn worker looks
> > good, looking forward to seeing the overall design and the CIP.
> >
> > Thanks,
> > Binjie Yang
> >
> > Aidar Bariev <ai...@stripe.com.invalid> 于2024年8月29日周四 05:39写道:
> > >
> > > Hey Celeborn community!
> > >
> > > We (Stripe) are discovering the possibility of reducing the amount of
> disk
> > > spills due to external sorting on the reducer side and trying to find a
> > > partial solution to
> https://issues.apache.org/jira/browse/CELEBORN-1435
> > > issue.
> > >
> > > Currently, the idea that we have is to sort the ReducePartition file
> on the
> > > Celeborn worker at the write time and use K-way merge to further sort
> the
> > > data on the reducers. It would still require spilling some data back to
> > > Celeborn in case the number of ReducePartition files is high and we
> need
> > > multiple passes of external merge/sort.
> > >
> > > This description might seem pretty generic, but I'm curious to hear
> > > opinions before starting to draft the CIP! It would be nice to see
> whether
> > > there have been previous thoughts and interest in reducing disk usage
> on
> > > executors other than the jira ticket above.
>

Reply via email to