This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new f7962a9b4 [CELEBORN-2145] QuotaManager should respect 
celeborn.quota.interruptShuffle.enabled of client config
f7962a9b4 is described below

commit f7962a9b41f9858faad2d8992b6f8573ff1c55a1
Author: Nicholas Jiang <[email protected]>
AuthorDate: Sat Sep 20 15:01:02 2025 +0800

    [CELEBORN-2145] QuotaManager should respect 
celeborn.quota.interruptShuffle.enabled of client config
    
    ### What changes were proposed in this pull request?
    
    `QuotaManager` should respect `celeborn.quota.interruptShuffle.enabled` of 
client config to expire applications that enables interrupt shuffle.
    
    ### Why are the changes needed?
    
    `QuotaManager` does not filter applications which enable interrupt shuffle 
at present, which causes that `QuotaManager` expires applications that unenable 
interrupt shuffle like high priority applications. Therefore, `QuotaManager` 
should respect `celeborn.quota.interruptShuffle.enabled` of client config.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    `QuotaManagerSuite`
    
    Closes #3471 from SteNicholas/CELEBORN-2145.
    
    Lead-authored-by: Nicholas Jiang <[email protected]>
    Co-authored-by: SteNicholas <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../apache/celeborn/client/LifecycleManager.scala  |  7 +++--
 .../master/clustermeta/AbstractMetaManager.java    |  8 +++++
 .../service/deploy/master/quota/QuotaManager.scala | 13 ++++----
 .../deploy/master/quota/QuotaManagerSuite.scala    | 36 +++++++++++++---------
 4 files changed, 42 insertions(+), 22 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 57349f130..a4f5c5b4c 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -249,11 +249,14 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
   private val messagesHelper: TransportMessagesHelper = new 
TransportMessagesHelper()
 
   private def registerApplicationInfo(): Unit = {
-    Utils.tryLogNonFatalError(
+    Utils.tryLogNonFatalError {
+      val extraInfo = ApplicationInfoProvider.instantiate(conf)
+        .provide() + (CelebornConf.QUOTA_INTERRUPT_SHUFFLE_ENABLED.key -> 
conf.quotaInterruptShuffleEnabled.toString)
       masterClient.send(RegisterApplicationInfo(
         appUniqueId,
         userIdentifier,
-        ApplicationInfoProvider.instantiate(conf).provide().asJava)))
+        extraInfo.asJava))
+    }
   }
 
   // Since method `onStart` is executed when `rpcEnv.setupEndpoint` is 
executed, and
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index fc6627fd2..11d73c82a 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -696,4 +696,12 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     return new ImmutablePair<>(
         unhealthyCount * 1.0 / diskMap.size() >= unhealthyDiskRatioThreshold, 
unhealthyCount);
   }
