This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 8effb735f [CELEBORN-2066] Release workers only with high workload when 
the number of excluded worker set is too large
8effb735f is described below

commit 8effb735f74c7b8bed9dc66ee1b57e4912034f23
Author: yuanzhen <[email protected]>
AuthorDate: Fri Aug 22 10:14:38 2025 +0800

    [CELEBORN-2066] Release workers only with high workload when the number of 
excluded worker set is too large
    
    ### What changes were proposed in this pull request?
    
    Provide user options to enable release workers only with high workload when 
the number of excluded worker set is too large.
    
    ### Why are the changes needed?
    
    In some cases, a large percentage of workers were excluded, but most of 
them were due to high workload. It's better to release such workers from 
excluded set to ensure the system availability is a priority.
    
    ### Does this PR introduce _any_ user-facing change?
    
    New Configuration Option.
    
    ### How was this patch tested?
    Unit tests.
    
    Closes #3365 from Kalvin2077/exclude-high-stress-workers.
    
    Lead-authored-by: yuanzhen <[email protected]>
    Co-authored-by: Kalvin2077 <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  |  23 ++
 .../apache/celeborn/common/meta/WorkerInfo.scala   |   5 +
 docs/configuration/master.md                       |   2 +
 .../master/clustermeta/AbstractMetaManager.java    |  76 +++++-
 .../clustermeta/SingleMasterMetaManager.java       |   4 +
 .../master/clustermeta/ha/HAMasterMetaManager.java |   3 +
 .../clustermeta/DefaultMetaSystemSuiteJ.java       | 206 +++++++++++++--
 .../ha/RatisMasterStatusSystemSuiteJ.java          | 277 ++++++++++++++++++++-
 8 files changed, 566 insertions(+), 30 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 986d77e0f..5cf7bf12e 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -812,6 +812,10 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   }
   def masterExcludeWorkerUnhealthyDiskRatioThreshold: Double =
     get(MASTER_EXCLUDE_WORKER_UNHEALTHY_DISK_RATIO_THRESHOLD)
+  def masterAutoReleaseHighWorkloadWorkerEnabled: Boolean =
+    get(MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_ENABLED)
+  def masterAutoReleaseHighWorkloadWorkerRatioThreshold: Double =
+    get(MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_RATIO_THRESHOLD)
 
   // //////////////////////////////////////////////////////
   //                      Worker                         //
