This is an automated email from the ASF dual-hosted git repository.

neuyilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 831b96516e [IOTDB-5091] add space quota (#9506)
831b96516e is described below

commit 831b96516ef692933d1efb4af5792cc03ab323d7
Author: 任宇华 <[email protected]>
AuthorDate: Tue Apr 4 13:01:15 2023 +0800

    [IOTDB-5091] add space quota (#9506)
---
 .../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 |   2 +
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  13 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |   8 +
 .../confignode/client/DataNodeRequestType.java     |   5 +-
 .../client/async/AsyncDataNodeClientPool.java      |   7 +
 .../heartbeat/DataNodeHeartbeatHandler.java        |  20 +-
 .../consensus/request/ConfigPhysicalPlan.java      |   4 +
 .../consensus/request/ConfigPhysicalPlanType.java  |   5 +-
 .../request/write/quota/SetSpaceQuotaPlan.java     | 101 ++++++++++
 .../confignode/manager/ClusterQuotaManager.java    | 222 +++++++++++++++++++++
 .../iotdb/confignode/manager/ConfigManager.java    |  38 +++-
 .../apache/iotdb/confignode/manager/IManager.java  |  11 +
 .../iotdb/confignode/manager/node/NodeManager.java |  16 +-
 .../manager/partition/PartitionManager.java        |   9 +
 .../persistence/executor/ConfigPlanExecutor.java   |  12 +-
 .../partition/DatabasePartitionTable.java          |  20 ++
 .../persistence/partition/PartitionInfo.java       |  17 ++
 .../confignode/persistence/quota/QuotaInfo.java    | 162 +++++++++++++++
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  17 ++
 .../request/ConfigPhysicalPlanSerDeTest.java       |  16 ++
 .../confignode/persistence/QuotaInfoTest.java      |  82 ++++++++
 .../apache/iotdb/commons/conf/CommonConfig.java    |  11 +
 .../iotdb/commons/conf/CommonDescriptor.java       |   3 +
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |   4 +
 .../apache/iotdb/commons/enums/SpaceQuotaType.java |  26 +++
 .../commons/utils/BasicStructureSerDeUtil.java     |  16 ++
 .../schemaregion/rocksdb/RSchemaRegion.java        |  10 +
 .../metadata/tagSchemaRegion/TagSchemaRegion.java  |  10 +
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  50 +++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  10 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   4 +
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  10 +
 .../iotdb/db/engine/storagegroup/DataRegion.java   |  45 +++++
 .../db/exception/quota/ExceedQuotaException.java   |  29 +++
 .../db/metadata/mtree/MTreeBelowSGMemoryImpl.java  |  18 +-
 .../db/metadata/schemaregion/ISchemaRegion.java    |   5 +
 .../db/metadata/schemaregion/SchemaEngine.java     |  30 +++
 .../schemaregion/SchemaRegionMemoryImpl.java       |  45 +++++
 .../schemaregion/SchemaRegionSchemaFileImpl.java   |  45 +++++
 .../db/mpp/common/header/ColumnHeaderConstant.java |  12 ++
 .../db/mpp/common/header/DatasetHeaderFactory.java |   4 +
 .../plan/execution/config/ConfigTaskVisitor.java   |  16 ++
 .../config/executor/ClusterConfigTaskExecutor.java |  71 ++++++-
 .../config/executor/IConfigTaskExecutor.java       |   9 +
 .../config/sys/quota/SetSpaceQuotaTask.java        |  42 ++++
 .../config/sys/quota/ShowSpaceQuotaTask.java       | 130 ++++++++++++
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |  93 +++++++++
 .../iotdb/db/mpp/plan/statement/StatementType.java |   3 +
 .../db/mpp/plan/statement/StatementVisitor.java    |  10 +
 .../sys/quota/SetSpaceQuotaStatement.java          | 100 ++++++++++
 .../sys/quota/ShowSpaceQuotaStatement.java         |  62 ++++++
 .../apache/iotdb/db/quotas/DataNodeSizeStore.java  |  60 ++++++
 .../iotdb/db/quotas/DataNodeSpaceQuotaManager.java | 153 ++++++++++++++
 .../impl/DataNodeInternalRPCServiceImpl.java       |  20 ++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   5 +-
 thrift-commons/src/main/thrift/common.thrift       |  12 ++
 .../src/main/thrift/confignode.thrift              |  19 ++
 thrift/src/main/thrift/datanode.thrift             |  12 +-
 58 files changed, 1979 insertions(+), 12 deletions(-)

diff --git 
a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
index f3f0325324..652415cf54 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
@@ -148,6 +148,7 @@ keyWords
     | QUERIES
     | QUERY
     | QUERYID
+    | QUOTA
     | RANGE
     | READONLY
     | REGEXP
@@ -172,6 +173,7 @@ keyWords
     | SHOW
     | SLIMIT
     | SOFFSET
+    | SPACE
     | STORAGE
     | START
     | STARTTIME
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 212099ba97..83cedcc380 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -58,6 +58,8 @@ ddlStatement
     | getRegionId | getTimeSlotList | getSeriesSlotList | migrateRegion
     // ML Model
     | createModel | dropModel | showModels | showTrails
+    // Quota
+    | setSpaceQuota | showSpaceQuota
     ;
 
 dmlStatement
@@ -314,6 +316,16 @@ showFunctions
     : SHOW FUNCTIONS
     ;
 
+// Quota 
=========================================================================================
+// Show Space Quota
+showSpaceQuota
+    : SHOW SPACE QUOTA (prefixPath (COMMA prefixPath)*)?
+    ;
+
+// Set Space Quota
+setSpaceQuota
+    : SET SPACE QUOTA attributePair (COMMA attributePair)* ON prefixPath 
(COMMA prefixPath)*
+    ;
 
 // Trigger 
=========================================================================================
 // ---- Create Trigger
@@ -447,7 +459,6 @@ migrateRegion
     : MIGRATE REGION regionId=INTEGER_LITERAL FROM fromId=INTEGER_LITERAL TO 
toId=INTEGER_LITERAL
     ;
 
-
 // Pipe Plugin 
=========================================================================================
 // Create Pipe Plugin
 createPipePlugin
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index 8716e22c61..3f99561539 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -526,6 +526,10 @@ QUERYID
     : Q U E R Y I D
     ;
 
+QUOTA
+    : Q U O T A
+    ;
+
 RANGE
     : R A N G E
     ;
@@ -630,6 +634,10 @@ SOFFSET
     : S O F F S E T
     ;
 
+SPACE
+    : S P A C E
+    ;
+
 STORAGE
     : S T O R A G E
     ;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 4ca8a532c8..55e8a11c35 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -95,5 +95,8 @@ public enum DataNodeRequestType {
   KILL_QUERY_INSTANCE,
 
   /** ML Model */
-  DELETE_MODEL_METRICS
+  DELETE_MODEL_METRICS,
+
+  /** Quota */
+  SET_SPACE_QUOTA,
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 79496235f5..2bd0cd43e9 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
@@ -353,6 +354,12 @@ public class AsyncDataNodeClientPool {
               (AsyncTSStatusRPCHandler)
                   clientHandler.createAsyncRPCHandler(requestId, 
targetDataNode));
           break;
+        case SET_SPACE_QUOTA:
+          client.setSpaceQuota(
+              (TSetSpaceQuotaReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, 
targetDataNode));
+          break;
         default:
           LOGGER.error(
               "Unexpected DataNode Request Type: {} when 
sendAsyncRequestToDataNode",
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index 28e1d26254..fe1af63540 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -40,16 +40,25 @@ public class DataNodeHeartbeatHandler implements 
AsyncMethodCallback<THeartbeatR
   private final DataNodeHeartbeatCache dataNodeHeartbeatCache;
   private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
   private final RouteBalancer routeBalancer;
+  private final Map<Integer, Long> deviceNum;
+  private final Map<Integer, Long> timeSeriesNum;
+  private final Map<Integer, Long> regionDisk;
 
   public DataNodeHeartbeatHandler(
       TDataNodeLocation dataNodeLocation,
       DataNodeHeartbeatCache dataNodeHeartbeatCache,
       Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap,
-      RouteBalancer routeBalancer) {
+      RouteBalancer routeBalancer,
+      Map<Integer, Long> deviceNum,
+      Map<Integer, Long> timeSeriesNum,
+      Map<Integer, Long> regionDisk) {
     this.dataNodeLocation = dataNodeLocation;
     this.dataNodeHeartbeatCache = dataNodeHeartbeatCache;
     this.regionGroupCacheMap = regionGroupCacheMap;
     this.routeBalancer = routeBalancer;
+    this.deviceNum = deviceNum;
+    this.timeSeriesNum = timeSeriesNum;
+    this.regionDisk = regionDisk;
   }
 
   @Override
@@ -82,6 +91,15 @@ public class DataNodeHeartbeatHandler implements 
AsyncMethodCallback<THeartbeatR
                         heartbeatResp.getHeartbeatTimestamp(), 
dataNodeLocation.getDataNodeId()));
               }
             });
