This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 9a552b42 [ISSUE-376] Fix concurrency problems may occur when the
ApplicationManager register app (#382)
9a552b42 is described below
commit 9a552b42f3b5073600b995d0da38204c2fe65e58
Author: jokercurry <[email protected]>
AuthorDate: Sat Dec 3 21:21:06 2022 +0800
[ISSUE-376] Fix concurrency problems may occur when the ApplicationManager
register app (#382)
### What changes were proposed in this pull request?
Lock when reading and writing the map `appAndTimes`.
### Why are the changes needed?
To resolve #376 .
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Passed origin uts.
---
.../uniffle/coordinator/AccessQuotaChecker.java | 25 ++++++----------
.../uniffle/coordinator/ApplicationManager.java | 11 ++++----
.../apache/uniffle/coordinator/QuotaManager.java | 27 ++++++++++++++++++
.../uniffle/coordinator/QuotaManagerTest.java | 33 ++++++++++++++++++++++
4 files changed, 73 insertions(+), 23 deletions(-)
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
index edaa6031..78923ab8 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
@@ -18,10 +18,8 @@
package org.apache.uniffle.coordinator;
import java.io.IOException;
-import java.util.Map;
import java.util.concurrent.atomic.LongAdder;
-import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,21 +50,14 @@ public class AccessQuotaChecker extends
AbstractAccessChecker {
final String uuid = hostIp.hashCode() + "-" + COUNTER.sum();
final String user = accessInfo.getUser();
// low version client user attribute is an empty string
- if (!"".equals(user)) {
- Map<String, Map<String, Long>> currentUserApps =
quotaManager.getCurrentUserAndApp();
- Map<String, Long> appAndTimes = currentUserApps.computeIfAbsent(user, x
-> Maps.newConcurrentMap());
- Integer defaultAppNum =
quotaManager.getDefaultUserApps().getOrDefault(user,
- conf.getInteger(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM));
- int currentAppNum = appAndTimes.size();
- if (currentAppNum >= defaultAppNum) {
- String msg = "Denied by AccessClusterLoadChecker => "
- + "User: " + user + ", current app num is: " + currentAppNum
- + ", default app num is: " + defaultAppNum + ". We will reject
this app[uuid=" + uuid + "].";
- LOG.error(msg);
- CoordinatorMetrics.counterTotalQuotaDeniedRequest.inc();
- return new AccessCheckResult(false, msg);
- }
- appAndTimes.put(uuid, System.currentTimeMillis());
+ if (!"".equals(user) && quotaManager.checkQuota(user, uuid)) {
+ String msg = "Denied by AccessQuotaChecker => "
+ + "User: " + user + ", current app num is: " +
quotaManager.getCurrentUserAndApp().get(user).size()
+ + ", default app num is: " +
quotaManager.getDefaultUserApps().get(user)
+ + ". We will reject this app[uuid=" + uuid + "].";
+ LOG.error(msg);
+ CoordinatorMetrics.counterTotalQuotaDeniedRequest.inc();
+ return new AccessCheckResult(false, msg);
}
return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE, uuid);
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
index 19c88ac2..73d21683 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
@@ -106,12 +106,11 @@ public class ApplicationManager {
CoordinatorMetrics.counterTotalAppNum.inc();
LOG.info("New application is registered: {}", appId);
}
- long currentTimeMillis = System.currentTimeMillis();
- String[] appIdAndUuid = appId.split("_");
- String uuidFromApp = appIdAndUuid[appIdAndUuid.length - 1];
- // if appId created successfully, we need to remove the uuid
- appAndTime.remove(uuidFromApp);
- appAndTime.put(appId, currentTimeMillis);
+ if (quotaManager != null) {
+ quotaManager.registerApplicationInfo(appId, appAndTime);
+ } else {
+ appAndTime.put(appId, System.currentTimeMillis());
+ }
}
public void refreshAppId(String appId) {
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
index 266aff40..69deefee 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
@@ -49,12 +49,14 @@ public class QuotaManager {
private final Map<String, Map<String, Long>> currentUserAndApp =
Maps.newConcurrentMap();
private final Map<String, String> appIdToUser = Maps.newConcurrentMap();
private final String quotaFilePath;
+ private final Integer quotaAppNum;
private FileSystem hadoopFileSystem;
private final AtomicLong quotaFileLastModify = new AtomicLong(0L);
private final Map<String, Integer> defaultUserApps = Maps.newConcurrentMap();
public QuotaManager(CoordinatorConf conf) {
this.quotaFilePath =
conf.get(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH);
+ this.quotaAppNum =
conf.getInteger(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM);
if (quotaFilePath == null) {
LOG.warn("{} is not configured, each user will use the default quota :
{}",
CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH.key(),
@@ -110,6 +112,31 @@ public class QuotaManager {
}
}
+ public boolean checkQuota(String user, String uuid) {
+ Map<String, Long> appAndTimes = currentUserAndApp.computeIfAbsent(user, x
-> Maps.newConcurrentMap());
+ Integer defaultAppNum = defaultUserApps.getOrDefault(user, quotaAppNum);
+ synchronized (this) {
+ int currentAppNum = appAndTimes.size();
+ if (currentAppNum >= defaultAppNum) {
+ return true;
+ } else {
+ appAndTimes.put(uuid, System.currentTimeMillis());
+ return false;
+ }
+ }
+ }
+
+ public void registerApplicationInfo(String appId, Map<String, Long>
appAndTime) {
+ long currentTimeMillis = System.currentTimeMillis();
+ String[] appIdAndUuid = appId.split("_");
+ String uuidFromApp = appIdAndUuid[appIdAndUuid.length - 1];
+ // if appId created successfully, we need to remove the uuid
+ synchronized (this) {
+ appAndTime.remove(uuidFromApp);
+ appAndTime.put(appId, currentTimeMillis);
+ }
+ }
+
@VisibleForTesting
public Map<String, Integer> getDefaultUserApps() {
return defaultUserApps;
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
index 8611d4a6..3ead278e 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
@@ -20,6 +20,9 @@ package org.apache.uniffle.coordinator;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
@@ -33,6 +36,7 @@ import org.junit.jupiter.api.io.TempDir;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* QuotaManager is a manager for resource restriction.
@@ -100,4 +104,33 @@ public class QuotaManagerTest {
// it didn't detectUserResource because
`org.apache.unifle.coordinator.AccessQuotaChecker` is not configured
assertNull(applicationManager.getQuotaManager());
}
+
+ @Test
+ public void testCheckQuota() throws Exception {
+ final String quotaFile =
+ new
Path(remotePath.getAbsolutePath()).getFileSystem(hdfsConf).getName() +
"/quotaFile.properties";
+ CoordinatorConf conf = new CoordinatorConf();
+ conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH,
+ quotaFile);
+ final ApplicationManager applicationManager = new ApplicationManager(conf);
+ final AtomicInteger uuid = new AtomicInteger();
+ Map<String, Long> uuidAndTime = new ConcurrentHashMap<>();
+ uuidAndTime.put(String.valueOf(uuid.incrementAndGet()),
System.currentTimeMillis());
+ uuidAndTime.put(String.valueOf(uuid.incrementAndGet()),
System.currentTimeMillis());
+ uuidAndTime.put(String.valueOf(uuid.incrementAndGet()),
System.currentTimeMillis());
+ uuidAndTime.put(String.valueOf(uuid.incrementAndGet()),
System.currentTimeMillis());
+ final int i1 = uuid.incrementAndGet();
+ uuidAndTime.put(String.valueOf(i1), System.currentTimeMillis());
+ Map<String, Long> appAndTime =
applicationManager.getQuotaManager().getCurrentUserAndApp()
+ .computeIfAbsent("user1", x -> uuidAndTime);
+ // This thread may remove the uuid and put the appId in.
+ final Thread registerThread = new Thread(() ->
+
applicationManager.getQuotaManager().registerApplicationInfo("application_test_"
+ i1, appAndTime));
+ registerThread.start();
+ final boolean icCheck = applicationManager.getQuotaManager()
+ .checkQuota("user1", String.valueOf(i1));
+ registerThread.join();
+ assertTrue(icCheck);
+
assertEquals(applicationManager.getQuotaManager().getCurrentUserAndApp().get("user1").size(),
5);
+ }
}