@@ -6536,6 +6540,25 @@ object CelebornConf extends Logging {
       .checkValue(v => v > 0.0 && v <= 1.0, "Should be in (0.0, 1.0].")
       .createWithDefault(1)
 
+  val MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.master.excludeWorker.autoReleaseHighWorkLoadEnabled")
+      .categories("master")
+      .version("0.7.0")
+      .doc("Whether to release workers with high workload in excluded worker 
list.")
+      .booleanConf
+      .createWithDefault(false)
+
+  val MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_RATIO_THRESHOLD: 
ConfigEntry[Double] =
+    
buildConf("celeborn.master.excludeWorker.autoReleaseHighWorkLoadRatioThreshold")
+      .categories("master")
+      .version("0.7.0")
+      .doc("Whenever the number of worker with high workload exceeds this 
ratio, " +
+        "master will release worker with high workload in excluded worker 
list. " +
+        "If this value is set to 0, such workers will never be excluded. ")
+      .doubleConf
+      .checkValue(v => v >= 0.0 && v < 1.0, "Should be in [0.0, 1).")
+      .createWithDefault(0.3)
+
   val QUOTA_CLUSTER_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
     buildConf("celeborn.quota.cluster.diskBytesWritten")
       .categories("quota")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 26e94e360..10a37cf07 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -47,6 +47,7 @@ class WorkerInfo(
   var nextInterruptionNotice = Long.MaxValue
   var lastHeartbeat: Long = 0
   var workerStatus = WorkerStatus.normalWorkerStatus()
+  var isHighWorkLoad: Boolean = false;
   val diskInfos = {
     if (_diskInfos != null) JavaUtils.newConcurrentHashMap[String, 
DiskInfo](_diskInfos)
     else null
@@ -182,6 +183,10 @@ class WorkerInfo(
     this.workerStatus = workerStatus;
   }
 
+  def setWorkLoad(isHighWorkLoad: Boolean): Unit = {
+    this.isHighWorkLoad = isHighWorkLoad;
+  }
+
   def updateDiskSlots(estimatedPartitionSize: Long): Unit = this.synchronized {
     diskInfos.asScala.foreach { case (_, disk) =>
       disk.maxSlots = disk.totalSpace / estimatedPartitionSize
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index fd4af6a8e..843e0c16a 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -43,6 +43,8 @@ license: |
 | celeborn.master.estimatedPartitionSize.minSize | 8mb | false | Ignore 
partition size smaller than this configuration of partition size for 
estimation. | 0.3.0 | celeborn.shuffle.minPartitionSizeToEstimate | 
 | celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | false | 
Initial delay time before start updating partition size for estimation. | 0.3.0 
| celeborn.shuffle.estimatedPartitionSize.update.initialDelay | 
 | celeborn.master.estimatedPartitionSize.update.interval | 10min | false | 
Interval of updating partition size for estimation. | 0.3.0 | 
celeborn.shuffle.estimatedPartitionSize.update.interval | 
+| celeborn.master.excludeWorker.autoReleaseHighWorkLoadEnabled | false | false 
| Whether to release workers with high workload in excluded worker list. | 
0.7.0 |  | 
+| celeborn.master.excludeWorker.autoReleaseHighWorkLoadRatioThreshold | 0.3 | 
false | Whenever the number of worker with high workload exceeds this ratio, 
master will release worker with high workload in excluded worker list. If this 
value is set to 0, such workers will never be excluded.  | 0.7.0 |  | 
 | celeborn.master.excludeWorker.unhealthyDiskRatioThreshold | 1.0 | false | 
Max ratio of unhealthy disks for excluding worker, when unhealthy disk is 
larger than max unhealthy count, master will exclude worker. If this value is 
set to 1, master will exclude worker of which disks are all unhealthy. | 0.6.0 
|  | 
 | celeborn.master.heartbeat.application.timeout | 300s | false | Application 
heartbeat timeout. | 0.3.0 | celeborn.application.heartbeat.timeout | 
 | celeborn.master.heartbeat.worker.timeout | 120s | false | Worker heartbeat 
timeout. | 0.3.0 | celeborn.worker.heartbeat.timeout | 
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 361bcbe08..4ae879743 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -32,6 +32,8 @@ import scala.Option;
 import scala.Tuple2;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.slf4j.Logger;
@@ -89,6 +91,9 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   public long initialEstimatedPartitionSize;
   public long estimatedPartitionSize;
   public double unhealthyDiskRatioThreshold;
+  protected boolean autoReleaseHighWorkLoadEnabled;
+  protected double autoReleaseHighWorkLoadRatioThreshold;
+  protected boolean hasRemoteStorage;
   public final LongAdder partitionTotalWritten = new LongAdder();
   public final LongAdder partitionTotalFileCount = new LongAdder();
   public final LongAdder shuffleTotalCount = new LongAdder();
@@ -249,6 +254,34 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     }
   }
 
+  private boolean hasAvailableStorage(WorkerInfo workerInfo) {
+    Map<String, DiskInfo> disks = workerInfo.diskInfos();
+    Pair<Boolean, Long> exceedCheckResult = 
isExceedingUnhealthyThreshold(disks);
+
+    boolean hasDisk = !disks.isEmpty();
+    boolean isExceeding = exceedCheckResult.getLeft();
+    long unhealthyCount = exceedCheckResult.getRight();
+
+    if (hasDisk) {
+      if (!isExceeding) {
+        return true;
+      } else {
+        LOG.warn(
+            "Worker {} doesn't have enough healthy local disk (unhealthy 
count: {}). Has remote storage: {}",
+            workerInfo,
+            unhealthyCount,
+            hasRemoteStorage);
+      }
+    }
+
+    if (hasRemoteStorage) {
+      return true;
+    } else {
+      LOG.warn("Worker {} has no available storage", workerInfo);
+      return false;
+    }
+  }
+
   public void updateWorkerHeartbeatMeta(
       String host,
       int rpcPort,
@@ -271,6 +304,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
             availableSlots.set(info.totalAvailableSlots());
             info.lastHeartbeat_$eq(time);
             info.setWorkerStatus(workerStatus);
+            info.setWorkLoad(highWorkload);
           });
     }
 
@@ -284,23 +318,34 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     }
 
     // If using HDFSONLY mode, workers with empty disks should not be put into 
