Hi Celeborn team,

We are implementing a Celeborn-MR3 client, and have a question on the order of chunks returned by ShuffleClient.readPartition().

--- Setup

With shuffleId, mapId, attemptId, partitionId all fixed, suppose that a mapper with mapIndex M calls ShuffleClient.pushData() several times in the following order:

  ShuffleClient.pushData(shuffleId, mapId, attemptId, partitionId, batch1, ...)
  ShuffleClient.pushData(shuffleId, mapId, attemptId, partitionId, batch2, ...)
  ShuffleClient.pushData(shuffleId, mapId, attemptId, partitionId, batch3, ...)
  ...
  ShuffleClient.pushData(shuffleId, mapId, attemptId, partitionId, batchN, ...)

Then a reducer calls ShuffleClient.readPartition() as follows:

  ShuffleClient.readPartition(shuffleId, partitionId, attemptId, M, M + 1)

--- Expectation and result

ShuffleClient.readPartition() should read batches in the same order that they are written by calls to pushData(). That is, we expect ShuffleClient.readPartition() to return:

  batch1, batch2, batch3, ..., batchN

However, the same order is not always guaranteed. For example,
ShuffleClient.readPartition() may return:

  batch3, batch1, batch2, ..., batchN

We find that each batch is written to a Celeborn chunk in its entirely, but ShuffleClient.readPartition() does not necessarily read Celeborn chunks in the same order that they are created.

--- Problem

For unordered data, the order of batches (or chunks) returned by
ShuffleClient.readPartition() does not matter.

For ordered data, however, the same order should be enforced because a mapper sorts output data before sending it to Celeborn in several batches. This is a requirement specific to Tez runtime. (I guess Spark does not depend on the order of batches because a reducer sorts all records.)

--- Quick fix

We can set celeborn.shuffle.chunk.size to a large value so that a single
chunk can accommodate all batches. However, this is not a general solution
because the max size of mapper output is unknown.

--- Questions

1. Is it a normal behavior of Celeborn that ShuffleClient.readPartition()
may not read chunks in the same odder that there are created?

We are confused because TransportClient.java says:

* <p>Multiple fetchChunk requests may be outstanding simultaneously, and the chunks are * guaranteed to be returned in the same order that they were requested, assuming only a single
   * TransportClient is used to fetch the chunks.

However, the implementation in the class WorkerPartitionReader seems to
suggest otherwise because it does not check or sort chunk indexes. (It
seems like up to celeborn.client.fetch.maxReqsInFlight chucnks are
requested at once and whichever chunk arrives first is returned right away.)

2. If this is a normal behavior of ShuffleClient.readPartition(), is there
some way to preserve the order of batches when calling
ShuffleClient.readPartition()?

Thanks,

--- Sungwoo

Reply via email to