This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ff41a58 [Feature] Support user's app quota level limit (#311)
8ff41a58 is described below

commit 8ff41a581a1f2c1a959f476903c83f51c37bb86f
Author: jokercurry <[email protected]>
AuthorDate: Tue Nov 22 09:07:07 2022 +0800

    [Feature] Support user's app quota level limit (#311)
    
    ### What changes were proposed in this pull request?
    For issue #211 and the design document 
[https://docs.google.com/document/d/1MApSMFQgoS1VAoKbZjomqSRm0iTbSuKG1yvKNlWW65c/edit?usp=sharing](https://docs.google.com/document/d/1MApSMFQgoS1VAoKbZjomqSRm0iTbSuKG1yvKNlWW65c/edit?usp=sharing)
    
    ### Why are the changes needed?
    Better isolation of resources between different users.
    
    ### Does this PR introduce _any_ user-facing change?
    Add config `rss.coordinator.quota.default.app.num` to set default app 
number each user and `rss.coordinator.quota.default.path` to set a path to 
record the number of apps that each user can run.
    
    ### How was this patch tested?
    
    Add uts.
---
 .../hadoop/mapreduce/v2/app/RssMRAppMaster.java    |   1 +
 .../hadoop/mapred/SortWriteBufferManagerTest.java  |   5 +
 .../hadoop/mapreduce/task/reduce/FetcherTest.java  |   5 +
 .../spark/shuffle/DelegationRssShuffleManager.java |  18 ++-
 .../apache/spark/shuffle/RssShuffleManager.java    |  10 +-
 .../spark/shuffle/DelegationRssShuffleManager.java |  18 ++-
 .../apache/spark/shuffle/RssShuffleManager.java    |  11 +-
 .../uniffle/client/api/ShuffleWriteClient.java     |   2 +
 .../client/impl/ShuffleWriteClientImpl.java        |  36 +++++-
 .../org/apache/uniffle/common/util/RssUtils.java   |   1 +
 .../uniffle/coordinator/AccessCheckResult.java     |  12 ++
 .../org/apache/uniffle/coordinator/AccessInfo.java |  13 +-
 .../apache/uniffle/coordinator/AccessManager.java  |  19 ++-
 .../uniffle/coordinator/AccessQuotaChecker.java    |  78 ++++++++++++
 .../uniffle/coordinator/ApplicationManager.java    |  68 +++++++++--
 .../uniffle/coordinator/CoordinatorConf.java       |  18 ++-
 .../coordinator/CoordinatorGrpcService.java        |  30 ++++-
 .../uniffle/coordinator/CoordinatorMetrics.java    |   3 +
 .../uniffle/coordinator/CoordinatorServer.java     |   2 +-
 .../apache/uniffle/coordinator/QuotaManager.java   | 115 ++++++++++++++++++
 .../coordinator/AccessCandidatesCheckerTest.java   |   8 +-
 .../coordinator/AccessClusterLoadCheckerTest.java  |  15 +--
 .../uniffle/coordinator/AccessManagerTest.java     |  13 +-
 .../coordinator/AccessQuotaCheckerTest.java        | 135 +++++++++++++++++++++
 .../AppBalanceSelectStorageStrategyTest.java       |  15 ++-
 .../coordinator/ApplicationManagerTest.java        |   3 +-
 .../coordinator/CoordinatorMetricsTest.java        |   2 +-
 ...owestIOSampleCostSelectStorageStrategyTest.java |  19 +--
 .../uniffle/coordinator/QuotaManagerTest.java      |  89 ++++++++++++++
 .../test/AccessCandidatesCheckerHdfsTest.java      |   9 +-
 .../org/apache/uniffle/test/AccessClusterTest.java |  29 +++--
 .../uniffle/test/CoordinatorGrpcServerTest.java    |  17 +--
 .../apache/uniffle/test/CoordinatorGrpcTest.java   |  21 ++--
 .../apache/uniffle/test/FetchClientConfTest.java   |  14 ++-
 .../apache/uniffle/test/ShuffleServerGrpcTest.java |  27 +++--
 .../uniffle/client/api/CoordinatorClient.java      |   4 +
 .../client/impl/grpc/CoordinatorGrpcClient.java    |  34 +++++-
 .../client/request/RssAccessClusterRequest.java    |  12 +-
 .../client/request/RssAppHeartBeatRequest.java     |   1 +
 ...Request.java => RssApplicationInfoRequest.java} |  10 +-
 .../client/response/RssAccessClusterResponse.java  |  11 ++
 ...sponse.java => RssApplicationInfoResponse.java} |   6 +-
 proto/src/main/proto/Rss.proto                     |  19 ++-
 43 files changed, 846 insertions(+), 132 deletions(-)