excluded worker list.
-    long unhealthyDiskNum =
-        disks.values().stream().filter(s -> 
!s.status().equals(DiskStatus.HEALTHY)).count();
-    boolean exceed = unhealthyDiskNum * 1.0 / disks.size() >= 
unhealthyDiskRatioThreshold;
-    boolean remoteStorageDirsDefined = conf.remoteStorageDirs().isDefined();
-    if (!excludedWorkers.contains(worker)
-        && (((disks.isEmpty() || exceed) && !remoteStorageDirsDefined) || 
highWorkload)) {
-      LOG.warn(
-          "Worker {} (unhealthy disks num: {}, high workload: {}) adds to 
excluded workers",
-          worker,
-          unhealthyDiskNum,
-          highWorkload);
+    if (!excludedWorkers.contains(worker) && (!hasAvailableStorage(worker) || 
highWorkload)) {
+      LOG.warn("Worker {} adds to excluded workers, high workload: {}", 
worker, highWorkload);
       excludedWorkers.add(worker);
-    } else if ((availableSlots.get() > 0 || remoteStorageDirsDefined) && 
!highWorkload) {
+    } else if ((availableSlots.get() > 0 || hasRemoteStorage) && 
!highWorkload) {
       // only unblack if numSlots larger than 0
       excludedWorkers.remove(worker);
     }
 
+    // release high work load workers when too many excluded workers
+    if (autoReleaseHighWorkLoadEnabled
+        && excludedWorkers.size()
+            >= Math.floor(workersMap.size() * 
autoReleaseHighWorkLoadRatioThreshold)) {
+      synchronized (workersMap) {
+        List<WorkerInfo> toRemoved =
+            excludedWorkers.stream()
+                .filter(
+                    w -> {
+                      WorkerInfo info = workersMap.get(w.toUniqueId());
+                      return info != null
+                          && info.isHighWorkLoad()
+                          && hasAvailableStorage(w)
+                          && info.totalAvailableSlots() > 0;
+                    })
+                .collect(Collectors.toList());
+        updateExcludedWorkersMeta(new ArrayList<>(), toRemoved);
+      }
+    }
+
     // try to update the available workers if the worker status is Normal
     if (workerStatus.getState() == PbWorkerStatus.State.Normal) {
       updateAvailableWorkers(worker);
@@ -639,4 +684,11 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
       workerInfo.ifPresent(info -> 
info.updateThenGetUserResourceConsumption(resourceConsumptions));
     }
   }
+
+  private Pair<Boolean, Long> isExceedingUnhealthyThreshold(Map<String, 
DiskInfo> diskMap) {
+    long unhealthyCount =
+        diskMap.values().stream().filter(disk -> 
!disk.status().equals(DiskStatus.HEALTHY)).count();
+    return new ImmutablePair<>(
+        unhealthyCount * 1.0 / diskMap.size() >= unhealthyDiskRatioThreshold, 
unhealthyCount);
+  }
 }
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 4d4fa3b1a..1a0beeffe 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -47,6 +47,10 @@ public class SingleMasterMetaManager extends 
AbstractMetaManager {
     this.initialEstimatedPartitionSize = conf.initialEstimatedPartitionSize();
     this.estimatedPartitionSize = initialEstimatedPartitionSize;
     this.unhealthyDiskRatioThreshold = 
conf.masterExcludeWorkerUnhealthyDiskRatioThreshold();
+    this.autoReleaseHighWorkLoadEnabled = 
conf.masterAutoReleaseHighWorkloadWorkerEnabled();
+    this.autoReleaseHighWorkLoadRatioThreshold =
+        conf.masterAutoReleaseHighWorkloadWorkerRatioThreshold();
+    this.hasRemoteStorage = conf.remoteStorageDirs().isDefined();
     this.rackResolver = rackResolver;
   }
 
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index e3319ffc1..72f531d95 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -56,6 +56,9 @@ public class HAMasterMetaManager extends AbstractMetaManager {
     this.initialEstimatedPartitionSize = conf.initialEstimatedPartitionSize();
     this.estimatedPartitionSize = initialEstimatedPartitionSize;
     this.unhealthyDiskRatioThreshold = 
conf.masterExcludeWorkerUnhealthyDiskRatioThreshold();
+    this.autoReleaseHighWorkLoadEnabled = 
conf.masterAutoReleaseHighWorkloadWorkerEnabled();
+    this.autoReleaseHighWorkLoadRatioThreshold =
+        conf.masterAutoReleaseHighWorkloadWorkerRatioThreshold();
     this.rackResolver = rackResolver;
   }
 
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 4f4361ef0..04a984cbc 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -704,8 +704,8 @@ public class DefaultMetaSystemSuiteJ {
         workerStatus,
         getNewReqeustId());
 
