Hi Celeborn team,
We are implementing a Celeborn-MR3 client, and have a question on the
order of chunks returned by ShuffleClient.readPartition().
--- Setup
With shuffleId, mapId, attemptId, partitionId all fixed, suppose that a
mapper with mapIndex M calls ShuffleClient.pushData() several times in the
following order:
ShuffleClient.pushData(shuffleId, mapId, attemptId, partitionId, batch1, ...)
ShuffleClient.pushData(shuffleId, mapId, attemptId, partitionId, batch2, ...)
ShuffleClient.pushData(shuffleId, mapId, attemptId, partitionId, batch3, ...)
...
ShuffleClient.pushData(shuffleId, mapId, attemptId, partitionId, batchN, ...)
Then a reducer calls ShuffleClient.readPartition() as follows:
ShuffleClient.readPartition(shuffleId, partitionId, attemptId, M, M + 1)
--- Expectation and result
ShuffleClient.readPartition() should read batches in the same order that
they are written by calls to pushData(). That is, we expect
ShuffleClient.readPartition() to return:
batch1, batch2, batch3, ..., batchN
However, the same order is not always guaranteed. For example,
ShuffleClient.readPartition() may return:
batch3, batch1, batch2, ..., batchN
We find that each batch is written to a Celeborn chunk in its entirely,
but ShuffleClient.readPartition() does not necessarily read Celeborn
chunks in the same order that they are created.
--- Problem
For unordered data, the order of batches (or chunks) returned by
ShuffleClient.readPartition() does not matter.
For ordered data, however, the same order should be enforced because a
mapper sorts output data before sending it to Celeborn in several batches.
This is a requirement specific to Tez runtime. (I guess Spark does not
depend on the order of batches because a reducer sorts all records.)
--- Quick fix
We can set celeborn.shuffle.chunk.size to a large value so that a single
chunk can accommodate all batches. However, this is not a general solution
because the max size of mapper output is unknown.
--- Questions
1. Is it a normal behavior of Celeborn that ShuffleClient.readPartition()
may not read chunks in the same odder that there are created?
We are confused because TransportClient.java says:
* <p>Multiple fetchChunk requests may be outstanding simultaneously,
and the chunks are
* guaranteed to be returned in the same order that they were requested,
assuming only a single
* TransportClient is used to fetch the chunks.
However, the implementation in the class WorkerPartitionReader seems to
suggest otherwise because it does not check or sort chunk indexes. (It
seems like up to celeborn.client.fetch.maxReqsInFlight chucnks are
requested at once and whichever chunk arrives first is returned right
away.)
2. If this is a normal behavior of ShuffleClient.readPartition(), is there
some way to preserve the order of batches when calling
ShuffleClient.readPartition()?
Thanks,
--- Sungwoo