This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 8ff41a58 [Feature] Support user's app quota level limit (#311)
8ff41a58 is described below
commit 8ff41a581a1f2c1a959f476903c83f51c37bb86f
Author: jokercurry <[email protected]>
AuthorDate: Tue Nov 22 09:07:07 2022 +0800
[Feature] Support user's app quota level limit (#311)
### What changes were proposed in this pull request?
For issue #211 and the design document
[https://docs.google.com/document/d/1MApSMFQgoS1VAoKbZjomqSRm0iTbSuKG1yvKNlWW65c/edit?usp=sharing](https://docs.google.com/document/d/1MApSMFQgoS1VAoKbZjomqSRm0iTbSuKG1yvKNlWW65c/edit?usp=sharing)
### Why are the changes needed?
Better isolation of resources between different users.
### Does this PR introduce _any_ user-facing change?
Add config `rss.coordinator.quota.default.app.num` to set default app
number each user and `rss.coordinator.quota.default.path` to set a path to
record the number of apps that each user can run.
### How was this patch tested?
Add uts.
---
.../hadoop/mapreduce/v2/app/RssMRAppMaster.java | 1 +
.../hadoop/mapred/SortWriteBufferManagerTest.java | 5 +
.../hadoop/mapreduce/task/reduce/FetcherTest.java | 5 +
.../spark/shuffle/DelegationRssShuffleManager.java | 18 ++-
.../apache/spark/shuffle/RssShuffleManager.java | 10 +-
.../spark/shuffle/DelegationRssShuffleManager.java | 18 ++-
.../apache/spark/shuffle/RssShuffleManager.java | 11 +-
.../uniffle/client/api/ShuffleWriteClient.java | 2 +
.../client/impl/ShuffleWriteClientImpl.java | 36 +++++-
.../org/apache/uniffle/common/util/RssUtils.java | 1 +
.../uniffle/coordinator/AccessCheckResult.java | 12 ++
.../org/apache/uniffle/coordinator/AccessInfo.java | 13 +-
.../apache/uniffle/coordinator/AccessManager.java | 19 ++-
.../uniffle/coordinator/AccessQuotaChecker.java | 78 ++++++++++++
.../uniffle/coordinator/ApplicationManager.java | 68 +++++++++--
.../uniffle/coordinator/CoordinatorConf.java | 18 ++-
.../coordinator/CoordinatorGrpcService.java | 30 ++++-
.../uniffle/coordinator/CoordinatorMetrics.java | 3 +
.../uniffle/coordinator/CoordinatorServer.java | 2 +-
.../apache/uniffle/coordinator/QuotaManager.java | 115 ++++++++++++++++++
.../coordinator/AccessCandidatesCheckerTest.java | 8 +-
.../coordinator/AccessClusterLoadCheckerTest.java | 15 +--
.../uniffle/coordinator/AccessManagerTest.java | 13 +-
.../coordinator/AccessQuotaCheckerTest.java | 135 +++++++++++++++++++++
.../AppBalanceSelectStorageStrategyTest.java | 15 ++-
.../coordinator/ApplicationManagerTest.java | 3 +-
.../coordinator/CoordinatorMetricsTest.java | 2 +-
...owestIOSampleCostSelectStorageStrategyTest.java | 19 +--
.../uniffle/coordinator/QuotaManagerTest.java | 89 ++++++++++++++
.../test/AccessCandidatesCheckerHdfsTest.java | 9 +-
.../org/apache/uniffle/test/AccessClusterTest.java | 29 +++--
.../uniffle/test/CoordinatorGrpcServerTest.java | 17 +--
.../apache/uniffle/test/CoordinatorGrpcTest.java | 21 ++--
.../apache/uniffle/test/FetchClientConfTest.java | 14 ++-
.../apache/uniffle/test/ShuffleServerGrpcTest.java | 27 +++--
.../uniffle/client/api/CoordinatorClient.java | 4 +
.../client/impl/grpc/CoordinatorGrpcClient.java | 34 +++++-
.../client/request/RssAccessClusterRequest.java | 12 +-
.../client/request/RssAppHeartBeatRequest.java | 1 +
...Request.java => RssApplicationInfoRequest.java} | 10 +-
.../client/response/RssAccessClusterResponse.java | 11 ++
...sponse.java => RssApplicationInfoResponse.java} | 6 +-
proto/src/main/proto/Rss.proto | 19 ++-
43 files changed, 846 insertions(+), 132 deletions(-)
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index d891452d..02c63527 100644
---
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -239,6 +239,7 @@ public class RssMRAppMaster extends MRAppMaster {
long heartbeatInterval = conf.getLong(RssMRConfig.RSS_HEARTBEAT_INTERVAL,
RssMRConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE);
long heartbeatTimeout = conf.getLong(RssMRConfig.RSS_HEARTBEAT_TIMEOUT,
heartbeatInterval / 2);
+ client.registerApplicationInfo(appId, heartbeatTimeout, "user");
scheduledExecutorService.scheduleAtFixedRate(
() -> {
try {
diff --git
a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
index f5c834b2..30267515 100644
---
a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
+++
b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
@@ -285,6 +285,11 @@ public class SortWriteBufferManagerTest {
}
+ @Override
+ public void registerApplicationInfo(String appId, long timeoutMs, String
user) {
+
+ }
+
@Override
public void registerShuffle(
ShuffleServerInfo shuffleServerInfo,
diff --git
a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
index 5f05b560..04e816c2 100644
---
a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
+++
b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
@@ -381,6 +381,11 @@ public class FetcherTest {
}
+ @Override
+ public void registerApplicationInfo(String appId, long timeoutMs, String
user) {
+
+ }
+
@Override
public void registerShuffle(
ShuffleServerInfo shuffleServerInfo,
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 6182ecb3..cf9eb8af 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -24,6 +24,7 @@ import java.util.Set;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
@@ -48,6 +49,8 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
private final List<CoordinatorClient> coordinatorClients;
private final int accessTimeoutMs;
private final SparkConf sparkConf;
+ private String user;
+ private String uuid;
public DelegationRssShuffleManager(SparkConf sparkConf, boolean isDriver)
throws Exception {
this.sparkConf = sparkConf;
@@ -67,10 +70,20 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
private ShuffleManager createShuffleManagerInDriver() throws RssException {
ShuffleManager shuffleManager;
-
+ user = "user";
+ try {
+ user = UserGroupInformation.getCurrentUser().getShortUserName();
+ } catch (Exception e) {
+ LOG.error("Error on getting user from ugi." + e);
+ }
boolean canAccess = tryAccessCluster();
+ if (uuid == null || "".equals(uuid)) {
+ uuid = String.valueOf(System.currentTimeMillis());
+ }
if (canAccess) {
try {
+ sparkConf.set("spark.rss.quota.user", user);
+ sparkConf.set("spark.rss.quota.uuid", uuid);
shuffleManager = new RssShuffleManager(sparkConf, true);
sparkConf.set(RssSparkConfig.RSS_ENABLED.key(), "true");
sparkConf.set("spark.shuffle.manager",
RssShuffleManager.class.getCanonicalName());
@@ -113,9 +126,10 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
try {
canAccess = RetryUtils.retry(() -> {
RssAccessClusterResponse response =
coordinatorClient.accessCluster(new RssAccessClusterRequest(
- accessId, assignmentTags, accessTimeoutMs, extraProperties));
+ accessId, assignmentTags, accessTimeoutMs, extraProperties,
user));
if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
LOG.warn("Success to access cluster {} using {}",
coordinatorClient.getDesc(), accessId);
+ uuid = response.getUuid();
return true;
} else if (response.getStatusCode() ==
ResponseStatusCode.ACCESS_DENIED) {
throw new RssException("Request to access cluster " +
coordinatorClient.getDesc() + " is denied using "
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index c045f399..e71ac82b 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -88,6 +88,8 @@ public class RssShuffleManager implements ShuffleManager {
private final int dataCommitPoolSize;
private boolean heartbeatStarted = false;
private boolean dynamicConfEnabled = false;
+ private final String user;
+ private final String uuid;
private ThreadPoolExecutor threadPoolExecutor;
private EventLoop eventLoop = new
EventLoop<AddBlockEvent>("ShuffleDataQueue") {
@@ -142,7 +144,8 @@ public class RssShuffleManager implements ShuffleManager {
throw new IllegalArgumentException("Spark2 doesn't support AQE,
spark.sql.adaptive.enabled should be false.");
}
this.sparkConf = sparkConf;
-
+ this.user = sparkConf.get("spark.rss.quota.user", "user");
+ this.uuid = sparkConf.get("spark.rss.quota.uuid",
Long.toString(System.currentTimeMillis()));
// set & check replica config
this.dataReplica = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA);
this.dataReplicaWrite =
sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_WRITE);
@@ -217,12 +220,12 @@ public class RssShuffleManager implements ShuffleManager {
}
// If yarn enable retry ApplicationMaster, appId will be not unique and
shuffle data will be incorrect,
- // appId + timestamp can avoid such problem,
+ // appId + uuid can avoid such problem,
// can't get appId in construct because SparkEnv is not created yet,
// appId will be initialized only once in this method which
// will be called many times depend on how many shuffle stage
if ("".equals(appId)) {
- appId = SparkEnv.get().conf().getAppId() + "_" +
System.currentTimeMillis();
+ appId = SparkEnv.get().conf().getAppId() + "_" + uuid;
LOG.info("Generate application id used in rss: " + appId);
}
@@ -273,6 +276,7 @@ public class RssShuffleManager implements ShuffleManager {
}
private void startHeartbeat() {
+ shuffleWriteClient.registerApplicationInfo(appId, heartbeatTimeout, user);
if (!sparkConf.getBoolean(RssSparkConfig.RSS_TEST_FLAG.key(), false) &&
!heartbeatStarted) {
heartBeatScheduledExecutorService.scheduleAtFixedRate(
() -> {
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 5078c708..79016a6d 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -24,6 +24,7 @@ import java.util.Set;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
@@ -48,6 +49,8 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
private final List<CoordinatorClient> coordinatorClients;
private final int accessTimeoutMs;
private final SparkConf sparkConf;
+ private String user;
+ private String uuid;
public DelegationRssShuffleManager(SparkConf sparkConf, boolean isDriver)
throws Exception {
this.sparkConf = sparkConf;
@@ -67,10 +70,20 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
private ShuffleManager createShuffleManagerInDriver() throws RssException {
ShuffleManager shuffleManager;
-
+ user = "user";
+ try {
+ user = UserGroupInformation.getCurrentUser().getShortUserName();
+ } catch (Exception e) {
+ LOG.error("Error on getting user from ugi." + e);
+ }
boolean canAccess = tryAccessCluster();
+ if (uuid == null || "".equals(uuid)) {
+ uuid = String.valueOf(System.currentTimeMillis());
+ }
if (canAccess) {
try {
+ sparkConf.set("spark.rss.quota.user", user);
+ sparkConf.set("spark.rss.quota.uuid", uuid);
shuffleManager = new RssShuffleManager(sparkConf, true);
sparkConf.set(RssSparkConfig.RSS_ENABLED.key(), "true");
sparkConf.set("spark.shuffle.manager",
RssShuffleManager.class.getCanonicalName());
@@ -113,9 +126,10 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
try {
canAccess = RetryUtils.retry(() -> {
RssAccessClusterResponse response =
coordinatorClient.accessCluster(new RssAccessClusterRequest(
- accessId, assignmentTags, accessTimeoutMs, extraProperties));
+ accessId, assignmentTags, accessTimeoutMs, extraProperties,
user));
if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
LOG.warn("Success to access cluster {} using {}",
coordinatorClient.getDesc(), accessId);
+ uuid = response.getUuid();
return true;
} else if (response.getStatusCode() ==
ResponseStatusCode.ACCESS_DENIED) {
throw new RssException("Request to access cluster " +
coordinatorClient.getDesc() + " is denied using "
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 1691dfe4..0ceb53fa 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -96,6 +96,8 @@ public class RssShuffleManager implements ShuffleManager {
private boolean heartbeatStarted = false;
private boolean dynamicConfEnabled = false;
private final ShuffleDataDistributionType dataDistributionType;
+ private String user;
+ private String uuid;
private final EventLoop eventLoop;
private final EventLoop defaultEventLoop = new
EventLoop<AddBlockEvent>("ShuffleDataQueue") {
@@ -144,7 +146,8 @@ public class RssShuffleManager implements ShuffleManager {
public RssShuffleManager(SparkConf conf, boolean isDriver) {
this.sparkConf = conf;
-
+ this.user = sparkConf.get("spark.rss.quota.user", "user");
+ this.uuid = sparkConf.get("spark.rss.quota.uuid",
Long.toString(System.currentTimeMillis()));
// set & check replica config
this.dataReplica = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA);
this.dataReplicaWrite =
sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_WRITE);
@@ -278,7 +281,7 @@ public class RssShuffleManager implements ShuffleManager {
}
if (id.get() == null) {
- id.compareAndSet(null, SparkEnv.get().conf().getAppId() + "_" +
System.currentTimeMillis());
+ id.compareAndSet(null, SparkEnv.get().conf().getAppId() + "_" + uuid);
}
LOG.info("Generate application id used in rss: " + id.get());
@@ -674,11 +677,13 @@ public class RssShuffleManager implements ShuffleManager {
}
private synchronized void startHeartbeat() {
+ shuffleWriteClient.registerApplicationInfo(id.get(), heartbeatTimeout,
user);
if (!heartbeatStarted) {
heartBeatScheduledExecutorService.scheduleAtFixedRate(
() -> {
try {
- shuffleWriteClient.sendAppHeartbeat(id.get(), heartbeatTimeout);
+ String appId = id.get();
+ shuffleWriteClient.sendAppHeartbeat(appId, heartbeatTimeout);
LOG.info("Finish send heartbeat to coordinator and servers");
} catch (Exception e) {
LOG.warn("Fail to send heartbeat to coordinator and servers", e);
diff --git
a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
index 9776678c..f2443bdb 100644
--- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
+++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
@@ -37,6 +37,8 @@ public interface ShuffleWriteClient {
void sendAppHeartbeat(String appId, long timeoutMs);
+ void registerApplicationInfo(String appId, long timeoutMs, String user);
+
void registerShuffle(
ShuffleServerInfo shuffleServerInfo,
String appId,
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 3bccd467..80096a9c 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -45,6 +45,7 @@ import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.CoordinatorClientFactory;
import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
import org.apache.uniffle.client.request.RssAppHeartBeatRequest;
+import org.apache.uniffle.client.request.RssApplicationInfoRequest;
import org.apache.uniffle.client.request.RssFetchClientConfRequest;
import org.apache.uniffle.client.request.RssFetchRemoteStorageRequest;
import org.apache.uniffle.client.request.RssFinishShuffleRequest;
@@ -59,6 +60,7 @@ import
org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
import org.apache.uniffle.client.response.ClientResponse;
import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
+import org.apache.uniffle.client.response.RssApplicationInfoResponse;
import org.apache.uniffle.client.response.RssFetchClientConfResponse;
import org.apache.uniffle.client.response.RssFetchRemoteStorageResponse;
import org.apache.uniffle.client.response.RssFinishShuffleResponse;
@@ -549,6 +551,37 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
return blockIdBitmap;
}
+ @Override
+ public void registerApplicationInfo(String appId, long timeoutMs, String
user) {
+ RssApplicationInfoRequest request = new RssApplicationInfoRequest(appId,
timeoutMs, user);
+ List<Callable<Void>> callableList = Lists.newArrayList();
+ coordinatorClients.forEach(coordinatorClient -> {
+ callableList.add(() -> {
+ try {
+ RssApplicationInfoResponse response =
coordinatorClient.registerApplicationInfo(request);
+ if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+ LOG.error("Failed to send applicationInfo to " +
coordinatorClient.getDesc());
+ } else {
+ LOG.info("Successfully send applicationInfo to " +
coordinatorClient.getDesc());
+ }
+ } catch (Exception e) {
+ LOG.warn("Error happened when send applicationInfo to " +
coordinatorClient.getDesc(), e);
+ }
+ return null;
+ });
+ });
+ try {
+ List<Future<Void>> futures =
heartBeatExecutorService.invokeAll(callableList, timeoutMs,
TimeUnit.MILLISECONDS);
+ for (Future<Void> future : futures) {
+ if (!future.isDone()) {
+ future.cancel(true);
+ }
+ }
+ } catch (InterruptedException ie) {
+ LOG.warn("register application is interrupted", ie);
+ }
+ }
+
@Override
public void sendAppHeartbeat(String appId, long timeoutMs) {
RssAppHeartBeatRequest request = new RssAppHeartBeatRequest(appId,
timeoutMs);
@@ -571,7 +604,7 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
}
);
- coordinatorClients.stream().forEach(coordinatorClient -> {
+ coordinatorClients.forEach(coordinatorClient -> {
callableList.add(() -> {
try {
RssAppHeartBeatResponse response =
coordinatorClient.sendAppHeartBeat(request);
@@ -683,7 +716,6 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
return
ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType,
shuffleServerInfo);
}
- @VisibleForTesting
void addShuffleServer(String appId, int shuffleId, ShuffleServerInfo
serverInfo) {
Map<Integer, Set<ShuffleServerInfo>> appServerMap =
shuffleServerInfoMap.get(appId);
if (appServerMap == null) {
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 16e21da3..5319444e 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -194,6 +194,7 @@ public class RssUtils {
List<T> extensions = Lists.newArrayList();
for (String name : classes) {
+ name = name.trim();
try {
Class<?> klass = Class.forName(name);
if (!extClass.isAssignableFrom(klass)) {
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCheckResult.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCheckResult.java
index 8aa3f6c1..3e1d8bc8 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCheckResult.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCheckResult.java
@@ -21,10 +21,18 @@ public class AccessCheckResult {
private final boolean success;
private final String msg;
+ private final String uuid;
+
+ public AccessCheckResult(boolean success, String msg, String uuid) {
+ this.success = success;
+ this.msg = msg;
+ this.uuid = uuid;
+ }
public AccessCheckResult(boolean success, String msg) {
this.success = success;
this.msg = msg;
+ this.uuid = "";
}
public boolean isSuccess() {
@@ -34,4 +42,8 @@ public class AccessCheckResult {
public String getMsg() {
return msg;
}
+
+ public String getUuid() {
+ return uuid;
+ }
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessInfo.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessInfo.java
index 8200fa70..f07bc2e7 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessInfo.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessInfo.java
@@ -21,21 +21,25 @@ import java.util.Collections;
import java.util.Map;
import java.util.Set;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
public class AccessInfo {
private final String accessId;
private final Set<String> tags;
private final Map<String, String> extraProperties;
+ private final String user;
- public AccessInfo(String accessId, Set<String> tags, Map<String, String>
extraProperties) {
+ public AccessInfo(String accessId, Set<String> tags, Map<String, String>
extraProperties, String user) {
this.accessId = accessId;
this.tags = tags;
this.extraProperties = extraProperties == null ? Collections.emptyMap() :
extraProperties;
+ this.user = user;
}
+ @VisibleForTesting
public AccessInfo(String accessId) {
- this(accessId, Sets.newHashSet(), Collections.emptyMap());
+ this(accessId, Sets.newHashSet(), Collections.emptyMap(), "");
}
public String getAccessId() {
@@ -50,10 +54,15 @@ public class AccessInfo {
return extraProperties;
}
+ public String getUser() {
+ return user;
+ }
+
@Override
public String toString() {
return "AccessInfo{"
+ "accessId='" + accessId + '\''
+ + ", user= " + user
+ ", tags=" + tags
+ ", extraProperties=" + extraProperties
+ '}';
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
index a41cd62e..04fddc36 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
@@ -35,15 +35,19 @@ public class AccessManager {
private final CoordinatorConf coordinatorConf;
private final ClusterManager clusterManager;
+ private final ApplicationManager applicationManager;
private final Configuration hadoopConf;
private List<AccessChecker> accessCheckers = Lists.newArrayList();
public AccessManager(
- CoordinatorConf conf, ClusterManager clusterManager,
- Configuration hadoopConf) throws Exception {
+ CoordinatorConf conf,
+ ClusterManager clusterManager,
+ ApplicationManager applicationManager,
+ Configuration hadoopConf) throws Exception {
this.coordinatorConf = conf;
this.clusterManager = clusterManager;
this.hadoopConf = hadoopConf;
+ this.applicationManager = applicationManager;
init();
}
@@ -58,15 +62,20 @@ public class AccessManager {
}
public AccessCheckResult handleAccessRequest(AccessInfo accessInfo) {
+ String uuid = "";
CoordinatorMetrics.counterTotalAccessRequest.inc();
for (AccessChecker checker : accessCheckers) {
AccessCheckResult accessCheckResult = checker.check(accessInfo);
if (!accessCheckResult.isSuccess()) {
return accessCheckResult;
}
+ String resultUuid = accessCheckResult.getUuid();
+ if (!"".equals(resultUuid)) {
+ uuid = resultUuid;
+ }
}
- return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE);
+ return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE, uuid);
}
public CoordinatorConf getCoordinatorConf() {
@@ -85,6 +94,10 @@ public class AccessManager {
return hadoopConf;
}
+ public ApplicationManager getApplicationManager() {
+ return applicationManager;
+ }
+
public void close() throws IOException {
for (AccessChecker checker : accessCheckers) {
checker.close();
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
new file mode 100644
index 00000000..91b9ceab
--- /dev/null
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.LongAdder;
+
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RssUtils;
+
+/**
+ * This checker limits the number of apps that different users can submit.
+ */
+public class AccessQuotaChecker extends AbstractAccessChecker {
+ private static final Logger LOG =
LoggerFactory.getLogger(AccessQuotaChecker.class);
+
+ private final ApplicationManager applicationManager;
+ private final CoordinatorConf conf;
+ private static final LongAdder COUNTER = new LongAdder();
+ private final String hostIp;
+
+ public AccessQuotaChecker(AccessManager accessManager) throws Exception {
+ super(accessManager);
+ conf = accessManager.getCoordinatorConf();
+ applicationManager = accessManager.getApplicationManager();
+ hostIp = RssUtils.getHostIp();
+ }
+
+ @Override
+ public AccessCheckResult check(AccessInfo accessInfo) {
+ COUNTER.increment();
+ final String uuid = hostIp.hashCode() + "-" + COUNTER.sum();
+ final String user = accessInfo.getUser();
+ // low version client user attribute is an empty string
+ if (!"".equals(user)) {
+ Map<String, Map<String, Long>> currentUserApps =
applicationManager.getCurrentUserApps();
+ Map<String, Long> appAndTimes = currentUserApps.computeIfAbsent(user, x
-> Maps.newConcurrentMap());
+ Integer defaultAppNum =
applicationManager.getDefaultUserApps().getOrDefault(user,
+ conf.getInteger(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM));
+ int currentAppNum = appAndTimes.size();
+ if (currentAppNum >= defaultAppNum) {
+ String msg = "Denied by AccessClusterLoadChecker => "
+ + "User: " + user + ", current app num is: " + currentAppNum
+ + ", default app num is: " + defaultAppNum + ". We will reject
this app[uuid=" + uuid + "].";
+ LOG.error(msg);
+ CoordinatorMetrics.counterTotalQuotaDeniedRequest.inc();
+ return new AccessCheckResult(false, msg);
+ }
+ appAndTimes.put(uuid, System.currentTimeMillis());
+ }
+ return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE, uuid);
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
index 71561af9..32c87eba 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
@@ -46,7 +46,6 @@ public class ApplicationManager {
public static final List<String> REMOTE_PATH_SCHEMA = Arrays.asList("hdfs");
private final long expired;
private final StrategyName storageStrategy;
- private final Map<String, Long> appIds = Maps.newConcurrentMap();
private final SelectStorageStrategy selectStorageStrategy;
// store appId -> remote path to make sure all shuffle data of the same
application
// will be written to the same remote storage
@@ -55,6 +54,9 @@ public class ApplicationManager {
private final Map<String, RankValue> remoteStoragePathRankValue;
private final Map<String, String> remoteStorageToHost =
Maps.newConcurrentMap();
private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
+ private final Map<String, Map<String, Long>> currentUserAndApp;
+ private final Map<String, String> appIdToUser;
+ private final Map<String, Integer> defaultUserApps;
// it's only for test case to check if status check has problem
private boolean hasErrorInStatusCheck = false;
@@ -73,6 +75,10 @@ public class ApplicationManager {
throw new UnsupportedOperationException("Unsupported selected storage
strategy.");
}
expired = conf.getLong(CoordinatorConf.COORDINATOR_APP_EXPIRED);
+ QuotaManager quotaManager = new QuotaManager(conf);
+ this.currentUserAndApp = quotaManager.getCurrentUserAndApp();
+ this.appIdToUser = quotaManager.getAppIdToUser();
+ this.defaultUserApps = quotaManager.getDefaultUserApps();
// the thread for checking application status
ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
ThreadUtils.getThreadFactory("ApplicationManager-%d"));
@@ -86,12 +92,32 @@ public class ApplicationManager {
conf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME),
TimeUnit.MILLISECONDS);
}
- public void refreshAppId(String appId) {
- if (!appIds.containsKey(appId)) {
+ public void registerApplicationInfo(String appId, String user) {
+ // using computeIfAbsent is just for MR and spark which is used
RssShuffleManager as implementation class
+ // in such case by default, there is no currentUserAndApp, so a unified
user implementation named "user" is used.
+ Map<String, Long> appAndTime = currentUserAndApp.computeIfAbsent(user, x
-> Maps.newConcurrentMap());
+ appIdToUser.put(appId, user);
+ if (!appAndTime.containsKey(appId)) {
CoordinatorMetrics.counterTotalAppNum.inc();
LOG.info("New application is registered: {}", appId);
}
- appIds.put(appId, System.currentTimeMillis());
+ long currentTimeMillis = System.currentTimeMillis();
+ String[] appIdAndUuid = appId.split("_");
+ String uuidFromApp = appIdAndUuid[appIdAndUuid.length - 1];
+ // if appId created successfully, we need to remove the uuid
+ appAndTime.remove(uuidFromApp);
+ appAndTime.put(appId, currentTimeMillis);
+ }
+
+ public void refreshAppId(String appId) {
+ String user = appIdToUser.get(appId);
+ // compatible with lower version clients
+ if (user == null) {
+ registerApplicationInfo(appId, "");
+ } else {
+ Map<String, Long> appAndTime = currentUserAndApp.get(user);
+ appAndTime.put(appId, System.currentTimeMillis());
+ }
}
public void refreshRemoteStorage(String remoteStoragePath, String
remoteStorageConf) {
@@ -190,7 +216,7 @@ public class ApplicationManager {
}
public Set<String> getAppIds() {
- return appIds.keySet();
+ return appIdToUser.keySet();
}
@VisibleForTesting
@@ -219,16 +245,26 @@ public class ApplicationManager {
}
private void statusCheck() {
+ List<Map<String, Long>> appAndNums =
Lists.newArrayList(currentUserAndApp.values());
+ Map<String, Long> appIds = Maps.newHashMap();
+ // The reason for setting an expired uuid here is that there is a scenario
where accessCluster succeeds,
+ // but the registration of shuffle fails, resulting in no normal
heartbeat, and no normal update of uuid to appId.
+ // Therefore, an expiration time is set to automatically remove expired
uuids
+ Set<String> expiredAppIds = Sets.newHashSet();
try {
- LOG.info("Start to check status for " + appIds.size() + " applications");
- long current = System.currentTimeMillis();
- Set<String> expiredAppIds = Sets.newHashSet();
- for (Map.Entry<String, Long> entry : appIds.entrySet()) {
- long lastReport = entry.getValue();
- if (current - lastReport > expired) {
- expiredAppIds.add(entry.getKey());
+ for (Map<String, Long> appAndTimes : appAndNums) {
+ for (Map.Entry<String, Long> appAndTime : appAndTimes.entrySet()) {
+ String appId = appAndTime.getKey();
+ long lastReport = appAndTime.getValue();
+ appIds.put(appId, lastReport);
+ if (System.currentTimeMillis() - lastReport > expired) {
+ expiredAppIds.add(appId);
+ appAndTimes.remove(appId);
+ appIdToUser.remove(appId);
+ }
}
}
+ LOG.info("Start to check status for " + appIds.size() + " applications");
for (String appId : expiredAppIds) {
LOG.info("Remove expired application:" + appId);
appIds.remove(appId);
@@ -281,6 +317,14 @@ public class ApplicationManager {
return storageHost;
}
+ public Map<String, Integer> getDefaultUserApps() {
+ return defaultUserApps;
+ }
+
+ public Map<String, Map<String, Long>> getCurrentUserApps() {
+ return currentUserAndApp;
+ }
+
public enum StrategyName {
APP_BALANCE,
IO_SAMPLE
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index ae987656..c696dc83 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -77,7 +77,8 @@ public class CoordinatorConf extends RssBaseConf {
.key("rss.coordinator.access.checkers")
.stringType()
.asList()
- .defaultValues("org.apache.uniffle.coordinator.AccessClusterLoadChecker")
+ .defaultValues("org.apache.uniffle.coordinator.AccessClusterLoadChecker",
+ "org.apache.uniffle.coordinator.AccessQuotaChecker")
.withDescription("Access checkers");
public static final ConfigOption<Integer>
COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC = ConfigOptions
.key("rss.coordinator.access.candidates.updateIntervalSec")
@@ -178,6 +179,21 @@ public class CoordinatorConf extends RssBaseConf {
.enumType(AbstractAssignmentStrategy.SelectPartitionStrategyName.class)
.defaultValue(AbstractAssignmentStrategy.SelectPartitionStrategyName.ROUND)
.withDescription("Strategy for selecting partitions");
+ public static final ConfigOption<Integer> COORDINATOR_QUOTA_DEFAULT_APP_NUM
= ConfigOptions
+ .key("rss.coordinator.quota.default.app.num")
+ .intType()
+ .defaultValue(5)
+ .withDescription("Default number of apps at user level");
+ public static final ConfigOption<String> COORDINATOR_QUOTA_DEFAULT_PATH =
ConfigOptions
+ .key("rss.coordinator.quota.default.path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("A configuration file for the number of apps for a
user-defined user");
+ public static final ConfigOption<Long> COORDINATOR_QUOTA_UPDATE_INTERVAL =
ConfigOptions
+ .key("rss.coordinator.quota.update.interval")
+ .longType()
+ .defaultValue(60 * 1000L)
+ .withDescription("Update interval for the default number of submitted
apps per user");
public CoordinatorConf() {
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index e9cbb79b..75e3dce7 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -38,6 +38,8 @@ import
org.apache.uniffle.proto.RssProtos.AccessClusterRequest;
import org.apache.uniffle.proto.RssProtos.AccessClusterResponse;
import org.apache.uniffle.proto.RssProtos.AppHeartBeatRequest;
import org.apache.uniffle.proto.RssProtos.AppHeartBeatResponse;
+import org.apache.uniffle.proto.RssProtos.ApplicationInfoRequest;
+import org.apache.uniffle.proto.RssProtos.ApplicationInfoResponse;
import org.apache.uniffle.proto.RssProtos.CheckServiceAvailableResponse;
import org.apache.uniffle.proto.RssProtos.ClientConfItem;
import org.apache.uniffle.proto.RssProtos.FetchClientConfResponse;
@@ -218,6 +220,30 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
responseObserver.onCompleted();
}
+ @Override
+ public void registerApplicationInfo(
+ ApplicationInfoRequest request,
+ StreamObserver<ApplicationInfoResponse> responseObserver) {
+ String appId = request.getAppId();
+ String user = request.getUser();
+ coordinatorServer.getApplicationManager().registerApplicationInfo(appId,
user);
+ LOG.debug("Got a registered application info: " + appId);
+ ApplicationInfoResponse response = ApplicationInfoResponse
+ .newBuilder()
+ .setRetMsg("")
+ .setStatus(StatusCode.SUCCESS)
+ .build();
+
+ if (Context.current().isCancelled()) {
+ responseObserver.onError(Status.CANCELLED.withDescription("Cancelled by
client").asRuntimeException());
+ LOG.warn("Cancelled by client {} for after deadline.", appId);
+ return;
+ }
+
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ }
+
@Override
public void accessCluster(AccessClusterRequest request,
StreamObserver<AccessClusterResponse> responseObserver) {
StatusCode statusCode = StatusCode.SUCCESS;
@@ -228,7 +254,8 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
new AccessInfo(
request.getAccessId(),
Sets.newHashSet(request.getTagsList()),
- request.getExtraPropertiesMap()
+ request.getExtraPropertiesMap(),
+ request.getUser()
);
AccessCheckResult result = accessManager.handleAccessRequest(accessInfo);
if (!result.isSuccess()) {
@@ -239,6 +266,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
.newBuilder()
.setStatus(statusCode)
.setRetMsg(result.getMsg())
+ .setUuid(result.getUuid())
.build();
if (Context.current().isCancelled()) {
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java
index d7d19b89..10bc8b2f 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java
@@ -39,6 +39,7 @@ public class CoordinatorMetrics {
private static final String TOTAL_ACCESS_REQUEST = "total_access_request";
private static final String TOTAL_CANDIDATES_DENIED_REQUEST =
"total_candidates_denied_request";
private static final String TOTAL_LOAD_DENIED_REQUEST =
"total_load_denied_request";
+ private static final String TOTAL_QUOTA_DENIED_REQUEST =
"total_quota_denied_request";
public static final String REMOTE_STORAGE_IN_USED_PREFIX =
"remote_storage_in_used_";
static Gauge gaugeTotalServerNum;
@@ -48,6 +49,7 @@ public class CoordinatorMetrics {
static Counter counterTotalAppNum;
static Counter counterTotalAccessRequest;
static Counter counterTotalCandidatesDeniedRequest;
+ static Counter counterTotalQuotaDeniedRequest;
static Counter counterTotalLoadDeniedRequest;
static final Map<String, Gauge> gaugeInUsedRemoteStorage =
Maps.newConcurrentMap();
@@ -102,6 +104,7 @@ public class CoordinatorMetrics {
counterTotalAppNum = metricsManager.addCounter(TOTAL_APP_NUM);
counterTotalAccessRequest =
metricsManager.addCounter(TOTAL_ACCESS_REQUEST);
counterTotalCandidatesDeniedRequest =
metricsManager.addCounter(TOTAL_CANDIDATES_DENIED_REQUEST);
+ counterTotalQuotaDeniedRequest =
metricsManager.addCounter(TOTAL_QUOTA_DENIED_REQUEST);
counterTotalLoadDeniedRequest =
metricsManager.addCounter(TOTAL_LOAD_DENIED_REQUEST);
}
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index cc8c9e84..6a3a0166 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -143,7 +143,7 @@ public class CoordinatorServer {
AssignmentStrategyFactory assignmentStrategyFactory =
new AssignmentStrategyFactory(coordinatorConf, clusterManager);
this.assignmentStrategy =
assignmentStrategyFactory.getAssignmentStrategy();
- this.accessManager = new AccessManager(coordinatorConf, clusterManager,
hadoopConf);
+ this.accessManager = new AccessManager(coordinatorConf, clusterManager,
applicationManager, hadoopConf);
CoordinatorFactory coordinatorFactory = new CoordinatorFactory(this);
server = coordinatorFactory.getServer();
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
new file mode 100644
index 00000000..2d9c9fce
--- /dev/null
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * QuotaManager is a manager for resource restriction.
+ */
+public class QuotaManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(QuotaManager.class);
+ private final Map<String, Map<String, Long>> currentUserAndApp =
Maps.newConcurrentMap();
+ private final Map<String, String> appIdToUser = Maps.newConcurrentMap();
+ private final String quotaFilePath;
+ private FileSystem hadoopFileSystem;
+ private final AtomicLong quotaFileLastModify = new AtomicLong(0L);
+ private final Map<String, Integer> defaultUserApps = Maps.newConcurrentMap();
+
+ public QuotaManager(CoordinatorConf conf) {
+ this.quotaFilePath =
conf.get(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH);;
+ final Long updateTime =
conf.get(CoordinatorConf.COORDINATOR_QUOTA_UPDATE_INTERVAL);
+ try {
+ hadoopFileSystem = HadoopFilesystemProvider.getFilesystem(new
Path(quotaFilePath), new Configuration());
+ } catch (Exception e) {
+ LOG.error("Cannot init remoteFS on path : " + quotaFilePath, e);
+ }
+ // Threads that update the number of submitted applications
+ ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.getThreadFactory("UpdateDefaultApp-%d"));
+ scheduledExecutorService.scheduleAtFixedRate(
+ this::detectUserResource, 0, updateTime / 2, TimeUnit.MILLISECONDS);
+ }
+
+ public void detectUserResource() {
+ if (quotaFilePath != null && hadoopFileSystem != null) {
+ try {
+ Path hadoopPath = new Path(quotaFilePath);
+ FileStatus fileStatus = hadoopFileSystem.getFileStatus(hadoopPath);
+ if (fileStatus != null && fileStatus.isFile()) {
+ long latestModificationTime = fileStatus.getModificationTime();
+ if (quotaFileLastModify.get() != latestModificationTime) {
+ parseQuotaFile(hadoopFileSystem.open(hadoopPath));
+ quotaFileLastModify.set(latestModificationTime);
+ }
+ }
+ } catch (FileNotFoundException fileNotFoundException) {
+ LOG.error("Can't find this file {}", quotaFilePath);
+ } catch (Exception e) {
+ LOG.warn("Error when updating quotaFile, the exclude nodes file path:
{}", quotaFilePath);
+ }
+ }
+ }
+
+ public void parseQuotaFile(DataInputStream fsDataInputStream) {
+ String content;
+ try (BufferedReader bufferedReader =
+ new BufferedReader(new InputStreamReader(fsDataInputStream,
StandardCharsets.UTF_8))) {
+ while ((content = bufferedReader.readLine()) != null) {
+ String user = content.split(Constants.EQUAL_SPLIT_CHAR)[0].trim();
+ Integer appNum =
Integer.valueOf(content.split(Constants.EQUAL_SPLIT_CHAR)[1].trim());
+ defaultUserApps.put(user, appNum);
+ }
+ } catch (Exception e) {
+ LOG.error("Error occur when parsing file {}", quotaFilePath, e);
+ }
+ }
+
+ public Map<String, Integer> getDefaultUserApps() {
+ return defaultUserApps;
+ }
+
+ public Map<String, Map<String, Long>> getCurrentUserAndApp() {
+ return currentUserAndApp;
+ }
+
+ public Map<String, String> getAppIdToUser() {
+ return appIdToUser;
+ }
+}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessCandidatesCheckerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessCandidatesCheckerTest.java
index 03c7c1a0..b3a6b30a 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessCandidatesCheckerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessCandidatesCheckerTest.java
@@ -58,11 +58,11 @@ public class AccessCandidatesCheckerTest {
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH,
tempDir.toURI().toString());
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessCandidatesChecker");
-
+ final ApplicationManager applicationManager = new ApplicationManager(conf);
// file load checking at startup
Exception expectedException = null;
try {
- new AccessManager(conf, null, new Configuration());
+ new AccessManager(conf, null, applicationManager, new Configuration());
} catch (RuntimeException e) {
expectedException = e;
}
@@ -72,7 +72,7 @@ public class AccessCandidatesCheckerTest {
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH,
cfgFile.toURI().toString());
expectedException = null;
try {
- new AccessManager(conf, null, new Configuration());
+ new AccessManager(conf, null, applicationManager, new Configuration());
} catch (RuntimeException e) {
expectedException = e;
}
@@ -88,7 +88,7 @@ public class AccessCandidatesCheckerTest {
printWriter.println("2 ");
printWriter.flush();
printWriter.close();
- AccessManager accessManager = new AccessManager(conf, null, new
Configuration());
+ AccessManager accessManager = new AccessManager(conf, null,
applicationManager, new Configuration());
AccessCandidatesChecker checker = (AccessCandidatesChecker)
accessManager.getAccessCheckers().get(0);
sleep(1200);
assertEquals(Sets.newHashSet("2", "9527", "135"),
checker.getCandidates().get());
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessClusterLoadCheckerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessClusterLoadCheckerTest.java
index 18542162..f932c8f5 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessClusterLoadCheckerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessClusterLoadCheckerTest.java
@@ -86,8 +86,8 @@ public class AccessClusterLoadCheckerTest {
conf.set(COORDINATOR_ACCESS_CHECKERS,
Arrays.asList("org.apache.uniffle.coordinator.AccessClusterLoadChecker"));
conf.set(COORDINATOR_SHUFFLE_NODES_MAX, 3);
conf.set(COORDINATOR_ACCESS_LOADCHECKER_MEMORY_PERCENTAGE, 20.0);
-
- AccessManager accessManager = new AccessManager(conf, clusterManager, new
Configuration());
+ ApplicationManager applicationManager = new ApplicationManager(conf);
+ AccessManager accessManager = new AccessManager(conf, clusterManager,
applicationManager, new Configuration());
AccessClusterLoadChecker accessClusterLoadChecker =
(AccessClusterLoadChecker) accessManager.getAccessCheckers().get(0);
@@ -99,7 +99,7 @@ public class AccessClusterLoadCheckerTest {
*/
Map<String, String> properties = new HashMap<>();
properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "-1");
- AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(),
properties);
+ AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(),
properties, "user");
assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
/**
@@ -108,7 +108,7 @@ public class AccessClusterLoadCheckerTest {
* the COORDINATOR_SHUFFLE_NODES_MAX, it should return true
*/
properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "1");
- accessInfo = new AccessInfo("test", new HashSet<>(), properties);
+ accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
assertTrue(accessClusterLoadChecker.check(accessInfo).isSuccess());
/**
@@ -117,7 +117,7 @@ public class AccessClusterLoadCheckerTest {
* the COORDINATOR_SHUFFLE_NODES_MAX, it should return false
*/
properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "100");
- accessInfo = new AccessInfo("test", new HashSet<>(), properties);
+ accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
/**
@@ -126,7 +126,7 @@ public class AccessClusterLoadCheckerTest {
* default shuffle nodes max from coordinator conf.
*/
properties = new HashMap<>();
- accessInfo = new AccessInfo("test", new HashSet<>(), properties);
+ accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
}
@@ -150,7 +150,8 @@ public class AccessClusterLoadCheckerTest {
CoordinatorConf conf = new CoordinatorConf(filePath);
conf.setString(COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessClusterLoadChecker");
- AccessManager accessManager = new AccessManager(conf, clusterManager, new
Configuration());
+ ApplicationManager applicationManager = new ApplicationManager(conf);
+ AccessManager accessManager = new AccessManager(conf, clusterManager,
applicationManager, new Configuration());
AccessClusterLoadChecker accessClusterLoadChecker =
(AccessClusterLoadChecker) accessManager.getAccessCheckers().get(0);
when(clusterManager.getServerList(any())).thenReturn(serverNodeList);
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessManagerTest.java
index bc12ffe3..f519bcf7 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessManagerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessManagerTest.java
@@ -48,8 +48,9 @@ public class AccessManagerTest {
// test init
CoordinatorConf conf = new CoordinatorConf();
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), " , ");
+ ApplicationManager applicationManager = new ApplicationManager(conf);
try {
- new AccessManager(conf, null, new Configuration());
+ new AccessManager(conf, null, applicationManager, new Configuration());
} catch (RuntimeException e) {
String expectedMessage = "Empty classes";
assertTrue(e.getMessage().startsWith(expectedMessage));
@@ -57,30 +58,30 @@ public class AccessManagerTest {
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"com.Dummy,org.apache.uniffle.coordinator.AccessManagerTest$MockAccessChecker");
try {
- new AccessManager(conf, null, new Configuration());
+ new AccessManager(conf, null, applicationManager, new Configuration());
} catch (RuntimeException e) {
String expectedMessage = "java.lang.ClassNotFoundException: com.Dummy";
assertTrue(e.getMessage().startsWith(expectedMessage));
}
// test empty checkers
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), "");
- AccessManager accessManager = new AccessManager(conf, null, new
Configuration());
+ AccessManager accessManager = new AccessManager(conf, null,
applicationManager, new Configuration());
assertTrue(accessManager.handleAccessRequest(
new AccessInfo(String.valueOf(new Random().nextInt()),
- Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION),
Collections.emptyMap()))
+ Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION),
Collections.emptyMap(), "user"))
.isSuccess());
accessManager.close();
// test mock checkers
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessManagerTest$MockAccessCheckerAlwaysTrue,");
- accessManager = new AccessManager(conf, null, new Configuration());
+ accessManager = new AccessManager(conf, null, applicationManager, new
Configuration());
assertEquals(1, accessManager.getAccessCheckers().size());
assertTrue(accessManager.handleAccessRequest(new
AccessInfo("mock1")).isSuccess());
assertTrue(accessManager.handleAccessRequest(new
AccessInfo("mock2")).isSuccess());
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessManagerTest$MockAccessCheckerAlwaysTrue,"
+
"org.apache.uniffle.coordinator.AccessManagerTest$MockAccessCheckerAlwaysFalse");
- accessManager = new AccessManager(conf, null, new Configuration());
+ accessManager = new AccessManager(conf, null, applicationManager, new
Configuration());
assertEquals(2, accessManager.getAccessCheckers().size());
assertFalse(accessManager.handleAccessRequest(new
AccessInfo("mock1")).isSuccess());
accessManager.close();
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessQuotaCheckerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessQuotaCheckerTest.java
new file mode 100644
index 00000000..723360fd
--- /dev/null
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessQuotaCheckerTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.uniffle.common.util.Constants.ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM;
+import static
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ACCESS_CHECKERS;
+import static
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AccessQuotaCheckerTest {
+ @BeforeEach
+ public void setUp() {
+ CoordinatorMetrics.register();
+ }
+
+ @AfterEach
+ public void clear() {
+ CoordinatorMetrics.clear();
+ }
+
+ @Test
+ public void testAccessInfoRequiredShuffleServers() throws Exception {
+ List<ServerNode> nodes = Lists.newArrayList();
+ ServerNode node1 = new ServerNode(
+ "1",
+ "1",
+ 0,
+ 50,
+ 20,
+ 1000,
+ 0,
+ null,
+ true);
+ ServerNode node2 = new ServerNode(
+ "1",
+ "1",
+ 0,
+ 50,
+ 20,
+ 1000,
+ 0,
+ null,
+ true);
+ nodes.add(node1);
+ nodes.add(node2);
+
+ ClusterManager clusterManager = mock(SimpleClusterManager.class);
+ when(clusterManager.getServerList(any())).thenReturn(nodes);
+
+ CoordinatorConf conf = new CoordinatorConf();
+ conf.set(COORDINATOR_ACCESS_CHECKERS,
+
Collections.singletonList("org.apache.uniffle.coordinator.AccessQuotaChecker"));
+ conf.set(COORDINATOR_QUOTA_DEFAULT_APP_NUM, 3);
+ ApplicationManager applicationManager = new ApplicationManager(conf);
+ AccessManager accessManager = new AccessManager(conf, clusterManager,
applicationManager, new Configuration());
+
+ AccessQuotaChecker accessQuotaChecker =
+ (AccessQuotaChecker) accessManager.getAccessCheckers().get(0);
+
+ /**
+ * case1:
+ * when user set default app num is 5, and commit 6 app which current app
num is greater than default app num,
+ * it will reject 1 app and return false.
+ */
+ Map<String, String> properties = new HashMap<>();
+ AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(),
properties, "user");
+ assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
+ assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
+ assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
+ assertFalse(accessQuotaChecker.check(accessInfo).isSuccess());
+
+ /**
+ * case2:
+ * when setting the valid required shuffle nodes number of job and
available servers greater than
+ * the COORDINATOR_SHUFFLE_NODES_MAX, it should return true
+ */
+ conf.set(COORDINATOR_QUOTA_DEFAULT_APP_NUM, 0);
+ applicationManager = new ApplicationManager(conf);
+ accessManager = new AccessManager(conf, clusterManager,
applicationManager, new Configuration());
+ accessQuotaChecker = (AccessQuotaChecker)
accessManager.getAccessCheckers().get(0);
+ accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
+ assertFalse(accessQuotaChecker.check(accessInfo).isSuccess());
+
+ /**
+ * case3:
+ * when setting two checkers and the valid required shuffle nodes number
of job and available servers less than
+ * the COORDINATOR_SHUFFLE_NODES_MAX, it should return false
+ */
+ conf.set(COORDINATOR_QUOTA_DEFAULT_APP_NUM, 10);
+ conf.set(COORDINATOR_ACCESS_CHECKERS,
+ Arrays.asList("org.apache.uniffle.coordinator.AccessQuotaChecker",
+ "org.apache.uniffle.coordinator.AccessClusterLoadChecker"));
+ applicationManager = new ApplicationManager(conf);
+ accessManager = new AccessManager(conf, clusterManager,
applicationManager, new Configuration());
+ accessQuotaChecker = (AccessQuotaChecker)
accessManager.getAccessCheckers().get(0);
+ final AccessClusterLoadChecker accessClusterLoadChecker =
+ (AccessClusterLoadChecker) accessManager.getAccessCheckers().get(1);
+ properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "100");
+ accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
+ assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
+ assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
+ }
+}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java
index 8ea88bcf..eefda99d 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java
@@ -72,7 +72,8 @@ public class AppBalanceSelectStorageStrategyTest {
// do inc for remotePath1 to make sure pick storage will be remotePath2 in
next call
applicationManager.incRemoteStorageCounter(remotePath1);
applicationManager.incRemoteStorageCounter(remotePath1);
- String testApp1 = "testApp1";
+ String testApp1 = "application_test_" + 1;
+ applicationManager.registerApplicationInfo(testApp1, "user");
applicationManager.refreshAppId(testApp1);
assertEquals(remotePath2,
applicationManager.pickRemoteStorage(testApp1).getPath());
assertEquals(remotePath2,
applicationManager.getAppIdToRemoteStorageInfo().get(testApp1).getPath());
@@ -88,6 +89,7 @@ public class AppBalanceSelectStorageStrategyTest {
// refresh app1, got remotePath2, then remove remotePath2,
// it should be existed in counter until it expired
+ applicationManager.registerApplicationInfo(testApp1, "user");
applicationManager.refreshAppId(testApp1);
assertEquals(remotePath2,
applicationManager.pickRemoteStorage(testApp1).getPath());
remoteStoragePath = remotePath1;
@@ -115,12 +117,13 @@ public class AppBalanceSelectStorageStrategyTest {
String remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR +
remotePath2
+ Constants.COMMA_SPLIT_CHAR + remotePath3;
applicationManager.refreshRemoteStorage(remoteStoragePath, "");
- String appPrefix = "testAppId";
+ String testApp1 = "application_testAppId";
// init detectStorageScheduler
Thread.sleep(2000);
Thread pickThread1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
- String appId = appPrefix + i;
+ String appId = testApp1 + i;
+ applicationManager.registerApplicationInfo(appId, "user");
applicationManager.refreshAppId(appId);
applicationManager.pickRemoteStorage(appId);
}
@@ -128,7 +131,8 @@ public class AppBalanceSelectStorageStrategyTest {
Thread pickThread2 = new Thread(() -> {
for (int i = 1000; i < 2000; i++) {
- String appId = appPrefix + i;
+ String appId = testApp1 + i;
+ applicationManager.registerApplicationInfo(appId, "user");
applicationManager.refreshAppId(appId);
applicationManager.pickRemoteStorage(appId);
}
@@ -136,7 +140,8 @@ public class AppBalanceSelectStorageStrategyTest {
Thread pickThread3 = new Thread(() -> {
for (int i = 2000; i < 3000; i++) {
- String appId = appPrefix + i;
+ String appId = testApp1 + i;
+ applicationManager.registerApplicationInfo(appId, "user");
applicationManager.refreshAppId(appId);
applicationManager.pickRemoteStorage(appId);
}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
index 791ec1dc..64150d42 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
@@ -101,7 +101,8 @@ public class ApplicationManagerTest {
public void clearWithoutRemoteStorageTest() throws Exception {
// test case for storage type without remote storage,
// NPE shouldn't happen when clear the resource
- String testApp = "clearWithoutRemoteStorageTest";
+ String testApp = "application_clearWithoutRemoteStorageTest";
+ applicationManager.registerApplicationInfo(testApp, "user");
applicationManager.refreshAppId(testApp);
// just set a value != 0, it should be reset to 0 if everything goes well
CoordinatorMetrics.gaugeRunningAppNum.set(100.0);
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java
index 1b4a2345..b1bd1e0e 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java
@@ -87,7 +87,7 @@ public class CoordinatorMetricsTest {
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(content);
assertEquals(2, actualObj.size());
- assertEquals(9, actualObj.get("metrics").size());
+ assertEquals(10, actualObj.get("metrics").size());
}
@Test
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java
index e0c3dde8..7273318f 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java
@@ -77,7 +77,7 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
Thread.sleep(500L);
CoordinatorConf conf = new CoordinatorConf();
conf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, appExpiredTime);
- conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME,
200);
+ conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME,
1000);
conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY,
IO_SAMPLE);
applicationManager = new ApplicationManager(conf);
selectStorageStrategy = (LowestIOSampleCostSelectStorageStrategy)
applicationManager.getSelectStorageStrategy();
@@ -104,10 +104,11 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
// compare with two remote path
applicationManager.incRemoteStorageCounter(remoteStorage1);
applicationManager.incRemoteStorageCounter(remoteStorage1);
- String testApp1 = "testApp1";
+ String testApp1 = "application_test_" + 1;
+ applicationManager.registerApplicationInfo(testApp1, "user");
+ applicationManager.refreshAppId(testApp1);
Thread.sleep(1000);
final long current = System.currentTimeMillis();
- applicationManager.refreshAppId(testApp1);
fs.create(path);
selectStorageStrategy.sortPathByRankValue(remoteStorage2, testFile,
current);
fs.create(path);
@@ -129,6 +130,7 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
// refresh app1, got remotePath2, then remove remotePath2,
// it should be existed in counter until it expired
+ applicationManager.registerApplicationInfo(testApp1, "user");
applicationManager.refreshAppId(testApp1);
assertEquals(remoteStorage2,
applicationManager.pickRemoteStorage(testApp1).getPath());
remoteStoragePath = remoteStorage1;
@@ -158,12 +160,13 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
String remoteStoragePath = remoteStorage1 + Constants.COMMA_SPLIT_CHAR +
remoteStorage2
+ Constants.COMMA_SPLIT_CHAR + remoteStorage3;
applicationManager.refreshRemoteStorage(remoteStoragePath, "");
- String appPrefix = "testAppId";
+ String testApp1 = "application_testAppId";
// init detectStorageScheduler
Thread.sleep(2000);
Thread pickThread1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
- String appId = appPrefix + i;
+ String appId = testApp1 + i;
+ applicationManager.registerApplicationInfo(appId, "user");
applicationManager.refreshAppId(appId);
applicationManager.pickRemoteStorage(appId);
}
@@ -171,7 +174,8 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
Thread pickThread2 = new Thread(() -> {
for (int i = 1000; i < 2000; i++) {
- String appId = appPrefix + i;
+ String appId = testApp1 + i;
+ applicationManager.registerApplicationInfo(appId, "user");
applicationManager.refreshAppId(appId);
applicationManager.pickRemoteStorage(appId);
}
@@ -179,7 +183,8 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
Thread pickThread3 = new Thread(() -> {
for (int i = 2000; i < 3000; i++) {
- String appId = appPrefix + i;
+ String appId = testApp1 + i;
+ applicationManager.registerApplicationInfo(appId, "user");
applicationManager.refreshAppId(appId);
applicationManager.pickRemoteStorage(appId);
}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
new file mode 100644
index 00000000..fe63b6c0
--- /dev/null
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * QuotaManager is a manager for resource restriction.
+ */
+public class QuotaManagerTest {
+ public QuotaManager quotaManager;
+ private static final Configuration hdfsConf = new Configuration();
+ private static MiniDFSCluster cluster;
+ @TempDir
+ private static File remotePath = new File("hdfs://rss");
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ hdfsConf.set("fs.defaultFS", remotePath.getAbsolutePath());
+ hdfsConf.set("dfs.nameservices", "rss");
+ hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
remotePath.getAbsolutePath());
+ cluster = (new MiniDFSCluster.Builder(hdfsConf)).build();
+ CoordinatorConf conf = new CoordinatorConf();
+ quotaManager = new QuotaManager(conf);
+ }
+
+ @AfterAll
+ public static void clear() {
+ cluster.close();
+ }
+
+ @Test
+ public void testDetectUserResource() throws Exception {
+ final String quotaFile =
+ new
Path(remotePath.getAbsolutePath()).getFileSystem(hdfsConf).getName() +
"/quotaFile.properties";
+ final FSDataOutputStream fsDataOutputStream =
+ new Path(remotePath.toString()).getFileSystem(hdfsConf).create(new
Path(quotaFile));
+ String quota1 = "user1 =10";
+ String quota2 = "user2= 20";
+ String quota3 = "user3 = 30";
+ fsDataOutputStream.write(quota1.getBytes(StandardCharsets.UTF_8));
+ fsDataOutputStream.write("\n".getBytes(StandardCharsets.UTF_8));
+ fsDataOutputStream.write(quota2.getBytes(StandardCharsets.UTF_8));
+ fsDataOutputStream.write("\n".getBytes(StandardCharsets.UTF_8));
+ fsDataOutputStream.write(quota3.getBytes(StandardCharsets.UTF_8));
+ fsDataOutputStream.flush();
+ fsDataOutputStream.close();
+ CoordinatorConf conf = new CoordinatorConf();
+ conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH,
+ quotaFile);
+ quotaManager = new QuotaManager(conf);
+ quotaManager.detectUserResource();
+
+ Integer user1 = quotaManager.getDefaultUserApps().get("user1");
+ Integer user2 = quotaManager.getDefaultUserApps().get("user2");
+ Integer user3 = quotaManager.getDefaultUserApps().get("user3");
+ assertEquals(user1, 10);
+ assertEquals(user2, 20);
+ assertEquals(user3, 30);
+ }
+}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
index c4097d7d..2ab9e2bd 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.Test;
import org.apache.uniffle.coordinator.AccessCandidatesChecker;
import org.apache.uniffle.coordinator.AccessInfo;
import org.apache.uniffle.coordinator.AccessManager;
+import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.CoordinatorMetrics;
import org.apache.uniffle.storage.HdfsTestBase;
@@ -70,11 +71,11 @@ public class AccessCandidatesCheckerHdfsTest extends
HdfsTestBase {
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH,
clusterPrefix);
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessCandidatesChecker");
-
+ ApplicationManager applicationManager = new ApplicationManager(conf);
// file load checking at startup
Exception expectedException = null;
try {
- new AccessManager(conf, null, new Configuration());
+ new AccessManager(conf, null, applicationManager, new Configuration());
} catch (RuntimeException e) {
expectedException = e;
}
@@ -84,7 +85,7 @@ public class AccessCandidatesCheckerHdfsTest extends
HdfsTestBase {
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH,
candidatesFile);
expectedException = null;
try {
- new AccessManager(conf, null, new Configuration());
+ new AccessManager(conf, null, applicationManager, new Configuration());
} catch (RuntimeException e) {
expectedException = e;
}
@@ -101,7 +102,7 @@ public class AccessCandidatesCheckerHdfsTest extends
HdfsTestBase {
printWriter.println("2 ");
printWriter.flush();
printWriter.close();
- AccessManager accessManager = new AccessManager(conf, null, hadoopConf);
+ AccessManager accessManager = new AccessManager(conf, null,
applicationManager, hadoopConf);
AccessCandidatesChecker checker = (AccessCandidatesChecker)
accessManager.getAccessCheckers().get(0);
// load the config at the beginning
sleep(1200);
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
index e245c256..4ee5b651 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
@@ -83,27 +83,26 @@ public class AccessClusterTest extends CoordinatorTestBase {
createCoordinatorServer(coordinatorConf);
startServers();
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
-
// case1: empty map
String accessID = "acessid";
- RssAccessClusterRequest request = new RssAccessClusterRequest(
- accessID, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000);
+ RssAccessClusterRequest request = new RssAccessClusterRequest(accessID,
+ Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user");
RssAccessClusterResponse response =
coordinatorClient.accessCluster(request);
assertEquals(ResponseStatusCode.ACCESS_DENIED, response.getStatusCode());
// case2: illegal names
Map<String, String> extraProperties = new HashMap<>();
extraProperties.put("key", "illegalName");
- request = new RssAccessClusterRequest(
- accessID, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000,
extraProperties);
+ request = new RssAccessClusterRequest(accessID,
Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION),
+ 2000, extraProperties, "user");
response = coordinatorClient.accessCluster(request);
assertEquals(ResponseStatusCode.ACCESS_DENIED, response.getStatusCode());
// case3: legal names
extraProperties.clear();
extraProperties.put("key", "v1");
- request = new RssAccessClusterRequest(
- accessID, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000,
extraProperties);
+ request = new RssAccessClusterRequest(accessID,
Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION),
+ 2000, extraProperties, "user");
response = coordinatorClient.accessCluster(request);
assertEquals(ResponseStatusCode.SUCCESS, response.getStatusCode());
@@ -135,15 +134,15 @@ public class AccessClusterTest extends
CoordinatorTestBase {
startServers();
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
String accessId = "111111";
- RssAccessClusterRequest request = new RssAccessClusterRequest(
- accessId, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000);
+ RssAccessClusterRequest request = new RssAccessClusterRequest(accessId,
+ Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user");
RssAccessClusterResponse response =
coordinatorClient.accessCluster(request);
assertEquals(ResponseStatusCode.ACCESS_DENIED, response.getStatusCode());
assertTrue(response.getMessage().startsWith("Denied by
AccessCandidatesChecker"));
accessId = "135";
- request = new RssAccessClusterRequest(
- accessId, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000);
+ request = new RssAccessClusterRequest(accessId,
+ Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user");
response = coordinatorClient.accessCluster(request);
assertEquals(ResponseStatusCode.ACCESS_DENIED, response.getStatusCode());
assertTrue(response.getMessage().startsWith("Denied by
AccessClusterLoadChecker"));
@@ -156,14 +155,14 @@ public class AccessClusterTest extends
CoordinatorTestBase {
CoordinatorClient client = new CoordinatorClientFactory("GRPC")
.createCoordinatorClient(LOCALHOST, COORDINATOR_PORT_1 + 13);
- request = new RssAccessClusterRequest(
- accessId, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000);
+ request = new RssAccessClusterRequest(accessId,
+ Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user");
response = client.accessCluster(request);
assertEquals(ResponseStatusCode.INTERNAL_ERROR, response.getStatusCode());
assertTrue(response.getMessage().startsWith("UNAVAILABLE: io exception"));
- request = new RssAccessClusterRequest(
- accessId, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000);
+ request = new RssAccessClusterRequest(accessId,
+ Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user");
response = coordinatorClient.accessCluster(request);
assertEquals(ResponseStatusCode.SUCCESS, response.getStatusCode());
assertTrue(response.getMessage().startsWith("SUCCESS"));
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
index 0c1e7c07..a1858b84 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
@@ -22,7 +22,7 @@ import io.prometheus.client.CollectorRegistry;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcClient;
-import org.apache.uniffle.client.request.RssAppHeartBeatRequest;
+import org.apache.uniffle.client.request.RssApplicationInfoRequest;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.rpc.GrpcServer;
@@ -40,10 +40,10 @@ public class CoordinatorGrpcServerTest {
static class MockedCoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorServerImplBase {
@Override
- public void appHeartbeat(
- RssProtos.AppHeartBeatRequest request,
- StreamObserver<RssProtos.AppHeartBeatResponse> responseObserver) {
- RssProtos.AppHeartBeatResponse response = RssProtos.AppHeartBeatResponse
+ public void registerApplicationInfo(
+ RssProtos.ApplicationInfoRequest request,
+ StreamObserver<RssProtos.ApplicationInfoResponse> responseObserver) {
+ RssProtos.ApplicationInfoResponse response =
RssProtos.ApplicationInfoResponse
.newBuilder()
.setRetMsg("")
.setStatus(RssProtos.StatusCode.SUCCESS)
@@ -69,7 +69,8 @@ public class CoordinatorGrpcServerTest {
assertEquals(0, connSize);
CoordinatorGrpcClient coordinatorGrpcClient = new
CoordinatorGrpcClient("localhost", 20001);
- coordinatorGrpcClient.sendAppHeartBeat(new
RssAppHeartBeatRequest("testGrpcConnectionSize", 10000));
+ coordinatorGrpcClient.registerApplicationInfo(
+ new RssApplicationInfoRequest("testGrpcConnectionSize", 10000,
"user"));
connSize =
grpcMetrics.getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get();
assertEquals(1, connSize);
@@ -77,8 +78,8 @@ public class CoordinatorGrpcServerTest {
// case2: test the multiple connections
CoordinatorGrpcClient client1 = new CoordinatorGrpcClient("localhost",
20001);
CoordinatorGrpcClient client2 = new CoordinatorGrpcClient("localhost",
20001);
- client1.sendAppHeartBeat(new
RssAppHeartBeatRequest("testGrpcConnectionSize", 10000));
- client2.sendAppHeartBeat(new
RssAppHeartBeatRequest("testGrpcConnectionSize", 10000));
+ client1.registerApplicationInfo(new
RssApplicationInfoRequest("testGrpcConnectionSize", 10000, "user"));
+ client2.registerApplicationInfo(new
RssApplicationInfoRequest("testGrpcConnectionSize", 10000, "user"));
connSize =
grpcMetrics.getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get();
assertEquals(3, connSize);
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
index 3cc79632..35a2ee0d 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
@@ -27,10 +27,10 @@ import com.google.common.collect.Sets;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.apache.uniffle.client.request.RssAppHeartBeatRequest;
+import org.apache.uniffle.client.request.RssApplicationInfoRequest;
import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest;
import org.apache.uniffle.client.response.ResponseStatusCode;
-import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
+import org.apache.uniffle.client.response.RssApplicationInfoResponse;
import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleRegisterInfo;
@@ -200,22 +200,25 @@ public class CoordinatorGrpcTest extends
CoordinatorTestBase {
@Test
public void appHeartbeatTest() throws Exception {
- RssAppHeartBeatResponse response =
- coordinatorClient.sendAppHeartBeat(new
RssAppHeartBeatRequest("appHeartbeatTest1", 1000));
+ RssApplicationInfoResponse response =
+ coordinatorClient.registerApplicationInfo(
+ new RssApplicationInfoRequest("application_appHeartbeatTest1",
1000, "user"));
assertEquals(ResponseStatusCode.SUCCESS, response.getStatusCode());
- assertEquals(Sets.newHashSet("appHeartbeatTest1"),
+ assertEquals(Sets.newHashSet("application_appHeartbeatTest1"),
coordinators.get(0).getApplicationManager().getAppIds());
- coordinatorClient.sendAppHeartBeat(new
RssAppHeartBeatRequest("appHeartbeatTest2", 1000));
- assertEquals(Sets.newHashSet("appHeartbeatTest1", "appHeartbeatTest2"),
+ coordinatorClient.registerApplicationInfo(
+ new RssApplicationInfoRequest("application_appHeartbeatTest2", 1000,
"user"));
+ assertEquals(Sets.newHashSet("application_appHeartbeatTest1",
"application_appHeartbeatTest2"),
coordinators.get(0).getApplicationManager().getAppIds());
int retry = 0;
while (retry < 5) {
- coordinatorClient.sendAppHeartBeat(new
RssAppHeartBeatRequest("appHeartbeatTest1", 1000));
+ coordinatorClient.registerApplicationInfo(
+ new RssApplicationInfoRequest("application_appHeartbeatTest1", 1000,
"user"));
retry++;
Thread.sleep(1000);
}
// appHeartbeatTest2 was removed because of expired
- assertEquals(Sets.newHashSet("appHeartbeatTest1"),
+ assertEquals(Sets.newHashSet("application_appHeartbeatTest1"),
coordinators.get(0).getApplicationManager().getAppIds());
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
index 86faac4d..76949fd4 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
@@ -110,13 +110,13 @@ public class FetchClientConfTest extends
CoordinatorTestBase {
coordinatorConf.setBoolean(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED,
true);
coordinatorConf.setString(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH,
cfgFile.toURI().toString());
coordinatorConf.setInteger(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC,
3);
-
coordinatorConf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME,
200);
+
coordinatorConf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME,
1000);
coordinatorConf.setInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_ACCESS_TIMES,
1);
createCoordinatorServer(coordinatorConf);
startServers();
waitForUpdate(Sets.newHashSet(remotePath1),
coordinators.get(0).getApplicationManager());
- String appId = "testFetchRemoteStorageApp";
+ String appId = "application_testFetchRemoteStorageApp_" + 1;
RssFetchRemoteStorageRequest request = new
RssFetchRemoteStorageRequest(appId);
RssFetchRemoteStorageResponse response =
coordinatorClient.fetchRemoteStorage(request);
RemoteStorageInfo remoteStorageInfo = response.getRemoteStorageInfo();
@@ -136,7 +136,8 @@ public class FetchClientConfTest extends
CoordinatorTestBase {
assertEquals(remotePath1, remoteStorageInfo.getPath());
assertTrue(remoteStorageInfo.getConfItems().isEmpty());
- request = new RssFetchRemoteStorageRequest(appId + "another");
+ String newAppId = "application_testFetchRemoteStorageApp_" + 2;
+ request = new RssFetchRemoteStorageRequest(newAppId);
response = coordinatorClient.fetchRemoteStorage(request);
// got the remotePath2 for new appId
remoteStorageInfo = response.getRemoteStorageInfo();
@@ -161,14 +162,14 @@ public class FetchClientConfTest extends
CoordinatorTestBase {
coordinatorConf.setBoolean(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED,
true);
coordinatorConf.setString(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH,
cfgFile.toURI().toString());
coordinatorConf.setInteger(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC,
2);
-
coordinatorConf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME,
100);
+
coordinatorConf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME,
1000);
coordinatorConf.setInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_ACCESS_TIMES,
1);
coordinatorConf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY,
IO_SAMPLE);
createCoordinatorServer(coordinatorConf);
startServers();
waitForUpdate(Sets.newHashSet(remotePath1),
coordinators.get(0).getApplicationManager());
- String appId = "testFetchRemoteStorageApp";
+ String appId = "application_testFetchRemoteStorageApp_" + 1;
RssFetchRemoteStorageRequest request = new
RssFetchRemoteStorageRequest(appId);
RssFetchRemoteStorageResponse response =
coordinatorClient.fetchRemoteStorage(request);
RemoteStorageInfo remoteStorageInfo = response.getRemoteStorageInfo();
@@ -189,7 +190,8 @@ public class FetchClientConfTest extends
CoordinatorTestBase {
// ensure sizeList can be updated
Thread.sleep(2000);
- request = new RssFetchRemoteStorageRequest(appId + "another");
+ String newAppId = "application_testFetchRemoteStorageApp_" + 2;
+ request = new RssFetchRemoteStorageRequest(newAppId);
response = coordinatorClient.fetchRemoteStorage(request);
// got the remotePath2 for new appId
remoteStorageInfo = response.getRemoteStorageInfo();
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index 9b3d8488..3d54f790 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -75,8 +75,8 @@ import static org.junit.jupiter.api.Assertions.fail;
public class ShuffleServerGrpcTest extends IntegrationTestBase {
private ShuffleServerGrpcClient shuffleServerClient;
- private AtomicInteger atomicInteger = new AtomicInteger(0);
- private static Long EVENT_THRESHOLD_SIZE = 2048L;
+ private final AtomicInteger atomicInteger = new AtomicInteger(0);
+ private static final Long EVENT_THRESHOLD_SIZE = 2048L;
private static final int GB = 1024 * 1024 * 1024;
@BeforeAll
@@ -111,30 +111,31 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
shuffleWriteClient.registerCoordinators("127.0.0.1:19999");
shuffleWriteClient.registerShuffle(
new ShuffleServerInfo("127.0.0.1-20001", "127.0.0.1", 20001),
- "clearResourceTest1",
+ "application_clearResourceTest1",
0,
Lists.newArrayList(new PartitionRange(0, 1)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL
);
+
shuffleWriteClient.registerApplicationInfo("application_clearResourceTest1",
500L, "user");
+ shuffleWriteClient.sendAppHeartbeat("application_clearResourceTest1",
500L);
+
shuffleWriteClient.registerApplicationInfo("application_clearResourceTest2",
500L, "user");
+ shuffleWriteClient.sendAppHeartbeat("application_clearResourceTest2",
500L);
- shuffleWriteClient.sendAppHeartbeat("clearResourceTest1", 1000L);
- shuffleWriteClient.sendAppHeartbeat("clearResourceTest2", 1000L);
-
- RssRegisterShuffleRequest rrsr = new
RssRegisterShuffleRequest("clearResourceTest1", 0,
+ RssRegisterShuffleRequest rrsr = new
RssRegisterShuffleRequest("application_clearResourceTest1", 0,
Lists.newArrayList(new PartitionRange(0, 1)), "");
shuffleServerClient.registerShuffle(rrsr);
- rrsr = new RssRegisterShuffleRequest("clearResourceTest2", 0,
+ rrsr = new RssRegisterShuffleRequest("application_clearResourceTest2", 0,
Lists.newArrayList(new PartitionRange(0, 1)), "");
shuffleServerClient.registerShuffle(rrsr);
- assertEquals(Sets.newHashSet("clearResourceTest1", "clearResourceTest2"),
+ assertEquals(Sets.newHashSet("application_clearResourceTest1",
"application_clearResourceTest2"),
shuffleServers.get(0).getShuffleTaskManager().getAppIds());
// Thread will keep refresh clearResourceTest1 in coordinator
Thread t = new Thread(() -> {
int i = 0;
while (i < 20) {
- shuffleWriteClient.sendAppHeartbeat("clearResourceTest1", 1000L);
+ shuffleWriteClient.sendAppHeartbeat("application_clearResourceTest1",
500L);
i++;
try {
Thread.sleep(1000);
@@ -147,13 +148,13 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
// Heartbeat is sent to coordinator too]
Thread.sleep(3000);
- shuffleServerClient.registerShuffle(new
RssRegisterShuffleRequest("clearResourceTest1", 0,
+ shuffleServerClient.registerShuffle(new
RssRegisterShuffleRequest("application_clearResourceTest1", 0,
Lists.newArrayList(new PartitionRange(0, 1)), ""));
- assertEquals(Sets.newHashSet("clearResourceTest1"),
+ assertEquals(Sets.newHashSet("application_clearResourceTest1"),
coordinators.get(0).getApplicationManager().getAppIds());
// clearResourceTest2 will be removed because of
rss.server.app.expired.withoutHeartbeat
Thread.sleep(2000);
- assertEquals(Sets.newHashSet("clearResourceTest1"),
+ assertEquals(Sets.newHashSet("application_clearResourceTest1"),
shuffleServers.get(0).getShuffleTaskManager().getAppIds());
// clearResourceTest1 will be removed because of
rss.server.app.expired.withoutHeartbeat
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/api/CoordinatorClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/api/CoordinatorClient.java
index 3bd7ca55..0ce1ca4e 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/api/CoordinatorClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/api/CoordinatorClient.java
@@ -19,12 +19,14 @@ package org.apache.uniffle.client.api;
import org.apache.uniffle.client.request.RssAccessClusterRequest;
import org.apache.uniffle.client.request.RssAppHeartBeatRequest;
+import org.apache.uniffle.client.request.RssApplicationInfoRequest;
import org.apache.uniffle.client.request.RssFetchClientConfRequest;
import org.apache.uniffle.client.request.RssFetchRemoteStorageRequest;
import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest;
import org.apache.uniffle.client.request.RssSendHeartBeatRequest;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
+import org.apache.uniffle.client.response.RssApplicationInfoResponse;
import org.apache.uniffle.client.response.RssFetchClientConfResponse;
import org.apache.uniffle.client.response.RssFetchRemoteStorageResponse;
import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse;
@@ -34,6 +36,8 @@ public interface CoordinatorClient {
RssAppHeartBeatResponse sendAppHeartBeat(RssAppHeartBeatRequest request);
+ RssApplicationInfoResponse registerApplicationInfo(RssApplicationInfoRequest
request);
+
RssSendHeartBeatResponse sendHeartBeat(RssSendHeartBeatRequest request);
RssGetShuffleAssignmentsResponse
getShuffleAssignments(RssGetShuffleAssignmentsRequest request);
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index be44c60c..4f85bdae 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.request.RssAccessClusterRequest;
import org.apache.uniffle.client.request.RssAppHeartBeatRequest;
+import org.apache.uniffle.client.request.RssApplicationInfoRequest;
import org.apache.uniffle.client.request.RssFetchClientConfRequest;
import org.apache.uniffle.client.request.RssFetchRemoteStorageRequest;
import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest;
@@ -43,6 +44,7 @@ import
org.apache.uniffle.client.request.RssSendHeartBeatRequest;
import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
+import org.apache.uniffle.client.response.RssApplicationInfoResponse;
import org.apache.uniffle.client.response.RssFetchClientConfResponse;
import org.apache.uniffle.client.response.RssFetchRemoteStorageResponse;
import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse;
@@ -56,8 +58,8 @@ import
org.apache.uniffle.proto.CoordinatorServerGrpc.CoordinatorServerBlockingS
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.RssProtos.AccessClusterRequest;
import org.apache.uniffle.proto.RssProtos.AccessClusterResponse;
-import org.apache.uniffle.proto.RssProtos.AppHeartBeatRequest;
-import org.apache.uniffle.proto.RssProtos.AppHeartBeatResponse;
+import org.apache.uniffle.proto.RssProtos.ApplicationInfoRequest;
+import org.apache.uniffle.proto.RssProtos.ApplicationInfoResponse;
import org.apache.uniffle.proto.RssProtos.ClientConfItem;
import org.apache.uniffle.proto.RssProtos.FetchClientConfResponse;
import org.apache.uniffle.proto.RssProtos.FetchRemoteStorageRequest;
@@ -207,8 +209,9 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
@Override
public RssAppHeartBeatResponse sendAppHeartBeat(RssAppHeartBeatRequest
request) {
- AppHeartBeatRequest rpcRequest =
AppHeartBeatRequest.newBuilder().setAppId(request.getAppId()).build();
- AppHeartBeatResponse rpcResponse = blockingStub
+ RssProtos.AppHeartBeatRequest rpcRequest =
+
RssProtos.AppHeartBeatRequest.newBuilder().setAppId(request.getAppId()).build();
+ RssProtos.AppHeartBeatResponse rpcResponse = blockingStub
.withDeadlineAfter(request.getTimeoutMs(),
TimeUnit.MILLISECONDS).appHeartbeat(rpcRequest);
RssAppHeartBeatResponse response;
StatusCode statusCode = rpcResponse.getStatus();
@@ -222,6 +225,24 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
return response;
}
+ @Override
+ public RssApplicationInfoResponse
registerApplicationInfo(RssApplicationInfoRequest request) {
+ ApplicationInfoRequest rpcRequest =
+
ApplicationInfoRequest.newBuilder().setAppId(request.getAppId()).setUser(request.getUser()).build();
+ ApplicationInfoResponse rpcResponse = blockingStub
+ .withDeadlineAfter(request.getTimeoutMs(),
TimeUnit.MILLISECONDS).registerApplicationInfo(rpcRequest);
+ RssApplicationInfoResponse response;
+ StatusCode statusCode = rpcResponse.getStatus();
+ switch (statusCode) {
+ case SUCCESS:
+ response = new RssApplicationInfoResponse(ResponseStatusCode.SUCCESS);
+ break;
+ default:
+ response = new
RssApplicationInfoResponse(ResponseStatusCode.INTERNAL_ERROR);
+ }
+ return response;
+ }
+
@Override
public RssGetShuffleAssignmentsResponse
getShuffleAssignments(RssGetShuffleAssignmentsRequest request) {
RssProtos.GetShuffleAssignmentsResponse rpcResponse =
doGetShuffleAssignments(
@@ -260,6 +281,7 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
AccessClusterRequest rpcRequest = AccessClusterRequest
.newBuilder()
.setAccessId(request.getAccessId())
+ .setUser(request.getUser())
.addAllTags(request.getTags())
.putAllExtraProperties(request.getExtraProperties())
.build();
@@ -277,7 +299,9 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
case SUCCESS:
response = new RssAccessClusterResponse(
ResponseStatusCode.SUCCESS,
- rpcResponse.getRetMsg());
+ rpcResponse.getRetMsg(),
+ rpcResponse.getUuid()
+ );
break;
default:
response = new
RssAccessClusterResponse(ResponseStatusCode.ACCESS_DENIED,
rpcResponse.getRetMsg());
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssAccessClusterRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssAccessClusterRequest.java
index ac5523ac..781a6b4f 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssAccessClusterRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssAccessClusterRequest.java
@@ -26,28 +26,32 @@ public class RssAccessClusterRequest {
private final String accessId;
private final Set<String> tags;
private final int timeoutMs;
+ private final String user;
/**
* The map is to pass the extra data to the coordinator and to
* extend more pluggable {@code AccessCheckers} easily.
*/
private final Map<String, String> extraProperties;
- public RssAccessClusterRequest(String accessId, Set<String> tags, int
timeoutMs) {
+ public RssAccessClusterRequest(String accessId, Set<String> tags, int
timeoutMs, String user) {
this.accessId = accessId;
this.tags = tags;
this.timeoutMs = timeoutMs;
this.extraProperties = Collections.emptyMap();
+ this.user = user;
}
public RssAccessClusterRequest(
String accessId,
Set<String> tags,
int timeoutMs,
- Map<String, String> extraProperties) {
+ Map<String, String> extraProperties,
+ String user) {
this.accessId = accessId;
this.tags = tags;
this.timeoutMs = timeoutMs;
this.extraProperties = extraProperties;
+ this.user = user;
}
public String getAccessId() {
@@ -65,4 +69,8 @@ public class RssAccessClusterRequest {
public Map<String, String> getExtraProperties() {
return extraProperties;
}
+
+ public String getUser() {
+ return user;
+ }
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssAppHeartBeatRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssAppHeartBeatRequest.java
index 6a7c3e90..475fd484 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssAppHeartBeatRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssAppHeartBeatRequest.java
@@ -34,4 +34,5 @@ public class RssAppHeartBeatRequest {
public long getTimeoutMs() {
return timeoutMs;
}
+
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssAppHeartBeatRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssApplicationInfoRequest.java
similarity index 82%
copy from
internal-client/src/main/java/org/apache/uniffle/client/request/RssAppHeartBeatRequest.java
copy to
internal-client/src/main/java/org/apache/uniffle/client/request/RssApplicationInfoRequest.java
index 6a7c3e90..475ddf28 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssAppHeartBeatRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssApplicationInfoRequest.java
@@ -17,14 +17,16 @@
package org.apache.uniffle.client.request;
-public class RssAppHeartBeatRequest {
+public class RssApplicationInfoRequest {
private final String appId;
private final long timeoutMs;
+ private final String user;
- public RssAppHeartBeatRequest(String appId, long timeoutMs) {
+ public RssApplicationInfoRequest(String appId, long timeoutMs, String user) {
this.appId = appId;
this.timeoutMs = timeoutMs;
+ this.user = user;
}
public String getAppId() {
@@ -34,4 +36,8 @@ public class RssAppHeartBeatRequest {
public long getTimeoutMs() {
return timeoutMs;
}
+
+ public String getUser() {
+ return user;
+ }
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssAccessClusterResponse.java
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssAccessClusterResponse.java
index 04ab5ac4..583e7daf 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssAccessClusterResponse.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssAccessClusterResponse.java
@@ -19,7 +19,18 @@ package org.apache.uniffle.client.response;
public class RssAccessClusterResponse extends ClientResponse {
+ private String uuid;
+
public RssAccessClusterResponse(ResponseStatusCode statusCode, String
messge) {
super(statusCode, messge);
}
+
+ public RssAccessClusterResponse(ResponseStatusCode statusCode, String
messge, String uuid) {
+ super(statusCode, messge);
+ this.uuid = uuid;
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssAccessClusterResponse.java
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssApplicationInfoResponse.java
similarity index 82%
copy from
internal-client/src/main/java/org/apache/uniffle/client/response/RssAccessClusterResponse.java
copy to
internal-client/src/main/java/org/apache/uniffle/client/response/RssApplicationInfoResponse.java
index 04ab5ac4..210041d7 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssAccessClusterResponse.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssApplicationInfoResponse.java
@@ -17,9 +17,9 @@
package org.apache.uniffle.client.response;
-public class RssAccessClusterResponse extends ClientResponse {
+public class RssApplicationInfoResponse extends ClientResponse {
- public RssAccessClusterResponse(ResponseStatusCode statusCode, String
messge) {
- super(statusCode, messge);
+ public RssApplicationInfoResponse(ResponseStatusCode statusCode) {
+ super(statusCode);
}
}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 5c7e80b8..5789a952 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -287,11 +287,14 @@ service CoordinatorServer {
rpc getShuffleDataStorageInfo(google.protobuf.Empty) returns
(GetShuffleDataStorageInfoResponse);
rpc checkServiceAvailable(google.protobuf.Empty) returns
(CheckServiceAvailableResponse);
+ // Heartbeat between Shuffle Application and Coordinator Server
+ rpc appHeartbeat(AppHeartBeatRequest) returns (AppHeartBeatResponse);
+
// Report a client operation's result to coordinator server
rpc reportClientOperation(ReportShuffleClientOpRequest) returns
(ReportShuffleClientOpResponse);
- // Heartbeat between Shuffle Application and Coordinator Server
- rpc appHeartbeat(AppHeartBeatRequest) returns (AppHeartBeatResponse);
+ // Report a application info to Coordinator Server
+ rpc registerApplicationInfo(ApplicationInfoRequest) returns
(ApplicationInfoResponse);
// Access to the remote shuffle service cluster
rpc accessCluster(AccessClusterRequest) returns (AccessClusterResponse);
@@ -312,6 +315,16 @@ message AppHeartBeatResponse {
string retMsg = 2;
}
+message ApplicationInfoRequest {
+ string appId = 1;
+ string user = 2;
+}
+
+message ApplicationInfoResponse {
+ StatusCode status = 1;
+ string retMsg = 2;
+}
+
message GetShuffleServerListResponse {
repeated ShuffleServerId servers = 1;
}
@@ -374,11 +387,13 @@ message AccessClusterRequest {
string accessId = 1;
repeated string tags = 2;
map<string, string> extraProperties = 3;
+ string user=4;
}
message AccessClusterResponse {
StatusCode status = 1;
string retMsg = 2;
+ string uuid = 3;
}
message FetchClientConfResponse {