-    assertEquals(statusSystem.excludedWorkers.size(), 1);
-    assertEquals(statusSystem.availableWorkers.size(), 2);
+    assertEquals(1, statusSystem.excludedWorkers.size());
+    assertEquals(2, statusSystem.availableWorkers.size());
 
     statusSystem.handleWorkerHeartbeat(
         HOSTNAME2,
@@ -720,8 +720,8 @@ public class DefaultMetaSystemSuiteJ {
         workerStatus,
         getNewReqeustId());
 
-    assertEquals(statusSystem.excludedWorkers.size(), 2);
-    assertEquals(statusSystem.availableWorkers.size(), 1);
+    assertEquals(2, statusSystem.excludedWorkers.size());
+    assertEquals(1, statusSystem.availableWorkers.size());
 
     statusSystem.handleWorkerHeartbeat(
         HOSTNAME3,
@@ -736,8 +736,8 @@ public class DefaultMetaSystemSuiteJ {
         workerStatus,
         getNewReqeustId());
 
-    assertEquals(statusSystem.excludedWorkers.size(), 2);
-    assertEquals(statusSystem.availableWorkers.size(), 1);
+    assertEquals(2, statusSystem.excludedWorkers.size());
+    assertEquals(1, statusSystem.availableWorkers.size());
 
     statusSystem.handleWorkerHeartbeat(
         HOSTNAME3,
@@ -752,8 +752,184 @@ public class DefaultMetaSystemSuiteJ {
         workerStatus,
         getNewReqeustId());
 
-    assertEquals(statusSystem.excludedWorkers.size(), 3);
-    assertEquals(statusSystem.availableWorkers.size(), 0);
+    assertEquals(3, statusSystem.excludedWorkers.size());
+    assertEquals(0, statusSystem.availableWorkers.size());
+  }
+
+  @Test
+  public void testAutoReleaseHighWorkLoadWorkers() {
+    conf.set(CelebornConf.MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_ENABLED(), 
true);
+    
conf.set(CelebornConf.MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_RATIO_THRESHOLD(),
 0.8);
+    statusSystem = new SingleMasterMetaManager(mockRpcEnv, conf);
+
+    statusSystem.handleRegisterWorker(
+        HOSTNAME1,
+        RPCPORT1,
+        PUSHPORT1,
+        FETCHPORT1,
+        REPLICATEPORT1,
+        INTERNALPORT1,
+        NETWORK_LOCATION1,
+        disks1,
+        userResourceConsumption1,
+        getNewReqeustId());
+    statusSystem.handleRegisterWorker(
+        HOSTNAME2,
+        RPCPORT2,
+        PUSHPORT2,
+        FETCHPORT2,
+        REPLICATEPORT2,
+        INTERNALPORT2,
+        NETWORK_LOCATION2,
+        disks2,
+        userResourceConsumption2,
+        getNewReqeustId());
+    statusSystem.handleRegisterWorker(
+        HOSTNAME3,
+        RPCPORT3,
+        PUSHPORT3,
+        FETCHPORT3,
+        REPLICATEPORT3,
+        INTERNALPORT3,
+        NETWORK_LOCATION3,
+        disks3,
+        userResourceConsumption3,
+        getNewReqeustId());
+
+    // worker2 and work3 are unhealthy
+    statusSystem.handleWorkerHeartbeat(
+        HOSTNAME2,
+        RPCPORT2,
+        PUSHPORT2,
+        FETCHPORT2,
+        REPLICATEPORT2,
+        new HashMap<>(),
+        userResourceConsumption2,
+        1,
+        false,
+        workerStatus,
+        getNewReqeustId());
+    assertEquals(1, statusSystem.excludedWorkers.size());
+    assertEquals(2, statusSystem.availableWorkers.size());
+    statusSystem.handleWorkerHeartbeat(
+        HOSTNAME3,
+        RPCPORT3,
+        PUSHPORT3,
+        FETCHPORT3,
+        REPLICATEPORT3,
+        new HashMap<>(),
+        userResourceConsumption3,
+        1,
+        false,
+        workerStatus,
+        getNewReqeustId());
+    assertEquals(2, statusSystem.excludedWorkers.size());
+    assertEquals(1, statusSystem.availableWorkers.size());
+
+    // worker2 and work3 have high workload
+    statusSystem.handleWorkerHeartbeat(
+        HOSTNAME2,
+        RPCPORT2,
+        PUSHPORT2,
+        FETCHPORT2,
+        REPLICATEPORT2,
+        disks2,
+        userResourceConsumption2,
+        1,
+        false,
+        workerStatus,
+        getNewReqeustId());
+    assertEquals(1, statusSystem.excludedWorkers.size());
+    assertEquals(2, statusSystem.availableWorkers.size());
+    statusSystem.handleWorkerHeartbeat(
+        HOSTNAME3,
+        RPCPORT3,
+        PUSHPORT3,
+        FETCHPORT3,
+        REPLICATEPORT3,
+        disks3,
+        userResourceConsumption3,
+        1,
+        false,
+        workerStatus,
+        getNewReqeustId());
+    assertEquals(0, statusSystem.excludedWorkers.size());
+    assertEquals(3, statusSystem.availableWorkers.size());
+    statusSystem.handleWorkerHeartbeat(
+        HOSTNAME2,
+        RPCPORT2,
+        PUSHPORT2,
+        FETCHPORT2,
+        REPLICATEPORT2,
+        disks2,
+        userResourceConsumption2,
+        1,
+        true,
+        workerStatus,
+        getNewReqeustId());
+    assertEquals(1, statusSystem.excludedWorkers.size());
+    assertEquals(2, statusSystem.availableWorkers.size());
+    statusSystem.handleWorkerHeartbeat(
+        HOSTNAME3,
+        RPCPORT3,
+        PUSHPORT3,
+        FETCHPORT3,
+        REPLICATEPORT3,
+        disks3,
+        userResourceConsumption3,
+        1,
+        true,
+        workerStatus,
+        getNewReqeustId());
+    // release 2 workers with high workload
+    assertEquals(0, statusSystem.excludedWorkers.size());
+    assertEquals(3, statusSystem.availableWorkers.size());
+
+    // work2 has high workload and work3 is unhealthy
+    statusSystem.handleWorkerHeartbeat(
+        HOSTNAME2,
+        RPCPORT2,
+        PUSHPORT2,
+        FETCHPORT2,
+        REPLICATEPORT2,
+        disks2,
+        userResourceConsumption2,
+        1,
+        true,
+        workerStatus,
+        getNewReqeustId());
+    assertEquals(1, statusSystem.excludedWorkers.size());
+    assertEquals(2, statusSystem.availableWorkers.size());
+    statusSystem.handleWorkerHeartbeat(
+        HOSTNAME3,
+        RPCPORT3,
+        PUSHPORT3,
+        FETCHPORT3,
+        REPLICATEPORT3,
+        new HashMap<>(),
+        userResourceConsumption3,
+        1,
+        false,
+        workerStatus,
+        getNewReqeustId());
+    // release worker2
+    assertEquals(1, statusSystem.excludedWorkers.size());
+    assertEquals(2, statusSystem.availableWorkers.size());
+
+    statusSystem.handleWorkerHeartbeat(
+        HOSTNAME2,
+        RPCPORT2,
+        PUSHPORT2,
+        FETCHPORT2,
+        REPLICATEPORT2,
+        new HashMap<>(),
+        userResourceConsumption2,
+        1,
+        true,
+        workerStatus,
+        getNewReqeustId());
+    assertEquals(2, statusSystem.excludedWorkers.size());
+    assertEquals(1, statusSystem.availableWorkers.size());
   }
 
   @Test
@@ -837,7 +1013,7 @@ public class DefaultMetaSystemSuiteJ {
 
     // Size between minEstimateSize -> maxEstimateSize
     statusSystem.handleUpdatePartitionSize();
-    Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
+    Assert.assertEquals(500000000, statusSystem.estimatedPartitionSize);
 
     statusSystem.handleAppHeartbeat(
         APPID1, 1000l, 1, 1, 1, new HashMap<>(), new HashMap<>(), dummy, 
getNewReqeustId());
@@ -951,11 +1127,11 @@ public class DefaultMetaSystemSuiteJ {
         dummy,
         getNewReqeustId());
 
-    assertEquals(statusSystem.shuffleTotalCount.longValue(), 5);
-    assertEquals(statusSystem.applicationTotalCount.longValue(), 3);
-    assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY1).longValue(), 
3);
-    assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY2).longValue(), 
2);
-    
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY1).longValue(), 
2);
-    
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY2).longValue(), 
1);
+    assertEquals(5, statusSystem.shuffleTotalCount.longValue());
+    assertEquals(3, statusSystem.applicationTotalCount.longValue());
+    assertEquals(3, 
statusSystem.shuffleFallbackCounts.get(POLICY1).longValue());
+    assertEquals(2, 
statusSystem.shuffleFallbackCounts.get(POLICY2).longValue());
+    assertEquals(2, 
statusSystem.applicationFallbackCounts.get(POLICY1).longValue());
+    assertEquals(1, 
statusSystem.applicationFallbackCounts.get(POLICY2).longValue());
   }
 }
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 7c66cccec..952789a32 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -108,9 +108,9 @@ public class RatisMasterStatusSystemSuiteJ {
 
     while (!serversStarted) {
       try {
-        STATUSSYSTEM1 = new HAMasterMetaManager(mockRpcEnv, new 
CelebornConf());
-        STATUSSYSTEM2 = new HAMasterMetaManager(mockRpcEnv, new 
CelebornConf());
-        STATUSSYSTEM3 = new HAMasterMetaManager(mockRpcEnv, new 
CelebornConf());
+        STATUSSYSTEM1 = new HAMasterMetaManager(mockRpcEnv, conf1);
+        STATUSSYSTEM2 = new HAMasterMetaManager(mockRpcEnv, conf2);
+        STATUSSYSTEM3 = new HAMasterMetaManager(mockRpcEnv, conf3);
 
         MetaHandler handler1 = new MetaHandler(STATUSSYSTEM1);
         MetaHandler handler2 = new MetaHandler(STATUSSYSTEM2);
@@ -1138,6 +1138,277 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(1, STATUSSYSTEM3.availableWorkers.size());
   }
 
+  @Test
+  public void testAutoReleaseHighWorkLoadWorkers() throws 
InterruptedException, IOException {
+    CelebornConf conf1 = new CelebornConf();
+    conf1.set(CelebornConf.MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_ENABLED(), 
true);
+    
conf1.set(CelebornConf.MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_RATIO_THRESHOLD(),
 0.8);
+    CelebornConf conf2 = new CelebornConf();
+    conf2.set(CelebornConf.MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_ENABLED(), 
true);
+    
conf2.set(CelebornConf.MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_RATIO_THRESHOLD(),
 0.8);
+    CelebornConf conf3 = new CelebornConf();
+    conf3.set(CelebornConf.MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_ENABLED(), 
true);
+    
conf3.set(CelebornConf.MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_RATIO_THRESHOLD(),
 0.8);
+
+    try {
+      resetRaftServer(
+          configureServerConf(conf1, 1),
+          configureServerConf(conf2, 2),
+          configureServerConf(conf3, 3),
+          false);
+
+      AbstractMetaManager statusSystem = pickLeaderStatusSystem();
+      Assert.assertNotNull(statusSystem);
+
+      statusSystem.handleRegisterWorker(
+          HOSTNAME1,
+          RPCPORT1,
+          PUSHPORT1,
+          FETCHPORT1,
+          REPLICATEPORT1,
+          INTERNALPORT1,
+          NETWORK_LOCATION1,
+          disks1,
+          userResourceConsumption1,
+          getNewReqeustId());
+      statusSystem.handleRegisterWorker(
+          HOSTNAME2,
+          RPCPORT2,
+          PUSHPORT2,
+          FETCHPORT2,
+          REPLICATEPORT2,
+          INTERNALPORT2,
+          NETWORK_LOCATION2,
+          disks2,
+          userResourceConsumption2,
+          getNewReqeustId());
+      statusSystem.handleRegisterWorker(
+          HOSTNAME3,
+          RPCPORT3,
+          PUSHPORT3,
+          FETCHPORT3,
+          REPLICATEPORT3,
+          INTERNALPORT3,
+          NETWORK_LOCATION3,
+          disks3,
+          userResourceConsumption3,
+          getNewReqeustId());
+
+      Thread.sleep(3000L);
+
+      // worker2 and work3 are unhealthy
+      statusSystem.handleWorkerHeartbeat(
+          HOSTNAME2,
+          RPCPORT2,
+          PUSHPORT2,
+          FETCHPORT2,
+          REPLICATEPORT2,
+          new HashMap<>(),
+          userResourceConsumption2,
+          1,
+          false,
+          workerStatus,
+          getNewReqeustId());
+      Thread.sleep(3000L);
+      Assert.assertEquals(1, statusSystem.excludedWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM1.excludedWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM2.excludedWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM3.excludedWorkers.size());
+      Assert.assertEquals(2, statusSystem.availableWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
+
+      statusSystem.handleWorkerHeartbeat(
+          HOSTNAME3,
+          RPCPORT3,
+          PUSHPORT3,
+          FETCHPORT3,
+          REPLICATEPORT3,
+          new HashMap<>(),
+          userResourceConsumption3,
+          1,
+          false,
+          workerStatus,
+          getNewReqeustId());
+      Thread.sleep(3000L);
+      Assert.assertEquals(2, statusSystem.excludedWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM1.excludedWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM2.excludedWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM3.excludedWorkers.size());
+      Assert.assertEquals(1, statusSystem.availableWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM1.availableWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM2.availableWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM3.availableWorkers.size());
+
+      // worker2 and work3 have high workload
+      statusSystem.handleWorkerHeartbeat(
+          HOSTNAME2,
+          RPCPORT2,
+          PUSHPORT2,
+          FETCHPORT2,
+          REPLICATEPORT2,
+          disks2,
+          userResourceConsumption2,
+          1,
+          false,
+          workerStatus,
+          getNewReqeustId());
+      Thread.sleep(3000L);
+      Assert.assertEquals(1, statusSystem.excludedWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM1.excludedWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM2.excludedWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM3.excludedWorkers.size());
+      Assert.assertEquals(2, statusSystem.availableWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
+
+      statusSystem.handleWorkerHeartbeat(
+          HOSTNAME3,
+          RPCPORT3,
+          PUSHPORT3,
+          FETCHPORT3,
+          REPLICATEPORT3,
+          disks3,
+          userResourceConsumption3,
+          1,
+          false,
+          workerStatus,
+          getNewReqeustId());
+      Thread.sleep(3000L);
+      Assert.assertEquals(0, statusSystem.excludedWorkers.size());
+      Assert.assertEquals(0, STATUSSYSTEM1.excludedWorkers.size());
+      Assert.assertEquals(0, STATUSSYSTEM2.excludedWorkers.size());
+      Assert.assertEquals(0, STATUSSYSTEM3.excludedWorkers.size());
+      Assert.assertEquals(3, statusSystem.availableWorkers.size());
+      Assert.assertEquals(3, STATUSSYSTEM1.availableWorkers.size());
+      Assert.assertEquals(3, STATUSSYSTEM2.availableWorkers.size());
+      Assert.assertEquals(3, STATUSSYSTEM3.availableWorkers.size());
+
+      statusSystem.handleWorkerHeartbeat(
+          HOSTNAME2,
+          RPCPORT2,
+          PUSHPORT2,
+          FETCHPORT2,
+          REPLICATEPORT2,
+          disks2,
+          userResourceConsumption2,
+          1,
+          true,
+          workerStatus,
+          getNewReqeustId());
+      Thread.sleep(3000L);
+      Assert.assertEquals(1, statusSystem.excludedWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM1.excludedWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM2.excludedWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM3.excludedWorkers.size());
+      Assert.assertEquals(2, statusSystem.availableWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
+
+      statusSystem.handleWorkerHeartbeat(
+          HOSTNAME3,
+          RPCPORT3,
+          PUSHPORT3,
+          FETCHPORT3,
+          REPLICATEPORT3,
+          disks3,
+          userResourceConsumption3,
+          1,
+          true,
+          workerStatus,
+          getNewReqeustId());
+      Thread.sleep(3000L);
+      // release 2 workers with high workload
+      Assert.assertEquals(0, statusSystem.excludedWorkers.size());
+      Assert.assertEquals(0, STATUSSYSTEM1.excludedWorkers.size());
+      Assert.assertEquals(0, STATUSSYSTEM2.excludedWorkers.size());
+      Assert.assertEquals(0, STATUSSYSTEM3.excludedWorkers.size());
+      Assert.assertEquals(3, statusSystem.availableWorkers.size());
+      Assert.assertEquals(3, STATUSSYSTEM1.availableWorkers.size());
+      Assert.assertEquals(3, STATUSSYSTEM2.availableWorkers.size());
+      Assert.assertEquals(3, STATUSSYSTEM3.availableWorkers.size());
+
+      // work2 has high workload and work3 is unhealthy
+      statusSystem.handleWorkerHeartbeat(
+          HOSTNAME2,
+          RPCPORT2,
+          PUSHPORT2,
+          FETCHPORT2,
+          REPLICATEPORT2,
+          disks2,
+          userResourceConsumption2,
+          1,
+          true,
+          workerStatus,
+          getNewReqeustId());
+      Thread.sleep(3000L);
+      Assert.assertEquals(1, statusSystem.excludedWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM1.excludedWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM2.excludedWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM3.excludedWorkers.size());
+      Assert.assertEquals(2, statusSystem.availableWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
+
+      statusSystem.handleWorkerHeartbeat(
+          HOSTNAME3,
+          RPCPORT3,
+          PUSHPORT3,
+          FETCHPORT3,
+          REPLICATEPORT3,
+          new HashMap<>(),
+          userResourceConsumption3,
+          1,
+          false,
+          workerStatus,
+          getNewReqeustId());
+      Thread.sleep(3000L);
+      // release worker2
+      Assert.assertEquals(1, statusSystem.excludedWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM1.excludedWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM2.excludedWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM3.excludedWorkers.size());
+      Assert.assertEquals(2, statusSystem.availableWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
+
+      statusSystem.handleWorkerHeartbeat(
+          HOSTNAME2,
+          RPCPORT2,
+          PUSHPORT2,
+          FETCHPORT2,
+          REPLICATEPORT2,
+          new HashMap<>(),
+          userResourceConsumption2,
+          1,
+          true,
+          workerStatus,
+          getNewReqeustId());
+      Thread.sleep(3000L);
+      Assert.assertEquals(2, statusSystem.excludedWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM1.excludedWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM2.excludedWorkers.size());
+      Assert.assertEquals(2, STATUSSYSTEM3.excludedWorkers.size());
+      Assert.assertEquals(1, statusSystem.availableWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM1.availableWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM2.availableWorkers.size());
+      Assert.assertEquals(1, STATUSSYSTEM3.availableWorkers.size());
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    } finally {
+      resetRaftServer(
+          configureServerConf(new CelebornConf(), 1),
+          configureServerConf(new CelebornConf(), 2),
+          configureServerConf(new CelebornConf(), 3),
+          false);
+    }
+  }
+
   @Before
   public void resetStatus() {
     STATUSSYSTEM1.registeredAppAndShuffles.clear();


Reply via email to