Hello Celeborn team,

We are quite close to completing our Celeborn-MR3 client, and I have a
question on speculative execution in the context of using Celeborn.

MR3 supports speculative execution which allows several task attempts to
run concurrently. When a task attempt succeeds, all other concurrent task
attempts are interrupted and killed, so that only one task attempt commits
its output.

When using Celeborn-MR3, speculative execution sometimes seems to corrupt
data sent over to Celeborn. Below I describe a sequence of events that
produce this error. shuffleId, mapId, and partitionId are all fixed,
whereas attemptId can be either 0 or 1.

1. Task attempt #1 (with attemptId 0) starts, and calls
ShuffleClient.pushData().

2. Task attempt #1 gets stuck at the call of mapperEnd() because
ShuffleClient fails to send data to Celeborn for an unknown reason, while
repeatedly producing INFO messages like:

2023-08-19 11:52:16,119 [celeborn-retry-sender-21] INFO org.apache.celeborn.client.ShuffleClientImpl [] - Revive for push data success, new location for shuffle 1005007 map 408 attempt 0 partition 0 batch 1 is location PartitionLocation[ id-epoch:0-4 host-rpcPort-pushPort-fetchPort-replicatePort:192.168.10.103-39861-45968-46540-44091
  mode:PRIMARY
  peer:(empty)
storage hint:StorageInfo{type=MEMORY, mountPoint='UNKNOWN_DISK', finalResult=false, filePath=}
  mapIdBitMap:null].

3. As task attempt #1 does not return for a long time, the speculative
execution mechanism of MR3 kicks in and launches another task attempt #2 (with attemptId 1).

4. Task attempt #2 calls pushData() and succeeds. That is, task attempt #2
successfully pushes data to Celeborn.

5. MR3 interrupts and kills task attempt #1. When this occurs, mapperEnd()
gets interrupted and prints a message like the following:

2023-08-19 11:52:16,089 [DAG-1-5-1] WARN RuntimeTask [] - LogicalOutput.close() fails on Reducer 12 org.apache.celeborn.common.exception.CelebornIOException: sleep interrupted at org.apache.celeborn.common.write.InFlightRequestTracker.limitZeroInFlight(InFlightRequestTracker.java:155) at org.apache.celeborn.common.write.PushState.limitZeroInFlight(PushState.java:85) at org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:611) at org.apache.celeborn.client.ShuffleClientImpl.mapEndInternal(ShuffleClientImpl.java:1494) at org.apache.celeborn.client.ShuffleClientImpl.mapperEnd(ShuffleClientImpl.java:1478)

6. Now, a consumer task attempt tries to read the data pushed by task attempt #2. However, it fails to read the data sent by task attempt #2, with the following error:

java.io.IOException: Premature EOF from inputStream
  at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:212)
  at 
org.apache.tez.runtime.library.common.shuffle.RssShuffleUtils.shuffleToMemory(RssShuffleUtils.java:47)

Our implementation is quite standard:

  inputStream = rssShuffleClient.readPartition(...);
  org.apache.hadoop.io.IOUtils.readFully(inputStream, ..., dataLength);

We double-checked the parameter dataLength and found that it was correctly
set to the size of data pushed by task attempt #2.

I have two questions:

1) In the context of using Celeborn, does our implementation violate the
usage of Celeborn-API? For example, should we prohibit speculative
execution because Celeborn requires only one task attempt to call
pushData() at any point of time?

2) If speculative execution is not allowed, how can we quickly fail
a task attempt stuck at mapperEnd()? By default, it seems like ShufflClient waits for 1200 seconds, not the defaul value of 120 seconds:

2023-08-19 11:51:32,159 [DAG-1-5-1] ERROR org.apache.celeborn.common.write.InFlightRequestTracker [] - After waiting for 1200000 ms, there are still 1 batches in flight for hostAndPushPort [192.168.10.106:38993], which exceeds the current limit 0.

Thanks a lot,

--- Sungwoo

Reply via email to