This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 49ea88103 [MINOR] Remove unnecessary increment index of
Master#timeoutDeadWorkers
49ea88103 is described below
commit 49ea8810376ceb387de0ef855b7e9205101c3958
Author: SteNicholas <[email protected]>
AuthorDate: Mon Oct 23 22:18:39 2023 +0800
[MINOR] Remove unnecessary increment index of Master#timeoutDeadWorkers
### What changes were proposed in this pull request?
Remove unnecessary increment index of `Master#timeoutDeadWorkers`.
### Why are the changes needed?
Increment index of `Master#timeoutDeadWorkers` is unnecessary.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes #2027 from SteNicholas/timeout-dead-workers.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../shuffle/celeborn/CelebornShuffleReader.scala | 2 +-
.../shuffle/celeborn/CelebornShuffleReader.scala | 2 +-
.../org/apache/celeborn/client/CommitManager.scala | 2 +-
.../celeborn/client/ShuffleClientHelper.scala | 4 ++--
.../celeborn/client/WorkerStatusTrackerSuite.scala | 8 +++----
.../common/identity/DefaultIdentityProvider.scala | 2 +-
.../celeborn/service/deploy/master/Master.scala | 14 +++++------
.../service/deploy/worker/PushDataHandler.scala | 2 +-
.../deploy/worker/storage/StorageManager.scala | 2 +-
.../service/deploy/memory/MemoryManagerSuite.scala | 28 +++++++++++-----------
10 files changed, 32 insertions(+), 34 deletions(-)
diff --git
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index c26a0280b..7518b1e6b 100644
---
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -75,7 +75,7 @@ class CelebornShuffleReader[K, C](
streamCreatorPool = ThreadUtils.newDaemonCachedThreadPool(
"celeborn-create-stream-thread",
conf.readStreamCreatorPoolThreads,
- 60);
+ 60)
}
}
}
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 6a0ebedc9..063ad0b6f 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -77,7 +77,7 @@ class CelebornShuffleReader[K, C](
streamCreatorPool = ThreadUtils.newDaemonCachedThreadPool(
"celeborn-create-stream-thread",
conf.readStreamCreatorPoolThreads,
- 60);
+ 60)
}
}
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
index 0baacd10e..1322f02d8 100644
--- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
@@ -191,7 +191,7 @@ class CommitManager(appUniqueId: String, val conf:
CelebornConf, lifecycleManage
new AtomicInteger(),
JavaUtils.newConcurrentHashMap[Int, AtomicInteger]()))
- getCommitHandler(shuffleId).registerShuffle(shuffleId, numMappers);
+ getCommitHandler(shuffleId).registerShuffle(shuffleId, numMappers)
}
def isMapperEnded(shuffleId: Int, mapId: Int): Boolean = {
diff --git
a/client/src/main/scala/org/apache/celeborn/client/ShuffleClientHelper.scala
b/client/src/main/scala/org/apache/celeborn/client/ShuffleClientHelper.scala
index f8a69468c..b94ba37c7 100644
--- a/client/src/main/scala/org/apache/celeborn/client/ShuffleClientHelper.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/ShuffleClientHelper.scala
@@ -52,7 +52,7 @@ object ShuffleClientHelper extends Logging {
logInfo(s"Stage ended for $shuffleId")
} else {
logInfo(s"split failed for $respStatus, " +
- s"shuffle file can be larger than expected, try split again");
+ s"shuffle file can be larger than expected, try split again")
}
splittingSet.remove(partitionId)
case Failure(exception) =>
@@ -60,7 +60,7 @@ object ShuffleClientHelper extends Logging {
logWarning(
s"Shuffle file split failed for map ${shuffleId} partitionId
${partitionId}," +
s" try again, detail : {}",
- exception);
+ exception)
}(concurrent.ExecutionContext.fromExecutorService(executors))
}
diff --git
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
index d060d7108..5d3355a33 100644
---
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
+++
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
@@ -32,12 +32,12 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
test("handleHeartbeatResponse") {
val celebornConf = new CelebornConf()
- celebornConf.set(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT, 2000L);
+ celebornConf.set(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT, 2000L)
val statusTracker = new WorkerStatusTracker(celebornConf, null)
val registerTime = System.currentTimeMillis()
- statusTracker.excludedWorkers.put(mock("host1"),
(StatusCode.WORKER_UNKNOWN, registerTime));
- statusTracker.excludedWorkers.put(mock("host2"),
(StatusCode.WORKER_SHUTDOWN, registerTime));
+ statusTracker.excludedWorkers.put(mock("host1"),
(StatusCode.WORKER_UNKNOWN, registerTime))
+ statusTracker.excludedWorkers.put(mock("host2"),
(StatusCode.WORKER_SHUTDOWN, registerTime))
// test reserve (only statusCode list in handleHeartbeatResponse)
val empty = buildResponse(Array.empty, Array.empty, Array.empty)
@@ -105,6 +105,6 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
}
private def mock(host: String): WorkerInfo = {
- new WorkerInfo(host, -1, -1, -1, -1);
+ new WorkerInfo(host, -1, -1, -1, -1)
}
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/identity/DefaultIdentityProvider.scala
b/common/src/main/scala/org/apache/celeborn/common/identity/DefaultIdentityProvider.scala
index 7da111396..76b89b331 100644
---
a/common/src/main/scala/org/apache/celeborn/common/identity/DefaultIdentityProvider.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/identity/DefaultIdentityProvider.scala
@@ -21,7 +21,7 @@ import org.apache.celeborn.common.CelebornConf
class DefaultIdentityProvider extends IdentityProvider {
override def provide(): UserIdentifier = {
- val conf = new CelebornConf();
+ val conf = new CelebornConf()
UserIdentifier(
conf.quotaUserSpecificTenant,
conf.quotaUserSpecificUserName)
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 7493fbcb9..a94989dd6 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -427,14 +427,13 @@ private[celeborn] class Master(
executeWithLeaderChecker(context, handleCheckWorkersAvailable(context))
}
- private def timeoutDeadWorkers() {
+ private def timeoutDeadWorkers(): Unit = {
val currentTime = System.currentTimeMillis()
// Need increase timeout deadline to avoid long time leader election period
if (HAHelper.getWorkerTimeoutDeadline(statusSystem) > currentTime) {
return
}
- var ind = 0
workersSnapShot.asScala.foreach { worker =>
if (worker.lastHeartbeat < currentTime - workerHeartbeatTimeoutMs
&& !statusSystem.workerLostEvents.contains(worker)) {
@@ -448,7 +447,6 @@ private[celeborn] class Master(
worker.replicatePort,
MasterClient.genRequestId()))
}
- ind += 1
}
}
@@ -467,7 +465,7 @@ private[celeborn] class Master(
logDebug(s"Remove unavailable info for workers:
$unavailableInfoTimeoutWorkers")
self.send(RemoveWorkersUnavailableInfo(
unavailableInfoTimeoutWorkers,
- MasterClient.genRequestId()));
+ MasterClient.genRequestId()))
}
}
@@ -766,7 +764,7 @@ private[celeborn] class Master(
}
val hdfsWorkPath = new Path(conf.hdfsDir, conf.workerWorkingDir)
if (hadoopFs.exists(hdfsWorkPath)) {
- if (!expiredDir.isEmpty) {
+ if (expiredDir.nonEmpty) {
val dirToDelete = new Path(hdfsWorkPath, expiredDir)
// delete specific app dir on application lost
CelebornHadoopUtils.deleteHDFSPathOrLogError(hadoopFs, dirToDelete,
true)
@@ -812,7 +810,7 @@ private[celeborn] class Master(
private def handleRemoveWorkersUnavailableInfos(
unavailableWorkers: util.List[WorkerInfo],
requestId: String): Unit = {
- statusSystem.handleRemoveWorkersUnavailableInfo(unavailableWorkers,
requestId);
+ statusSystem.handleRemoveWorkersUnavailableInfo(unavailableWorkers,
requestId)
}
private def computeUserResourceConsumption(userIdentifier: UserIdentifier)
@@ -877,7 +875,7 @@ private[celeborn] class Master(
override def getMasterGroupInfo: String = {
val sb = new StringBuilder
sb.append("====================== Master Group INFO
==============================\n")
- sb.append(getMasterGroupInfoInternal())
+ sb.append(getMasterGroupInfoInternal)
sb.toString()
}
@@ -981,7 +979,7 @@ private[celeborn] class Master(
isActive
}
- private def getMasterGroupInfoInternal(): String = {
+ private def getMasterGroupInfoInternal: String = {
if (conf.haEnabled) {
val sb = new StringBuilder
val groupInfo =
statusSystem.asInstanceOf[HAMasterMetaManager].getRatisServer.getGroupInfo
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index a1bf40f57..4cf8456de 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -759,7 +759,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
logError(s"Error while handle${message.`type`()} $message", e)
client.getChannel.writeAndFlush(new RpcFailure(
requestId,
- Throwables.getStackTraceAsString(e)));
+ Throwables.getStackTraceAsString(e)))
} finally {
message.body().release()
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 9f370a6bc..79390ee77 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -759,7 +759,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
})
hdfsWriters.forEach(new BiConsumer[String, FileWriter] {
override def accept(t: String, u: FileWriter): Unit = {
- u.flushOnMemoryPressure();
+ u.flushOnMemoryPressure()
}
})
}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
index bfbb42043..f26962929 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
@@ -42,7 +42,7 @@ class MemoryManagerSuite extends CelebornFunSuite {
.set(WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE, 0.85)
val caught =
intercept[IllegalArgumentException] {
- MemoryManager.initialize(conf);
+ MemoryManager.initialize(conf)
}
assert(
caught.getMessage == s"Invalid config,
${WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE.key}(0.85) " +
@@ -69,17 +69,17 @@ class MemoryManagerSuite extends CelebornFunSuite {
memoryCounter.set(pushThreshold + 1)
assert(ServingState.PUSH_PAUSED == memoryManager.currentServingState())
// reach pause replicate data threshold
- memoryCounter.set(replicateThreshold + 1);
- assert(ServingState.PUSH_AND_REPLICATE_PAUSED ==
memoryManager.currentServingState());
+ memoryCounter.set(replicateThreshold + 1)
+ assert(ServingState.PUSH_AND_REPLICATE_PAUSED ==
memoryManager.currentServingState())
// touch pause push data threshold again
- memoryCounter.set(pushThreshold + 1);
- assert(MemoryManager.ServingState.PUSH_PAUSED ==
memoryManager.currentServingState());
+ memoryCounter.set(pushThreshold + 1)
+ assert(MemoryManager.ServingState.PUSH_PAUSED ==
memoryManager.currentServingState())
// between pause push data threshold and resume data threshold
- memoryCounter.set(resumeThreshold + 2);
- assert(MemoryManager.ServingState.PUSH_PAUSED ==
memoryManager.currentServingState());
+ memoryCounter.set(resumeThreshold + 2)
+ assert(MemoryManager.ServingState.PUSH_PAUSED ==
memoryManager.currentServingState())
// touch resume data threshold
- memoryCounter.set(0);
- assert(MemoryManager.ServingState.NONE_PAUSED ==
memoryManager.currentServingState());
+ memoryCounter.set(0)
+ assert(MemoryManager.ServingState.NONE_PAUSED ==
memoryManager.currentServingState())
} catch {
case e: Exception => throw e
} finally {
@@ -112,21 +112,21 @@ class MemoryManagerSuite extends CelebornFunSuite {
}
// PAUSE PUSH -> PAUSE PUSH AND REPLICATE
- memoryCounter.set(replicateThreshold + 1);
+ memoryCounter.set(replicateThreshold + 1)
eventually(timeout(30.second), interval(10.milliseconds)) {
assert(pushListener.isPause)
assert(replicateListener.isPause)
}
// PAUSE PUSH AND REPLICATE -> PAUSE PUSH
- memoryCounter.set(pushThreshold + 1);
+ memoryCounter.set(pushThreshold + 1)
eventually(timeout(30.second), interval(10.milliseconds)) {
assert(pushListener.isPause)
assert(!replicateListener.isPause)
}
// PAUSE PUSH -> NONE PAUSED
- memoryCounter.set(0);
+ memoryCounter.set(0)
eventually(timeout(30.second), interval(10.milliseconds)) {
assert(!pushListener.isPause)
assert(!replicateListener.isPause)
@@ -136,14 +136,14 @@ class MemoryManagerSuite extends CelebornFunSuite {
val lastPauseTime = memoryManager.getPausePushDataTime.longValue()
// NONE PAUSED -> PAUSE PUSH AND REPLICATE
- memoryCounter.set(replicateThreshold + 1);
+ memoryCounter.set(replicateThreshold + 1)
eventually(timeout(30.second), interval(10.milliseconds)) {
assert(pushListener.isPause)
assert(replicateListener.isPause)
}
// PAUSE PUSH AND REPLICATE -> NONE PAUSED
- memoryCounter.set(0);
+ memoryCounter.set(0)
eventually(timeout(30.second), interval(10.milliseconds)) {
assert(!pushListener.isPause)
assert(!replicateListener.isPause)