+    if (heartbeatResp.getDeviceNum() != null) {
+      deviceNum.putAll(heartbeatResp.getDeviceNum());
+    }
+    if (heartbeatResp.getTimeSeriesNum() != null) {
+      timeSeriesNum.putAll(heartbeatResp.getTimeSeriesNum());
+    }
+    if (heartbeatResp.getRegionDisk() != null) {
+      regionDisk.putAll(heartbeatResp.getRegionDisk());
+    }
   }
 
   @Override
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index bb3dc7d4e3..aff108b0ce 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -79,6 +79,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePip
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.region.PollRegionMaintainTaskPlan;
@@ -430,6 +431,9 @@ public abstract class ConfigPhysicalPlan implements 
IConsensusRequest {
         case GetPipePluginJar:
           plan = new GetPipePluginJarPlan();
           break;
+        case setSpaceQuota:
+          plan = new SetSpaceQuotaPlan();
+          break;
         default:
           throw new IOException("unknown PhysicalPlan configPhysicalPlanType: 
" + planType);
       }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index 950cfce41d..9d450259ec 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -154,7 +154,10 @@ public enum ConfigPhysicalPlanType {
   CreatePipePlugin((short) 1300),
   DropPipePlugin((short) 1301),
   GetPipePluginTable((short) 1302),
-  GetPipePluginJar((short) 1303);
+  GetPipePluginJar((short) 1303),
+
+  /** Quota */
+  setSpaceQuota((short) 1400);
 
   private final short planType;
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/quota/SetSpaceQuotaPlan.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/quota/SetSpaceQuotaPlan.java
new file mode 100644
index 0000000000..c7b46fbd63
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/quota/SetSpaceQuotaPlan.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.quota;
+
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
+import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+public class SetSpaceQuotaPlan extends ConfigPhysicalPlan {
+
+  private List<String> prefixPathList;
+  private TSpaceQuota spaceLimit;
+
+  public SetSpaceQuotaPlan() {
+    super(ConfigPhysicalPlanType.setSpaceQuota);
+  }
+
+  public SetSpaceQuotaPlan(List<String> prefixPathList, TSpaceQuota 
spaceLimit) {
+    super(ConfigPhysicalPlanType.setSpaceQuota);
+    this.prefixPathList = prefixPathList;
+    this.spaceLimit = spaceLimit;
+  }
+
+  public List<String> getPrefixPathList() {
+    return prefixPathList;
+  }
+
+  public void setPrefixPathList(List<String> prefixPathList) {
+    this.prefixPathList = prefixPathList;
+  }
+
+  public TSpaceQuota getSpaceLimit() {
+    return spaceLimit;
+  }
+
+  public void setSpaceLimit(TSpaceQuota spaceLimit) {
+    this.spaceLimit = spaceLimit;
+  }
+
+  @Override
+  protected void serializeImpl(DataOutputStream stream) throws IOException {
+    stream.writeShort(getType().getPlanType());
+    BasicStructureSerDeUtil.write(prefixPathList, stream);
+    BasicStructureSerDeUtil.write(spaceLimit.getDeviceNum(), stream);
+    BasicStructureSerDeUtil.write(spaceLimit.getTimeserieNum(), stream);
+    BasicStructureSerDeUtil.write(spaceLimit.getDiskSize(), stream);
+  }
+
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+    List<String> prefixPathList = 
BasicStructureSerDeUtil.readStringList(buffer);
+    long deviceNum = BasicStructureSerDeUtil.readLong(buffer);
+    long timeserieNum = BasicStructureSerDeUtil.readLong(buffer);
+    long disk = BasicStructureSerDeUtil.readLong(buffer);
+    this.prefixPathList = prefixPathList;
+    TSpaceQuota spaceLimit = new TSpaceQuota();
+    spaceLimit.setDeviceNum(deviceNum);
+    spaceLimit.setTimeserieNum(timeserieNum);
+    spaceLimit.setDiskSize(disk);
+    this.spaceLimit = spaceLimit;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
+    SetSpaceQuotaPlan that = (SetSpaceQuotaPlan) o;
+    return Objects.equals(prefixPathList, that.prefixPathList)
+        && Objects.equals(spaceLimit, that.spaceLimit);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), prefixPathList, spaceLimit);
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java
new file mode 100644
index 0000000000..de1a546282
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import 
org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
+import org.apache.iotdb.confignode.manager.partition.PartitionManager;
+import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ClusterQuotaManager {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterQuotaManager.class);
+
+  private final IManager configManager;
+  private final QuotaInfo quotaInfo;
+  private final Map<Integer, Long> deviceNum;
+  private final Map<Integer, Long> timeSeriesNum;
+  private final Map<String, List<Integer>> schemaRegionIdMap;
+  private final Map<String, List<Integer>> dataRegionIdMap;
+  private final Map<Integer, Long> regionDisk;
+
+  public ClusterQuotaManager(IManager configManager, QuotaInfo quotaInfo) {
+    this.configManager = configManager;
+    this.quotaInfo = quotaInfo;
+    deviceNum = new ConcurrentHashMap<>();
+    timeSeriesNum = new ConcurrentHashMap<>();
+    schemaRegionIdMap = new HashMap<>();
+    dataRegionIdMap = new HashMap<>();
+    regionDisk = new ConcurrentHashMap<>();
+  }
+
+  public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) {
+    if (!checkSpaceQuota(req)) {
+      return RpcUtils.getStatus(
+          TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
+          "The used quota exceeds the preset quota. Please set a larger 
value.");
+    }
+    ConsensusWriteResponse response =
+        configManager
+            .getConsensusManager()
+            .write(new SetSpaceQuotaPlan(req.getDatabase(), 
req.getSpaceLimit()));
+    if (response.getStatus() != null) {
+      if (response.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+            configManager.getNodeManager().getRegisteredDataNodeLocations();
+        AsyncClientHandler<TSetSpaceQuotaReq, TSStatus> clientHandler =
+            new AsyncClientHandler<>(DataNodeRequestType.SET_SPACE_QUOTA, req, 
dataNodeLocationMap);
+        
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+        return 
RpcUtils.squashResponseStatusList(clientHandler.getResponseList());
+      }
+      return response.getStatus();
+    } else {
+      LOGGER.warn(
+          "Unexpected error happened while setting space quota on {}: ",
+          req.getDatabase().toString(),
+          response.getException());
+      // consensus layer related errors
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(response.getErrorMessage());
+      return res;
+    }
+  }
+
+  /** If the new quota is smaller than the quota already used, the setting 
fails. */
+  private boolean checkSpaceQuota(TSetSpaceQuotaReq req) {
+    for (String database : req.getDatabase()) {
+      if (quotaInfo.getSpaceQuotaLimit().containsKey(database)) {
+        TSpaceQuota spaceQuota = quotaInfo.getSpaceQuotaUsage().get(database);
+        if (spaceQuota.getDeviceNum() > req.getSpaceLimit().getDeviceNum()
+            || spaceQuota.getTimeserieNum() > 
req.getSpaceLimit().getTimeserieNum()
+            || spaceQuota.getDiskSize() > req.getSpaceLimit().getDiskSize()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  public TSpaceQuotaResp showSpaceQuota(List<String> databases) {
+    TSpaceQuotaResp showSpaceQuotaResp = new TSpaceQuotaResp();
+    if (databases.isEmpty()) {
+      showSpaceQuotaResp.setSpaceQuota(quotaInfo.getSpaceQuotaLimit());
+      showSpaceQuotaResp.setSpaceQuotaUsage(quotaInfo.getSpaceQuotaUsage());
+    } else if (!quotaInfo.getSpaceQuotaLimit().isEmpty()) {
+      Map<String, TSpaceQuota> spaceQuotaMap = new HashMap<>();
+      Map<String, TSpaceQuota> spaceQuotaUsageMap = new HashMap<>();
+      for (String database : databases) {
+        if (quotaInfo.getSpaceQuotaLimit().containsKey(database)) {
+          spaceQuotaMap.put(database, 
quotaInfo.getSpaceQuotaLimit().get(database));
+          spaceQuotaUsageMap.put(database, 
quotaInfo.getSpaceQuotaUsage().get(database));
+        }
+      }
+      showSpaceQuotaResp.setSpaceQuota(spaceQuotaMap);
+      showSpaceQuotaResp.setSpaceQuotaUsage(spaceQuotaUsageMap);
+    }
+    
showSpaceQuotaResp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+    return showSpaceQuotaResp;
+  }
+
+  public TSpaceQuotaResp getSpaceQuota() {
+    TSpaceQuotaResp spaceQuotaResp = new TSpaceQuotaResp();
+    if (!quotaInfo.getSpaceQuotaLimit().isEmpty()) {
+      spaceQuotaResp.setSpaceQuota(quotaInfo.getSpaceQuotaLimit());
+    }
+    spaceQuotaResp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+    return spaceQuotaResp;
+  }
+
+  public boolean hasSpaceQuotaLimit() {
+    return quotaInfo.getSpaceQuotaLimit().keySet().isEmpty();
+  }
+
+  public List<Integer> getSchemaRegionIds() {
+    List<Integer> schemaRegionIds = new ArrayList<>();
+    getPartitionManager()
+        .getSchemaRegionIds(
+            new ArrayList<>(quotaInfo.getSpaceQuotaLimit().keySet()), 
schemaRegionIdMap);
+    schemaRegionIdMap.values().forEach(schemaRegionIds::addAll);
+    return schemaRegionIds;
+  }
+
+  public List<Integer> getDataRegionIds() {
+    List<Integer> dataRegionIds = new ArrayList<>();
+    getPartitionManager()
+        .getDataRegionIds(
+            new ArrayList<>(quotaInfo.getSpaceQuotaLimit().keySet()), 
dataRegionIdMap);
+    dataRegionIdMap.values().forEach(dataRegionIds::addAll);
+    return dataRegionIds;
+  }
+
+  public Map<String, TSpaceQuota> getSpaceQuotaUsage() {
+    return quotaInfo.getSpaceQuotaUsage();
+  }
+
+  public Map<Integer, Long> getDeviceNum() {
+    return deviceNum;
+  }
+
+  public Map<Integer, Long> getTimeSeriesNum() {
+    return timeSeriesNum;
+  }
+
+  public Map<Integer, Long> getRegionDisk() {
+    return regionDisk;
+  }
+
+  public void updateSpaceQuotaUsage() {
+    AtomicLong deviceCount = new AtomicLong();
+    AtomicLong timeSeriesCount = new AtomicLong();
+    for (Map.Entry<String, List<Integer>> entry : 
schemaRegionIdMap.entrySet()) {
+      deviceCount.set(0);
+      timeSeriesCount.set(0);
+      entry
+          .getValue()
+          .forEach(
+              schemaRegionId -> {
+                if (deviceNum.containsKey(schemaRegionId)) {
+                  deviceCount.addAndGet(deviceCount.get() + 
deviceNum.get(schemaRegionId));
+                }
+                if (timeSeriesNum.containsKey(schemaRegionId)) {
+                  timeSeriesCount.addAndGet(
+                      timeSeriesCount.get() + 
timeSeriesNum.get(schemaRegionId));
+                }
+              });
+      
quotaInfo.getSpaceQuotaUsage().get(entry.getKey()).setDeviceNum(deviceCount.get());
+      
quotaInfo.getSpaceQuotaUsage().get(entry.getKey()).setTimeserieNum(timeSeriesCount.get());
+    }
+    AtomicLong regionDiskCount = new AtomicLong();
+    for (Map.Entry<String, List<Integer>> entry : dataRegionIdMap.entrySet()) {
+      regionDiskCount.set(0);
+      entry
+          .getValue()
+          .forEach(
+              dataRegionId -> {
+                if (regionDisk.containsKey(dataRegionId)) {
+                  regionDiskCount.addAndGet(regionDisk.get(dataRegionId));
+                }
+              });
+      
quotaInfo.getSpaceQuotaUsage().get(entry.getKey()).setDiskSize(regionDiskCount.get());
+    }
+  }
+
+  private PartitionManager getPartitionManager() {
+    return configManager.getPartitionManager();
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 924cf9ef1e..41c755b65a 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.cluster.NodeType;
@@ -100,6 +101,7 @@ import 
org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
 import org.apache.iotdb.confignode.persistence.node.NodeInfo;
 import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
 import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
+import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
 import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
 import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TClusterParameters;
@@ -161,6 +163,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTrailReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTrailResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
@@ -236,6 +239,9 @@ public class ConfigManager implements IManager {
   /** Pipe */
   private final PipeManager pipeManager;
 
+  /** Manage quotas */
+  private final ClusterQuotaManager clusterQuotaManager;
+
   private final ConfigRegionStateMachine stateMachine;
 
   private final RetryFailedTasksThread retryFailedTasksThread;
@@ -255,6 +261,7 @@ public class ConfigManager implements IManager {
     CQInfo cqInfo = new CQInfo();
     ModelInfo modelInfo = new ModelInfo();
     PipeInfo pipeInfo = new PipeInfo();
+    QuotaInfo quotaInfo = new QuotaInfo();
 
     // Build state machine and executor
     ConfigPlanExecutor executor =
@@ -269,7 +276,8 @@ public class ConfigManager implements IManager {
             syncInfo,
             cqInfo,
             modelInfo,
-            pipeInfo);
+            pipeInfo,
+            quotaInfo);
     this.stateMachine = new ConfigRegionStateMachine(this, executor);
 
     // Build the manager module
@@ -287,6 +295,7 @@ public class ConfigManager implements IManager {
     this.pipeManager = new PipeManager(this, pipeInfo);
 
     this.retryFailedTasksThread = new RetryFailedTasksThread(this);
+    this.clusterQuotaManager = new ClusterQuotaManager(this, quotaInfo);
   }
 
   public void initConsensusManager() throws IOException {
@@ -1399,6 +1408,11 @@ public class ConfigManager implements IManager {
     return cqManager;
   }
 
+  @Override
+  public ClusterQuotaManager getClusterQuotaManager() {
+    return clusterQuotaManager;
+  }
+
   @Override
   public RetryFailedTasksThread getRetryFailedTasksThread() {
     return retryFailedTasksThread;
@@ -1868,4 +1882,26 @@ public class ConfigManager implements IManager {
         ? modelManager.updateModelState(req)
         : status;
   }
+
+  @Override
+  public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) {
+    TSStatus status = confirmLeader();
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? clusterQuotaManager.setSpaceQuota(req)
+        : status;
+  }
+
+  public TSpaceQuotaResp showSpaceQuota(List<String> databases) {
+    TSStatus status = confirmLeader();
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? clusterQuotaManager.showSpaceQuota(databases)
+        : new TSpaceQuotaResp(status);
+  }
+
+  public TSpaceQuotaResp getSpaceQuota() {
+    TSStatus status = confirmLeader();
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? clusterQuotaManager.getSpaceQuota()
+        : new TSpaceQuotaResp(status);
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 75aaaf9a5b..3d6490db57 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
@@ -195,6 +196,13 @@ public interface IManager {
    */
   PipeManager getPipeManager();
 
+  /**
+   * Get ClusterQuotaManager
+   *
+   * @return ClusterQuotaManager instance
+   */
+  ClusterQuotaManager getClusterQuotaManager();
+
   /**
    * Get RetryFailedTasksThread
    *
@@ -654,4 +662,7 @@ public interface IManager {
 
   /** Update the model state */
   TSStatus updateModelState(TUpdateModelStateReq req);
+
+  /** Set space quota */
+  TSStatus setSpaceQuota(TSetSpaceQuotaReq req);
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index b502a25e3a..97125db05a 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -52,6 +52,7 @@ import 
org.apache.iotdb.confignode.consensus.response.datanode.ConfigurationResp
 import 
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
 import 
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
 import 
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
+import org.apache.iotdb.confignode.manager.ClusterQuotaManager;
 import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.IManager;
@@ -730,6 +731,11 @@ public class NodeManager {
 
     /* Update heartbeat counter */
     heartbeatCounter.getAndUpdate((x) -> (x + 1) % 10);
+    if (!getClusterQuotaManager().hasSpaceQuotaLimit()) {
+      
heartbeatReq.setSchemaRegionIds(getClusterQuotaManager().getSchemaRegionIds());
+      
heartbeatReq.setDataRegionIds(getClusterQuotaManager().getDataRegionIds());
+      
heartbeatReq.setSpaceQuotaUsage(getClusterQuotaManager().getSpaceQuotaUsage());
+    }
     return heartbeatReq;
   }
 
@@ -750,7 +756,11 @@ public class NodeManager {
                       dataNodeInfo.getLocation().getDataNodeId(),
                       empty -> new DataNodeHeartbeatCache()),
               getPartitionManager().getRegionGroupCacheMap(),
-              getLoadManager().getRouteBalancer());
+              getLoadManager().getRouteBalancer(),
+              getClusterQuotaManager().getDeviceNum(),
+              getClusterQuotaManager().getTimeSeriesNum(),
+              getClusterQuotaManager().getRegionDisk());
+      getClusterQuotaManager().updateSpaceQuotaUsage();
       AsyncDataNodeHeartbeatClientPool.getInstance()
           .getDataNodeHeartBeat(
               dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, 
handler);
@@ -990,4 +1000,8 @@ public class NodeManager {
   private UDFManager getUDFManager() {
     return configManager.getUDFManager();
   }
+
+  private ClusterQuotaManager getClusterQuotaManager() {
+    return configManager.getClusterQuotaManager();
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 5e2ad788aa..223bee15e4 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -1168,6 +1168,15 @@ public class PartitionManager {
                     new RegionGroupCache(regionReplicaSet.getRegionId())));
   }
 
+  public void getSchemaRegionIds(
+      List<String> databases, Map<String, List<Integer>> schemaRegionIds) {
+    partitionInfo.getSchemaRegionIds(databases, schemaRegionIds);
+  }
+
+  public void getDataRegionIds(List<String> databases, Map<String, 
List<Integer>> dataRegionIds) {
+    partitionInfo.getDataRegionIds(databases, dataRegionIds);
+  }
+
   public ScheduledExecutorService getRegionMaintainer() {
     return regionMaintainer;
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 86910b222a..72ee52fde1 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -78,6 +78,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePip
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan;
@@ -111,6 +112,7 @@ import org.apache.iotdb.confignode.persistence.cq.CQInfo;
 import org.apache.iotdb.confignode.persistence.node.NodeInfo;
 import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
 import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
+import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
 import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
 import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
@@ -162,6 +164,8 @@ public class ConfigPlanExecutor {
 
   private final PipeInfo pipeInfo;
 
+  private final QuotaInfo quotaInfo;
+
   public ConfigPlanExecutor(
       NodeInfo nodeInfo,
       ClusterSchemaInfo clusterSchemaInfo,
@@ -173,7 +177,8 @@ public class ConfigPlanExecutor {
       ClusterSyncInfo syncInfo,
       CQInfo cqInfo,
       ModelInfo modelInfo,
-      PipeInfo pipeInfo) {
+      PipeInfo pipeInfo,
+      QuotaInfo quotaInfo) {
 
     this.snapshotProcessorList = new ArrayList<>();
 
@@ -208,6 +213,9 @@ public class ConfigPlanExecutor {
     this.snapshotProcessorList.add(pipeInfo);
 
     this.procedureInfo = procedureInfo;
+
+    this.quotaInfo = quotaInfo;
+    this.snapshotProcessorList.add(quotaInfo);
   }
 
   public DataSet executeQueryPlan(ConfigPhysicalPlan req)
@@ -417,6 +425,8 @@ public class ConfigPlanExecutor {
         return 
pipeInfo.getPipePluginInfo().createPipePlugin((CreatePipePluginPlan) 
physicalPlan);
       case DropPipePlugin:
         return 
pipeInfo.getPipePluginInfo().dropPipePlugin((DropPipePluginPlan) physicalPlan);
+      case setSpaceQuota:
+        return quotaInfo.setSpaceQuota((SetSpaceQuotaPlan) physicalPlan);
       default:
         throw new UnknownPhysicalPlanTypeException(physicalPlan.getType());
     }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index 882cee0b14..b27a4b529f 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -504,6 +504,26 @@ public class DatabasePartitionTable {
     return databaseName;
   }
 
+  public List<Integer> getSchemaRegionIds() {
+    List<Integer> schemaRegionIds = new ArrayList<>();
+    for (TConsensusGroupId consensusGroupId : regionGroupMap.keySet()) {
+      if (consensusGroupId.getType().equals(TConsensusGroupType.SchemaRegion)) 
{
+        schemaRegionIds.add(consensusGroupId.getId());
+      }
+    }
+    return schemaRegionIds;
+  }
+
+  public List<Integer> getDataRegionIds() {
+    List<Integer> dataRegionIds = new ArrayList<>();
+    for (TConsensusGroupId consensusGroupId : regionGroupMap.keySet()) {
+      if (consensusGroupId.getType().equals(TConsensusGroupType.DataRegion)) {
+        dataRegionIds.add(consensusGroupId.getId());
+      }
+    }
+    return dataRegionIds;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 70b0679e3c..b19ac834b3 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -823,6 +823,23 @@ public class PartitionInfo implements SnapshotProcessor {
         sgPartitionTable.getSeriesSlotList(plan.getPartitionType()));
   }
 
+  public void getSchemaRegionIds(
+      List<String> databases, Map<String, List<Integer>> schemaRegionIds) {
+    for (String database : databases) {
+      if (databasePartitionTables.containsKey(database)) {
+        schemaRegionIds.put(database, 
databasePartitionTables.get(database).getSchemaRegionIds());
+      }
+    }
+  }
+
+  public void getDataRegionIds(List<String> databases, Map<String, 
List<Integer>> dataRegionIds) {
+    for (String database : databases) {
+      if (databasePartitionTables.containsKey(database)) {
+        dataRegionIds.put(database, 
databasePartitionTables.get(database).getDataRegionIds());
+      }
+    }
+  }
+
   public void clear() {
     nextRegionGroupId.set(-1);
     databasePartitionTables.clear();
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/quota/QuotaInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/quota/QuotaInfo.java
new file mode 100644
index 0000000000..c4ad43aca2
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/quota/QuotaInfo.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence.quota;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import 
org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class QuotaInfo implements SnapshotProcessor {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(QuotaInfo.class);
+
+  private final ReentrantReadWriteLock spaceQuotaReadWriteLock;
+  private final Map<String, TSpaceQuota> spaceQuotaLimit;
+  private final Map<String, TSpaceQuota> spaceQuotaUsage;
+
+  private final String snapshotFileName = "quota_info.bin";
+
+  public QuotaInfo() {
+    spaceQuotaReadWriteLock = new ReentrantReadWriteLock();
+    spaceQuotaLimit = new HashMap<>();
+    spaceQuotaUsage = new HashMap<>();
+  }
+
+  public TSStatus setSpaceQuota(SetSpaceQuotaPlan setSpaceQuotaPlan) {
+    for (String database : setSpaceQuotaPlan.getPrefixPathList()) {
+      TSpaceQuota spaceQuota = setSpaceQuotaPlan.getSpaceLimit();
+      // “0” means that the user has not reset the value of the space quota 
type
+      // So the old values are still used
+      if (spaceQuotaLimit.containsKey(database)) {
+        if (spaceQuota.getDeviceNum() == 0) {
+          
spaceQuota.setDeviceNum(spaceQuotaLimit.get(database).getDeviceNum());
+        }
+        if (spaceQuota.getTimeserieNum() == 0) {
+          
spaceQuota.setTimeserieNum(spaceQuotaLimit.get(database).getTimeserieNum());
+        }
+        if (spaceQuota.getDiskSize() == 0) {
+          spaceQuota.setDiskSize(spaceQuotaLimit.get(database).getDiskSize());
+        }
+        if (spaceQuota.getDeviceNum() == -1) {
+          spaceQuota.setDeviceNum(0);
+        }
+        if (spaceQuota.getTimeserieNum() == -1) {
+          spaceQuota.setTimeserieNum(0);
+        }
+        if (spaceQuota.getDiskSize() == -1) {
+          spaceQuota.setDiskSize(0);
+        }
+      }
+      if (!spaceQuotaUsage.containsKey(database)) {
+        spaceQuotaUsage.put(database, new TSpaceQuota());
+      }
+      spaceQuotaLimit.put(database, spaceQuota);
+    }
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  public Map<String, TSpaceQuota> getSpaceQuotaLimit() {
+    return spaceQuotaLimit;
+  }
+
+  @Override
+  public boolean processTakeSnapshot(File snapshotDir) throws TException, 
IOException {
+    File snapshotFile = new File(snapshotDir, snapshotFileName);
+    if (snapshotFile.exists() && snapshotFile.isFile()) {
+      logger.error(
+          "Failed to take snapshot, because snapshot file [{}] is already 
exist.",
+          snapshotFile.getAbsolutePath());
+      return false;
+    }
+
+    spaceQuotaReadWriteLock.writeLock().lock();
+    try (FileOutputStream fileOutputStream = new 
FileOutputStream(snapshotFile)) {
+      serializeSpaceQuotaLimit(fileOutputStream);
+    } finally {
+      spaceQuotaReadWriteLock.writeLock().unlock();
+    }
+    return true;
+  }
+
+  private void serializeSpaceQuotaLimit(FileOutputStream fileOutputStream) 
throws IOException {
+    ReadWriteIOUtils.write(spaceQuotaLimit.size(), fileOutputStream);
+    for (Map.Entry<String, TSpaceQuota> spaceQuotaEntry : 
spaceQuotaLimit.entrySet()) {
+      ReadWriteIOUtils.write(spaceQuotaEntry.getKey(), fileOutputStream);
+      ReadWriteIOUtils.write(spaceQuotaEntry.getValue().getDeviceNum(), 
fileOutputStream);
+      ReadWriteIOUtils.write(spaceQuotaEntry.getValue().getTimeserieNum(), 
fileOutputStream);
+      ReadWriteIOUtils.write(spaceQuotaEntry.getValue().getDiskSize(), 
fileOutputStream);
+    }
+  }
+
+  @Override
+  public void processLoadSnapshot(File snapshotDir) throws TException, 
IOException {
+    File snapshotFile = new File(snapshotDir, snapshotFileName);
+    if (!snapshotFile.exists() || !snapshotFile.isFile()) {
+      logger.error(
+          "Failed to load snapshot,snapshot file [{}] is not exist.",
+          snapshotFile.getAbsolutePath());
+      return;
+    }
+    spaceQuotaReadWriteLock.writeLock().lock();
+    try (FileInputStream fileInputStream = new FileInputStream(snapshotFile)) {
+      clear();
+      deserializeSpaceQuotaLimit(fileInputStream);
+    } finally {
+      spaceQuotaReadWriteLock.writeLock().unlock();
+    }
+  }
+
+  private void deserializeSpaceQuotaLimit(FileInputStream fileInputStream) 
throws IOException {
+    int size = ReadWriteIOUtils.readInt(fileInputStream);
+    while (size > 0) {
+      String path = ReadWriteIOUtils.readString(fileInputStream);
+      TSpaceQuota spaceQuota = new TSpaceQuota();
+      spaceQuota.setDeviceNum(ReadWriteIOUtils.readLong(fileInputStream));
+      spaceQuota.setTimeserieNum(ReadWriteIOUtils.readLong(fileInputStream));
+      spaceQuota.setDiskSize(ReadWriteIOUtils.readLong(fileInputStream));
+      spaceQuotaLimit.put(path, spaceQuota);
+      size--;
+    }
+  }
+
+  public Map<String, TSpaceQuota> getSpaceQuotaUsage() {
+    return spaceQuotaUsage;
+  }
+
+  public void clear() {
+    spaceQuotaLimit.clear();
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index da9a7cd5e6..1607250ac8 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.commons.auth.AuthException;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -151,6 +152,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTrailReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTrailResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
@@ -970,4 +972,19 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
   public TSStatus updateModelState(TUpdateModelStateReq req) throws TException 
{
     return configManager.updateModelState(req);
   }
+
+  @Override
+  public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) throws TException {
+    return configManager.setSpaceQuota(req);
+  }
+
+  @Override
+  public TSpaceQuotaResp showSpaceQuota(List<String> databases) throws 
TException {
+    return configManager.showSpaceQuota(databases);
+  }
+
+  @Override
+  public TSpaceQuotaResp getSpaceQuota() throws TException {
+    return configManager.getSpaceQuota();
+  }
 }
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 0441bfa2ef..ed28dc7889 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TNodeResource;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.auth.AuthException;
 import org.apache.iotdb.commons.auth.entity.PrivilegeType;
@@ -83,6 +84,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataP
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.region.PollRegionMaintainTaskPlan;
@@ -1350,4 +1352,18 @@ public class ConfigPhysicalPlanSerDeTest {
             ConfigPhysicalPlan.Factory.create(plan.serializeToByteBuffer());
     Assert.assertEquals(deserializedPlan.getRegionIdSet(), regionIdSet);
   }
+
+  @Test
+  public void setSpaceQuotaPlanTest() throws IOException {
+    TSpaceQuota spaceQuota = new TSpaceQuota();
+    spaceQuota.setDeviceNum(2);
+    spaceQuota.setTimeserieNum(3);
+    spaceQuota.setDiskSize(1024);
+    SetSpaceQuotaPlan plan =
+        new SetSpaceQuotaPlan(Collections.singletonList("root.sg"), 
spaceQuota);
+    SetSpaceQuotaPlan deserializedPlan =
+        (SetSpaceQuotaPlan) 
ConfigPhysicalPlan.Factory.create(plan.serializeToByteBuffer());
+    Assert.assertEquals(plan.getPrefixPathList(), 
deserializedPlan.getPrefixPathList());
+    Assert.assertEquals(plan.getSpaceLimit(), 
deserializedPlan.getSpaceLimit());
+  }
 }
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/QuotaInfoTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/QuotaInfoTest.java
new file mode 100644
index 0000000000..075237e61d
--- /dev/null
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/QuotaInfoTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
+import 
org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
+import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.iotdb.db.constant.TestConstant.BASE_OUTPUT_PATH;
+
+public class QuotaInfoTest {
+
+  private QuotaInfo quotaInfo;
+  private static final File snapshotDir = new File(BASE_OUTPUT_PATH, 
"snapshot");
+
+  @Before
+  public void setup() throws IOException {
+    quotaInfo = new QuotaInfo();
+    if (!snapshotDir.exists()) {
+      snapshotDir.mkdirs();
+    }
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    if (snapshotDir.exists()) {
+      FileUtils.deleteDirectory(snapshotDir);
+    }
+  }
+
+  private void prepareQuotaInfo() {
+    List<String> prefixPathList = new ArrayList<>();
+    prefixPathList.add("root.sg");
+    prefixPathList.add("root.ln");
+    TSpaceQuota spaceQuota = new TSpaceQuota();
+    spaceQuota.setTimeserieNum(10000);
+    spaceQuota.setDeviceNum(100);
+    spaceQuota.setDiskSize(512);
+    SetSpaceQuotaPlan setSpaceQuotaPlan = new 
SetSpaceQuotaPlan(prefixPathList, spaceQuota);
+    quotaInfo.setSpaceQuota(setSpaceQuotaPlan);
+  }
+
+  @Test
+  public void testSnapshot() throws TException, IOException {
+    prepareQuotaInfo();
+
+    quotaInfo.processTakeSnapshot(snapshotDir);
+    QuotaInfo quotaInfo2 = new QuotaInfo();
+    quotaInfo2.processLoadSnapshot(snapshotDir);
+
+    Assert.assertEquals(quotaInfo.getSpaceQuotaLimit(), 
quotaInfo2.getSpaceQuotaLimit());
+  }
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index bc6e07c36b..ba148179d2 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -126,6 +126,9 @@ public class CommonConfig {
   /** Ip and port of target ML node. */
   private TEndPoint targetMLNodeEndPoint = new TEndPoint("127.0.0.1", 10810);
 
+  /** multi-tenancy */
+  private boolean quotaEnable = false;
+
   CommonConfig() {}
 
   public void updatePath(String homeDir) {
@@ -370,4 +373,12 @@ public class CommonConfig {
   public void setStopping(boolean stopping) {
     isStopping = stopping;
   }
+
+  public boolean isQuotaEnable() {
+    return quotaEnable;
+  }
+
+  public void setQuotaEnable(boolean quotaEnable) {
+    this.quotaEnable = quotaEnable;
+  }
 }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 5aa42bbe5f..64f21944d7 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -187,6 +187,9 @@ public class CommonDescriptor {
                     "disk_space_warning_threshold",
                     String.valueOf(config.getDiskSpaceWarningThreshold()))
                 .trim()));
+    config.setQuotaEnable(
+        Boolean.parseBoolean(
+            properties.getProperty("quota_enable", 
String.valueOf(config.isQuotaEnable()))));
 
     String endPointUrl =
         properties.getProperty(
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 37321694d2..fd1c98e8c4 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -278,6 +278,10 @@ public class IoTDBConstant {
   public static final String IOTDB_FOREGROUND = "iotdb-foreground";
   public static final String IOTDB_PIDFILE = "iotdb-pidfile";
 
+  // quota
+  public static final String SPACE_QUOTA_DISK = "disk";
+  public static final String SPACE_QUOTA_UNLIMITED = "unlimited";
+
   // client version number
   public enum ClientVersion {
     V_0_12,
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/enums/SpaceQuotaType.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/enums/SpaceQuotaType.java
new file mode 100644
index 0000000000..bfbb6562a4
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/enums/SpaceQuotaType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.enums;
+
+public enum SpaceQuotaType {
+  diskSize,
+  deviceNum,
+  timeSeriesNum
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/utils/BasicStructureSerDeUtil.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/utils/BasicStructureSerDeUtil.java
index efb6796b80..7d6ff521a8 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/utils/BasicStructureSerDeUtil.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/utils/BasicStructureSerDeUtil.java
@@ -29,6 +29,7 @@ import java.util.Map;
 
 public class BasicStructureSerDeUtil {
   public static final int INT_LEN = 4;
+  public static final int LONG_LEN = 8;
 
   private BasicStructureSerDeUtil() {}
 
@@ -63,6 +64,11 @@ public class BasicStructureSerDeUtil {
     return buffer.getInt();
   }
 
+  /** read a long var from byteBuffer. */
+  public static long readLong(ByteBuffer buffer) {
+    return buffer.getLong();
+  }
+
   /**
    * write string to byteBuffer.
    *
@@ -125,6 +131,16 @@ public class BasicStructureSerDeUtil {
     return INT_LEN;
   }
 
+  /**
+   * write a long n to dataOutputStream.
+   *
+   * @return The number of bytes used to represent n.
+   */
+  public static int write(long n, DataOutputStream stream) throws IOException {
+    stream.writeLong(n);
+    return LONG_LEN;
+  }
+
   /**
    * write an int n to dataOutputStream.
    *
diff --git 
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
 
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index 69aec0c956..0d853b2667 100644
--- 
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++ 
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -1534,6 +1534,16 @@ public class RSchemaRegion implements ISchemaRegion {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public long countDeviceNumBySchemaRegion() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long countTimeSeriesNumBySchemaRegion() throws MetadataException {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public String toString() {
     return String.format("database:[%s]", storageGroupFullPath);
diff --git 
a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
 
b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
index 75b73a4d9e..9ac8eb1812 100644
--- 
a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
+++ 
b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
@@ -583,6 +583,16 @@ public class TagSchemaRegion implements ISchemaRegion {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public long countDeviceNumBySchemaRegion() throws MetadataException {
+    throw new UnsupportedOperationException("countDeviceNumBySchemaRegion");
+  }
+
+  @Override
+  public long countTimeSeriesNumBySchemaRegion() throws MetadataException {
+    throw new 
UnsupportedOperationException("countTimeSeriesNumBySchemaRegion");
+  }
+
   @Override
   public String toString() {
     return "TagSchemaRegion{"
diff --git 
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java 
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index ed6865a302..53b8c99439 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ThriftClient;
@@ -117,6 +118,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTrailReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTrailResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
@@ -2073,6 +2075,54 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
     throw new TException(new UnsupportedOperationException().getCause());
   }
 
+  @Override
+  public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TSStatus status = client.setSpaceQuota(req);
+        if (!updateConfigNodeLeader(status)) {
+          return status;
+        }
+      } catch (TException e) {
+        configLeader = null;
+      }
+      waitAndReconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
+  @Override
+  public TSpaceQuotaResp showSpaceQuota(List<String> databases) throws 
TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TSpaceQuotaResp resp = client.showSpaceQuota(databases);
+        if (!updateConfigNodeLeader(resp.getStatus())) {
+          return resp;
+        }
+      } catch (TException e) {
+        configLeader = null;
+      }
+      waitAndReconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
+  @Override
+  public TSpaceQuotaResp getSpaceQuota() throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TSpaceQuotaResp resp = client.getSpaceQuota();
+        if (!updateConfigNodeLeader(resp.getStatus())) {
+          return resp;
+        }
+      } catch (TException e) {
+        configLeader = null;
+      }
+      waitAndReconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
   public static class Factory extends ThriftClientFactory<ConfigRegionId, 
ConfigNodeClient> {
 
     public Factory(
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index ae3f0b2af7..f92c093ddf 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1066,6 +1066,8 @@ public class IoTDBConfig {
   /** The maximum number of threads that can be used to execute subtasks in 
PipeSubtaskExecutor */
   private int pipeMaxThreadNum = 5;
 
+  private boolean quotaEnable = false;
+
   IoTDBConfig() {}
 
   public float getUdfMemoryBudgetInMB() {
@@ -3696,4 +3698,12 @@ public class IoTDBConfig {
   public int getPipeSubtaskExecutorMaxThreadNum() {
     return pipeMaxThreadNum;
   }
+
+  public boolean isQuotaEnable() {
+    return quotaEnable;
+  }
+
+  public void setQuotaEnable(boolean quotaEnable) {
+    this.quotaEnable = quotaEnable;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 3dc79cbb59..c1365a7685 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1030,6 +1030,10 @@ public class IoTDBDescriptor {
     conf.setTimePartitionInterval(
         DateTimeUtils.convertMilliTimeWithPrecision(
             conf.getTimePartitionInterval(), conf.getTimestampPrecision()));
+
+    conf.setQuotaEnable(
+        Boolean.parseBoolean(
+            properties.getProperty("quota_enable", 
String.valueOf(conf.isQuotaEnable()))));
   }
 
   private void loadAuthorCache(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java 
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index a542d8ac33..6ebfff6ffc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -839,6 +839,16 @@ public class StorageEngine implements IService {
     }
   }
 
+  public void getDiskSizeByDataRegion(
+      Map<Integer, Long> dataRegionDisk, List<Integer> dataRegionIds) {
+    dataRegionMap.forEach(
+        (dataRegionId, dataRegion) -> {
+          if (dataRegionIds.contains(dataRegionId.getId())) {
+            dataRegionDisk.put(dataRegionId.getId(), 
dataRegion.countRegionDiskSize());
+          }
+        });
+  }
+
   static class InstanceHolder {
 
     private static final StorageEngine INSTANCE = new StorageEngine();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 7aa38a3a68..2ed0072e7a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.quota.ExceedQuotaException;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.idtable.IDTableManager;
@@ -81,6 +82,7 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDevic
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.quotas.DataNodeSpaceQuotaManager;
 import org.apache.iotdb.db.rescon.TsFileResourceManager;
 import org.apache.iotdb.db.service.SettleService;
 import org.apache.iotdb.db.sync.SyncService;
@@ -132,6 +134,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -1253,6 +1256,13 @@ public class DataRegion implements IDataRegionForQuery {
     int retryCnt = 0;
     do {
       try {
+        if 
(!DataNodeSpaceQuotaManager.getInstance().checkRegionDisk(databaseName)) {
+          throw new ExceedQuotaException(
+              "Unable to continue writing data, because the space allocated to 
the database "
+                  + databaseName
+                  + " has already used the upper limit",
+              TSStatusCode.EXCEED_QUOTA_ERROR.getStatusCode());
+        }
         if (sequence) {
           tsFileProcessor =
               getOrCreateTsFileProcessorIntern(timeRangeId, 
workSequenceTsFileProcessors, true);
@@ -1276,6 +1286,9 @@ public class DataRegion implements IDataRegionForQuery {
           
CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
           break;
         }
+      } catch (ExceedQuotaException e) {
+        logger.error(e.getMessage());
+        break;
       }
     } while (tsFileProcessor == null);
     return tsFileProcessor;
@@ -3211,6 +3224,38 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
+  /** @return the disk space occupied by this data region, unit is MB */
+  public long countRegionDiskSize() {
+    AtomicLong diskSize = new AtomicLong(0);
+    DirectoryManager.getInstance()
+        .getAllFilesFolders()
+        .forEach(
+            folder -> {
+              folder = folder + File.separator + databaseName + File.separator 
+ dataRegionId;
+              countFolderDiskSize(folder, diskSize);
+            });
+    return diskSize.get() / 1024 / 1024;
+  }
+
+  /**
+   * @param folder the folder's path
+   * @param diskSize the disk space occupied by this folder, unit is MB
+   */
+  private void countFolderDiskSize(String folder, AtomicLong diskSize) {
+    File file = new File(folder);
+    File[] allFile = file.listFiles();
+    if (allFile == null) {
+      return;
+    }
+    for (File f : allFile) {
+      if (f.isFile()) {
+        diskSize.addAndGet(f.length());
+      } else if (f.isDirectory()) {
+        countFolderDiskSize(f.getAbsolutePath(), diskSize);
+      }
+    }
+  }
+
   @TestOnly
   public long getPartitionMaxFileVersions(long partitionId) {
     return partitionMaxFileVersions.getOrDefault(partitionId, 0L);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/exception/quota/ExceedQuotaException.java
 
b/server/src/main/java/org/apache/iotdb/db/exception/quota/ExceedQuotaException.java
new file mode 100644
index 0000000000..60872e0916
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/exception/quota/ExceedQuotaException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception.quota;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+
+public class ExceedQuotaException extends MetadataException {
+
+  public ExceedQuotaException(String message, int errorCode) {
+    super(message, errorCode);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index 9d03f3075e..c3f3448654 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -38,6 +38,7 @@ import 
org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import 
org.apache.iotdb.db.exception.metadata.template.DifferentTemplateException;
 import 
org.apache.iotdb.db.exception.metadata.template.TemplateImcompatibeException;
 import 
org.apache.iotdb.db.exception.metadata.template.TemplateIsInUseException;
+import org.apache.iotdb.db.exception.quota.ExceedQuotaException;
 import org.apache.iotdb.db.metadata.MetadataConstant;
 import org.apache.iotdb.db.metadata.mnode.mem.IMemMNode;
 import org.apache.iotdb.db.metadata.mnode.mem.factory.MemMNodeFactory;
@@ -63,6 +64,8 @@ import 
org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
 import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.utils.MetaFormatUtils;
+import org.apache.iotdb.db.quotas.DataNodeSpaceQuotaManager;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -371,13 +374,18 @@ public class MTreeBelowSGMemoryImpl {
   }
 
   private IMemMNode checkAndAutoCreateDeviceNode(String deviceName, IMemMNode 
deviceParent)
-      throws PathAlreadyExistException {
+      throws PathAlreadyExistException, ExceedQuotaException {
     if (deviceParent == null) {
       // device is sg
       return storageGroupMNode;
     }
     IMemMNode device = store.getChild(deviceParent, deviceName);
     if (device == null) {
+      if 
(!DataNodeSpaceQuotaManager.getInstance().checkDeviceLimit(storageGroupMNode.getName()))
 {
+        throw new ExceedQuotaException(
+            "The number of devices has reached the upper limit",
+            TSStatusCode.EXCEED_QUOTA_ERROR.getStatusCode());
+      }
       device =
           store.addChild(
               deviceParent, deviceName, 
nodeFactory.createInternalMNode(deviceParent, deviceName));
@@ -430,6 +438,14 @@ public class MTreeBelowSGMemoryImpl {
             new AliasAlreadyExistException(
                 devicePath.getFullPath() + "." + measurementList.get(i), 
aliasList.get(i)));
       }
+      if (!DataNodeSpaceQuotaManager.getInstance()
+          .checkTimeSeriesNum(storageGroupMNode.getName())) {
+        failingMeasurementMap.put(
+            i,
+            new ExceedQuotaException(
+                "The number of timeSeries has reached the upper limit",
+                TSStatusCode.EXCEED_QUOTA_ERROR.getStatusCode()));
+      }
     }
     return failingMeasurementMap;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index 9e9d8922f2..9db4a6f5d3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -277,4 +277,9 @@ public interface ISchemaRegion {
       throws MetadataException;
 
   // endregion
+
+  // count
+  long countDeviceNumBySchemaRegion() throws MetadataException;
+
+  long countTimeSeriesNumBySchemaRegion() throws MetadataException;
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
index 20c90e66d1..c2a31d3bd6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
@@ -343,6 +344,35 @@ public class SchemaEngine {
     return schemaRegionMap == null ? 0 : schemaRegionMap.size();
   }
 
+  public Map<Integer, Long> countDeviceNumBySchemaRegion(List<Integer> 
schemaIds) {
+    Map<Integer, Long> deviceNum = new HashMap<>();
+    try {
+      for (Map.Entry<SchemaRegionId, ISchemaRegion> entry : 
schemaRegionMap.entrySet()) {
+        if (schemaIds.contains(entry.getKey().getId())) {
+          deviceNum.put(entry.getKey().getId(), 
entry.getValue().countDeviceNumBySchemaRegion());
+        }
+      }
+    } catch (MetadataException e) {
+      // no
+    }
+    return deviceNum;
+  }
+
+  public Map<Integer, Long> countTimeSeriesNumBySchemaRegion(List<Integer> 
schemaIds) {
+    Map<Integer, Long> timeSeriesNum = new HashMap<>();
+    try {
+      for (Map.Entry<SchemaRegionId, ISchemaRegion> entry : 
schemaRegionMap.entrySet()) {
+        if (schemaIds.contains(entry.getKey().getId())) {
+          timeSeriesNum.put(
+              entry.getKey().getId(), 
entry.getValue().countTimeSeriesNumBySchemaRegion());
+        }
+      }
+    } catch (MetadataException e) {
+      // no
+    }
+    return timeSeriesNum;
+  }
+
   @TestOnly
   public ISchemaEngineStatistics getSchemaEngineStatistics() {
     return schemaEngineStatistics;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index b9bec0d522..45d88abc19 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.metadata.schemaregion;
 
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
@@ -49,6 +50,7 @@ import 
org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
 import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
 import 
org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanDeserializer;
 import 
org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanSerializer;
+import 
org.apache.iotdb.db.metadata.plan.schemaregion.impl.read.SchemaRegionReadPlanFactory;
 import 
org.apache.iotdb.db.metadata.plan.schemaregion.impl.write.SchemaRegionWritePlanFactory;
 import org.apache.iotdb.db.metadata.plan.schemaregion.read.IShowDevicesPlan;
 import org.apache.iotdb.db.metadata.plan.schemaregion.read.IShowNodesPlan;
@@ -85,6 +87,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -1163,6 +1166,48 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
     return mtree.getNodeReader(showNodesPlan);
   }
 
+  @Override
+  public long countDeviceNumBySchemaRegion() throws MetadataException {
+    ISchemaReader<IDeviceSchemaInfo> deviceReader =
+        this.getDeviceReader(
+            SchemaRegionReadPlanFactory.getShowDevicesPlan(
+                new PartialPath(
+                    IoTDBConstant.PATH_ROOT
+                        + IoTDBConstant.PATH_SEPARATOR
+                        + IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD),
+                false));
+    long count = 0;
+    while (deviceReader.hasNext()) {
+      deviceReader.next();
+      count++;
+    }
+    return count;
+  }
+
+  @Override
+  public long countTimeSeriesNumBySchemaRegion() throws MetadataException {
+    ISchemaReader<ITimeSeriesSchemaInfo> timeSeriesReader =
+        this.getTimeSeriesReader(
+            SchemaRegionReadPlanFactory.getShowTimeSeriesPlan(
+                new PartialPath(
+                    IoTDBConstant.PATH_ROOT
+                        + IoTDBConstant.PATH_SEPARATOR
+                        + IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD),
+                new HashMap<>(),
+                false,
+                null,
+                null,
+                0,
+                0,
+                false));
+    long count = 0;
+    while (timeSeriesReader.hasNext()) {
+      timeSeriesReader.next();
+      count++;
+    }
+    return count;
+  }
+
   // endregion
 
   private static class RecoverOperationResult {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 7f100b0b6b..1768ba2713 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.metadata.schemaregion;
 
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
@@ -52,6 +53,7 @@ import 
org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
 import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
 import 
org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanDeserializer;
 import 
org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanSerializer;
+import 
org.apache.iotdb.db.metadata.plan.schemaregion.impl.read.SchemaRegionReadPlanFactory;
 import 
org.apache.iotdb.db.metadata.plan.schemaregion.impl.write.SchemaRegionWritePlanFactory;
 import org.apache.iotdb.db.metadata.plan.schemaregion.read.IShowDevicesPlan;
 import org.apache.iotdb.db.metadata.plan.schemaregion.read.IShowNodesPlan;
@@ -90,6 +92,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -1322,6 +1325,48 @@ public class SchemaRegionSchemaFileImpl implements 
ISchemaRegion {
   }
   // endregion
 
+  @Override
+  public long countDeviceNumBySchemaRegion() throws MetadataException {
+    ISchemaReader<IDeviceSchemaInfo> deviceReader =
+        this.getDeviceReader(
+            SchemaRegionReadPlanFactory.getShowDevicesPlan(
+                new PartialPath(
+                    IoTDBConstant.PATH_ROOT
+                        + IoTDBConstant.PATH_SEPARATOR
+                        + IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD),
+                false));
+    long count = 0;
+    while (deviceReader.hasNext()) {
+      deviceReader.next();
+      count++;
+    }
+    return count;
+  }
+
+  @Override
+  public long countTimeSeriesNumBySchemaRegion() throws MetadataException {
+    ISchemaReader<ITimeSeriesSchemaInfo> timeSeriesReader =
+        this.getTimeSeriesReader(
+            SchemaRegionReadPlanFactory.getShowTimeSeriesPlan(
+                new PartialPath(
+                    IoTDBConstant.PATH_ROOT
+                        + IoTDBConstant.PATH_SEPARATOR
+                        + IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD),
+                new HashMap<>(),
+                false,
+                null,
+                null,
+                0,
+                0,
+                false));
+    long count = 0;
+    while (timeSeriesReader.hasNext()) {
+      timeSeriesReader.next();
+      count++;
+    }
+    return count;
+  }
+
   private static class RecoverOperationResult {
 
     private static final RecoverOperationResult SUCCESS = new 
RecoverOperationResult(null);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
index ded93fc3df..213778db8a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -161,6 +161,11 @@ public class ColumnHeaderConstant {
   public static final String ELAPSED_TIME = "ElapsedTime";
   public static final String STATEMENT = "Statement";
 
+  // column names for quotas
+  public static final String QUOTA_TYPE = "quotaType";
+  public static final String LIMIT = "limit";
+  public static final String USED = "used";
+
   // column names for show models/trails
   public static final String MODEL_ID = "ModelId";
   public static final String TRAIL_ID = "TrailId";
@@ -405,6 +410,13 @@ public class ColumnHeaderConstant {
           new ColumnHeader(ELAPSED_TIME, TSDataType.FLOAT),
           new ColumnHeader(STATEMENT, TSDataType.TEXT));
 
+  public static final List<ColumnHeader> showSpaceQuotaColumnHeaders =
+      ImmutableList.of(
+          new ColumnHeader(DATABASE, TSDataType.TEXT),
+          new ColumnHeader(QUOTA_TYPE, TSDataType.TEXT),
+          new ColumnHeader(LIMIT, TSDataType.TEXT),
+          new ColumnHeader(USED, TSDataType.TEXT));
+
   public static final List<ColumnHeader> showModelsColumnHeaders =
       ImmutableList.of(
           new ColumnHeader(MODEL_ID, TSDataType.TEXT),
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
index c7b07431d0..2896be7dd3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
@@ -169,6 +169,10 @@ public class DatasetHeaderFactory {
     return new DatasetHeader(ColumnHeaderConstant.showQueriesColumnHeaders, 
false);
   }
 
+  public static DatasetHeader getShowSpaceQuotaHeader() {
+    return new DatasetHeader(ColumnHeaderConstant.showSpaceQuotaColumnHeaders, 
true);
+  }
+
   public static DatasetHeader getShowModelsHeader() {
     return new DatasetHeader(ColumnHeaderConstant.showModelsColumnHeaders, 
true);
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index 81c33c845c..20bf24ce4a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -73,6 +73,8 @@ import 
org.apache.iotdb.db.mpp.plan.execution.config.sys.pipe.DropPipeTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.pipe.ShowPipeTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.pipe.StartPipeTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.pipe.StopPipeTask;
+import 
org.apache.iotdb.db.mpp.plan.execution.config.sys.quota.SetSpaceQuotaTask;
+import 
org.apache.iotdb.db.mpp.plan.execution.config.sys.quota.ShowSpaceQuotaTask;
 import 
org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.CreatePipeSinkTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.DropPipeSinkTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeSinkTask;
@@ -132,6 +134,8 @@ import 
org.apache.iotdb.db.mpp.plan.statement.sys.pipe.DropPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StartPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StopPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
+import 
org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
@@ -456,6 +460,18 @@ public class ConfigTaskVisitor
     return new ShowContinuousQueriesTask();
   }
 
+  @Override
+  public IConfigTask visitSetSpaceQuota(
+      SetSpaceQuotaStatement setSpaceQuotaStatement, TaskContext context) {
+    return new SetSpaceQuotaTask(setSpaceQuotaStatement);
+  }
+
+  @Override
+  public IConfigTask visitShowSpaceQuota(
+      ShowSpaceQuotaStatement showSpaceQuotaStatement, TaskContext context) {
+    return new ShowSpaceQuotaTask(showSpaceQuotaStatement);
+  }
+
   /** ML Model Management */
   @Override
   public IConfigTask visitCreateModel(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 53567fc601..e59005b75f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -21,7 +21,9 @@ package 
org.apache.iotdb.db.mpp.plan.execution.config.executor;
 
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.cluster.NodeStatus;
@@ -84,6 +86,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTrailReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTrailResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
 import org.apache.iotdb.db.client.ConfigNodeClient;
 import org.apache.iotdb.db.client.ConfigNodeClientManager;
@@ -120,6 +123,7 @@ import 
org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowNodes
 import 
org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowPathSetTemplateTask;
 import 
org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowSchemaTemplateTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.pipe.ShowPipeTask;
+import 
org.apache.iotdb.db.mpp.plan.execution.config.sys.quota.ShowSpaceQuotaTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeSinkTask;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountDatabaseStatement;
@@ -155,6 +159,8 @@ import 
org.apache.iotdb.db.mpp.plan.statement.sys.pipe.DropPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StartPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StopPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
+import 
org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
@@ -1814,6 +1820,33 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     return future;
   }
 
+  @Override
+  public SettableFuture<ConfigTaskResult> setSpaceQuota(
+      SetSpaceQuotaStatement setSpaceQuotaStatement) {
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    TSStatus tsStatus = new TSStatus();
+    TSetSpaceQuotaReq req = new TSetSpaceQuotaReq();
+    req.setDatabase(setSpaceQuotaStatement.getPrefixPathList());
+    TSpaceQuota spaceQuota = new TSpaceQuota();
+    spaceQuota.setDeviceNum(setSpaceQuotaStatement.getDeviceNum());
+    spaceQuota.setTimeserieNum(setSpaceQuotaStatement.getTimeSeriesNum());
+    spaceQuota.setDiskSize(setSpaceQuotaStatement.getDiskSize());
+    req.setSpaceLimit(spaceQuota);
+    try (ConfigNodeClient client =
+        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      // Send request to some API server
+      tsStatus = client.setSpaceQuota(req);
+    } catch (Exception e) {
+      future.setException(e);
+    }
+    if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+    } else {
+      future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
+    }
+    return future;
+  }
+
   @Override
   public SettableFuture<ConfigTaskResult> createModel(CreateModelStatement 
createModelStatement) {
     createModelStatement.semanticCheck();
@@ -1856,6 +1889,29 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     return future;
   }
 
+  @Override
+  public SettableFuture<ConfigTaskResult> showSpaceQuota(
+      ShowSpaceQuotaStatement showSpaceQuotaStatement) {
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+
+    try (ConfigNodeClient configNodeClient =
+        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      List<String> databases = new ArrayList<>();
+      if (showSpaceQuotaStatement.getDatabases() != null) {
+        showSpaceQuotaStatement
+            .getDatabases()
+            .forEach(database -> databases.add(database.toString()));
+      }
+      // Send request to some API server
+      TSpaceQuotaResp showSpaceQuotaResp = 
configNodeClient.showSpaceQuota(databases);
+      // build TSBlock
+      ShowSpaceQuotaTask.buildTSBlock(showSpaceQuotaResp, future);
+    } catch (Exception e) {
+      future.setException(e);
+    }
+    return future;
+  }
+
   @Override
   public SettableFuture<ConfigTaskResult> dropModel(String modelId) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
@@ -1890,10 +1946,22 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     } catch (ClientManagerException | TException e) {
       future.setException(e);
     }
-
     return future;
   }
 
+  @Override
+  public TSpaceQuotaResp getSpaceQuota() {
+    TSpaceQuotaResp spaceQuotaResp = new TSpaceQuotaResp();
+    try (ConfigNodeClient configNodeClient =
+        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      // Send request to some API server
+      spaceQuotaResp = configNodeClient.getSpaceQuota();
+    } catch (Exception e) {
+      LOGGER.error(e.getMessage());
+    }
+    return spaceQuotaResp;
+  }
+
   @Override
   public SettableFuture<ConfigTaskResult> showTrails(String modelId) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
@@ -1910,7 +1978,6 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     } catch (ClientManagerException | TException e) {
       future.setException(e);
     }
-
     return future;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index 8fdaa7c8a1..8832009240 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.mpp.plan.execution.config.executor;
 
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
 import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountDatabaseStatement;
 import 
org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
@@ -55,6 +56,8 @@ import 
org.apache.iotdb.db.mpp.plan.statement.sys.pipe.DropPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StartPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StopPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
+import 
org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
@@ -177,6 +180,12 @@ public interface IConfigTaskExecutor {
 
   SettableFuture<ConfigTaskResult> showContinuousQueries();
 
+  SettableFuture<ConfigTaskResult> setSpaceQuota(SetSpaceQuotaStatement 
setSpaceQuotaStatement);
+
+  SettableFuture<ConfigTaskResult> showSpaceQuota(ShowSpaceQuotaStatement 
showSpaceQuotaStatement);
+
+  TSpaceQuotaResp getSpaceQuota();
+
   SettableFuture<ConfigTaskResult> createModel(CreateModelStatement 
createModelStatement);
 
   SettableFuture<ConfigTaskResult> dropModel(String modelId);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/quota/SetSpaceQuotaTask.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/quota/SetSpaceQuotaTask.java
new file mode 100644
index 0000000000..bb80bc4d9b
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/quota/SetSpaceQuotaTask.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config.sys.quota;
+
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import 
org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class SetSpaceQuotaTask implements IConfigTask {
+
+  private final SetSpaceQuotaStatement setSpaceQuotaStatement;
+
+  public SetSpaceQuotaTask(SetSpaceQuotaStatement setSpaceQuotaStatement) {
+    this.setSpaceQuotaStatement = setSpaceQuotaStatement;
+  }
+
+  @Override
+  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
+      throws InterruptedException {
+    return configTaskExecutor.setSpaceQuota(setSpaceQuotaStatement);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/quota/ShowSpaceQuotaTask.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/quota/ShowSpaceQuotaTask.java
new file mode 100644
index 0000000000..0c26415229
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/quota/ShowSpaceQuotaTask.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config.sys.quota;
+
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.enums.SpaceQuotaType;
+import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import 
org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import 
org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ShowSpaceQuotaTask implements IConfigTask {
+
+  private final ShowSpaceQuotaStatement showSpaceQuotaStatement;
+
+  public ShowSpaceQuotaTask(ShowSpaceQuotaStatement showSpaceQuotaStatement) {
+    this.showSpaceQuotaStatement = showSpaceQuotaStatement;
+  }
+
+  @Override
+  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
+      throws InterruptedException {
+    return configTaskExecutor.showSpaceQuota(showSpaceQuotaStatement);
+  }
+
+  public static void buildTSBlock(TSpaceQuotaResp resp, 
SettableFuture<ConfigTaskResult> future) {
+    List<TSDataType> outputDataTypes =
+        ColumnHeaderConstant.showSpaceQuotaColumnHeaders.stream()
+            .map(ColumnHeader::getColumnType)
+            .collect(Collectors.toList());
+    TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+    if (resp.getSpaceQuota() != null) {
+      for (Map.Entry<String, TSpaceQuota> spaceQuotaEntry : 
resp.getSpaceQuota().entrySet()) {
+        if (spaceQuotaEntry.getValue().getDiskSize() != -1) {
+          builder.getTimeColumnBuilder().writeLong(0L);
+          
builder.getColumnBuilder(0).writeBinary(Binary.valueOf(spaceQuotaEntry.getKey()));
+          
builder.getColumnBuilder(1).writeBinary(Binary.valueOf(SpaceQuotaType.diskSize.name()));
+          builder
+              .getColumnBuilder(2)
+              .writeBinary(
+                  Binary.valueOf(
+                      spaceQuotaEntry.getValue().getDiskSize() == 0
+                          ? IoTDBConstant.SPACE_QUOTA_UNLIMITED
+                          : spaceQuotaEntry.getValue().getDiskSize() + "M"));
+          builder
+              .getColumnBuilder(3)
+              .writeBinary(
+                  Binary.valueOf(
+                      
resp.getSpaceQuotaUsage().get(spaceQuotaEntry.getKey()).getDiskSize() + "M"));
+          builder.declarePosition();
+        }
+        if (spaceQuotaEntry.getValue().getDeviceNum() != -1) {
+          builder.getTimeColumnBuilder().writeLong(0L);
+          
builder.getColumnBuilder(0).writeBinary(Binary.valueOf(spaceQuotaEntry.getKey()));
+          
builder.getColumnBuilder(1).writeBinary(Binary.valueOf(SpaceQuotaType.deviceNum.name()));
+          builder
+              .getColumnBuilder(2)
+              .writeBinary(
+                  Binary.valueOf(
+                      spaceQuotaEntry.getValue().getDeviceNum() == 0
+                          ? IoTDBConstant.SPACE_QUOTA_UNLIMITED
+                          : spaceQuotaEntry.getValue().getDeviceNum() + ""));
+          builder
+              .getColumnBuilder(3)
+              .writeBinary(
+                  Binary.valueOf(
+                      
resp.getSpaceQuotaUsage().get(spaceQuotaEntry.getKey()).getDeviceNum() + ""));
+          builder.declarePosition();
+        }
+        if (spaceQuotaEntry.getValue().getTimeserieNum() != -1) {
+          builder.getTimeColumnBuilder().writeLong(0L);
+          
builder.getColumnBuilder(0).writeBinary(Binary.valueOf(spaceQuotaEntry.getKey()));
+          builder
+              .getColumnBuilder(1)
+              
.writeBinary(Binary.valueOf(SpaceQuotaType.timeSeriesNum.name()));
+          builder
+              .getColumnBuilder(2)
+              .writeBinary(
+                  Binary.valueOf(
+                      spaceQuotaEntry.getValue().getTimeserieNum() == 0
+                          ? IoTDBConstant.SPACE_QUOTA_UNLIMITED
+                          : spaceQuotaEntry.getValue().getTimeserieNum() + 
""));
+          builder
+              .getColumnBuilder(3)
+              .writeBinary(
+                  Binary.valueOf(
+                      
resp.getSpaceQuotaUsage().get(spaceQuotaEntry.getKey()).getTimeserieNum()
+                          + ""));
+          builder.declarePosition();
+        }
+      }
+    }
+    DatasetHeader datasetHeader = 
DatasetHeaderFactory.getShowSpaceQuotaHeader();
+    future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, 
builder.build(), datasetHeader));
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 4bc44543fe..934067f1a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -167,6 +167,8 @@ import 
org.apache.iotdb.db.mpp.plan.statement.sys.pipe.DropPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StartPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StopPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
+import 
org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
@@ -3379,4 +3381,95 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
         Integer.parseInt(ctx.fromId.getText()),
         Integer.parseInt(ctx.toId.getText()));
   }
+
+  // Quota
+  @Override
+  public Statement visitSetSpaceQuota(IoTDBSqlParser.SetSpaceQuotaContext ctx) 
{
+    if (!IoTDBDescriptor.getInstance().getConfig().isQuotaEnable()) {
+      throw new SemanticException("Limit configuration is not enabled, please 
enable it first.");
+    }
+    SetSpaceQuotaStatement setSpaceQuotaStatement = new 
SetSpaceQuotaStatement();
+    List<IoTDBSqlParser.PrefixPathContext> prefixPathContexts = 
ctx.prefixPath();
+    List<String> paths = new ArrayList<>();
+    for (IoTDBSqlParser.PrefixPathContext prefixPathContext : 
prefixPathContexts) {
+      paths.add(parsePrefixPath(prefixPathContext).getFullPath());
+    }
+    setSpaceQuotaStatement.setPrefixPathList(paths);
+
+    Map<String, String> quotas = new HashMap<>();
+    for (IoTDBSqlParser.AttributePairContext attributePair : 
ctx.attributePair()) {
+      quotas.put(
+          parseAttributeKey(attributePair.attributeKey()),
+          parseAttributeValue(attributePair.attributeValue()));
+    }
+
+    if (quotas.containsKey(IoTDBConstant.COLUMN_DEVICES)) {
+      if 
(quotas.get(IoTDBConstant.COLUMN_DEVICES).equals(IoTDBConstant.SPACE_QUOTA_UNLIMITED))
 {
+        setSpaceQuotaStatement.setDeviceNum(-1);
+      } else if (Long.parseLong(quotas.get(IoTDBConstant.COLUMN_DEVICES)) <= 
0) {
+        throw new SemanticException("Please set the number of devices greater 
than 0");
+      } else {
+        setSpaceQuotaStatement.setDeviceNum(
+            Long.parseLong(quotas.get(IoTDBConstant.COLUMN_DEVICES)));
+      }
+    }
+    if (quotas.containsKey(IoTDBConstant.COLUMN_TIMESERIES)) {
+      if 
(quotas.get(IoTDBConstant.COLUMN_TIMESERIES).equals(IoTDBConstant.SPACE_QUOTA_UNLIMITED))
 {
+        setSpaceQuotaStatement.setTimeSeriesNum(-1);
+      } else if (Long.parseLong(quotas.get(IoTDBConstant.COLUMN_TIMESERIES)) 
<= 0) {
+        throw new SemanticException("Please set the number of timeseries 
greater than 0");
+      } else {
+        setSpaceQuotaStatement.setTimeSeriesNum(
+            Long.parseLong(quotas.get(IoTDBConstant.COLUMN_TIMESERIES)));
+      }
+    }
+    if (quotas.containsKey(IoTDBConstant.SPACE_QUOTA_DISK)) {
+      if 
(quotas.get(IoTDBConstant.SPACE_QUOTA_DISK).equals(IoTDBConstant.SPACE_QUOTA_UNLIMITED))
 {
+        setSpaceQuotaStatement.setDiskSize(-1);
+      } else {
+        
setSpaceQuotaStatement.setDiskSize(parseUnit(quotas.get(IoTDBConstant.SPACE_QUOTA_DISK)));
+      }
+    }
+    return setSpaceQuotaStatement;
+  }
+
+  private long parseUnit(String data) {
+    String unit = data.substring(data.length() - 1);
+    long disk = Long.parseLong(data.substring(0, data.length() - 1));
+    if (disk <= 0) {
+      throw new SemanticException("Please set the disk size greater than 0");
+    }
+    switch (unit.toLowerCase()) {
+      case "m":
+        return disk;
+      case "g":
+        return disk * 1024;
+      case "t":
+        return disk * 1024 * 1024;
+      case "p":
+        return disk * 1024 * 1024 * 1024;
+      default:
+        throw new SemanticException(
+            "When setting the disk size, the unit is incorrect. Please use 
'M', 'G', 'P', 'T' as the unit");
+    }
+  }
+
+  @Override
+  public Statement visitShowSpaceQuota(IoTDBSqlParser.ShowSpaceQuotaContext 
ctx) {
+    if (!IoTDBDescriptor.getInstance().getConfig().isQuotaEnable()) {
+      throw new SemanticException("Limit configuration is not enabled, please 
enable it first.");
+    }
+    ShowSpaceQuotaStatement showSpaceQuotaStatement = new 
ShowSpaceQuotaStatement();
+    List<PartialPath> databases = null;
+    if (ctx.prefixPath() != null) {
+      databases = new ArrayList<>();
+      for (IoTDBSqlParser.PrefixPathContext prefixPathContext : 
ctx.prefixPath()) {
+        databases.add(parsePrefixPath(prefixPathContext));
+      }
+      showSpaceQuotaStatement.setDatabases(databases);
+    } else {
+      showSpaceQuotaStatement.setDatabases(null);
+    }
+    return showSpaceQuotaStatement;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java
index f0a91bbfee..27a041fba8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java
@@ -161,4 +161,7 @@ public enum StatementType {
   SHOW_PIPES,
 
   BATCH_ACTIVATE_TEMPLATE,
+
+  SET_SPACE_QUOTA,
+  SHOW_SPACE_QUOTA,
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 93bbda8947..1e73670802 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -103,6 +103,8 @@ import 
org.apache.iotdb.db.mpp.plan.statement.sys.pipe.DropPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StartPipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StopPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
+import 
org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
@@ -522,4 +524,12 @@ public abstract class StatementVisitor<R, C> {
       InternalCreateMultiTimeSeriesStatement 
internalCreateMultiTimeSeriesStatement, C context) {
     return visitStatement(internalCreateMultiTimeSeriesStatement, context);
   }
+
+  public R visitSetSpaceQuota(SetSpaceQuotaStatement setSpaceQuotaStatement, C 
context) {
+    return visitStatement(setSpaceQuotaStatement, context);
+  }
+
+  public R visitShowSpaceQuota(ShowSpaceQuotaStatement 
showSpaceQuotaStatement, C context) {
+    return visitStatement(showSpaceQuotaStatement, context);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/quota/SetSpaceQuotaStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/quota/SetSpaceQuotaStatement.java
new file mode 100644
index 0000000000..0eb4023d5b
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/quota/SetSpaceQuotaStatement.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.sys.quota;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+
+import java.util.Collections;
+import java.util.List;
+
+public class SetSpaceQuotaStatement extends Statement implements 
IConfigStatement {
+
+  private long timeSeriesNum;
+  private long deviceNum;
+  private long diskSize;
+  private String policy;
+  private List<String> prefixPathList;
+
+  /** QuotaOperator Constructor with OperatorType. */
+  public SetSpaceQuotaStatement() {
+    super();
+    statementType = StatementType.SET_SPACE_QUOTA;
+  }
+
+  public long getTimeSeriesNum() {
+    return timeSeriesNum;
+  }
+
+  public void setTimeSeriesNum(long timeSeriesNum) {
+    this.timeSeriesNum = timeSeriesNum;
+  }
+
+  public long getDeviceNum() {
+    return deviceNum;
+  }
+
+  public void setDeviceNum(long deviceNum) {
+    this.deviceNum = deviceNum;
+  }
+
+  public long getDiskSize() {
+    return diskSize;
+  }
+
+  public void setDiskSize(long diskSize) {
+    this.diskSize = diskSize;
+  }
+
+  public String getPolicy() {
+    return policy;
+  }
+
+  public void setPolicy(String policy) {
+    this.policy = policy;
+  }
+
+  public List<String> getPrefixPathList() {
+    return prefixPathList;
+  }
+
+  public void setPrefixPathList(List<String> prefixPathList) {
+    this.prefixPathList = prefixPathList;
+  }
+
+  @Override
+  public QueryType getQueryType() {
+    return QueryType.WRITE;
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitSetSpaceQuota(this, context);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/quota/ShowSpaceQuotaStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/quota/ShowSpaceQuotaStatement.java
new file mode 100644
index 0000000000..35669ec7e8
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/quota/ShowSpaceQuotaStatement.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.sys.quota;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+
+import java.util.List;
+
+public class ShowSpaceQuotaStatement extends Statement implements 
IConfigStatement {
+
+  private List<PartialPath> databases;
+
+  public ShowSpaceQuotaStatement() {
+    super();
+    statementType = StatementType.SHOW_SPACE_QUOTA;
+  }
+
+  public List<PartialPath> getDatabases() {
+    return databases;
+  }
+
+  public void setDatabases(List<PartialPath> databases) {
+    this.databases = databases;
+  }
+
+  @Override
+  public QueryType getQueryType() {
+    return QueryType.READ;
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return databases;
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitShowSpaceQuota(this, context);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSizeStore.java 
b/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSizeStore.java
new file mode 100644
index 0000000000..6585460fcf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSizeStore.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.quotas;
+
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.db.engine.StorageEngine;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class DataNodeSizeStore {
+
+  private final StorageEngine storageEngine;
+  private final Map<Integer, Long> dataRegionDisk;
+  private final ScheduledExecutorService scheduledExecutorService;
+  private List<Integer> dataRegionIds;
+
+  public DataNodeSizeStore() {
+    storageEngine = StorageEngine.getInstance();
+    dataRegionDisk = new HashMap<>();
+    dataRegionIds = new ArrayList<>();
+    this.scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+    ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+        scheduledExecutorService, () -> calculateRegionSize(dataRegionIds), 0, 
5, TimeUnit.SECONDS);
+  }
+
+  public void calculateRegionSize(List<Integer> dataRegionIds) {
+    storageEngine.getDiskSizeByDataRegion(dataRegionDisk, dataRegionIds);
+  }
+
+  public Map<Integer, Long> getDataRegionDisk() {
+    return dataRegionDisk;
+  }
+
+  public void setDataRegionIds(List<Integer> dataRegionIds) {
+    this.dataRegionIds = dataRegionIds;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSpaceQuotaManager.java
 
b/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSpaceQuotaManager.java
new file mode 100644
index 0000000000..1e46ee2958
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSpaceQuotaManager.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.quotas;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
+import 
org.apache.iotdb.db.mpp.plan.execution.config.executor.ClusterConfigTaskExecutor;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DataNodeSpaceQuotaManager {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataNodeSpaceQuotaManager.class);
+
+  private Map<String, TSpaceQuota> spaceQuotaLimit;
+  private Map<String, TSpaceQuota> spaceQuotaUsage;
+  private DataNodeSizeStore dataNodeSizeStore;
+
+  public DataNodeSpaceQuotaManager() {
+    spaceQuotaLimit = new HashMap<>();
+    spaceQuotaUsage = new HashMap<>();
+    dataNodeSizeStore = new DataNodeSizeStore();
+    recover();
+  }
+
+  public DataNodeSpaceQuotaManager(
+      Map<String, TSpaceQuota> spaceQuotaLimit, Map<String, TSpaceQuota> 
spaceQuotaUsage) {
+    this.spaceQuotaLimit = spaceQuotaLimit;
+    this.spaceQuotaUsage = spaceQuotaUsage;
+  }
+
+  /** SingleTone */
+  private static class DataNodeSpaceQuotaManagerHolder {
+    private static final DataNodeSpaceQuotaManager INSTANCE = new 
DataNodeSpaceQuotaManager();
+
+    private DataNodeSpaceQuotaManagerHolder() {}
+  }
+
+  public static DataNodeSpaceQuotaManager getInstance() {
+    return DataNodeSpaceQuotaManagerHolder.INSTANCE;
+  }
+
+  public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) {
+    for (String database : req.getDatabase()) {
+      spaceQuotaLimit.put(database, req.getSpaceLimit());
+      spaceQuotaUsage.put(database, new TSpaceQuota());
+    }
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  private void recover() {
+    TSpaceQuotaResp spaceQuota = 
ClusterConfigTaskExecutor.getInstance().getSpaceQuota();
+    if (spaceQuota.getStatus() != null) {
+      if (spaceQuota.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && spaceQuota.getSpaceQuota() != null) {
+        for (String database : spaceQuota.getSpaceQuota().keySet()) {
+          spaceQuotaLimit.put(database, 
spaceQuota.getSpaceQuota().get(database));
+          spaceQuotaUsage.put(database, new TSpaceQuota());
+        }
+      }
+      LOGGER.info("Space quota limit restored succeeded. " + 
spaceQuotaLimit.toString());
+    } else {
+      LOGGER.error("Space quota limit restored failed. " + 
spaceQuotaLimit.toString());
+    }
+  }
+
+  public boolean checkDeviceLimit(String database) {
+    database = IoTDBConstant.PATH_ROOT + IoTDBConstant.PATH_SEPARATOR + 
database;
+    TSpaceQuota spaceQuota = spaceQuotaLimit.get(database);
+    if (spaceQuota == null) {
+      return true;
+    } else if (spaceQuota.getDeviceNum() == 0 || spaceQuota.getDeviceNum() == 
-1) {
+      return true;
+    }
+    long deviceNum = spaceQuotaUsage.get(database).getDeviceNum();
+    if (spaceQuota.getDeviceNum() - deviceNum > 0) {
+      return true;
+    }
+    return false;
+  }
+
+  public void updateSpaceQuotaUsage(Map<String, TSpaceQuota> spaceQuotaUsage) {
+    this.spaceQuotaUsage = spaceQuotaUsage;
+  }
+
+  public boolean checkTimeSeriesNum(String database) {
+    database = IoTDBConstant.PATH_ROOT + IoTDBConstant.PATH_SEPARATOR + 
database;
+    TSpaceQuota spaceQuota = spaceQuotaLimit.get(database);
+    if (spaceQuota == null) {
+      return true;
+    } else if (spaceQuota.getTimeserieNum() == 0 || 
spaceQuota.getTimeserieNum() == -1) {
+      return true;
+    }
+    long timeSeriesNum = spaceQuotaUsage.get(database).getTimeserieNum();
+    if (spaceQuota.getTimeserieNum() - timeSeriesNum > 0) {
+      return true;
+    }
+    return false;
+  }
+
+  public boolean checkRegionDisk(String database) {
+    TSpaceQuota spaceQuota = spaceQuotaLimit.get(database);
+    if (spaceQuota == null) {
+      return true;
+    } else if (spaceQuota.getDiskSize() == 0 || spaceQuota.getDiskSize() == 
-1) {
+      return true;
+    }
+    long diskSize = spaceQuotaUsage.get(database).getDiskSize();
+    if (spaceQuota.getDiskSize() - diskSize > 0) {
+      return true;
+    }
+    return false;
+  }
+
+  public void setDataRegionIds(List<Integer> dataRegionIds) {
+    dataNodeSizeStore.setDataRegionIds(dataRegionIds);
+  }
+
+  public Map<Integer, Long> getRegionDisk() {
+    return dataNodeSizeStore.getDataRegionDisk();
+  }
+
+  public void setSpaceQuotaLimit(Map<String, TSpaceQuota> spaceQuotaLimit) {
+    this.spaceQuotaLimit = spaceQuotaLimit;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index d00903759a..c087ca9aa1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.common.rpc.thrift.TSettleReq;
 import org.apache.iotdb.commons.cluster.NodeStatus;
@@ -106,6 +107,7 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.query.control.clientsession.InternalClientSession;
+import org.apache.iotdb.db.quotas.DataNodeSpaceQuotaManager;
 import org.apache.iotdb.db.service.DataNode;
 import org.apache.iotdb.db.service.RegionMigrateService;
 import org.apache.iotdb.db.sync.SyncService;
@@ -225,6 +227,9 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   private final DataNodeRegionManager regionManager = 
DataNodeRegionManager.getInstance();
 
+  private final DataNodeSpaceQuotaManager spaceQuotaManager =
+      DataNodeSpaceQuotaManager.getInstance();
+
   public DataNodeInternalRPCServiceImpl() {
     super();
     PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
@@ -873,6 +878,11 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     return RpcUtils.SUCCESS_STATUS;
   }
 
+  @Override
+  public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) throws TException {
+    return spaceQuotaManager.setSpaceQuota(req);
+  }
+
   private PathPatternTree filterPathPatternTree(PathPatternTree patternTree, 
String storageGroup) {
     PathPatternTree filteredPatternTree = new PathPatternTree();
     try {
@@ -929,6 +939,16 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     if (CommonDescriptor.getInstance().getConfig().getStatusReason() != null) {
       
resp.setStatusReason(CommonDescriptor.getInstance().getConfig().getStatusReason());
     }
+    if (req.getSchemaRegionIds() != null) {
+      spaceQuotaManager.updateSpaceQuotaUsage(req.getSpaceQuotaUsage());
+      
resp.setDeviceNum(schemaEngine.countDeviceNumBySchemaRegion(req.getSchemaRegionIds()));
+      resp.setTimeSeriesNum(
+          
schemaEngine.countTimeSeriesNumBySchemaRegion(req.getSchemaRegionIds()));
+    }
+    if (req.getDataRegionIds() != null) {
+      spaceQuotaManager.setDataRegionIds(req.getDataRegionIds());
+      resp.setRegionDisk(spaceQuotaManager.getRegionDisk());
+    }
     return resp;
   }
 
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 9042f5a12a..69d6cc6c3a 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -193,7 +193,10 @@ public enum TSStatusCode {
   PIPE_PLUGIN_LOAD_CLASS_ERROR(1602),
   PIPE_PLUGIN_DOWNLOAD_ERROR(1603),
   CREATE_PIPE_PLUGIN_ON_DATANODE_ERROR(1604),
-  DROP_PIPE_PLUGIN_ON_DATANODE_ERROR(1605);
+  DROP_PIPE_PLUGIN_ON_DATANODE_ERROR(1605),
+
+  // Quota
+  EXCEED_QUOTA_ERROR(1700);
 
   private final int statusCode;
 
diff --git a/thrift-commons/src/main/thrift/common.thrift 
b/thrift-commons/src/main/thrift/common.thrift
index 0b19406d71..dc10721b37 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -128,6 +128,18 @@ struct TFilesResp {
   2: required list<TFile> files
 }
 
+// quota
+struct TSpaceQuota {
+  1: optional i64 diskSize
+  2: optional i64 deviceNum
+  3: optional i64 timeserieNum
+}
+
+struct TSetSpaceQuotaReq {
+  1: required list<string> database
+  2: required TSpaceQuota spaceLimit
+}
+
 enum TAggregationType {
   COUNT,
   AVG,
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift 
b/thrift-confignode/src/main/thrift/confignode.thrift
index ac5ab69268..050f00fe33 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -730,6 +730,15 @@ struct TUpdateModelStateReq {
   3: optional string bestTrailId
 }
 
+// ====================================================
+// Quota
+// ====================================================
+struct TSpaceQuotaResp{
+  1: required common.TSStatus status
+  2: optional map<string, common.TSpaceQuota> spaceQuota
+  3: optional map<string, common.TSpaceQuota> spaceQuotaUsage
+}
+
 service IConfigNodeRPCService {
 
   // ======================================================
@@ -1333,5 +1342,15 @@ service IConfigNodeRPCService {
    * @return SUCCESS_STATUS if the model was removed successfully
    */
   common.TSStatus updateModelState(TUpdateModelStateReq req)
+
+  // ======================================================
+  // Quota
+  // ======================================================
+  /** Set Space Quota */
+  common.TSStatus setSpaceQuota(common.TSetSpaceQuotaReq req)
+
+  TSpaceQuotaResp showSpaceQuota(list<string> databases);
+
+  TSpaceQuotaResp getSpaceQuota();
 }
 
diff --git a/thrift/src/main/thrift/datanode.thrift 
b/thrift/src/main/thrift/datanode.thrift
index b3fe6d3787..c20f32a703 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -235,6 +235,9 @@ struct THeartbeatReq {
   1: required i64 heartbeatTimestamp
   2: required bool needJudgeLeader
   3: required bool needSamplingLoad
+  4: optional list<i32> schemaRegionIds
+  5: optional list<i32> dataRegionIds
+  6: optional map<string, common.TSpaceQuota> spaceQuotaUsage
 }
 
 struct THeartbeatResp {
@@ -243,6 +246,9 @@ struct THeartbeatResp {
   3: optional string statusReason
   4: optional map<common.TConsensusGroupId, bool> judgedLeaders
   5: optional TLoadSample loadSample
+  6: optional map<i32, i64> deviceNum
+  7: optional map<i32, i64> timeSeriesNum
+  8: optional map<i32, i64> regionDisk
 }
 
 struct TLoadSample {
@@ -758,6 +764,11 @@ service IDataNodeRPCService {
   * Delete model training metrics on DataNode
   */
   common.TSStatus deleteModelMetrics(TDeleteModelMetricsReq req)
+
+  /**
+   * Set space quota
+   **/
+  common.TSStatus setSpaceQuota(common.TSetSpaceQuotaReq req)
 }
 
 service MPPDataExchangeService {
@@ -792,6 +803,5 @@ service IMLNodeInternalRPCService{
   * Record model training metrics on DataNode
   */
   common.TSStatus recordModelMetrics(TRecordModelMetricsReq req)
-
 }
 


Reply via email to