This is an automated email from the ASF dual-hosted git repository.
angerszhuuuu pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 7d828d8f1 [CELEBORN-645][REFACTOR] Refine logic about handle
HeartbeatFromWorkerResponse
7d828d8f1 is described below
commit 7d828d8f1f38f4bcb24d62bf13289fa7ec5d2d8e
Author: Angerszhuuuu <[email protected]>
AuthorDate: Wed Jun 7 16:34:44 2023 +0800
[CELEBORN-645][REFACTOR] Refine logic about handle
HeartbeatFromWorkerResponse
### What changes were proposed in this pull request?
Refine the logic here to make it easier understand.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1555 from AngersZhuuuu/CELEBORN-645.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Angerszhuuuu <[email protected]>
(cherry picked from commit d4cb6dd8abc230b7ad8bd7a7ec163c221deb98d2)
Signed-off-by: Angerszhuuuu <[email protected]>
---
.../scala/org/apache/celeborn/service/deploy/worker/Worker.scala | 9 +++------
1 file changed, 3 insertions(+), 6 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index c0a819d29..9a3b050ef 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -304,13 +304,10 @@ private[celeborn] class Worker(
activeShuffleKeys,
estimatedAppDiskUsage),
classOf[HeartbeatResponse])
- if (response.registered) {
- response.expiredShuffleKeys.asScala.foreach(shuffleKey =>
workerInfo.releaseSlots(shuffleKey))
- cleanTaskQueue.put(response.expiredShuffleKeys)
- } else {
+ response.expiredShuffleKeys.asScala.foreach(shuffleKey =>
workerInfo.releaseSlots(shuffleKey))
+ cleanTaskQueue.put(response.expiredShuffleKeys)
+ if (!response.registered) {
logError("Worker not registered in master, clean expired shuffle data
and register again.")
- // Clean expired shuffle.
- cleanup(response.expiredShuffleKeys)
try {
registerWithMaster()
} catch {