+
+  public boolean isAppInterruptShuffleEnabled(String appId) {
+    return Boolean.parseBoolean(
+        Optional.ofNullable(applicationInfos.get(appId))
+            .map(ApplicationInfo::extraInfo)
+            .map(extraInfo -> 
extraInfo.get(CelebornConf.QUOTA_INTERRUPT_SHUFFLE_ENABLED().key()))
+            
.orElse(CelebornConf.QUOTA_INTERRUPT_SHUFFLE_ENABLED().defaultValueString()));
+  }
 }
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
index 8da25f0dd..0d8d7b168 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
@@ -339,12 +339,13 @@ class QuotaManager(
     var nonExpired = used
     if (checkConsumptionExceeded(used, threshold)) {
       val sortedConsumption =
-        appMap.sortBy(_._2)(Ordering.by((r: ResourceConsumption) =>
-          (
-            r.diskBytesWritten,
-            r.diskFileCount,
-            r.hdfsBytesWritten,
-            r.hdfsFileCount)).reverse)
+        appMap.filter(app => statusSystem.isAppInterruptShuffleEnabled(app._1))
+          .sortBy(_._2)(Ordering.by((r: ResourceConsumption) =>
+            (
+              r.diskBytesWritten,
+              r.diskFileCount,
+              r.hdfsBytesWritten,
+              r.hdfsFileCount)).reverse)
       for ((appId, consumption) <- sortedConsumption
         if checkConsumptionExceeded(nonExpired, threshold)) {
         val reason = s"$expireReason Used: ${consumption.simpleString}, 
Threshold: $threshold"
diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala
index ee1d550b9..425a3404b 100644
--- 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala
@@ -67,6 +67,8 @@ class QuotaManagerSuite extends CelebornFunSuite
 
   val metricsInstanceLabel = 
s"""instance="${Utils.localHostName(conf)}:${conf.masterHttpPort}""""
 
+  private val extraInfo = Map(CelebornConf.QUOTA_INTERRUPT_SHUFFLE_ENABLED.key 
-> "true").asJava
+
   override def beforeAll(): Unit = {
     conf.set(CelebornConf.DYNAMIC_CONFIG_STORE_BACKEND, "FS")
     conf.set(
@@ -192,13 +194,15 @@ class QuotaManagerSuite extends CelebornFunSuite
           Utils.byteStringAsBytes("5G"),
           20)).asJava)
 
+    statusSystem.updateApplicationInfo("app1", user, extraInfo)
+    statusSystem.updateApplicationInfo("app2", user, extraInfo)
     addUserConsumption(user, rc)
     conf.set("celeborn.quota.cluster.diskBytesWritten", "60gb")
     configService.refreshCache()
     quotaManager.updateResourceConsumption()
     var res1 = checkUserQuota(user)
-    var res2 = checkApplicationQuota(user, "app1")
-    var res3 = checkApplicationQuota(user, "app2")
+    var res2 = checkApplicationQuota("app1")
+    var res3 = checkApplicationQuota("app2")
 
     val succeed = CheckQuotaResponse(true, "")
     val failed = CheckQuotaResponse(
@@ -230,8 +234,8 @@ class QuotaManagerSuite extends CelebornFunSuite
     configService.refreshCache()
     quotaManager.updateResourceConsumption()
     res1 = checkUserQuota(user)
-    res2 = checkApplicationQuota(user, "app1")
-    res3 = checkApplicationQuota(user, "app2")
+    res2 = checkApplicationQuota("app1")
+    res3 = checkApplicationQuota("app2")
 
     assert(res1 == failed)
     assert(res2 == CheckQuotaResponse(
@@ -282,14 +286,16 @@ class QuotaManagerSuite extends CelebornFunSuite
           Utils.byteStringAsBytes("2G"),
           20)).asJava)
 
+    statusSystem.updateApplicationInfo("app1", user, extraInfo)
+    statusSystem.updateApplicationInfo("app2", user, extraInfo)
     addUserConsumption(user, rc)
     conf.set("celeborn.quota.cluster.diskBytesWritten", "20gb")
     configService.refreshCache()
     quotaManager.updateResourceConsumption()
 
     res1 = checkUserQuota(user)
-    res2 = checkApplicationQuota(user, "app1")
-    res3 = checkApplicationQuota(user, "app2")
+    res2 = checkApplicationQuota("app1")
+    res3 = checkApplicationQuota("app2")
 
     assert(res1 == CheckQuotaResponse(
       false,
@@ -315,7 +321,7 @@ class QuotaManagerSuite extends CelebornFunSuite
   }
 
   test("test handleResourceConsumption time - case1") {
-    // 1000 users 100wapplications, all exceeded
+    // 1000 users 100w applications, all exceeded
     conf.set("celeborn.quota.tenant.diskBytesWritten", "1mb")
     conf.set("celeborn.quota.cluster.diskBytesWritten", "1mb")
     configService.refreshCache()
@@ -332,6 +338,7 @@ class QuotaManagerSuite extends CelebornFunSuite
             MIN + Math.abs(random.nextLong()) % (MAX - MIN),
             MIN + Math.abs(random.nextLong()) % (MAX - MIN),
             MIN + Math.abs(random.nextLong()) % (MAX - MIN))
+          statusSystem.updateApplicationInfo(appId, user, extraInfo)
           (appId, consumption)
       }.toMap
       val userConsumption = subResourceConsumption.values.foldRight(
@@ -340,10 +347,7 @@ class QuotaManagerSuite extends CelebornFunSuite
       addUserConsumption(user, userConsumption)
     }
 
-    val start = System.currentTimeMillis()
     quotaManager.updateResourceConsumption()
-    val duration = System.currentTimeMillis() - start
-    print(s"duration=$duration")
 
     val res = resourceConsumptionSource.getMetrics
     for (i <- 0 until 1000) {
@@ -386,6 +390,7 @@ class QuotaManagerSuite extends CelebornFunSuite
                 MIN + Math.abs(random.nextLong()) % (MAX - MIN),
                 MIN + Math.abs(random.nextLong()) % (MAX - MIN),
                 MIN + Math.abs(random.nextLong()) % (MAX - MIN))
+              statusSystem.updateApplicationInfo(appId, user, extraInfo)
               (appId, consumption)
           }.toMap
         } else {
@@ -393,6 +398,7 @@ class QuotaManagerSuite extends CelebornFunSuite
             index =>
               val appId = s"$user$i case2_app$index"
               val consumption = ResourceConsumption(0, 0, 0, 0)
+              statusSystem.updateApplicationInfo(appId, user, extraInfo)
               (appId, consumption)
           }.toMap
         }
@@ -402,10 +408,7 @@ class QuotaManagerSuite extends CelebornFunSuite
       addUserConsumption(user, userConsumption)
     }
 
-    val start = System.currentTimeMillis()
     quotaManager.updateResourceConsumption()
-    val duration = System.currentTimeMillis() - start
-    print(s"duration=$duration")
 
     val res = resourceConsumptionSource.getMetrics
     for (i <- 0 until 1000) {
@@ -492,6 +495,9 @@ class QuotaManagerSuite extends CelebornFunSuite
           0,
           0)).asJava)
 
+    statusSystem1.updateApplicationInfo("app1", user, extraInfo)
+    statusSystem1.updateApplicationInfo("app2", user, extraInfo)
+    statusSystem1.updateApplicationInfo("app3", user, extraInfo)
     addUserConsumption(user, rc)
     addUserConsumption(user1, rc1)
 
@@ -607,6 +613,9 @@ class QuotaManagerSuite extends CelebornFunSuite
           0,
           0)).asJava)
 
+    statusSystem1.updateApplicationInfo("app1", user1, extraInfo)
+    statusSystem1.updateApplicationInfo("app2", user1, extraInfo)
+    statusSystem1.updateApplicationInfo("app3", user2, extraInfo)
     addUserConsumption(user1, rc1)
     addUserConsumption(user2, rc2)
 
@@ -677,7 +686,6 @@ class QuotaManagerSuite extends CelebornFunSuite
   }
 
   def checkApplicationQuota(
-      userIdentifier: UserIdentifier,
       applicationId: String): CheckQuotaResponse = {
     quotaManager.checkApplicationQuotaStatus(applicationId)
   }

Reply via email to