diff --git 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index d891452d..02c63527 100644
--- 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++ 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -239,6 +239,7 @@ public class RssMRAppMaster extends MRAppMaster {
       long heartbeatInterval = conf.getLong(RssMRConfig.RSS_HEARTBEAT_INTERVAL,
           RssMRConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE);
       long heartbeatTimeout = conf.getLong(RssMRConfig.RSS_HEARTBEAT_TIMEOUT, 
heartbeatInterval / 2);
+      client.registerApplicationInfo(appId, heartbeatTimeout, "user");
       scheduledExecutorService.scheduleAtFixedRate(
           () -> {
             try {
diff --git 
a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
 
b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
index f5c834b2..30267515 100644
--- 
a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
+++ 
b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
@@ -285,6 +285,11 @@ public class SortWriteBufferManagerTest {
 
     }
 
+    @Override
+    public void registerApplicationInfo(String appId, long timeoutMs, String 
user) {
+
+    }
+
     @Override
     public void registerShuffle(
         ShuffleServerInfo shuffleServerInfo,
diff --git 
a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
 
b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
index 5f05b560..04e816c2 100644
--- 
a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
+++ 
b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
@@ -381,6 +381,11 @@ public class FetcherTest {
 
     }
 
+    @Override
+    public void registerApplicationInfo(String appId, long timeoutMs, String 
user) {
+
+    }
+
     @Override
     public void registerShuffle(
         ShuffleServerInfo shuffleServerInfo,
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 6182ecb3..cf9eb8af 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
 import org.apache.spark.TaskContext;
@@ -48,6 +49,8 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
   private final List<CoordinatorClient> coordinatorClients;
   private final int accessTimeoutMs;
   private final SparkConf sparkConf;
+  private String user;
+  private String uuid;
 
   public DelegationRssShuffleManager(SparkConf sparkConf, boolean isDriver) 
throws Exception {
     this.sparkConf = sparkConf;
@@ -67,10 +70,20 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
 
   private ShuffleManager createShuffleManagerInDriver() throws RssException {
     ShuffleManager shuffleManager;
-
+    user = "user";
+    try {
+      user = UserGroupInformation.getCurrentUser().getShortUserName();
+    } catch (Exception e) {
+      LOG.error("Error on getting user from ugi." + e);
+    }
     boolean canAccess = tryAccessCluster();
+    if (uuid == null || "".equals(uuid)) {
+      uuid = String.valueOf(System.currentTimeMillis());
+    }
     if (canAccess) {
       try {
+        sparkConf.set("spark.rss.quota.user", user);
+        sparkConf.set("spark.rss.quota.uuid", uuid);
         shuffleManager = new RssShuffleManager(sparkConf, true);
         sparkConf.set(RssSparkConfig.RSS_ENABLED.key(), "true");
         sparkConf.set("spark.shuffle.manager", 
RssShuffleManager.class.getCanonicalName());
@@ -113,9 +126,10 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
       try {
         canAccess = RetryUtils.retry(() -> {
           RssAccessClusterResponse response = 
coordinatorClient.accessCluster(new RssAccessClusterRequest(
-              accessId, assignmentTags, accessTimeoutMs, extraProperties));
+              accessId, assignmentTags, accessTimeoutMs, extraProperties, 
user));
           if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
             LOG.warn("Success to access cluster {} using {}", 
coordinatorClient.getDesc(), accessId);
+            uuid = response.getUuid();
             return true;
           } else if (response.getStatusCode() == 
ResponseStatusCode.ACCESS_DENIED) {
             throw new RssException("Request to access cluster " + 
coordinatorClient.getDesc() + " is denied using "
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index c045f399..e71ac82b 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -88,6 +88,8 @@ public class RssShuffleManager implements ShuffleManager {
   private final int dataCommitPoolSize;
   private boolean heartbeatStarted = false;
   private boolean dynamicConfEnabled = false;
+  private final String user;
+  private final String uuid;
   private ThreadPoolExecutor threadPoolExecutor;
   private EventLoop eventLoop = new 
EventLoop<AddBlockEvent>("ShuffleDataQueue") {
 
@@ -142,7 +144,8 @@ public class RssShuffleManager implements ShuffleManager {
       throw new IllegalArgumentException("Spark2 doesn't support AQE, 
spark.sql.adaptive.enabled should be false.");
     }
     this.sparkConf = sparkConf;
-
+    this.user = sparkConf.get("spark.rss.quota.user", "user");
+    this.uuid = sparkConf.get("spark.rss.quota.uuid",  
Long.toString(System.currentTimeMillis()));
     // set & check replica config
     this.dataReplica = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA);
     this.dataReplicaWrite = 
sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_WRITE);
@@ -217,12 +220,12 @@ public class RssShuffleManager implements ShuffleManager {
     }
 
     // If yarn enable retry ApplicationMaster, appId will be not unique and 
shuffle data will be incorrect,
-    // appId + timestamp can avoid such problem,
+    // appId + uuid can avoid such problem,
     // can't get appId in construct because SparkEnv is not created yet,
     // appId will be initialized only once in this method which
     // will be called many times depend on how many shuffle stage
     if ("".equals(appId)) {
-      appId = SparkEnv.get().conf().getAppId() + "_" + 
System.currentTimeMillis();
+      appId = SparkEnv.get().conf().getAppId() + "_" + uuid;
       LOG.info("Generate application id used in rss: " + appId);
     }
 
@@ -273,6 +276,7 @@ public class RssShuffleManager implements ShuffleManager {
   }
 
   private void startHeartbeat() {
+    shuffleWriteClient.registerApplicationInfo(appId, heartbeatTimeout, user);
     if (!sparkConf.getBoolean(RssSparkConfig.RSS_TEST_FLAG.key(), false) && 
!heartbeatStarted) {
       heartBeatScheduledExecutorService.scheduleAtFixedRate(
           () -> {
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 5078c708..79016a6d 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
 import org.apache.spark.TaskContext;
@@ -48,6 +49,8 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
   private final List<CoordinatorClient> coordinatorClients;
   private final int accessTimeoutMs;
   private final SparkConf sparkConf;
+  private String user;
+  private String uuid;
 
   public DelegationRssShuffleManager(SparkConf sparkConf, boolean isDriver) 
throws Exception {
     this.sparkConf = sparkConf;
@@ -67,10 +70,20 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
 
   private ShuffleManager createShuffleManagerInDriver() throws RssException {
     ShuffleManager shuffleManager;
-
+    user = "user";
+    try {
+      user = UserGroupInformation.getCurrentUser().getShortUserName();
+    } catch (Exception e) {
+      LOG.error("Error on getting user from ugi." + e);
+    }
     boolean canAccess = tryAccessCluster();
+    if (uuid == null || "".equals(uuid)) {
+      uuid = String.valueOf(System.currentTimeMillis());
+    }
     if (canAccess) {
       try {
+        sparkConf.set("spark.rss.quota.user", user);
+        sparkConf.set("spark.rss.quota.uuid", uuid);
         shuffleManager = new RssShuffleManager(sparkConf, true);
         sparkConf.set(RssSparkConfig.RSS_ENABLED.key(), "true");
         sparkConf.set("spark.shuffle.manager", 
RssShuffleManager.class.getCanonicalName());
@@ -113,9 +126,10 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
       try {
         canAccess = RetryUtils.retry(() -> {
           RssAccessClusterResponse response = 
coordinatorClient.accessCluster(new RssAccessClusterRequest(
-              accessId, assignmentTags, accessTimeoutMs, extraProperties));
+              accessId, assignmentTags, accessTimeoutMs, extraProperties, 
user));
           if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
             LOG.warn("Success to access cluster {} using {}", 
coordinatorClient.getDesc(), accessId);
+            uuid = response.getUuid();
             return true;
           } else if (response.getStatusCode() == 
ResponseStatusCode.ACCESS_DENIED) {
             throw new RssException("Request to access cluster " + 
coordinatorClient.getDesc() + " is denied using "
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 1691dfe4..0ceb53fa 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -96,6 +96,8 @@ public class RssShuffleManager implements ShuffleManager {
   private boolean heartbeatStarted = false;
   private boolean dynamicConfEnabled = false;
   private final ShuffleDataDistributionType dataDistributionType;
+  private String user;
+  private String uuid;
   private final EventLoop eventLoop;
   private final EventLoop defaultEventLoop = new 
EventLoop<AddBlockEvent>("ShuffleDataQueue") {
 
@@ -144,7 +146,8 @@ public class RssShuffleManager implements ShuffleManager {
 
   public RssShuffleManager(SparkConf conf, boolean isDriver) {
     this.sparkConf = conf;
-
+    this.user = sparkConf.get("spark.rss.quota.user", "user");
+    this.uuid = sparkConf.get("spark.rss.quota.uuid",  
Long.toString(System.currentTimeMillis()));
     // set & check replica config
     this.dataReplica = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA);
     this.dataReplicaWrite =  
sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_WRITE);
@@ -278,7 +281,7 @@ public class RssShuffleManager implements ShuffleManager {
     }
 
     if (id.get() == null) {
-      id.compareAndSet(null, SparkEnv.get().conf().getAppId() + "_" + 
System.currentTimeMillis());
+      id.compareAndSet(null, SparkEnv.get().conf().getAppId() + "_" + uuid);
     }
     LOG.info("Generate application id used in rss: " + id.get());
 
@@ -674,11 +677,13 @@ public class RssShuffleManager implements ShuffleManager {
   }
 
   private synchronized void startHeartbeat() {
+    shuffleWriteClient.registerApplicationInfo(id.get(), heartbeatTimeout, 
user);
     if (!heartbeatStarted) {
       heartBeatScheduledExecutorService.scheduleAtFixedRate(
           () -> {
             try {
-              shuffleWriteClient.sendAppHeartbeat(id.get(), heartbeatTimeout);
+              String appId = id.get();
+              shuffleWriteClient.sendAppHeartbeat(appId, heartbeatTimeout);
               LOG.info("Finish send heartbeat to coordinator and servers");
             } catch (Exception e) {
               LOG.warn("Fail to send heartbeat to coordinator and servers", e);
diff --git 
a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java 
b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
index 9776678c..f2443bdb 100644
--- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
+++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
@@ -37,6 +37,8 @@ public interface ShuffleWriteClient {
 
   void sendAppHeartbeat(String appId, long timeoutMs);
 
+  void registerApplicationInfo(String appId, long timeoutMs, String user);
+
   void registerShuffle(
       ShuffleServerInfo shuffleServerInfo,
       String appId,
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 3bccd467..80096a9c 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -45,6 +45,7 @@ import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.client.factory.CoordinatorClientFactory;
 import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
 import org.apache.uniffle.client.request.RssAppHeartBeatRequest;
+import org.apache.uniffle.client.request.RssApplicationInfoRequest;
 import org.apache.uniffle.client.request.RssFetchClientConfRequest;
 import org.apache.uniffle.client.request.RssFetchRemoteStorageRequest;
 import org.apache.uniffle.client.request.RssFinishShuffleRequest;
@@ -59,6 +60,7 @@ import 
org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
 import org.apache.uniffle.client.response.ClientResponse;
 import org.apache.uniffle.client.response.ResponseStatusCode;
 import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
+import org.apache.uniffle.client.response.RssApplicationInfoResponse;
 import org.apache.uniffle.client.response.RssFetchClientConfResponse;
 import org.apache.uniffle.client.response.RssFetchRemoteStorageResponse;
 import org.apache.uniffle.client.response.RssFinishShuffleResponse;
@@ -549,6 +551,37 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
     return blockIdBitmap;
   }
 
+  @Override
+  public void registerApplicationInfo(String appId, long timeoutMs, String 
user) {
+    RssApplicationInfoRequest request = new RssApplicationInfoRequest(appId, 
timeoutMs, user);
+    List<Callable<Void>> callableList = Lists.newArrayList();
+    coordinatorClients.forEach(coordinatorClient -> {
+      callableList.add(() -> {
+        try {
+          RssApplicationInfoResponse response = 
coordinatorClient.registerApplicationInfo(request);
+          if (response.getStatusCode() != ResponseStatusCode.SUCCESS) {
+            LOG.error("Failed to send applicationInfo to " + 
coordinatorClient.getDesc());
+          } else {
+            LOG.info("Successfully send applicationInfo to " + 
coordinatorClient.getDesc());
+          }
+        } catch (Exception e) {
+          LOG.warn("Error happened when send applicationInfo to " + 
coordinatorClient.getDesc(), e);
+        }
+        return null;
+      });
+    });
+    try {
+      List<Future<Void>> futures = 
heartBeatExecutorService.invokeAll(callableList, timeoutMs, 
TimeUnit.MILLISECONDS);
+      for (Future<Void> future : futures) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
+    } catch (InterruptedException ie) {
+      LOG.warn("register application is interrupted", ie);
+    }
+  }
+
   @Override
   public void sendAppHeartbeat(String appId, long timeoutMs) {
     RssAppHeartBeatRequest request = new RssAppHeartBeatRequest(appId, 
timeoutMs);
@@ -571,7 +604,7 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
         }
     );
 
-    coordinatorClients.stream().forEach(coordinatorClient -> {
+    coordinatorClients.forEach(coordinatorClient -> {
       callableList.add(() -> {
         try {
           RssAppHeartBeatResponse response = 
coordinatorClient.sendAppHeartBeat(request);
@@ -683,7 +716,6 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
     return 
ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, 
shuffleServerInfo);
   }
 
-  @VisibleForTesting
   void addShuffleServer(String appId, int shuffleId, ShuffleServerInfo 
serverInfo) {
     Map<Integer, Set<ShuffleServerInfo>> appServerMap = 
shuffleServerInfoMap.get(appId);
     if (appServerMap == null) {
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 16e21da3..5319444e 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -194,6 +194,7 @@ public class RssUtils {
 
     List<T> extensions = Lists.newArrayList();
     for (String name : classes) {
+      name = name.trim();
       try {
         Class<?> klass = Class.forName(name);
         if (!extClass.isAssignableFrom(klass)) {
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCheckResult.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCheckResult.java
index 8aa3f6c1..3e1d8bc8 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCheckResult.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCheckResult.java
@@ -21,10 +21,18 @@ public class AccessCheckResult {
 
   private final boolean success;
   private final String msg;
+  private final String uuid;
+
+  public AccessCheckResult(boolean success, String msg, String uuid) {
+    this.success = success;
+    this.msg = msg;
+    this.uuid = uuid;
+  }
 
   public AccessCheckResult(boolean success, String msg) {
     this.success = success;
     this.msg = msg;
+    this.uuid = "";
   }
 
   public boolean isSuccess() {
@@ -34,4 +42,8 @@ public class AccessCheckResult {
   public String getMsg() {
     return msg;
   }
+
+  public String getUuid() {
+    return uuid;
+  }
 }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessInfo.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessInfo.java
index 8200fa70..f07bc2e7 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessInfo.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessInfo.java
@@ -21,21 +21,25 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 
 public class AccessInfo {
   private final String accessId;
   private final Set<String> tags;
   private final Map<String, String> extraProperties;
+  private final String user;
 
-  public AccessInfo(String accessId, Set<String> tags, Map<String, String> 
extraProperties) {
+  public AccessInfo(String accessId, Set<String> tags, Map<String, String> 
extraProperties, String user) {
     this.accessId = accessId;
     this.tags = tags;
     this.extraProperties = extraProperties == null ? Collections.emptyMap() : 
extraProperties;
+    this.user = user;
   }
 
+  @VisibleForTesting
   public AccessInfo(String accessId) {
-    this(accessId, Sets.newHashSet(), Collections.emptyMap());
+    this(accessId, Sets.newHashSet(), Collections.emptyMap(), "");
   }
 
   public String getAccessId() {
@@ -50,10 +54,15 @@ public class AccessInfo {
     return extraProperties;
   }
 
+  public String getUser() {
+    return user;
+  }
+
   @Override
   public String toString() {
     return "AccessInfo{"
             + "accessId='" + accessId + '\''
+            + ", user= " + user
             + ", tags=" + tags
             + ", extraProperties=" + extraProperties
             + '}';
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
index a41cd62e..04fddc36 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
@@ -35,15 +35,19 @@ public class AccessManager {
 
   private final CoordinatorConf coordinatorConf;
   private final ClusterManager clusterManager;
+  private final ApplicationManager applicationManager;
   private final Configuration hadoopConf;
   private List<AccessChecker> accessCheckers = Lists.newArrayList();
 
   public AccessManager(
-          CoordinatorConf conf, ClusterManager clusterManager,
-          Configuration hadoopConf) throws Exception {
+      CoordinatorConf conf,
+      ClusterManager clusterManager,
+      ApplicationManager applicationManager,
+      Configuration hadoopConf) throws Exception {
     this.coordinatorConf = conf;
     this.clusterManager = clusterManager;
     this.hadoopConf = hadoopConf;
+    this.applicationManager = applicationManager;
     init();
   }
 
@@ -58,15 +62,20 @@ public class AccessManager {
   }
 
   public AccessCheckResult handleAccessRequest(AccessInfo accessInfo) {
+    String uuid = "";
     CoordinatorMetrics.counterTotalAccessRequest.inc();
     for (AccessChecker checker : accessCheckers) {
       AccessCheckResult accessCheckResult = checker.check(accessInfo);
       if (!accessCheckResult.isSuccess()) {
         return accessCheckResult;
       }
+      String resultUuid = accessCheckResult.getUuid();
+      if (!"".equals(resultUuid)) {
+        uuid = resultUuid;
+      }
     }
 
-    return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE);
+    return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE, uuid);
   }
 
   public CoordinatorConf getCoordinatorConf() {
@@ -85,6 +94,10 @@ public class AccessManager {
     return hadoopConf;
   }
 
+  public ApplicationManager getApplicationManager() {
+    return applicationManager;
+  }
+
   public void close() throws IOException {
     for (AccessChecker checker : accessCheckers) {
       checker.close();
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
new file mode 100644
index 00000000..91b9ceab
--- /dev/null
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.LongAdder;
+
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RssUtils;
+
+/**
+ * This checker limits the number of apps that different users can submit.
+ */
+public class AccessQuotaChecker extends AbstractAccessChecker {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AccessQuotaChecker.class);
+
+  private final ApplicationManager applicationManager;
+  private final CoordinatorConf conf;
+  private static final LongAdder COUNTER = new LongAdder();
+  private final String hostIp;
+
+  public AccessQuotaChecker(AccessManager accessManager) throws Exception {
+    super(accessManager);
+    conf = accessManager.getCoordinatorConf();
+    applicationManager = accessManager.getApplicationManager();
+    hostIp = RssUtils.getHostIp();
+  }
+
+  @Override
+  public AccessCheckResult check(AccessInfo accessInfo) {
+    COUNTER.increment();
+    final String uuid = hostIp.hashCode() + "-" + COUNTER.sum();
+    final String user = accessInfo.getUser();
+    // low version client user attribute is an empty string
+    if (!"".equals(user)) {
+      Map<String, Map<String, Long>> currentUserApps = 
applicationManager.getCurrentUserApps();
+      Map<String, Long> appAndTimes = currentUserApps.computeIfAbsent(user, x 
-> Maps.newConcurrentMap());
+      Integer defaultAppNum = 
applicationManager.getDefaultUserApps().getOrDefault(user,
+          conf.getInteger(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM));
+      int currentAppNum = appAndTimes.size();
+      if (currentAppNum >= defaultAppNum) {
+        String msg = "Denied by AccessClusterLoadChecker => "
+            + "User: " + user + ", current app num is: " + currentAppNum
+            + ", default app num is: " + defaultAppNum + ". We will reject 
this app[uuid=" + uuid + "].";
+        LOG.error(msg);
+        CoordinatorMetrics.counterTotalQuotaDeniedRequest.inc();
+        return new AccessCheckResult(false, msg);
+      }
+      appAndTimes.put(uuid, System.currentTimeMillis());
+    }
+    return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE, uuid);
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+}
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
index 71561af9..32c87eba 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
@@ -46,7 +46,6 @@ public class ApplicationManager {
   public static final List<String> REMOTE_PATH_SCHEMA = Arrays.asList("hdfs");
   private final long expired;
   private final StrategyName storageStrategy;
-  private final Map<String, Long> appIds = Maps.newConcurrentMap();
   private final SelectStorageStrategy selectStorageStrategy;
   // store appId -> remote path to make sure all shuffle data of the same 
application
   // will be written to the same remote storage
@@ -55,6 +54,9 @@ public class ApplicationManager {
   private final Map<String, RankValue> remoteStoragePathRankValue;
   private final Map<String, String> remoteStorageToHost = 
Maps.newConcurrentMap();
   private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
+  private final Map<String, Map<String, Long>> currentUserAndApp;
+  private final Map<String, String> appIdToUser;
+  private final Map<String, Integer> defaultUserApps;
   // it's only for test case to check if status check has problem
   private boolean hasErrorInStatusCheck = false;
 
@@ -73,6 +75,10 @@ public class ApplicationManager {
       throw new UnsupportedOperationException("Unsupported selected storage 
strategy.");
     }
     expired = conf.getLong(CoordinatorConf.COORDINATOR_APP_EXPIRED);
+    QuotaManager quotaManager = new QuotaManager(conf);
+    this.currentUserAndApp = quotaManager.getCurrentUserAndApp();
+    this.appIdToUser = quotaManager.getAppIdToUser();
+    this.defaultUserApps = quotaManager.getDefaultUserApps();
     // the thread for checking application status
     ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(
         ThreadUtils.getThreadFactory("ApplicationManager-%d"));
@@ -86,12 +92,32 @@ public class ApplicationManager {
         
conf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME), 
TimeUnit.MILLISECONDS);
   }
 
-  public void refreshAppId(String appId) {
-    if (!appIds.containsKey(appId)) {
+  public void registerApplicationInfo(String appId, String user) {
+    // using computeIfAbsent is just for MR and spark which is used 
RssShuffleManager as implementation class
+    // in such case by default, there is no currentUserAndApp, so a unified 
user implementation named "user" is used.
+    Map<String, Long> appAndTime = currentUserAndApp.computeIfAbsent(user, x 
-> Maps.newConcurrentMap());
+    appIdToUser.put(appId, user);
+    if (!appAndTime.containsKey(appId)) {
       CoordinatorMetrics.counterTotalAppNum.inc();
       LOG.info("New application is registered: {}", appId);
     }
-    appIds.put(appId, System.currentTimeMillis());
+    long currentTimeMillis = System.currentTimeMillis();
+    String[] appIdAndUuid = appId.split("_");
+    String uuidFromApp = appIdAndUuid[appIdAndUuid.length - 1];
+    // if appId created successfully, we need to remove the uuid
+    appAndTime.remove(uuidFromApp);
+    appAndTime.put(appId, currentTimeMillis);
+  }
+
+  public void refreshAppId(String appId) {
+    String user = appIdToUser.get(appId);
+    // compatible with lower version clients
+    if (user == null) {
+      registerApplicationInfo(appId, "");
+    } else {
+      Map<String, Long> appAndTime = currentUserAndApp.get(user);
+      appAndTime.put(appId, System.currentTimeMillis());
+    }
   }
 
   public void refreshRemoteStorage(String remoteStoragePath, String 
remoteStorageConf) {
@@ -190,7 +216,7 @@ public class ApplicationManager {
   }
 
   public Set<String> getAppIds() {
-    return appIds.keySet();
+    return appIdToUser.keySet();
   }
 
   @VisibleForTesting
@@ -219,16 +245,26 @@ public class ApplicationManager {
   }
 
   private void statusCheck() {
+    List<Map<String, Long>> appAndNums = 
Lists.newArrayList(currentUserAndApp.values());
+    Map<String, Long> appIds = Maps.newHashMap();
+    // The reason for setting an expired uuid here is that there is a scenario 
where accessCluster succeeds,
+    // but the registration of shuffle fails, resulting in no normal 
heartbeat, and no normal update of uuid to appId.
+    // Therefore, an expiration time is set to automatically remove expired 
uuids
+    Set<String> expiredAppIds = Sets.newHashSet();
     try {
-      LOG.info("Start to check status for " + appIds.size() + " applications");
-      long current = System.currentTimeMillis();
-      Set<String> expiredAppIds = Sets.newHashSet();
-      for (Map.Entry<String, Long> entry : appIds.entrySet()) {
-        long lastReport = entry.getValue();
-        if (current - lastReport > expired) {
-          expiredAppIds.add(entry.getKey());
+      for (Map<String, Long> appAndTimes : appAndNums) {
+        for (Map.Entry<String, Long> appAndTime : appAndTimes.entrySet()) {
+          String appId = appAndTime.getKey();
+          long lastReport = appAndTime.getValue();
+          appIds.put(appId, lastReport);
+          if (System.currentTimeMillis() - lastReport > expired) {
+            expiredAppIds.add(appId);
+            appAndTimes.remove(appId);
+            appIdToUser.remove(appId);
+          }
         }
       }
+      LOG.info("Start to check status for " + appIds.size() + " applications");
       for (String appId : expiredAppIds) {
         LOG.info("Remove expired application:" + appId);
         appIds.remove(appId);
@@ -281,6 +317,14 @@ public class ApplicationManager {
     return storageHost;
   }
 
+  public Map<String, Integer> getDefaultUserApps() {
+    return defaultUserApps;
+  }
+
+  public Map<String, Map<String, Long>> getCurrentUserApps() {
+    return currentUserAndApp;
+  }
+
   public enum StrategyName {
     APP_BALANCE,
     IO_SAMPLE
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index ae987656..c696dc83 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -77,7 +77,8 @@ public class CoordinatorConf extends RssBaseConf {
       .key("rss.coordinator.access.checkers")
       .stringType()
       .asList()
-      .defaultValues("org.apache.uniffle.coordinator.AccessClusterLoadChecker")
+      .defaultValues("org.apache.uniffle.coordinator.AccessClusterLoadChecker",
+          "org.apache.uniffle.coordinator.AccessQuotaChecker")
       .withDescription("Access checkers");
   public static final ConfigOption<Integer> 
COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC = ConfigOptions
       .key("rss.coordinator.access.candidates.updateIntervalSec")
@@ -178,6 +179,21 @@ public class CoordinatorConf extends RssBaseConf {
           
.enumType(AbstractAssignmentStrategy.SelectPartitionStrategyName.class)
           
.defaultValue(AbstractAssignmentStrategy.SelectPartitionStrategyName.ROUND)
           .withDescription("Strategy for selecting partitions");
+  public static final ConfigOption<Integer> COORDINATOR_QUOTA_DEFAULT_APP_NUM 
= ConfigOptions
+      .key("rss.coordinator.quota.default.app.num")
+      .intType()
+      .defaultValue(5)
+      .withDescription("Default number of apps at user level");
+  public static final ConfigOption<String> COORDINATOR_QUOTA_DEFAULT_PATH = 
ConfigOptions
+      .key("rss.coordinator.quota.default.path")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("A configuration file for the number of apps for a 
user-defined user");
+  public static final ConfigOption<Long> COORDINATOR_QUOTA_UPDATE_INTERVAL = 
ConfigOptions
+      .key("rss.coordinator.quota.update.interval")
+      .longType()
+      .defaultValue(60 * 1000L)
+      .withDescription("Update interval for the default number of submitted 
apps per user");
 
   public CoordinatorConf() {
   }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index e9cbb79b..75e3dce7 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -38,6 +38,8 @@ import 
org.apache.uniffle.proto.RssProtos.AccessClusterRequest;
 import org.apache.uniffle.proto.RssProtos.AccessClusterResponse;
 import org.apache.uniffle.proto.RssProtos.AppHeartBeatRequest;
 import org.apache.uniffle.proto.RssProtos.AppHeartBeatResponse;
+import org.apache.uniffle.proto.RssProtos.ApplicationInfoRequest;
+import org.apache.uniffle.proto.RssProtos.ApplicationInfoResponse;
 import org.apache.uniffle.proto.RssProtos.CheckServiceAvailableResponse;
 import org.apache.uniffle.proto.RssProtos.ClientConfItem;
 import org.apache.uniffle.proto.RssProtos.FetchClientConfResponse;
@@ -218,6 +220,30 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
     responseObserver.onCompleted();
   }
 
+  @Override
+  public void registerApplicationInfo(
+      ApplicationInfoRequest request,
+      StreamObserver<ApplicationInfoResponse> responseObserver) {
+    String appId = request.getAppId();
+    String user = request.getUser();
+    coordinatorServer.getApplicationManager().registerApplicationInfo(appId, 
user);
+    LOG.debug("Got a registered application info: " + appId);
+    ApplicationInfoResponse response = ApplicationInfoResponse
+        .newBuilder()
+        .setRetMsg("")
+        .setStatus(StatusCode.SUCCESS)
+        .build();
+
+    if (Context.current().isCancelled()) {
+      responseObserver.onError(Status.CANCELLED.withDescription("Cancelled by 
client").asRuntimeException());
+      LOG.warn("Cancelled by client {} for after deadline.", appId);
+      return;
+    }
+
+    responseObserver.onNext(response);
+    responseObserver.onCompleted();
+  }
+
   @Override
   public void accessCluster(AccessClusterRequest request, 
StreamObserver<AccessClusterResponse> responseObserver) {
     StatusCode statusCode = StatusCode.SUCCESS;
@@ -228,7 +254,8 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
             new AccessInfo(
                 request.getAccessId(),
                 Sets.newHashSet(request.getTagsList()),
-                request.getExtraPropertiesMap()
+                request.getExtraPropertiesMap(),
+                request.getUser()
             );
     AccessCheckResult result = accessManager.handleAccessRequest(accessInfo);
     if (!result.isSuccess()) {
@@ -239,6 +266,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
         .newBuilder()
         .setStatus(statusCode)
         .setRetMsg(result.getMsg())
+        .setUuid(result.getUuid())
         .build();
 
     if (Context.current().isCancelled()) {
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java
index d7d19b89..10bc8b2f 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java
@@ -39,6 +39,7 @@ public class CoordinatorMetrics {
   private static final String TOTAL_ACCESS_REQUEST = "total_access_request";
   private static final String TOTAL_CANDIDATES_DENIED_REQUEST = 
"total_candidates_denied_request";
   private static final String TOTAL_LOAD_DENIED_REQUEST = 
"total_load_denied_request";
+  private static final String TOTAL_QUOTA_DENIED_REQUEST = 
"total_quota_denied_request";
   public static final String REMOTE_STORAGE_IN_USED_PREFIX = 
"remote_storage_in_used_";
 
   static Gauge gaugeTotalServerNum;
@@ -48,6 +49,7 @@ public class CoordinatorMetrics {
   static Counter counterTotalAppNum;
   static Counter counterTotalAccessRequest;
   static Counter counterTotalCandidatesDeniedRequest;
+  static Counter counterTotalQuotaDeniedRequest;
   static Counter counterTotalLoadDeniedRequest;
   static final Map<String, Gauge> gaugeInUsedRemoteStorage = 
Maps.newConcurrentMap();
 
@@ -102,6 +104,7 @@ public class CoordinatorMetrics {
     counterTotalAppNum = metricsManager.addCounter(TOTAL_APP_NUM);
     counterTotalAccessRequest = 
metricsManager.addCounter(TOTAL_ACCESS_REQUEST);
     counterTotalCandidatesDeniedRequest = 
metricsManager.addCounter(TOTAL_CANDIDATES_DENIED_REQUEST);
+    counterTotalQuotaDeniedRequest = 
metricsManager.addCounter(TOTAL_QUOTA_DENIED_REQUEST);
     counterTotalLoadDeniedRequest = 
metricsManager.addCounter(TOTAL_LOAD_DENIED_REQUEST);
   }
 }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index cc8c9e84..6a3a0166 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -143,7 +143,7 @@ public class CoordinatorServer {
     AssignmentStrategyFactory assignmentStrategyFactory =
         new AssignmentStrategyFactory(coordinatorConf, clusterManager);
     this.assignmentStrategy = 
assignmentStrategyFactory.getAssignmentStrategy();
-    this.accessManager = new AccessManager(coordinatorConf, clusterManager, 
hadoopConf);
+    this.accessManager = new AccessManager(coordinatorConf, clusterManager, 
applicationManager, hadoopConf);
 
     CoordinatorFactory coordinatorFactory = new CoordinatorFactory(this);
     server = coordinatorFactory.getServer();
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
new file mode 100644
index 00000000..2d9c9fce
--- /dev/null
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * QuotaManager is a manager for resource restriction.
+ */
+public class QuotaManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(QuotaManager.class);
+  private final Map<String, Map<String, Long>> currentUserAndApp = 
Maps.newConcurrentMap();
+  private final Map<String, String> appIdToUser = Maps.newConcurrentMap();
+  private final String quotaFilePath;
+  private FileSystem hadoopFileSystem;
+  private final AtomicLong quotaFileLastModify = new AtomicLong(0L);
+  private final Map<String, Integer> defaultUserApps = Maps.newConcurrentMap();
+
+  public QuotaManager(CoordinatorConf conf) {
+    this.quotaFilePath = 
conf.get(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH);;
+    final Long updateTime = 
conf.get(CoordinatorConf.COORDINATOR_QUOTA_UPDATE_INTERVAL);
+    try {
+      hadoopFileSystem = HadoopFilesystemProvider.getFilesystem(new 
Path(quotaFilePath), new Configuration());
+    } catch (Exception e) {
+      LOG.error("Cannot init remoteFS on path : " + quotaFilePath, e);
+    }
+    // Threads that update the number of submitted applications
+    ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.getThreadFactory("UpdateDefaultApp-%d"));
+    scheduledExecutorService.scheduleAtFixedRate(
+        this::detectUserResource, 0, updateTime / 2, TimeUnit.MILLISECONDS);
+  }
+
+  public void detectUserResource() {
+    if (quotaFilePath != null && hadoopFileSystem != null) {
+      try {
+        Path hadoopPath = new Path(quotaFilePath);
+        FileStatus fileStatus = hadoopFileSystem.getFileStatus(hadoopPath);
+        if (fileStatus != null && fileStatus.isFile()) {
+          long latestModificationTime = fileStatus.getModificationTime();
+          if (quotaFileLastModify.get() != latestModificationTime) {
+            parseQuotaFile(hadoopFileSystem.open(hadoopPath));
+            quotaFileLastModify.set(latestModificationTime);
+          }
+        }
+      } catch (FileNotFoundException fileNotFoundException) {
+        LOG.error("Can't find this file {}", quotaFilePath);
+      } catch (Exception e) {
+        LOG.warn("Error when updating quotaFile, the exclude nodes file path: 
{}", quotaFilePath);
+      }
+    }
+  }
+
+  public void parseQuotaFile(DataInputStream fsDataInputStream) {
+    String content;
+    try (BufferedReader bufferedReader =
+             new BufferedReader(new InputStreamReader(fsDataInputStream, 
StandardCharsets.UTF_8))) {
+      while ((content = bufferedReader.readLine()) != null) {
+        String user = content.split(Constants.EQUAL_SPLIT_CHAR)[0].trim();
+        Integer appNum = 
Integer.valueOf(content.split(Constants.EQUAL_SPLIT_CHAR)[1].trim());
+        defaultUserApps.put(user, appNum);
+      }
+    } catch (Exception e) {
+      LOG.error("Error occur when parsing file {}", quotaFilePath, e);
+    }
+  }
+
+  public Map<String, Integer> getDefaultUserApps() {
+    return defaultUserApps;
+  }
+
+  public Map<String, Map<String, Long>> getCurrentUserAndApp() {
+    return currentUserAndApp;
+  }
+
+  public Map<String, String> getAppIdToUser() {
+    return appIdToUser;
+  }
+}
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessCandidatesCheckerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessCandidatesCheckerTest.java
index 03c7c1a0..b3a6b30a 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessCandidatesCheckerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessCandidatesCheckerTest.java
@@ -58,11 +58,11 @@ public class AccessCandidatesCheckerTest {
     conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, 
tempDir.toURI().toString());
     conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
         "org.apache.uniffle.coordinator.AccessCandidatesChecker");
-
+    final ApplicationManager applicationManager = new ApplicationManager(conf);
     // file load checking at startup
     Exception expectedException = null;
     try {
-      new AccessManager(conf, null, new Configuration());
+      new AccessManager(conf, null, applicationManager, new Configuration());
     } catch (RuntimeException e) {
       expectedException = e;
     }
@@ -72,7 +72,7 @@ public class AccessCandidatesCheckerTest {
     conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, 
cfgFile.toURI().toString());
     expectedException = null;
     try {
-      new AccessManager(conf, null, new Configuration());
+      new AccessManager(conf, null, applicationManager, new Configuration());
     } catch (RuntimeException e) {
       expectedException = e;
     }
@@ -88,7 +88,7 @@ public class AccessCandidatesCheckerTest {
     printWriter.println("2 ");
     printWriter.flush();
     printWriter.close();
-    AccessManager accessManager = new AccessManager(conf, null, new 
Configuration());
+    AccessManager accessManager = new AccessManager(conf, null, 
applicationManager, new Configuration());
     AccessCandidatesChecker checker = (AccessCandidatesChecker) 
accessManager.getAccessCheckers().get(0);
     sleep(1200);
     assertEquals(Sets.newHashSet("2", "9527", "135"), 
checker.getCandidates().get());
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessClusterLoadCheckerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessClusterLoadCheckerTest.java
index 18542162..f932c8f5 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessClusterLoadCheckerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessClusterLoadCheckerTest.java
@@ -86,8 +86,8 @@ public class AccessClusterLoadCheckerTest {
     conf.set(COORDINATOR_ACCESS_CHECKERS, 
Arrays.asList("org.apache.uniffle.coordinator.AccessClusterLoadChecker"));
     conf.set(COORDINATOR_SHUFFLE_NODES_MAX, 3);
     conf.set(COORDINATOR_ACCESS_LOADCHECKER_MEMORY_PERCENTAGE, 20.0);
-
-    AccessManager accessManager = new AccessManager(conf, clusterManager, new 
Configuration());
+    ApplicationManager applicationManager = new ApplicationManager(conf);
+    AccessManager accessManager = new AccessManager(conf, clusterManager, 
applicationManager, new Configuration());
 
     AccessClusterLoadChecker accessClusterLoadChecker =
         (AccessClusterLoadChecker) accessManager.getAccessCheckers().get(0);
@@ -99,7 +99,7 @@ public class AccessClusterLoadCheckerTest {
      */
     Map<String, String> properties = new HashMap<>();
     properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "-1");
-    AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), 
properties);
+    AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), 
properties, "user");
     assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
 
     /**
@@ -108,7 +108,7 @@ public class AccessClusterLoadCheckerTest {
      * the COORDINATOR_SHUFFLE_NODES_MAX, it should return true
      */
     properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "1");
-    accessInfo = new AccessInfo("test", new HashSet<>(), properties);
+    accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
     assertTrue(accessClusterLoadChecker.check(accessInfo).isSuccess());
 
     /**
@@ -117,7 +117,7 @@ public class AccessClusterLoadCheckerTest {
      * the COORDINATOR_SHUFFLE_NODES_MAX, it should return false
      */
     properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "100");
-    accessInfo = new AccessInfo("test", new HashSet<>(), properties);
+    accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
     assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
 
     /**
@@ -126,7 +126,7 @@ public class AccessClusterLoadCheckerTest {
      * default shuffle nodes max from coordinator conf.
      */
     properties = new HashMap<>();
-    accessInfo = new AccessInfo("test", new HashSet<>(), properties);
+    accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
     assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
   }
 
@@ -150,7 +150,8 @@ public class AccessClusterLoadCheckerTest {
     CoordinatorConf conf = new CoordinatorConf(filePath);
     conf.setString(COORDINATOR_ACCESS_CHECKERS.key(),
         "org.apache.uniffle.coordinator.AccessClusterLoadChecker");
-    AccessManager accessManager = new AccessManager(conf, clusterManager, new 
Configuration());
+    ApplicationManager applicationManager = new ApplicationManager(conf);
+    AccessManager accessManager = new AccessManager(conf, clusterManager, 
applicationManager, new Configuration());
     AccessClusterLoadChecker accessClusterLoadChecker =
         (AccessClusterLoadChecker) accessManager.getAccessCheckers().get(0);
     when(clusterManager.getServerList(any())).thenReturn(serverNodeList);
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessManagerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessManagerTest.java
index bc12ffe3..f519bcf7 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessManagerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessManagerTest.java
@@ -48,8 +48,9 @@ public class AccessManagerTest {
     // test init
     CoordinatorConf conf = new CoordinatorConf();
     conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), " , ");
+    ApplicationManager applicationManager = new ApplicationManager(conf);
     try {
-      new AccessManager(conf, null, new Configuration());
+      new AccessManager(conf, null, applicationManager, new Configuration());
     } catch (RuntimeException e) {
       String expectedMessage = "Empty classes";
       assertTrue(e.getMessage().startsWith(expectedMessage));
@@ -57,30 +58,30 @@ public class AccessManagerTest {
     conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
         
"com.Dummy,org.apache.uniffle.coordinator.AccessManagerTest$MockAccessChecker");
     try {
-      new AccessManager(conf, null, new Configuration());
+      new AccessManager(conf, null, applicationManager, new Configuration());
     } catch (RuntimeException e) {
       String expectedMessage = "java.lang.ClassNotFoundException: com.Dummy";
       assertTrue(e.getMessage().startsWith(expectedMessage));
     }
     // test empty checkers
     conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), "");
-    AccessManager accessManager = new AccessManager(conf, null, new 
Configuration());
+    AccessManager accessManager = new AccessManager(conf, null, 
applicationManager, new Configuration());
     assertTrue(accessManager.handleAccessRequest(
             new AccessInfo(String.valueOf(new Random().nextInt()),
-                Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 
Collections.emptyMap()))
+                Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 
Collections.emptyMap(), "user"))
         .isSuccess());
     accessManager.close();
     // test mock checkers
     conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
         
"org.apache.uniffle.coordinator.AccessManagerTest$MockAccessCheckerAlwaysTrue,");
-    accessManager = new AccessManager(conf, null, new Configuration());
+    accessManager = new AccessManager(conf, null, applicationManager, new 
Configuration());
     assertEquals(1, accessManager.getAccessCheckers().size());
     assertTrue(accessManager.handleAccessRequest(new 
AccessInfo("mock1")).isSuccess());
     assertTrue(accessManager.handleAccessRequest(new 
AccessInfo("mock2")).isSuccess());
     conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
         
"org.apache.uniffle.coordinator.AccessManagerTest$MockAccessCheckerAlwaysTrue,"
             + 
"org.apache.uniffle.coordinator.AccessManagerTest$MockAccessCheckerAlwaysFalse");
-    accessManager = new AccessManager(conf, null, new Configuration());
+    accessManager = new AccessManager(conf, null, applicationManager, new 
Configuration());
     assertEquals(2, accessManager.getAccessCheckers().size());
     assertFalse(accessManager.handleAccessRequest(new 
AccessInfo("mock1")).isSuccess());
     accessManager.close();
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessQuotaCheckerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessQuotaCheckerTest.java
new file mode 100644
index 00000000..723360fd
--- /dev/null
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessQuotaCheckerTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.uniffle.common.util.Constants.ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM;
+import static 
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ACCESS_CHECKERS;
+import static 
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AccessQuotaCheckerTest {
+  @BeforeEach
+  public void setUp() {
+    CoordinatorMetrics.register();
+  }
+
+  @AfterEach
+  public void clear() {
+    CoordinatorMetrics.clear();
+  }
+
+  @Test
+  public void testAccessInfoRequiredShuffleServers() throws Exception {
+    List<ServerNode> nodes = Lists.newArrayList();
+    ServerNode node1 = new ServerNode(
+        "1",
+        "1",
+        0,
+        50,
+        20,
+        1000,
+        0,
+        null,
+        true);
+    ServerNode node2 = new ServerNode(
+        "1",
+        "1",
+        0,
+        50,
+        20,
+        1000,
+        0,
+        null,
+        true);
+    nodes.add(node1);
+    nodes.add(node2);
+
+    ClusterManager clusterManager = mock(SimpleClusterManager.class);
+    when(clusterManager.getServerList(any())).thenReturn(nodes);
+
+    CoordinatorConf conf = new CoordinatorConf();
+    conf.set(COORDINATOR_ACCESS_CHECKERS,
+        
Collections.singletonList("org.apache.uniffle.coordinator.AccessQuotaChecker"));
+    conf.set(COORDINATOR_QUOTA_DEFAULT_APP_NUM, 3);
+    ApplicationManager applicationManager = new ApplicationManager(conf);
+    AccessManager accessManager = new AccessManager(conf, clusterManager, 
applicationManager, new Configuration());
+
+    AccessQuotaChecker accessQuotaChecker =
+        (AccessQuotaChecker) accessManager.getAccessCheckers().get(0);
+
+    /**
+     * case1:
+     * when user set default app num is 5, and commit 6 app which current app 
num is greater than default app num,
+     * it will reject 1 app and return false.
+     */
+    Map<String, String> properties = new HashMap<>();
+    AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), 
properties, "user");
+    assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
+    assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
+    assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
+    assertFalse(accessQuotaChecker.check(accessInfo).isSuccess());
+
+    /**
+     * case2:
+     * when setting the valid required shuffle nodes number of job and 
available servers greater than
+     * the COORDINATOR_SHUFFLE_NODES_MAX, it should return true
+     */
+    conf.set(COORDINATOR_QUOTA_DEFAULT_APP_NUM, 0);
+    applicationManager = new ApplicationManager(conf);
+    accessManager = new AccessManager(conf, clusterManager, 
applicationManager, new Configuration());
+    accessQuotaChecker = (AccessQuotaChecker) 
accessManager.getAccessCheckers().get(0);
+    accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
+    assertFalse(accessQuotaChecker.check(accessInfo).isSuccess());
+
+    /**
+     * case3:
+     * when setting two checkers and the valid required shuffle nodes number 
of job and available servers less than
+     * the COORDINATOR_SHUFFLE_NODES_MAX, it should return false
+     */
+    conf.set(COORDINATOR_QUOTA_DEFAULT_APP_NUM, 10);
+    conf.set(COORDINATOR_ACCESS_CHECKERS,
+        Arrays.asList("org.apache.uniffle.coordinator.AccessQuotaChecker",
+            "org.apache.uniffle.coordinator.AccessClusterLoadChecker"));
+    applicationManager = new ApplicationManager(conf);
+    accessManager = new AccessManager(conf, clusterManager, 
applicationManager, new Configuration());
+    accessQuotaChecker = (AccessQuotaChecker) 
accessManager.getAccessCheckers().get(0);
+    final AccessClusterLoadChecker accessClusterLoadChecker =
+        (AccessClusterLoadChecker) accessManager.getAccessCheckers().get(1);
+    properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "100");
+    accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
+    assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
+    assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
+  }
+}
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java
index 8ea88bcf..eefda99d 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java
@@ -72,7 +72,8 @@ public class AppBalanceSelectStorageStrategyTest {
     // do inc for remotePath1 to make sure pick storage will be remotePath2 in 
next call
     applicationManager.incRemoteStorageCounter(remotePath1);
     applicationManager.incRemoteStorageCounter(remotePath1);
-    String testApp1 = "testApp1";
+    String testApp1 = "application_test_" + 1;
+    applicationManager.registerApplicationInfo(testApp1, "user");
     applicationManager.refreshAppId(testApp1);
     assertEquals(remotePath2, 
applicationManager.pickRemoteStorage(testApp1).getPath());
     assertEquals(remotePath2, 
applicationManager.getAppIdToRemoteStorageInfo().get(testApp1).getPath());
@@ -88,6 +89,7 @@ public class AppBalanceSelectStorageStrategyTest {
 
     // refresh app1, got remotePath2, then remove remotePath2,
     // it should be existed in counter until it expired
+    applicationManager.registerApplicationInfo(testApp1, "user");
     applicationManager.refreshAppId(testApp1);
     assertEquals(remotePath2, 
applicationManager.pickRemoteStorage(testApp1).getPath());
     remoteStoragePath = remotePath1;
@@ -115,12 +117,13 @@ public class AppBalanceSelectStorageStrategyTest {
     String remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + 
remotePath2
         + Constants.COMMA_SPLIT_CHAR + remotePath3;
     applicationManager.refreshRemoteStorage(remoteStoragePath, "");
-    String appPrefix = "testAppId";
+    String testApp1 = "application_testAppId";
     // init detectStorageScheduler
     Thread.sleep(2000);
     Thread pickThread1 = new Thread(() -> {
       for (int i = 0; i < 1000; i++) {
-        String appId = appPrefix + i;
+        String appId = testApp1 + i;
+        applicationManager.registerApplicationInfo(appId, "user");
         applicationManager.refreshAppId(appId);
         applicationManager.pickRemoteStorage(appId);
       }
@@ -128,7 +131,8 @@ public class AppBalanceSelectStorageStrategyTest {
 
     Thread pickThread2 = new Thread(() -> {
       for (int i = 1000; i < 2000; i++) {
-        String appId = appPrefix + i;
+        String appId = testApp1 + i;
+        applicationManager.registerApplicationInfo(appId, "user");
         applicationManager.refreshAppId(appId);
         applicationManager.pickRemoteStorage(appId);
       }
@@ -136,7 +140,8 @@ public class AppBalanceSelectStorageStrategyTest {
 
     Thread pickThread3 = new Thread(() -> {
       for (int i = 2000; i < 3000; i++) {
-        String appId = appPrefix + i;
+        String appId = testApp1 + i;
+        applicationManager.registerApplicationInfo(appId, "user");
         applicationManager.refreshAppId(appId);
         applicationManager.pickRemoteStorage(appId);
       }
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
index 791ec1dc..64150d42 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
@@ -101,7 +101,8 @@ public class ApplicationManagerTest {
   public void clearWithoutRemoteStorageTest() throws Exception {
     // test case for storage type without remote storage,
     // NPE shouldn't happen when clear the resource
-    String testApp = "clearWithoutRemoteStorageTest";
+    String testApp = "application_clearWithoutRemoteStorageTest";
+    applicationManager.registerApplicationInfo(testApp, "user");
     applicationManager.refreshAppId(testApp);
     // just set a value != 0, it should be reset to 0 if everything goes well
     CoordinatorMetrics.gaugeRunningAppNum.set(100.0);
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java
index 1b4a2345..b1bd1e0e 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java
@@ -87,7 +87,7 @@ public class CoordinatorMetricsTest {
     ObjectMapper mapper = new ObjectMapper();
     JsonNode actualObj = mapper.readTree(content);
     assertEquals(2, actualObj.size());
-    assertEquals(9, actualObj.get("metrics").size());
+    assertEquals(10, actualObj.get("metrics").size());
   }
 
   @Test
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java
index e0c3dde8..7273318f 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java
@@ -77,7 +77,7 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
     Thread.sleep(500L);
     CoordinatorConf conf = new CoordinatorConf();
     conf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, appExpiredTime);
-    conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME, 
200);
+    conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME, 
1000);
     conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, 
IO_SAMPLE);
     applicationManager = new ApplicationManager(conf);
     selectStorageStrategy = (LowestIOSampleCostSelectStorageStrategy) 
applicationManager.getSelectStorageStrategy();
@@ -104,10 +104,11 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
     // compare with two remote path
     applicationManager.incRemoteStorageCounter(remoteStorage1);
     applicationManager.incRemoteStorageCounter(remoteStorage1);
-    String testApp1 = "testApp1";
+    String testApp1 = "application_test_" + 1;
+    applicationManager.registerApplicationInfo(testApp1, "user");
+    applicationManager.refreshAppId(testApp1);
     Thread.sleep(1000);
     final long current = System.currentTimeMillis();
-    applicationManager.refreshAppId(testApp1);
     fs.create(path);
     selectStorageStrategy.sortPathByRankValue(remoteStorage2, testFile, 
current);
     fs.create(path);
@@ -129,6 +130,7 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
 
     // refresh app1, got remotePath2, then remove remotePath2,
     // it should be existed in counter until it expired
+    applicationManager.registerApplicationInfo(testApp1, "user");
     applicationManager.refreshAppId(testApp1);
     assertEquals(remoteStorage2, 
applicationManager.pickRemoteStorage(testApp1).getPath());
     remoteStoragePath = remoteStorage1;
@@ -158,12 +160,13 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
     String remoteStoragePath = remoteStorage1 + Constants.COMMA_SPLIT_CHAR + 
remoteStorage2
         + Constants.COMMA_SPLIT_CHAR + remoteStorage3;
     applicationManager.refreshRemoteStorage(remoteStoragePath, "");
-    String appPrefix = "testAppId";
+    String testApp1 = "application_testAppId";
     // init detectStorageScheduler
     Thread.sleep(2000);
     Thread pickThread1 = new Thread(() -> {
       for (int i = 0; i < 1000; i++) {
-        String appId = appPrefix + i;
+        String appId = testApp1 + i;
+        applicationManager.registerApplicationInfo(appId, "user");
         applicationManager.refreshAppId(appId);
         applicationManager.pickRemoteStorage(appId);
       }
@@ -171,7 +174,8 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
 
     Thread pickThread2 = new Thread(() -> {
       for (int i = 1000; i < 2000; i++) {
-        String appId = appPrefix + i;
+        String appId = testApp1 + i;
+        applicationManager.registerApplicationInfo(appId, "user");
         applicationManager.refreshAppId(appId);
         applicationManager.pickRemoteStorage(appId);
       }
@@ -179,7 +183,8 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
 
     Thread pickThread3 = new Thread(() -> {
       for (int i = 2000; i < 3000; i++) {
-        String appId = appPrefix + i;
+        String appId = testApp1 + i;
+        applicationManager.registerApplicationInfo(appId, "user");
         applicationManager.refreshAppId(appId);
         applicationManager.pickRemoteStorage(appId);
       }
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
new file mode 100644
index 00000000..fe63b6c0
--- /dev/null
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * QuotaManager is a manager for resource restriction.
+ */
+public class QuotaManagerTest {
+  public QuotaManager quotaManager;
+  private static final Configuration hdfsConf = new Configuration();
+  private static MiniDFSCluster cluster;
+  @TempDir
+  private static File remotePath = new File("hdfs://rss");
+
+  @BeforeEach
+  public void setUp() throws IOException {
+    hdfsConf.set("fs.defaultFS", remotePath.getAbsolutePath());
+    hdfsConf.set("dfs.nameservices", "rss");
+    hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
remotePath.getAbsolutePath());
+    cluster = (new MiniDFSCluster.Builder(hdfsConf)).build();
+    CoordinatorConf conf = new CoordinatorConf();
+    quotaManager = new QuotaManager(conf);
+  }
+
+  @AfterAll
+  public static void clear() {
+    cluster.close();
+  }
+
+  @Test
+  public void testDetectUserResource() throws Exception {
+    final String quotaFile =
+        new 
Path(remotePath.getAbsolutePath()).getFileSystem(hdfsConf).getName() + 
"/quotaFile.properties";
+    final FSDataOutputStream fsDataOutputStream =
+        new Path(remotePath.toString()).getFileSystem(hdfsConf).create(new 
Path(quotaFile));
+    String quota1 = "user1 =10";
+    String quota2 = "user2= 20";
+    String quota3 = "user3 = 30";
+    fsDataOutputStream.write(quota1.getBytes(StandardCharsets.UTF_8));
+    fsDataOutputStream.write("\n".getBytes(StandardCharsets.UTF_8));
+    fsDataOutputStream.write(quota2.getBytes(StandardCharsets.UTF_8));
+    fsDataOutputStream.write("\n".getBytes(StandardCharsets.UTF_8));
+    fsDataOutputStream.write(quota3.getBytes(StandardCharsets.UTF_8));
+    fsDataOutputStream.flush();
+    fsDataOutputStream.close();
+    CoordinatorConf conf = new CoordinatorConf();
+    conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH,
+        quotaFile);
+    quotaManager = new QuotaManager(conf);
+    quotaManager.detectUserResource();
+
+    Integer user1 = quotaManager.getDefaultUserApps().get("user1");
+    Integer user2 = quotaManager.getDefaultUserApps().get("user2");
+    Integer user3 = quotaManager.getDefaultUserApps().get("user3");
+    assertEquals(user1, 10);
+    assertEquals(user2, 20);
+    assertEquals(user3, 30);
+  }
+}
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
index c4097d7d..2ab9e2bd 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.Test;
 import org.apache.uniffle.coordinator.AccessCandidatesChecker;
 import org.apache.uniffle.coordinator.AccessInfo;
 import org.apache.uniffle.coordinator.AccessManager;
+import org.apache.uniffle.coordinator.ApplicationManager;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.coordinator.CoordinatorMetrics;
 import org.apache.uniffle.storage.HdfsTestBase;
@@ -70,11 +71,11 @@ public class AccessCandidatesCheckerHdfsTest extends 
HdfsTestBase {
     conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, 
clusterPrefix);
     conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
         "org.apache.uniffle.coordinator.AccessCandidatesChecker");
-
+    ApplicationManager applicationManager = new ApplicationManager(conf);
     // file load checking at startup
     Exception expectedException = null;
     try {
-      new AccessManager(conf, null, new Configuration());
+      new AccessManager(conf, null, applicationManager, new Configuration());
     } catch (RuntimeException e) {
       expectedException = e;
     }
@@ -84,7 +85,7 @@ public class AccessCandidatesCheckerHdfsTest extends 
HdfsTestBase {
     conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, 
candidatesFile);
     expectedException = null;
     try {
-      new AccessManager(conf, null, new Configuration());
+      new AccessManager(conf, null, applicationManager, new Configuration());
     } catch (RuntimeException e) {
       expectedException = e;
     }
@@ -101,7 +102,7 @@ public class AccessCandidatesCheckerHdfsTest extends 
HdfsTestBase {
     printWriter.println("2 ");
     printWriter.flush();
     printWriter.close();
-    AccessManager accessManager = new AccessManager(conf, null, hadoopConf);
+    AccessManager accessManager = new AccessManager(conf, null, 
applicationManager, hadoopConf);
     AccessCandidatesChecker checker = (AccessCandidatesChecker) 
accessManager.getAccessCheckers().get(0);
     // load the config at the beginning
     sleep(1200);
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
index e245c256..4ee5b651 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
@@ -83,27 +83,26 @@ public class AccessClusterTest extends CoordinatorTestBase {
     createCoordinatorServer(coordinatorConf);
     startServers();
     Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
-
     // case1: empty map
     String accessID = "acessid";
-    RssAccessClusterRequest request = new RssAccessClusterRequest(
-            accessID, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000);
+    RssAccessClusterRequest request = new RssAccessClusterRequest(accessID,
+        Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user");
     RssAccessClusterResponse response = 
coordinatorClient.accessCluster(request);
     assertEquals(ResponseStatusCode.ACCESS_DENIED, response.getStatusCode());
 
     // case2: illegal names
     Map<String, String> extraProperties = new HashMap<>();
     extraProperties.put("key", "illegalName");
-    request = new RssAccessClusterRequest(
-            accessID, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, 
extraProperties);
+    request = new RssAccessClusterRequest(accessID, 
Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION),
+        2000, extraProperties, "user");
     response = coordinatorClient.accessCluster(request);
     assertEquals(ResponseStatusCode.ACCESS_DENIED, response.getStatusCode());
 
     // case3: legal names
     extraProperties.clear();
     extraProperties.put("key", "v1");
-    request = new RssAccessClusterRequest(
-            accessID, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, 
extraProperties);
+    request = new RssAccessClusterRequest(accessID, 
Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION),
+        2000, extraProperties, "user");
     response = coordinatorClient.accessCluster(request);
     assertEquals(ResponseStatusCode.SUCCESS, response.getStatusCode());
 
@@ -135,15 +134,15 @@ public class AccessClusterTest extends 
CoordinatorTestBase {
     startServers();
     Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
     String accessId = "111111";
-    RssAccessClusterRequest request = new RssAccessClusterRequest(
-        accessId, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000);
+    RssAccessClusterRequest request = new RssAccessClusterRequest(accessId,
+        Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user");
     RssAccessClusterResponse response = 
coordinatorClient.accessCluster(request);
     assertEquals(ResponseStatusCode.ACCESS_DENIED, response.getStatusCode());
     assertTrue(response.getMessage().startsWith("Denied by 
AccessCandidatesChecker"));
 
     accessId = "135";
-    request = new RssAccessClusterRequest(
-        accessId, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000);
+    request = new RssAccessClusterRequest(accessId,
+        Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user");
     response = coordinatorClient.accessCluster(request);
     assertEquals(ResponseStatusCode.ACCESS_DENIED, response.getStatusCode());
     assertTrue(response.getMessage().startsWith("Denied by 
AccessClusterLoadChecker"));
@@ -156,14 +155,14 @@ public class AccessClusterTest extends 
CoordinatorTestBase {
 
     CoordinatorClient client = new CoordinatorClientFactory("GRPC")
         .createCoordinatorClient(LOCALHOST, COORDINATOR_PORT_1 + 13);
-    request = new RssAccessClusterRequest(
-        accessId, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000);
+    request = new RssAccessClusterRequest(accessId,
+        Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user");
     response = client.accessCluster(request);
     assertEquals(ResponseStatusCode.INTERNAL_ERROR, response.getStatusCode());
     assertTrue(response.getMessage().startsWith("UNAVAILABLE: io exception"));
 
-    request = new RssAccessClusterRequest(
-        accessId, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000);
+    request = new RssAccessClusterRequest(accessId,
+        Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user");
     response = coordinatorClient.accessCluster(request);
     assertEquals(ResponseStatusCode.SUCCESS, response.getStatusCode());
     assertTrue(response.getMessage().startsWith("SUCCESS"));
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
index 0c1e7c07..a1858b84 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
@@ -22,7 +22,7 @@ import io.prometheus.client.CollectorRegistry;
 import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcClient;
-import org.apache.uniffle.client.request.RssAppHeartBeatRequest;
+import org.apache.uniffle.client.request.RssApplicationInfoRequest;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.metrics.GRPCMetrics;
 import org.apache.uniffle.common.rpc.GrpcServer;
@@ -40,10 +40,10 @@ public class CoordinatorGrpcServerTest {
 
   static class MockedCoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorServerImplBase {
     @Override
-    public void appHeartbeat(
-        RssProtos.AppHeartBeatRequest request,
-        StreamObserver<RssProtos.AppHeartBeatResponse> responseObserver) {
-      RssProtos.AppHeartBeatResponse response = RssProtos.AppHeartBeatResponse
+    public void registerApplicationInfo(
+        RssProtos.ApplicationInfoRequest request,
+        StreamObserver<RssProtos.ApplicationInfoResponse> responseObserver) {
+      RssProtos.ApplicationInfoResponse response = 
RssProtos.ApplicationInfoResponse
           .newBuilder()
           .setRetMsg("")
           .setStatus(RssProtos.StatusCode.SUCCESS)
@@ -69,7 +69,8 @@ public class CoordinatorGrpcServerTest {
     assertEquals(0, connSize);
 
     CoordinatorGrpcClient coordinatorGrpcClient = new 
CoordinatorGrpcClient("localhost", 20001);
-    coordinatorGrpcClient.sendAppHeartBeat(new 
RssAppHeartBeatRequest("testGrpcConnectionSize", 10000));
+    coordinatorGrpcClient.registerApplicationInfo(
+        new RssApplicationInfoRequest("testGrpcConnectionSize", 10000, 
"user"));
 
     connSize = 
grpcMetrics.getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get();
     assertEquals(1, connSize);
@@ -77,8 +78,8 @@ public class CoordinatorGrpcServerTest {
     // case2: test the multiple connections
     CoordinatorGrpcClient client1 = new CoordinatorGrpcClient("localhost", 
20001);
     CoordinatorGrpcClient client2 = new CoordinatorGrpcClient("localhost", 
20001);
-    client1.sendAppHeartBeat(new 
RssAppHeartBeatRequest("testGrpcConnectionSize", 10000));
-    client2.sendAppHeartBeat(new 
RssAppHeartBeatRequest("testGrpcConnectionSize", 10000));
+    client1.registerApplicationInfo(new 
RssApplicationInfoRequest("testGrpcConnectionSize", 10000, "user"));
+    client2.registerApplicationInfo(new 
RssApplicationInfoRequest("testGrpcConnectionSize", 10000, "user"));
 
     connSize = 
grpcMetrics.getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get();
     assertEquals(3, connSize);
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
index 3cc79632..35a2ee0d 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
@@ -27,10 +27,10 @@ import com.google.common.collect.Sets;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
-import org.apache.uniffle.client.request.RssAppHeartBeatRequest;
+import org.apache.uniffle.client.request.RssApplicationInfoRequest;
 import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest;
 import org.apache.uniffle.client.response.ResponseStatusCode;
-import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
+import org.apache.uniffle.client.response.RssApplicationInfoResponse;
 import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.ShuffleRegisterInfo;
@@ -200,22 +200,25 @@ public class CoordinatorGrpcTest extends 
CoordinatorTestBase {
 
   @Test
   public void appHeartbeatTest() throws Exception {
-    RssAppHeartBeatResponse response =
-        coordinatorClient.sendAppHeartBeat(new 
RssAppHeartBeatRequest("appHeartbeatTest1", 1000));
+    RssApplicationInfoResponse response =
+        coordinatorClient.registerApplicationInfo(
+            new RssApplicationInfoRequest("application_appHeartbeatTest1", 
1000, "user"));
     assertEquals(ResponseStatusCode.SUCCESS, response.getStatusCode());
-    assertEquals(Sets.newHashSet("appHeartbeatTest1"),
+    assertEquals(Sets.newHashSet("application_appHeartbeatTest1"),
         coordinators.get(0).getApplicationManager().getAppIds());
-    coordinatorClient.sendAppHeartBeat(new 
RssAppHeartBeatRequest("appHeartbeatTest2", 1000));
-    assertEquals(Sets.newHashSet("appHeartbeatTest1", "appHeartbeatTest2"),
+    coordinatorClient.registerApplicationInfo(
+        new RssApplicationInfoRequest("application_appHeartbeatTest2", 1000, 
"user"));
+    assertEquals(Sets.newHashSet("application_appHeartbeatTest1", 
"application_appHeartbeatTest2"),
         coordinators.get(0).getApplicationManager().getAppIds());
     int retry = 0;
     while (retry < 5) {
-      coordinatorClient.sendAppHeartBeat(new 
RssAppHeartBeatRequest("appHeartbeatTest1", 1000));
+      coordinatorClient.registerApplicationInfo(
+          new RssApplicationInfoRequest("application_appHeartbeatTest1", 1000, 
"user"));
       retry++;
       Thread.sleep(1000);
     }
     // appHeartbeatTest2 was removed because of expired
-    assertEquals(Sets.newHashSet("appHeartbeatTest1"),
+    assertEquals(Sets.newHashSet("application_appHeartbeatTest1"),
         coordinators.get(0).getApplicationManager().getAppIds());
   }
 
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
index 86faac4d..76949fd4 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
@@ -110,13 +110,13 @@ public class FetchClientConfTest extends 
CoordinatorTestBase {
     
coordinatorConf.setBoolean(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED,
 true);
     
coordinatorConf.setString(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, 
cfgFile.toURI().toString());
     
coordinatorConf.setInteger(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC,
 3);
-    
coordinatorConf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME,
 200);
+    
coordinatorConf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME,
 1000);
     
coordinatorConf.setInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_ACCESS_TIMES,
 1);
     createCoordinatorServer(coordinatorConf);
     startServers();
 
     waitForUpdate(Sets.newHashSet(remotePath1), 
coordinators.get(0).getApplicationManager());
-    String appId = "testFetchRemoteStorageApp";
+    String appId = "application_testFetchRemoteStorageApp_" + 1;
     RssFetchRemoteStorageRequest request = new 
RssFetchRemoteStorageRequest(appId);
     RssFetchRemoteStorageResponse response = 
coordinatorClient.fetchRemoteStorage(request);
     RemoteStorageInfo remoteStorageInfo = response.getRemoteStorageInfo();
@@ -136,7 +136,8 @@ public class FetchClientConfTest extends 
CoordinatorTestBase {
     assertEquals(remotePath1, remoteStorageInfo.getPath());
     assertTrue(remoteStorageInfo.getConfItems().isEmpty());
 
-    request = new RssFetchRemoteStorageRequest(appId + "another");
+    String newAppId = "application_testFetchRemoteStorageApp_" + 2;
+    request = new RssFetchRemoteStorageRequest(newAppId);
     response = coordinatorClient.fetchRemoteStorage(request);
     // got the remotePath2 for new appId
     remoteStorageInfo = response.getRemoteStorageInfo();
@@ -161,14 +162,14 @@ public class FetchClientConfTest extends 
CoordinatorTestBase {
     
coordinatorConf.setBoolean(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED,
 true);
     
coordinatorConf.setString(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, 
cfgFile.toURI().toString());
     
coordinatorConf.setInteger(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC,
 2);
-    
coordinatorConf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME,
 100);
+    
coordinatorConf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME,
 1000);
     
coordinatorConf.setInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_ACCESS_TIMES,
 1);
     
coordinatorConf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, 
IO_SAMPLE);
     createCoordinatorServer(coordinatorConf);
     startServers();
 
     waitForUpdate(Sets.newHashSet(remotePath1), 
coordinators.get(0).getApplicationManager());
-    String appId = "testFetchRemoteStorageApp";
+    String appId = "application_testFetchRemoteStorageApp_" + 1;
     RssFetchRemoteStorageRequest request = new 
RssFetchRemoteStorageRequest(appId);
     RssFetchRemoteStorageResponse response = 
coordinatorClient.fetchRemoteStorage(request);
     RemoteStorageInfo remoteStorageInfo = response.getRemoteStorageInfo();
@@ -189,7 +190,8 @@ public class FetchClientConfTest extends 
CoordinatorTestBase {
 
     // ensure sizeList can be updated
     Thread.sleep(2000);
-    request = new RssFetchRemoteStorageRequest(appId + "another");
+    String newAppId = "application_testFetchRemoteStorageApp_" + 2;
+    request = new RssFetchRemoteStorageRequest(newAppId);
     response = coordinatorClient.fetchRemoteStorage(request);
     // got the remotePath2 for new appId
     remoteStorageInfo = response.getRemoteStorageInfo();
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index 9b3d8488..3d54f790 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -75,8 +75,8 @@ import static org.junit.jupiter.api.Assertions.fail;
 public class ShuffleServerGrpcTest extends IntegrationTestBase {
 
   private ShuffleServerGrpcClient shuffleServerClient;
-  private AtomicInteger atomicInteger = new AtomicInteger(0);
-  private static Long EVENT_THRESHOLD_SIZE = 2048L;
+  private final AtomicInteger atomicInteger = new AtomicInteger(0);
+  private static final Long EVENT_THRESHOLD_SIZE = 2048L;
   private static final int GB = 1024 * 1024 * 1024;
 
   @BeforeAll
@@ -111,30 +111,31 @@ public class ShuffleServerGrpcTest extends 
IntegrationTestBase {
     shuffleWriteClient.registerCoordinators("127.0.0.1:19999");
     shuffleWriteClient.registerShuffle(
         new ShuffleServerInfo("127.0.0.1-20001", "127.0.0.1", 20001),
-        "clearResourceTest1",
+        "application_clearResourceTest1",
         0,
         Lists.newArrayList(new PartitionRange(0, 1)),
         new RemoteStorageInfo(""),
         ShuffleDataDistributionType.NORMAL
     );
+    
shuffleWriteClient.registerApplicationInfo("application_clearResourceTest1", 
500L, "user");
+    shuffleWriteClient.sendAppHeartbeat("application_clearResourceTest1", 
500L);
+    
shuffleWriteClient.registerApplicationInfo("application_clearResourceTest2", 
500L, "user");
+    shuffleWriteClient.sendAppHeartbeat("application_clearResourceTest2", 
500L);
 
-    shuffleWriteClient.sendAppHeartbeat("clearResourceTest1", 1000L);
-    shuffleWriteClient.sendAppHeartbeat("clearResourceTest2", 1000L);
-
-    RssRegisterShuffleRequest rrsr = new 
RssRegisterShuffleRequest("clearResourceTest1", 0,
+    RssRegisterShuffleRequest rrsr = new 
RssRegisterShuffleRequest("application_clearResourceTest1", 0,
         Lists.newArrayList(new PartitionRange(0, 1)), "");
     shuffleServerClient.registerShuffle(rrsr);
-    rrsr = new RssRegisterShuffleRequest("clearResourceTest2", 0,
+    rrsr = new RssRegisterShuffleRequest("application_clearResourceTest2", 0,
         Lists.newArrayList(new PartitionRange(0, 1)), "");
     shuffleServerClient.registerShuffle(rrsr);
-    assertEquals(Sets.newHashSet("clearResourceTest1", "clearResourceTest2"),
+    assertEquals(Sets.newHashSet("application_clearResourceTest1", 
"application_clearResourceTest2"),
         shuffleServers.get(0).getShuffleTaskManager().getAppIds());
 
     // Thread will keep refresh clearResourceTest1 in coordinator
     Thread t = new Thread(() -> {
       int i = 0;
       while (i < 20) {
-        shuffleWriteClient.sendAppHeartbeat("clearResourceTest1", 1000L);
+        shuffleWriteClient.sendAppHeartbeat("application_clearResourceTest1", 
500L);
         i++;
         try {
           Thread.sleep(1000);
@@ -147,13 +148,13 @@ public class ShuffleServerGrpcTest extends 
IntegrationTestBase {
 
     // Heartbeat is sent to coordinator too]
     Thread.sleep(3000);
-    shuffleServerClient.registerShuffle(new 
RssRegisterShuffleRequest("clearResourceTest1", 0,
+    shuffleServerClient.registerShuffle(new 
RssRegisterShuffleRequest("application_clearResourceTest1", 0,
         Lists.newArrayList(new PartitionRange(0, 1)), ""));
-    assertEquals(Sets.newHashSet("clearResourceTest1"),
+    assertEquals(Sets.newHashSet("application_clearResourceTest1"),
         coordinators.get(0).getApplicationManager().getAppIds());
     // clearResourceTest2 will be removed because of 
rss.server.app.expired.withoutHeartbeat
     Thread.sleep(2000);
-    assertEquals(Sets.newHashSet("clearResourceTest1"),
+    assertEquals(Sets.newHashSet("application_clearResourceTest1"),
         shuffleServers.get(0).getShuffleTaskManager().getAppIds());
 
     // clearResourceTest1 will be removed because of 
rss.server.app.expired.withoutHeartbeat
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/api/CoordinatorClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/api/CoordinatorClient.java
index 3bd7ca55..0ce1ca4e 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/api/CoordinatorClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/api/CoordinatorClient.java
@@ -19,12 +19,14 @@ package org.apache.uniffle.client.api;
 
 import org.apache.uniffle.client.request.RssAccessClusterRequest;
 import org.apache.uniffle.client.request.RssAppHeartBeatRequest;
+import org.apache.uniffle.client.request.RssApplicationInfoRequest;
 import org.apache.uniffle.client.request.RssFetchClientConfRequest;
 import org.apache.uniffle.client.request.RssFetchRemoteStorageRequest;
 import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest;
 import org.apache.uniffle.client.request.RssSendHeartBeatRequest;
 import org.apache.uniffle.client.response.RssAccessClusterResponse;
 import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
+import org.apache.uniffle.client.response.RssApplicationInfoResponse;
 import org.apache.uniffle.client.response.RssFetchClientConfResponse;
 import org.apache.uniffle.client.response.RssFetchRemoteStorageResponse;
 import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse;
@@ -34,6 +36,8 @@ public interface CoordinatorClient {
 
   RssAppHeartBeatResponse sendAppHeartBeat(RssAppHeartBeatRequest request);
 
+  RssApplicationInfoResponse registerApplicationInfo(RssApplicationInfoRequest 
request);
+
   RssSendHeartBeatResponse sendHeartBeat(RssSendHeartBeatRequest request);
 
   RssGetShuffleAssignmentsResponse 
getShuffleAssignments(RssGetShuffleAssignmentsRequest request);
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index be44c60c..4f85bdae 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.client.api.CoordinatorClient;
 import org.apache.uniffle.client.request.RssAccessClusterRequest;
 import org.apache.uniffle.client.request.RssAppHeartBeatRequest;
+import org.apache.uniffle.client.request.RssApplicationInfoRequest;
 import org.apache.uniffle.client.request.RssFetchClientConfRequest;
 import org.apache.uniffle.client.request.RssFetchRemoteStorageRequest;
 import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest;
@@ -43,6 +44,7 @@ import 
org.apache.uniffle.client.request.RssSendHeartBeatRequest;
 import org.apache.uniffle.client.response.ResponseStatusCode;
 import org.apache.uniffle.client.response.RssAccessClusterResponse;
 import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
+import org.apache.uniffle.client.response.RssApplicationInfoResponse;
 import org.apache.uniffle.client.response.RssFetchClientConfResponse;
 import org.apache.uniffle.client.response.RssFetchRemoteStorageResponse;
 import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse;
@@ -56,8 +58,8 @@ import 
org.apache.uniffle.proto.CoordinatorServerGrpc.CoordinatorServerBlockingS
 import org.apache.uniffle.proto.RssProtos;
 import org.apache.uniffle.proto.RssProtos.AccessClusterRequest;
 import org.apache.uniffle.proto.RssProtos.AccessClusterResponse;
-import org.apache.uniffle.proto.RssProtos.AppHeartBeatRequest;
-import org.apache.uniffle.proto.RssProtos.AppHeartBeatResponse;
+import org.apache.uniffle.proto.RssProtos.ApplicationInfoRequest;
+import org.apache.uniffle.proto.RssProtos.ApplicationInfoResponse;
 import org.apache.uniffle.proto.RssProtos.ClientConfItem;
 import org.apache.uniffle.proto.RssProtos.FetchClientConfResponse;
 import org.apache.uniffle.proto.RssProtos.FetchRemoteStorageRequest;
@@ -207,8 +209,9 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
 
   @Override
   public RssAppHeartBeatResponse sendAppHeartBeat(RssAppHeartBeatRequest 
request) {
-    AppHeartBeatRequest rpcRequest = 
AppHeartBeatRequest.newBuilder().setAppId(request.getAppId()).build();
-    AppHeartBeatResponse rpcResponse = blockingStub
+    RssProtos.AppHeartBeatRequest rpcRequest =
+        
RssProtos.AppHeartBeatRequest.newBuilder().setAppId(request.getAppId()).build();
+    RssProtos.AppHeartBeatResponse rpcResponse = blockingStub
         .withDeadlineAfter(request.getTimeoutMs(), 
TimeUnit.MILLISECONDS).appHeartbeat(rpcRequest);
     RssAppHeartBeatResponse response;
     StatusCode statusCode = rpcResponse.getStatus();
@@ -222,6 +225,24 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
     return response;
   }
 
+  @Override
+  public RssApplicationInfoResponse 
registerApplicationInfo(RssApplicationInfoRequest request) {
+    ApplicationInfoRequest rpcRequest =
+        
ApplicationInfoRequest.newBuilder().setAppId(request.getAppId()).setUser(request.getUser()).build();
+    ApplicationInfoResponse rpcResponse = blockingStub
+        .withDeadlineAfter(request.getTimeoutMs(), 
TimeUnit.MILLISECONDS).registerApplicationInfo(rpcRequest);
+    RssApplicationInfoResponse response;
+    StatusCode statusCode = rpcResponse.getStatus();
+    switch (statusCode) {
+      case SUCCESS:
+        response = new RssApplicationInfoResponse(ResponseStatusCode.SUCCESS);
+        break;
+      default:
+        response = new 
RssApplicationInfoResponse(ResponseStatusCode.INTERNAL_ERROR);
+    }
+    return response;
+  }
+
   @Override
   public RssGetShuffleAssignmentsResponse 
getShuffleAssignments(RssGetShuffleAssignmentsRequest request) {
     RssProtos.GetShuffleAssignmentsResponse rpcResponse = 
doGetShuffleAssignments(
@@ -260,6 +281,7 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
     AccessClusterRequest rpcRequest = AccessClusterRequest
         .newBuilder()
         .setAccessId(request.getAccessId())
+        .setUser(request.getUser())
         .addAllTags(request.getTags())
         .putAllExtraProperties(request.getExtraProperties())
         .build();
@@ -277,7 +299,9 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
       case SUCCESS:
         response = new RssAccessClusterResponse(
             ResponseStatusCode.SUCCESS,
-            rpcResponse.getRetMsg());
+            rpcResponse.getRetMsg(),
+            rpcResponse.getUuid()
+        );
         break;
       default:
         response = new 
RssAccessClusterResponse(ResponseStatusCode.ACCESS_DENIED, 
rpcResponse.getRetMsg());
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssAccessClusterRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssAccessClusterRequest.java
index ac5523ac..781a6b4f 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssAccessClusterRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssAccessClusterRequest.java
@@ -26,28 +26,32 @@ public class RssAccessClusterRequest {
   private final String accessId;
   private final Set<String> tags;
   private final int timeoutMs;
+  private final String user;
   /**
    * The map is to pass the extra data to the coordinator and to
    * extend more pluggable {@code AccessCheckers} easily.
    */
   private final Map<String, String> extraProperties;
 
-  public RssAccessClusterRequest(String accessId, Set<String> tags, int 
timeoutMs) {
+  public RssAccessClusterRequest(String accessId, Set<String> tags, int 
timeoutMs, String user) {
     this.accessId = accessId;
     this.tags = tags;
     this.timeoutMs = timeoutMs;
     this.extraProperties = Collections.emptyMap();
+    this.user = user;
   }
 
   public RssAccessClusterRequest(
       String accessId,
       Set<String> tags,
       int timeoutMs,
-      Map<String, String> extraProperties) {
+      Map<String, String> extraProperties,
+      String user) {
     this.accessId = accessId;
     this.tags = tags;
     this.timeoutMs = timeoutMs;
     this.extraProperties = extraProperties;
+    this.user = user;
   }
 
   public String getAccessId() {
@@ -65,4 +69,8 @@ public class RssAccessClusterRequest {
   public Map<String, String> getExtraProperties() {
     return extraProperties;
   }
+
+  public String getUser() {
+    return user;
+  }
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssAppHeartBeatRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssAppHeartBeatRequest.java
index 6a7c3e90..475fd484 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssAppHeartBeatRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssAppHeartBeatRequest.java
@@ -34,4 +34,5 @@ public class RssAppHeartBeatRequest {
   public long getTimeoutMs() {
     return timeoutMs;
   }
+
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssAppHeartBeatRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssApplicationInfoRequest.java
similarity index 82%
copy from 
internal-client/src/main/java/org/apache/uniffle/client/request/RssAppHeartBeatRequest.java
copy to 
internal-client/src/main/java/org/apache/uniffle/client/request/RssApplicationInfoRequest.java
index 6a7c3e90..475ddf28 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssAppHeartBeatRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssApplicationInfoRequest.java
@@ -17,14 +17,16 @@
 
 package org.apache.uniffle.client.request;
 
-public class RssAppHeartBeatRequest {
+public class RssApplicationInfoRequest {
 
   private final String appId;
   private final long timeoutMs;
+  private final String user;
 
-  public RssAppHeartBeatRequest(String appId, long timeoutMs) {
+  public RssApplicationInfoRequest(String appId, long timeoutMs, String user) {
     this.appId = appId;
     this.timeoutMs = timeoutMs;
+    this.user = user;
   }
 
   public String getAppId() {
@@ -34,4 +36,8 @@ public class RssAppHeartBeatRequest {
   public long getTimeoutMs() {
     return timeoutMs;
   }
+
+  public String getUser() {
+    return user;
+  }
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssAccessClusterResponse.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssAccessClusterResponse.java
index 04ab5ac4..583e7daf 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssAccessClusterResponse.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssAccessClusterResponse.java
@@ -19,7 +19,18 @@ package org.apache.uniffle.client.response;
 
 public class RssAccessClusterResponse extends ClientResponse {
 
+  private String uuid;
+
   public RssAccessClusterResponse(ResponseStatusCode statusCode, String 
messge) {
     super(statusCode, messge);
   }
+
+  public RssAccessClusterResponse(ResponseStatusCode statusCode, String 
messge, String uuid) {
+    super(statusCode, messge);
+    this.uuid = uuid;
+  }
+
+  public String getUuid() {
+    return uuid;
+  }
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssAccessClusterResponse.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssApplicationInfoResponse.java
similarity index 82%
copy from 
internal-client/src/main/java/org/apache/uniffle/client/response/RssAccessClusterResponse.java
copy to 
internal-client/src/main/java/org/apache/uniffle/client/response/RssApplicationInfoResponse.java
index 04ab5ac4..210041d7 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssAccessClusterResponse.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssApplicationInfoResponse.java
@@ -17,9 +17,9 @@
 
 package org.apache.uniffle.client.response;
 
-public class RssAccessClusterResponse extends ClientResponse {
+public class RssApplicationInfoResponse extends ClientResponse {
 
-  public RssAccessClusterResponse(ResponseStatusCode statusCode, String 
messge) {
-    super(statusCode, messge);
+  public RssApplicationInfoResponse(ResponseStatusCode statusCode) {
+    super(statusCode);
   }
 }
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 5c7e80b8..5789a952 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -287,11 +287,14 @@ service CoordinatorServer {
   rpc getShuffleDataStorageInfo(google.protobuf.Empty) returns 
(GetShuffleDataStorageInfoResponse);
   rpc checkServiceAvailable(google.protobuf.Empty) returns 
(CheckServiceAvailableResponse);
 
+  // Heartbeat between Shuffle Application and Coordinator Server
+  rpc appHeartbeat(AppHeartBeatRequest) returns (AppHeartBeatResponse);
+
   // Report a client operation's result to coordinator server
   rpc reportClientOperation(ReportShuffleClientOpRequest) returns 
(ReportShuffleClientOpResponse);
 
-  // Heartbeat between Shuffle Application and Coordinator Server
-  rpc appHeartbeat(AppHeartBeatRequest) returns (AppHeartBeatResponse);
+  // Report a application info to Coordinator Server
+  rpc registerApplicationInfo(ApplicationInfoRequest) returns 
(ApplicationInfoResponse);
 
   // Access to the remote shuffle service cluster
   rpc accessCluster(AccessClusterRequest) returns (AccessClusterResponse);
@@ -312,6 +315,16 @@ message AppHeartBeatResponse {
   string retMsg = 2;
 }
 
+message ApplicationInfoRequest {
+  string appId = 1;
+  string user = 2;
+}
+
+message ApplicationInfoResponse {
+  StatusCode status = 1;
+  string retMsg = 2;
+}
+
 message GetShuffleServerListResponse {
   repeated ShuffleServerId servers = 1;
 }
@@ -374,11 +387,13 @@ message AccessClusterRequest {
   string accessId = 1;
   repeated string tags = 2;
   map<string, string> extraProperties = 3;
+  string user=4;
 }
 
 message AccessClusterResponse {
   StatusCode status = 1;
   string retMsg = 2;
+  string uuid = 3;
 }
 
 message FetchClientConfResponse {

Reply via email to