Hello Keyong,
We have implemented the previous plan to generate a fresh shufflId when
failing to fetch data from Celeborn workers. Thanks for your comment.
While testing Celeborn-MR3 with task re-execution (or stage resubmission),
I have noticed that shuffleId does not change its associated set of
Celeborn workers. Specifically, pushData() for a specific shuffleId always
fails if any of its associated Celeborn workers is killed. Our experiment
goes as follows:
1. Execute a long-running query.
2. While the query is running (many mappers have succeeded and some
reducers are still running), kill 2 Celeborn workers out of 12.
3. A task fails after three attempts with this stack trace:
2023-10-08 05:23:43,814 [DAG-2-23-2] ERROR
org.apache.celeborn.client.ShuffleClientImpl [] - Exception raised while
pushing data for shuffle 25 map 113 attempt 3 partition 0 batch 1 location
PartitionLocation[
id-epoch:0-0
host-rpcPort-pushPort-fetchPort-replicatePort:192.168.10.113-45109-37140-45752-40163
mode:PRIMARY
peer:(empty)
storage hint:StorageInfo{type=MEMORY, mountPoint='/data3',
finalResult=false, filePath=}
mapIdBitMap:null].
org.apache.celeborn.common.exception.CelebornIOException: Failed to
connect to /192.168.10.113:37140
at
org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:246)
~[celeborn-client-mr3-1.8-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
at
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:190)
~[celeborn-client-mr3-1.8-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
at
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:116)
~[celeborn-client-mr3-1.8-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
at
org.apache.celeborn.client.ShuffleClientImpl.pushOrMergeData(ShuffleClientImpl.java:1079)
~[celeborn-client-mr3-1.8-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
at
org.apache.celeborn.client.ShuffleClientImpl.pushData(ShuffleClientImpl.java:1162)
~[celeborn-client-mr3-1.8-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
at
org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter.doFinalMerge(PipelinedSorter.java:979)
~[tez-runtime-library-0.9.1.mr3.1.0.jar:0.9.1.mr3.1.0]
So, does this mean that if pushData() fails with this exception (after
some Celeborn workers are lost):
org.apache.celeborn.common.exception.CelebornIOException: Failed to connect
to /192.168.10.113:37140
we should stop using the current shuffleId and use a fresh shuffleId
instead? (I guess the same phenomenon will occur for stage resubmission in
Spark.)
Or, generating a fresh shuffleId does not help because it may still use
those lost Celeborn workers?
Thanks,
--- Sungwoo
On Sat, 7 Oct 2023, Keyong Zhou wrote:
Hi Sungwoo,
Sorry for the late reply. Reusing a committed shuffleId does not work in
current architecture, even after calling unregisterShuffle in
LifecycleManager,
because the cleanup of metadata is delayed and not guaranteed. It will
be more complicated when we consider graceful restart of workers.
If we want to reuse the shuffleId, we need to redesign the whole picture.
Thanks,
Keyong Zhou
Sungwoo Park <o...@pl.postech.ac.kr> 于2023年10月2日周一 13:23?道:
Hi Keyong,
Instead of picking up a new shuffleId, can we reuse an existing shuffleId
after unregistering it? If the following plan worked, it would further
simplify the implementation:
1. Downstream tasks fail because of read failures.
2. All active downstream tasks are killed, so the shuffleId is not used.
3. An upstream vertex unregisters the shuffleId.
4. The upstream vertex is re-executed normally. This re-execution
automatically registers the same shuffleId.
In summary, we would like to go back in time before the upstream vertex
started by cleaning up the shuffleId. Could you please give a comment on
this plan?
Thank you,
--- Sungwoo
On Sat, 30 Sep 2023, Keyong Zhou wrote:
Hi Sungwoo,
I think your approach works with current architecture of Celeborn,
and interpreting IOException when reading as read failure makes
sense. Currently only when CommitFiles fails will LifecycleManager
announce data lost.
Thanks,
Keyong Zhou
Sungwoo Park <o...@pl.postech.ac.kr> 于2023年9月29日周五 22:05?道:
Since the partition split has a good chance to contain data from almost
all
upstream
mapper tasks, the cost of re-computing all upstream tasks may have
little
difference
to re-computing the actual mapper tasks in most cases. Of course it's
not
always true.
To change from 'complete' to 'incomplete' also needs to refactor
Worker's
logic, which
currently assumes that the succeeded attempts will not be changed after
final committing
files.
a subset of succeeded attempts. In Erik's proposal, the whole upstream
stage will be rerun when data lost.
Thank you for your response --- things are now much clearer.
From your comments shown above, let me assume that:
1. The whole upstream stage is rerun in the case of read failure.
2. Currently it is not easy to change the state of a shuffleId from
'complete' to 'incomplete'.
Then, for Celeborn-MR3, I would like to experiment with the following
approach:
1. If read failures occur for shuffleId #1, we pick up a new shuffleId
#2.
2. The upstream stage (or Vertex in the case of MR3) re-executes all
tasks
again, but writes the output to shuffleId #2.
3. Tasks in the downstream stage re-try by reading from shuffleId #2.
Do you think this approach makes sense under the current architecture of
Celeborn? If this approach is feasible, MR3 only needs to be notified of
read failures due to lost data by Celeborn ShuffleClient. Or, we could
just interpret IOException from Celeborn ShuffleClient as read failures,
in which case we can implement stage recompute without requiring any
extension of Celeborn. (However, it would be great if Celeborn
ShuffleClient could announce lost data explicitly.)
An industrial user of Hive-MR3-Celeborn is trying hard to save disk
usage
on Celeborn workers (which use SSDs of limited capacity), so stage
recompute would be a great new feature to them.
Thank you,
--- Sungwoo