This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new f7962a9b4 [CELEBORN-2145] QuotaManager should respect
celeborn.quota.interruptShuffle.enabled of client config
f7962a9b4 is described below
commit f7962a9b41f9858faad2d8992b6f8573ff1c55a1
Author: Nicholas Jiang <[email protected]>
AuthorDate: Sat Sep 20 15:01:02 2025 +0800
[CELEBORN-2145] QuotaManager should respect
celeborn.quota.interruptShuffle.enabled of client config
### What changes were proposed in this pull request?
`QuotaManager` should respect `celeborn.quota.interruptShuffle.enabled` of
client config to expire applications that enables interrupt shuffle.
### Why are the changes needed?
`QuotaManager` does not filter applications which enable interrupt shuffle
at present, which causes that `QuotaManager` expires applications that unenable
interrupt shuffle like high priority applications. Therefore, `QuotaManager`
should respect `celeborn.quota.interruptShuffle.enabled` of client config.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`QuotaManagerSuite`
Closes #3471 from SteNicholas/CELEBORN-2145.
Lead-authored-by: Nicholas Jiang <[email protected]>
Co-authored-by: SteNicholas <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../apache/celeborn/client/LifecycleManager.scala | 7 +++--
.../master/clustermeta/AbstractMetaManager.java | 8 +++++
.../service/deploy/master/quota/QuotaManager.scala | 13 ++++----
.../deploy/master/quota/QuotaManagerSuite.scala | 36 +++++++++++++---------
4 files changed, 42 insertions(+), 22 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 57349f130..a4f5c5b4c 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -249,11 +249,14 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
private val messagesHelper: TransportMessagesHelper = new
TransportMessagesHelper()
private def registerApplicationInfo(): Unit = {
- Utils.tryLogNonFatalError(
+ Utils.tryLogNonFatalError {
+ val extraInfo = ApplicationInfoProvider.instantiate(conf)
+ .provide() + (CelebornConf.QUOTA_INTERRUPT_SHUFFLE_ENABLED.key ->
conf.quotaInterruptShuffleEnabled.toString)
masterClient.send(RegisterApplicationInfo(
appUniqueId,
userIdentifier,
- ApplicationInfoProvider.instantiate(conf).provide().asJava)))
+ extraInfo.asJava))
+ }
}
// Since method `onStart` is executed when `rpcEnv.setupEndpoint` is
executed, and
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index fc6627fd2..11d73c82a 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -696,4 +696,12 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
return new ImmutablePair<>(
unhealthyCount * 1.0 / diskMap.size() >= unhealthyDiskRatioThreshold,
unhealthyCount);
}
+
+ public boolean isAppInterruptShuffleEnabled(String appId) {
+ return Boolean.parseBoolean(
+ Optional.ofNullable(applicationInfos.get(appId))
+ .map(ApplicationInfo::extraInfo)
+ .map(extraInfo ->
extraInfo.get(CelebornConf.QUOTA_INTERRUPT_SHUFFLE_ENABLED().key()))
+
.orElse(CelebornConf.QUOTA_INTERRUPT_SHUFFLE_ENABLED().defaultValueString()));
+ }
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
index 8da25f0dd..0d8d7b168 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
@@ -339,12 +339,13 @@ class QuotaManager(
var nonExpired = used
if (checkConsumptionExceeded(used, threshold)) {
val sortedConsumption =
- appMap.sortBy(_._2)(Ordering.by((r: ResourceConsumption) =>
- (
- r.diskBytesWritten,
- r.diskFileCount,
- r.hdfsBytesWritten,
- r.hdfsFileCount)).reverse)
+ appMap.filter(app => statusSystem.isAppInterruptShuffleEnabled(app._1))
+ .sortBy(_._2)(Ordering.by((r: ResourceConsumption) =>
+ (
+ r.diskBytesWritten,
+ r.diskFileCount,
+ r.hdfsBytesWritten,
+ r.hdfsFileCount)).reverse)
for ((appId, consumption) <- sortedConsumption
if checkConsumptionExceeded(nonExpired, threshold)) {
val reason = s"$expireReason Used: ${consumption.simpleString},
Threshold: $threshold"
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala
index ee1d550b9..425a3404b 100644
---
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala
@@ -67,6 +67,8 @@ class QuotaManagerSuite extends CelebornFunSuite
val metricsInstanceLabel =
s"""instance="${Utils.localHostName(conf)}:${conf.masterHttpPort}""""
+ private val extraInfo = Map(CelebornConf.QUOTA_INTERRUPT_SHUFFLE_ENABLED.key
-> "true").asJava
+
override def beforeAll(): Unit = {
conf.set(CelebornConf.DYNAMIC_CONFIG_STORE_BACKEND, "FS")
conf.set(
@@ -192,13 +194,15 @@ class QuotaManagerSuite extends CelebornFunSuite
Utils.byteStringAsBytes("5G"),
20)).asJava)
+ statusSystem.updateApplicationInfo("app1", user, extraInfo)
+ statusSystem.updateApplicationInfo("app2", user, extraInfo)
addUserConsumption(user, rc)
conf.set("celeborn.quota.cluster.diskBytesWritten", "60gb")
configService.refreshCache()
quotaManager.updateResourceConsumption()
var res1 = checkUserQuota(user)
- var res2 = checkApplicationQuota(user, "app1")
- var res3 = checkApplicationQuota(user, "app2")
+ var res2 = checkApplicationQuota("app1")
+ var res3 = checkApplicationQuota("app2")
val succeed = CheckQuotaResponse(true, "")
val failed = CheckQuotaResponse(
@@ -230,8 +234,8 @@ class QuotaManagerSuite extends CelebornFunSuite
configService.refreshCache()
quotaManager.updateResourceConsumption()
res1 = checkUserQuota(user)
- res2 = checkApplicationQuota(user, "app1")
- res3 = checkApplicationQuota(user, "app2")
+ res2 = checkApplicationQuota("app1")
+ res3 = checkApplicationQuota("app2")
assert(res1 == failed)
assert(res2 == CheckQuotaResponse(
@@ -282,14 +286,16 @@ class QuotaManagerSuite extends CelebornFunSuite
Utils.byteStringAsBytes("2G"),
20)).asJava)
+ statusSystem.updateApplicationInfo("app1", user, extraInfo)
+ statusSystem.updateApplicationInfo("app2", user, extraInfo)
addUserConsumption(user, rc)
conf.set("celeborn.quota.cluster.diskBytesWritten", "20gb")
configService.refreshCache()
quotaManager.updateResourceConsumption()
res1 = checkUserQuota(user)
- res2 = checkApplicationQuota(user, "app1")
- res3 = checkApplicationQuota(user, "app2")
+ res2 = checkApplicationQuota("app1")
+ res3 = checkApplicationQuota("app2")
assert(res1 == CheckQuotaResponse(
false,
@@ -315,7 +321,7 @@ class QuotaManagerSuite extends CelebornFunSuite
}
test("test handleResourceConsumption time - case1") {
- // 1000 users 100wapplications, all exceeded
+ // 1000 users 100w applications, all exceeded
conf.set("celeborn.quota.tenant.diskBytesWritten", "1mb")
conf.set("celeborn.quota.cluster.diskBytesWritten", "1mb")
configService.refreshCache()
@@ -332,6 +338,7 @@ class QuotaManagerSuite extends CelebornFunSuite
MIN + Math.abs(random.nextLong()) % (MAX - MIN),
MIN + Math.abs(random.nextLong()) % (MAX - MIN),
MIN + Math.abs(random.nextLong()) % (MAX - MIN))
+ statusSystem.updateApplicationInfo(appId, user, extraInfo)
(appId, consumption)
}.toMap
val userConsumption = subResourceConsumption.values.foldRight(
@@ -340,10 +347,7 @@ class QuotaManagerSuite extends CelebornFunSuite
addUserConsumption(user, userConsumption)
}
- val start = System.currentTimeMillis()
quotaManager.updateResourceConsumption()
- val duration = System.currentTimeMillis() - start
- print(s"duration=$duration")
val res = resourceConsumptionSource.getMetrics
for (i <- 0 until 1000) {
@@ -386,6 +390,7 @@ class QuotaManagerSuite extends CelebornFunSuite
MIN + Math.abs(random.nextLong()) % (MAX - MIN),
MIN + Math.abs(random.nextLong()) % (MAX - MIN),
MIN + Math.abs(random.nextLong()) % (MAX - MIN))
+ statusSystem.updateApplicationInfo(appId, user, extraInfo)
(appId, consumption)
}.toMap
} else {
@@ -393,6 +398,7 @@ class QuotaManagerSuite extends CelebornFunSuite
index =>
val appId = s"$user$i case2_app$index"
val consumption = ResourceConsumption(0, 0, 0, 0)
+ statusSystem.updateApplicationInfo(appId, user, extraInfo)
(appId, consumption)
}.toMap
}
@@ -402,10 +408,7 @@ class QuotaManagerSuite extends CelebornFunSuite
addUserConsumption(user, userConsumption)
}
- val start = System.currentTimeMillis()
quotaManager.updateResourceConsumption()
- val duration = System.currentTimeMillis() - start
- print(s"duration=$duration")
val res = resourceConsumptionSource.getMetrics
for (i <- 0 until 1000) {
@@ -492,6 +495,9 @@ class QuotaManagerSuite extends CelebornFunSuite
0,
0)).asJava)
+ statusSystem1.updateApplicationInfo("app1", user, extraInfo)
+ statusSystem1.updateApplicationInfo("app2", user, extraInfo)
+ statusSystem1.updateApplicationInfo("app3", user, extraInfo)
addUserConsumption(user, rc)
addUserConsumption(user1, rc1)
@@ -607,6 +613,9 @@ class QuotaManagerSuite extends CelebornFunSuite
0,
0)).asJava)
+ statusSystem1.updateApplicationInfo("app1", user1, extraInfo)
+ statusSystem1.updateApplicationInfo("app2", user1, extraInfo)
+ statusSystem1.updateApplicationInfo("app3", user2, extraInfo)
addUserConsumption(user1, rc1)
addUserConsumption(user2, rc2)
@@ -677,7 +686,6 @@ class QuotaManagerSuite extends CelebornFunSuite
}
def checkApplicationQuota(
- userIdentifier: UserIdentifier,
applicationId: String): CheckQuotaResponse = {
quotaManager.checkApplicationQuotaStatus(applicationId)
}