This is an automated email from the ASF dual-hosted git repository.
neuyilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 831b96516e [IOTDB-5091] add space quota (#9506)
831b96516e is described below
commit 831b96516ef692933d1efb4af5792cc03ab323d7
Author: 任宇华 <[email protected]>
AuthorDate: Tue Apr 4 13:01:15 2023 +0800
[IOTDB-5091] add space quota (#9506)
---
.../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 | 2 +
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 13 +-
.../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 8 +
.../confignode/client/DataNodeRequestType.java | 5 +-
.../client/async/AsyncDataNodeClientPool.java | 7 +
.../heartbeat/DataNodeHeartbeatHandler.java | 20 +-
.../consensus/request/ConfigPhysicalPlan.java | 4 +
.../consensus/request/ConfigPhysicalPlanType.java | 5 +-
.../request/write/quota/SetSpaceQuotaPlan.java | 101 ++++++++++
.../confignode/manager/ClusterQuotaManager.java | 222 +++++++++++++++++++++
.../iotdb/confignode/manager/ConfigManager.java | 38 +++-
.../apache/iotdb/confignode/manager/IManager.java | 11 +
.../iotdb/confignode/manager/node/NodeManager.java | 16 +-
.../manager/partition/PartitionManager.java | 9 +
.../persistence/executor/ConfigPlanExecutor.java | 12 +-
.../partition/DatabasePartitionTable.java | 20 ++
.../persistence/partition/PartitionInfo.java | 17 ++
.../confignode/persistence/quota/QuotaInfo.java | 162 +++++++++++++++
.../thrift/ConfigNodeRPCServiceProcessor.java | 17 ++
.../request/ConfigPhysicalPlanSerDeTest.java | 16 ++
.../confignode/persistence/QuotaInfoTest.java | 82 ++++++++
.../apache/iotdb/commons/conf/CommonConfig.java | 11 +
.../iotdb/commons/conf/CommonDescriptor.java | 3 +
.../apache/iotdb/commons/conf/IoTDBConstant.java | 4 +
.../apache/iotdb/commons/enums/SpaceQuotaType.java | 26 +++
.../commons/utils/BasicStructureSerDeUtil.java | 16 ++
.../schemaregion/rocksdb/RSchemaRegion.java | 10 +
.../metadata/tagSchemaRegion/TagSchemaRegion.java | 10 +
.../apache/iotdb/db/client/ConfigNodeClient.java | 50 +++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 +
.../org/apache/iotdb/db/engine/StorageEngine.java | 10 +
.../iotdb/db/engine/storagegroup/DataRegion.java | 45 +++++
.../db/exception/quota/ExceedQuotaException.java | 29 +++
.../db/metadata/mtree/MTreeBelowSGMemoryImpl.java | 18 +-
.../db/metadata/schemaregion/ISchemaRegion.java | 5 +
.../db/metadata/schemaregion/SchemaEngine.java | 30 +++
.../schemaregion/SchemaRegionMemoryImpl.java | 45 +++++
.../schemaregion/SchemaRegionSchemaFileImpl.java | 45 +++++
.../db/mpp/common/header/ColumnHeaderConstant.java | 12 ++
.../db/mpp/common/header/DatasetHeaderFactory.java | 4 +
.../plan/execution/config/ConfigTaskVisitor.java | 16 ++
.../config/executor/ClusterConfigTaskExecutor.java | 71 ++++++-
.../config/executor/IConfigTaskExecutor.java | 9 +
.../config/sys/quota/SetSpaceQuotaTask.java | 42 ++++
.../config/sys/quota/ShowSpaceQuotaTask.java | 130 ++++++++++++
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 93 +++++++++
.../iotdb/db/mpp/plan/statement/StatementType.java | 3 +
.../db/mpp/plan/statement/StatementVisitor.java | 10 +
.../sys/quota/SetSpaceQuotaStatement.java | 100 ++++++++++
.../sys/quota/ShowSpaceQuotaStatement.java | 62 ++++++
.../apache/iotdb/db/quotas/DataNodeSizeStore.java | 60 ++++++
.../iotdb/db/quotas/DataNodeSpaceQuotaManager.java | 153 ++++++++++++++
.../impl/DataNodeInternalRPCServiceImpl.java | 20 ++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 5 +-
thrift-commons/src/main/thrift/common.thrift | 12 ++
.../src/main/thrift/confignode.thrift | 19 ++
thrift/src/main/thrift/datanode.thrift | 12 +-
58 files changed, 1979 insertions(+), 12 deletions(-)
diff --git
a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
index f3f0325324..652415cf54 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
@@ -148,6 +148,7 @@ keyWords
| QUERIES
| QUERY
| QUERYID
+ | QUOTA
| RANGE
| READONLY
| REGEXP
@@ -172,6 +173,7 @@ keyWords
| SHOW
| SLIMIT
| SOFFSET
+ | SPACE
| STORAGE
| START
| STARTTIME
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 212099ba97..83cedcc380 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -58,6 +58,8 @@ ddlStatement
| getRegionId | getTimeSlotList | getSeriesSlotList | migrateRegion
// ML Model
| createModel | dropModel | showModels | showTrails
+ // Quota
+ | setSpaceQuota | showSpaceQuota
;
dmlStatement
@@ -314,6 +316,16 @@ showFunctions
: SHOW FUNCTIONS
;
+// Quota
=========================================================================================
+// Show Space Quota
+showSpaceQuota
+ : SHOW SPACE QUOTA (prefixPath (COMMA prefixPath)*)?
+ ;
+
+// Set Space Quota
+setSpaceQuota
+ : SET SPACE QUOTA attributePair (COMMA attributePair)* ON prefixPath
(COMMA prefixPath)*
+ ;
// Trigger
=========================================================================================
// ---- Create Trigger
@@ -447,7 +459,6 @@ migrateRegion
: MIGRATE REGION regionId=INTEGER_LITERAL FROM fromId=INTEGER_LITERAL TO
toId=INTEGER_LITERAL
;
-
// Pipe Plugin
=========================================================================================
// Create Pipe Plugin
createPipePlugin
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index 8716e22c61..3f99561539 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -526,6 +526,10 @@ QUERYID
: Q U E R Y I D
;
+QUOTA
+ : Q U O T A
+ ;
+
RANGE
: R A N G E
;
@@ -630,6 +634,10 @@ SOFFSET
: S O F F S E T
;
+SPACE
+ : S P A C E
+ ;
+
STORAGE
: S T O R A G E
;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 4ca8a532c8..55e8a11c35 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -95,5 +95,8 @@ public enum DataNodeRequestType {
KILL_QUERY_INSTANCE,
/** ML Model */
- DELETE_MODEL_METRICS
+ DELETE_MODEL_METRICS,
+
+ /** Quota */
+ SET_SPACE_QUOTA,
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 79496235f5..2bd0cd43e9 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
@@ -353,6 +354,12 @@ public class AsyncDataNodeClientPool {
(AsyncTSStatusRPCHandler)
clientHandler.createAsyncRPCHandler(requestId,
targetDataNode));
break;
+ case SET_SPACE_QUOTA:
+ client.setSpaceQuota(
+ (TSetSpaceQuotaReq) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId,
targetDataNode));
+ break;
default:
LOGGER.error(
"Unexpected DataNode Request Type: {} when
sendAsyncRequestToDataNode",
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index 28e1d26254..fe1af63540 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -40,16 +40,25 @@ public class DataNodeHeartbeatHandler implements
AsyncMethodCallback<THeartbeatR
private final DataNodeHeartbeatCache dataNodeHeartbeatCache;
private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
private final RouteBalancer routeBalancer;
+ private final Map<Integer, Long> deviceNum;
+ private final Map<Integer, Long> timeSeriesNum;
+ private final Map<Integer, Long> regionDisk;
public DataNodeHeartbeatHandler(
TDataNodeLocation dataNodeLocation,
DataNodeHeartbeatCache dataNodeHeartbeatCache,
Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap,
- RouteBalancer routeBalancer) {
+ RouteBalancer routeBalancer,
+ Map<Integer, Long> deviceNum,
+ Map<Integer, Long> timeSeriesNum,
+ Map<Integer, Long> regionDisk) {
this.dataNodeLocation = dataNodeLocation;
this.dataNodeHeartbeatCache = dataNodeHeartbeatCache;
this.regionGroupCacheMap = regionGroupCacheMap;
this.routeBalancer = routeBalancer;
+ this.deviceNum = deviceNum;
+ this.timeSeriesNum = timeSeriesNum;
+ this.regionDisk = regionDisk;
}
@Override
@@ -82,6 +91,15 @@ public class DataNodeHeartbeatHandler implements
AsyncMethodCallback<THeartbeatR
heartbeatResp.getHeartbeatTimestamp(),
dataNodeLocation.getDataNodeId()));
}
});
+ if (heartbeatResp.getDeviceNum() != null) {
+ deviceNum.putAll(heartbeatResp.getDeviceNum());
+ }
+ if (heartbeatResp.getTimeSeriesNum() != null) {
+ timeSeriesNum.putAll(heartbeatResp.getTimeSeriesNum());
+ }
+ if (heartbeatResp.getRegionDisk() != null) {
+ regionDisk.putAll(heartbeatResp.getRegionDisk());
+ }
}
@Override
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index bb3dc7d4e3..aff108b0ce 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -79,6 +79,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePip
import
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
import
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import
org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
import
org.apache.iotdb.confignode.consensus.request.write.region.PollRegionMaintainTaskPlan;
@@ -430,6 +431,9 @@ public abstract class ConfigPhysicalPlan implements
IConsensusRequest {
case GetPipePluginJar:
plan = new GetPipePluginJarPlan();
break;
+ case setSpaceQuota:
+ plan = new SetSpaceQuotaPlan();
+ break;
default:
throw new IOException("unknown PhysicalPlan configPhysicalPlanType:
" + planType);
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index 950cfce41d..9d450259ec 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -154,7 +154,10 @@ public enum ConfigPhysicalPlanType {
CreatePipePlugin((short) 1300),
DropPipePlugin((short) 1301),
GetPipePluginTable((short) 1302),
- GetPipePluginJar((short) 1303);
+ GetPipePluginJar((short) 1303),
+
+ /** Quota */
+ setSpaceQuota((short) 1400);
private final short planType;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/quota/SetSpaceQuotaPlan.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/quota/SetSpaceQuotaPlan.java
new file mode 100644
index 0000000000..c7b46fbd63
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/quota/SetSpaceQuotaPlan.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.quota;
+
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
+import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+public class SetSpaceQuotaPlan extends ConfigPhysicalPlan {
+
+ private List<String> prefixPathList;
+ private TSpaceQuota spaceLimit;
+
+ public SetSpaceQuotaPlan() {
+ super(ConfigPhysicalPlanType.setSpaceQuota);
+ }
+
+ public SetSpaceQuotaPlan(List<String> prefixPathList, TSpaceQuota
spaceLimit) {
+ super(ConfigPhysicalPlanType.setSpaceQuota);
+ this.prefixPathList = prefixPathList;
+ this.spaceLimit = spaceLimit;
+ }
+
+ public List<String> getPrefixPathList() {
+ return prefixPathList;
+ }
+
+ public void setPrefixPathList(List<String> prefixPathList) {
+ this.prefixPathList = prefixPathList;
+ }
+
+ public TSpaceQuota getSpaceLimit() {
+ return spaceLimit;
+ }
+
+ public void setSpaceLimit(TSpaceQuota spaceLimit) {
+ this.spaceLimit = spaceLimit;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeShort(getType().getPlanType());
+ BasicStructureSerDeUtil.write(prefixPathList, stream);
+ BasicStructureSerDeUtil.write(spaceLimit.getDeviceNum(), stream);
+ BasicStructureSerDeUtil.write(spaceLimit.getTimeserieNum(), stream);
+ BasicStructureSerDeUtil.write(spaceLimit.getDiskSize(), stream);
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ List<String> prefixPathList =
BasicStructureSerDeUtil.readStringList(buffer);
+ long deviceNum = BasicStructureSerDeUtil.readLong(buffer);
+ long timeserieNum = BasicStructureSerDeUtil.readLong(buffer);
+ long disk = BasicStructureSerDeUtil.readLong(buffer);
+ this.prefixPathList = prefixPathList;
+ TSpaceQuota spaceLimit = new TSpaceQuota();
+ spaceLimit.setDeviceNum(deviceNum);
+ spaceLimit.setTimeserieNum(timeserieNum);
+ spaceLimit.setDiskSize(disk);
+ this.spaceLimit = spaceLimit;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ SetSpaceQuotaPlan that = (SetSpaceQuotaPlan) o;
+ return Objects.equals(prefixPathList, that.prefixPathList)
+ && Objects.equals(spaceLimit, that.spaceLimit);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), prefixPathList, spaceLimit);
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java
new file mode 100644
index 0000000000..de1a546282
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import
org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
+import org.apache.iotdb.confignode.manager.partition.PartitionManager;
+import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ClusterQuotaManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterQuotaManager.class);
+
+ private final IManager configManager;
+ private final QuotaInfo quotaInfo;
+ private final Map<Integer, Long> deviceNum;
+ private final Map<Integer, Long> timeSeriesNum;
+ private final Map<String, List<Integer>> schemaRegionIdMap;
+ private final Map<String, List<Integer>> dataRegionIdMap;
+ private final Map<Integer, Long> regionDisk;
+
+ public ClusterQuotaManager(IManager configManager, QuotaInfo quotaInfo) {
+ this.configManager = configManager;
+ this.quotaInfo = quotaInfo;
+ deviceNum = new ConcurrentHashMap<>();
+ timeSeriesNum = new ConcurrentHashMap<>();
+ schemaRegionIdMap = new HashMap<>();
+ dataRegionIdMap = new HashMap<>();
+ regionDisk = new ConcurrentHashMap<>();
+ }
+
+ public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) {
+ if (!checkSpaceQuota(req)) {
+ return RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
+ "The used quota exceeds the preset quota. Please set a larger
value.");
+ }
+ ConsensusWriteResponse response =
+ configManager
+ .getConsensusManager()
+ .write(new SetSpaceQuotaPlan(req.getDatabase(),
req.getSpaceLimit()));
+ if (response.getStatus() != null) {
+ if (response.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+ AsyncClientHandler<TSetSpaceQuotaReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.SET_SPACE_QUOTA, req,
dataNodeLocationMap);
+
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return
RpcUtils.squashResponseStatusList(clientHandler.getResponseList());
+ }
+ return response.getStatus();
+ } else {
+ LOGGER.warn(
+ "Unexpected error happened while setting space quota on {}: ",
+ req.getDatabase().toString(),
+ response.getException());
+ // consensus layer related errors
+ TSStatus res = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ res.setMessage(response.getErrorMessage());
+ return res;
+ }
+ }
+
+ /** If the new quota is smaller than the quota already used, the setting
fails. */
+ private boolean checkSpaceQuota(TSetSpaceQuotaReq req) {
+ for (String database : req.getDatabase()) {
+ if (quotaInfo.getSpaceQuotaLimit().containsKey(database)) {
+ TSpaceQuota spaceQuota = quotaInfo.getSpaceQuotaUsage().get(database);
+ if (spaceQuota.getDeviceNum() > req.getSpaceLimit().getDeviceNum()
+ || spaceQuota.getTimeserieNum() >
req.getSpaceLimit().getTimeserieNum()
+ || spaceQuota.getDiskSize() > req.getSpaceLimit().getDiskSize()) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ public TSpaceQuotaResp showSpaceQuota(List<String> databases) {
+ TSpaceQuotaResp showSpaceQuotaResp = new TSpaceQuotaResp();
+ if (databases.isEmpty()) {
+ showSpaceQuotaResp.setSpaceQuota(quotaInfo.getSpaceQuotaLimit());
+ showSpaceQuotaResp.setSpaceQuotaUsage(quotaInfo.getSpaceQuotaUsage());
+ } else if (!quotaInfo.getSpaceQuotaLimit().isEmpty()) {
+ Map<String, TSpaceQuota> spaceQuotaMap = new HashMap<>();
+ Map<String, TSpaceQuota> spaceQuotaUsageMap = new HashMap<>();
+ for (String database : databases) {
+ if (quotaInfo.getSpaceQuotaLimit().containsKey(database)) {
+ spaceQuotaMap.put(database,
quotaInfo.getSpaceQuotaLimit().get(database));
+ spaceQuotaUsageMap.put(database,
quotaInfo.getSpaceQuotaUsage().get(database));
+ }
+ }
+ showSpaceQuotaResp.setSpaceQuota(spaceQuotaMap);
+ showSpaceQuotaResp.setSpaceQuotaUsage(spaceQuotaUsageMap);
+ }
+
showSpaceQuotaResp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ return showSpaceQuotaResp;
+ }
+
+ public TSpaceQuotaResp getSpaceQuota() {
+ TSpaceQuotaResp spaceQuotaResp = new TSpaceQuotaResp();
+ if (!quotaInfo.getSpaceQuotaLimit().isEmpty()) {
+ spaceQuotaResp.setSpaceQuota(quotaInfo.getSpaceQuotaLimit());
+ }
+ spaceQuotaResp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ return spaceQuotaResp;
+ }
+
+ public boolean hasSpaceQuotaLimit() {
+ return quotaInfo.getSpaceQuotaLimit().keySet().isEmpty();
+ }
+
+ public List<Integer> getSchemaRegionIds() {
+ List<Integer> schemaRegionIds = new ArrayList<>();
+ getPartitionManager()
+ .getSchemaRegionIds(
+ new ArrayList<>(quotaInfo.getSpaceQuotaLimit().keySet()),
schemaRegionIdMap);
+ schemaRegionIdMap.values().forEach(schemaRegionIds::addAll);
+ return schemaRegionIds;
+ }
+
+ public List<Integer> getDataRegionIds() {
+ List<Integer> dataRegionIds = new ArrayList<>();
+ getPartitionManager()
+ .getDataRegionIds(
+ new ArrayList<>(quotaInfo.getSpaceQuotaLimit().keySet()),
dataRegionIdMap);
+ dataRegionIdMap.values().forEach(dataRegionIds::addAll);
+ return dataRegionIds;
+ }
+
+ public Map<String, TSpaceQuota> getSpaceQuotaUsage() {
+ return quotaInfo.getSpaceQuotaUsage();
+ }
+
+ public Map<Integer, Long> getDeviceNum() {
+ return deviceNum;
+ }
+
+ public Map<Integer, Long> getTimeSeriesNum() {
+ return timeSeriesNum;
+ }
+
+ public Map<Integer, Long> getRegionDisk() {
+ return regionDisk;
+ }
+
+ public void updateSpaceQuotaUsage() {
+ AtomicLong deviceCount = new AtomicLong();
+ AtomicLong timeSeriesCount = new AtomicLong();
+ for (Map.Entry<String, List<Integer>> entry :
schemaRegionIdMap.entrySet()) {
+ deviceCount.set(0);
+ timeSeriesCount.set(0);
+ entry
+ .getValue()
+ .forEach(
+ schemaRegionId -> {
+ if (deviceNum.containsKey(schemaRegionId)) {
+ deviceCount.addAndGet(deviceCount.get() +
deviceNum.get(schemaRegionId));
+ }
+ if (timeSeriesNum.containsKey(schemaRegionId)) {
+ timeSeriesCount.addAndGet(
+ timeSeriesCount.get() +
timeSeriesNum.get(schemaRegionId));
+ }
+ });
+
quotaInfo.getSpaceQuotaUsage().get(entry.getKey()).setDeviceNum(deviceCount.get());
+
quotaInfo.getSpaceQuotaUsage().get(entry.getKey()).setTimeserieNum(timeSeriesCount.get());
+ }
+ AtomicLong regionDiskCount = new AtomicLong();
+ for (Map.Entry<String, List<Integer>> entry : dataRegionIdMap.entrySet()) {
+ regionDiskCount.set(0);
+ entry
+ .getValue()
+ .forEach(
+ dataRegionId -> {
+ if (regionDisk.containsKey(dataRegionId)) {
+ regionDiskCount.addAndGet(regionDisk.get(dataRegionId));
+ }
+ });
+
quotaInfo.getSpaceQuotaUsage().get(entry.getKey()).setDiskSize(regionDiskCount.get());
+ }
+ }
+
+ private PartitionManager getPartitionManager() {
+ return configManager.getPartitionManager();
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 924cf9ef1e..41c755b65a 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.NodeType;
@@ -100,6 +101,7 @@ import
org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
+import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
import org.apache.iotdb.confignode.rpc.thrift.TClusterParameters;
@@ -161,6 +163,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowTrailReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowTrailResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
@@ -236,6 +239,9 @@ public class ConfigManager implements IManager {
/** Pipe */
private final PipeManager pipeManager;
+ /** Manage quotas */
+ private final ClusterQuotaManager clusterQuotaManager;
+
private final ConfigRegionStateMachine stateMachine;
private final RetryFailedTasksThread retryFailedTasksThread;
@@ -255,6 +261,7 @@ public class ConfigManager implements IManager {
CQInfo cqInfo = new CQInfo();
ModelInfo modelInfo = new ModelInfo();
PipeInfo pipeInfo = new PipeInfo();
+ QuotaInfo quotaInfo = new QuotaInfo();
// Build state machine and executor
ConfigPlanExecutor executor =
@@ -269,7 +276,8 @@ public class ConfigManager implements IManager {
syncInfo,
cqInfo,
modelInfo,
- pipeInfo);
+ pipeInfo,
+ quotaInfo);
this.stateMachine = new ConfigRegionStateMachine(this, executor);
// Build the manager module
@@ -287,6 +295,7 @@ public class ConfigManager implements IManager {
this.pipeManager = new PipeManager(this, pipeInfo);
this.retryFailedTasksThread = new RetryFailedTasksThread(this);
+ this.clusterQuotaManager = new ClusterQuotaManager(this, quotaInfo);
}
public void initConsensusManager() throws IOException {
@@ -1399,6 +1408,11 @@ public class ConfigManager implements IManager {
return cqManager;
}
+ @Override
+ public ClusterQuotaManager getClusterQuotaManager() {
+ return clusterQuotaManager;
+ }
+
@Override
public RetryFailedTasksThread getRetryFailedTasksThread() {
return retryFailedTasksThread;
@@ -1868,4 +1882,26 @@ public class ConfigManager implements IManager {
? modelManager.updateModelState(req)
: status;
}
+
+ @Override
+ public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) {
+ TSStatus status = confirmLeader();
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? clusterQuotaManager.setSpaceQuota(req)
+ : status;
+ }
+
+ public TSpaceQuotaResp showSpaceQuota(List<String> databases) {
+ TSStatus status = confirmLeader();
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? clusterQuotaManager.showSpaceQuota(databases)
+ : new TSpaceQuotaResp(status);
+ }
+
+ public TSpaceQuotaResp getSpaceQuota() {
+ TSStatus status = confirmLeader();
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? clusterQuotaManager.getSpaceQuota()
+ : new TSpaceQuotaResp(status);
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 75aaaf9a5b..3d6490db57 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
@@ -195,6 +196,13 @@ public interface IManager {
*/
PipeManager getPipeManager();
+ /**
+ * Get ClusterQuotaManager
+ *
+ * @return ClusterQuotaManager instance
+ */
+ ClusterQuotaManager getClusterQuotaManager();
+
/**
* Get RetryFailedTasksThread
*
@@ -654,4 +662,7 @@ public interface IManager {
/** Update the model state */
TSStatus updateModelState(TUpdateModelStateReq req);
+
+ /** Set space quota */
+ TSStatus setSpaceQuota(TSetSpaceQuotaReq req);
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index b502a25e3a..97125db05a 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -52,6 +52,7 @@ import
org.apache.iotdb.confignode.consensus.response.datanode.ConfigurationResp
import
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
import
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
import
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
+import org.apache.iotdb.confignode.manager.ClusterQuotaManager;
import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.IManager;
@@ -730,6 +731,11 @@ public class NodeManager {
/* Update heartbeat counter */
heartbeatCounter.getAndUpdate((x) -> (x + 1) % 10);
+ if (!getClusterQuotaManager().hasSpaceQuotaLimit()) {
+
heartbeatReq.setSchemaRegionIds(getClusterQuotaManager().getSchemaRegionIds());
+
heartbeatReq.setDataRegionIds(getClusterQuotaManager().getDataRegionIds());
+
heartbeatReq.setSpaceQuotaUsage(getClusterQuotaManager().getSpaceQuotaUsage());
+ }
return heartbeatReq;
}
@@ -750,7 +756,11 @@ public class NodeManager {
dataNodeInfo.getLocation().getDataNodeId(),
empty -> new DataNodeHeartbeatCache()),
getPartitionManager().getRegionGroupCacheMap(),
- getLoadManager().getRouteBalancer());
+ getLoadManager().getRouteBalancer(),
+ getClusterQuotaManager().getDeviceNum(),
+ getClusterQuotaManager().getTimeSeriesNum(),
+ getClusterQuotaManager().getRegionDisk());
+ getClusterQuotaManager().updateSpaceQuotaUsage();
AsyncDataNodeHeartbeatClientPool.getInstance()
.getDataNodeHeartBeat(
dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq,
handler);
@@ -990,4 +1000,8 @@ public class NodeManager {
private UDFManager getUDFManager() {
return configManager.getUDFManager();
}
+
+ private ClusterQuotaManager getClusterQuotaManager() {
+ return configManager.getClusterQuotaManager();
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 5e2ad788aa..223bee15e4 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -1168,6 +1168,15 @@ public class PartitionManager {
new RegionGroupCache(regionReplicaSet.getRegionId())));
}
+ public void getSchemaRegionIds(
+ List<String> databases, Map<String, List<Integer>> schemaRegionIds) {
+ partitionInfo.getSchemaRegionIds(databases, schemaRegionIds);
+ }
+
+ public void getDataRegionIds(List<String> databases, Map<String,
List<Integer>> dataRegionIds) {
+ partitionInfo.getDataRegionIds(databases, dataRegionIds);
+ }
+
public ScheduledExecutorService getRegionMaintainer() {
return regionMaintainer;
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 86910b222a..72ee52fde1 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -78,6 +78,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePip
import
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
import
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import
org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
import
org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan;
@@ -111,6 +112,7 @@ import org.apache.iotdb.confignode.persistence.cq.CQInfo;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
+import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
@@ -162,6 +164,8 @@ public class ConfigPlanExecutor {
private final PipeInfo pipeInfo;
+ private final QuotaInfo quotaInfo;
+
public ConfigPlanExecutor(
NodeInfo nodeInfo,
ClusterSchemaInfo clusterSchemaInfo,
@@ -173,7 +177,8 @@ public class ConfigPlanExecutor {
ClusterSyncInfo syncInfo,
CQInfo cqInfo,
ModelInfo modelInfo,
- PipeInfo pipeInfo) {
+ PipeInfo pipeInfo,
+ QuotaInfo quotaInfo) {
this.snapshotProcessorList = new ArrayList<>();
@@ -208,6 +213,9 @@ public class ConfigPlanExecutor {
this.snapshotProcessorList.add(pipeInfo);
this.procedureInfo = procedureInfo;
+
+ this.quotaInfo = quotaInfo;
+ this.snapshotProcessorList.add(quotaInfo);
}
public DataSet executeQueryPlan(ConfigPhysicalPlan req)
@@ -417,6 +425,8 @@ public class ConfigPlanExecutor {
return
pipeInfo.getPipePluginInfo().createPipePlugin((CreatePipePluginPlan)
physicalPlan);
case DropPipePlugin:
return
pipeInfo.getPipePluginInfo().dropPipePlugin((DropPipePluginPlan) physicalPlan);
+ case setSpaceQuota:
+ return quotaInfo.setSpaceQuota((SetSpaceQuotaPlan) physicalPlan);
default:
throw new UnknownPhysicalPlanTypeException(physicalPlan.getType());
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index 882cee0b14..b27a4b529f 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -504,6 +504,26 @@ public class DatabasePartitionTable {
return databaseName;
}
+ public List<Integer> getSchemaRegionIds() {
+ List<Integer> schemaRegionIds = new ArrayList<>();
+ for (TConsensusGroupId consensusGroupId : regionGroupMap.keySet()) {
+ if (consensusGroupId.getType().equals(TConsensusGroupType.SchemaRegion))
{
+ schemaRegionIds.add(consensusGroupId.getId());
+ }
+ }
+ return schemaRegionIds;
+ }
+
+ public List<Integer> getDataRegionIds() {
+ List<Integer> dataRegionIds = new ArrayList<>();
+ for (TConsensusGroupId consensusGroupId : regionGroupMap.keySet()) {
+ if (consensusGroupId.getType().equals(TConsensusGroupType.DataRegion)) {
+ dataRegionIds.add(consensusGroupId.getId());
+ }
+ }
+ return dataRegionIds;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 70b0679e3c..b19ac834b3 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -823,6 +823,23 @@ public class PartitionInfo implements SnapshotProcessor {
sgPartitionTable.getSeriesSlotList(plan.getPartitionType()));
}
+ public void getSchemaRegionIds(
+ List<String> databases, Map<String, List<Integer>> schemaRegionIds) {
+ for (String database : databases) {
+ if (databasePartitionTables.containsKey(database)) {
+ schemaRegionIds.put(database,
databasePartitionTables.get(database).getSchemaRegionIds());
+ }
+ }
+ }
+
+ public void getDataRegionIds(List<String> databases, Map<String,
List<Integer>> dataRegionIds) {
+ for (String database : databases) {
+ if (databasePartitionTables.containsKey(database)) {
+ dataRegionIds.put(database,
databasePartitionTables.get(database).getDataRegionIds());
+ }
+ }
+ }
+
public void clear() {
nextRegionGroupId.set(-1);
databasePartitionTables.clear();
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/quota/QuotaInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/quota/QuotaInfo.java
new file mode 100644
index 0000000000..c4ad43aca2
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/quota/QuotaInfo.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence.quota;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import
org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class QuotaInfo implements SnapshotProcessor {
+
+ private static final Logger logger =
LoggerFactory.getLogger(QuotaInfo.class);
+
+ private final ReentrantReadWriteLock spaceQuotaReadWriteLock;
+ private final Map<String, TSpaceQuota> spaceQuotaLimit;
+ private final Map<String, TSpaceQuota> spaceQuotaUsage;
+
+ private final String snapshotFileName = "quota_info.bin";
+
+ public QuotaInfo() {
+ spaceQuotaReadWriteLock = new ReentrantReadWriteLock();
+ spaceQuotaLimit = new HashMap<>();
+ spaceQuotaUsage = new HashMap<>();
+ }
+
+ public TSStatus setSpaceQuota(SetSpaceQuotaPlan setSpaceQuotaPlan) {
+ for (String database : setSpaceQuotaPlan.getPrefixPathList()) {
+ TSpaceQuota spaceQuota = setSpaceQuotaPlan.getSpaceLimit();
+ // “0” means that the user has not reset the value of the space quota
type
+ // So the old values are still used
+ if (spaceQuotaLimit.containsKey(database)) {
+ if (spaceQuota.getDeviceNum() == 0) {
+
spaceQuota.setDeviceNum(spaceQuotaLimit.get(database).getDeviceNum());
+ }
+ if (spaceQuota.getTimeserieNum() == 0) {
+
spaceQuota.setTimeserieNum(spaceQuotaLimit.get(database).getTimeserieNum());
+ }
+ if (spaceQuota.getDiskSize() == 0) {
+ spaceQuota.setDiskSize(spaceQuotaLimit.get(database).getDiskSize());
+ }
+ if (spaceQuota.getDeviceNum() == -1) {
+ spaceQuota.setDeviceNum(0);
+ }
+ if (spaceQuota.getTimeserieNum() == -1) {
+ spaceQuota.setTimeserieNum(0);
+ }
+ if (spaceQuota.getDiskSize() == -1) {
+ spaceQuota.setDiskSize(0);
+ }
+ }
+ if (!spaceQuotaUsage.containsKey(database)) {
+ spaceQuotaUsage.put(database, new TSpaceQuota());
+ }
+ spaceQuotaLimit.put(database, spaceQuota);
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ public Map<String, TSpaceQuota> getSpaceQuotaLimit() {
+ return spaceQuotaLimit;
+ }
+
+ @Override
+ public boolean processTakeSnapshot(File snapshotDir) throws TException,
IOException {
+ File snapshotFile = new File(snapshotDir, snapshotFileName);
+ if (snapshotFile.exists() && snapshotFile.isFile()) {
+ logger.error(
+ "Failed to take snapshot, because snapshot file [{}] is already
exist.",
+ snapshotFile.getAbsolutePath());
+ return false;
+ }
+
+ spaceQuotaReadWriteLock.writeLock().lock();
+ try (FileOutputStream fileOutputStream = new
FileOutputStream(snapshotFile)) {
+ serializeSpaceQuotaLimit(fileOutputStream);
+ } finally {
+ spaceQuotaReadWriteLock.writeLock().unlock();
+ }
+ return true;
+ }
+
+ private void serializeSpaceQuotaLimit(FileOutputStream fileOutputStream)
throws IOException {
+ ReadWriteIOUtils.write(spaceQuotaLimit.size(), fileOutputStream);
+ for (Map.Entry<String, TSpaceQuota> spaceQuotaEntry :
spaceQuotaLimit.entrySet()) {
+ ReadWriteIOUtils.write(spaceQuotaEntry.getKey(), fileOutputStream);
+ ReadWriteIOUtils.write(spaceQuotaEntry.getValue().getDeviceNum(),
fileOutputStream);
+ ReadWriteIOUtils.write(spaceQuotaEntry.getValue().getTimeserieNum(),
fileOutputStream);
+ ReadWriteIOUtils.write(spaceQuotaEntry.getValue().getDiskSize(),
fileOutputStream);
+ }
+ }
+
+ @Override
+ public void processLoadSnapshot(File snapshotDir) throws TException,
IOException {
+ File snapshotFile = new File(snapshotDir, snapshotFileName);
+ if (!snapshotFile.exists() || !snapshotFile.isFile()) {
+ logger.error(
+ "Failed to load snapshot,snapshot file [{}] is not exist.",
+ snapshotFile.getAbsolutePath());
+ return;
+ }
+ spaceQuotaReadWriteLock.writeLock().lock();
+ try (FileInputStream fileInputStream = new FileInputStream(snapshotFile)) {
+ clear();
+ deserializeSpaceQuotaLimit(fileInputStream);
+ } finally {
+ spaceQuotaReadWriteLock.writeLock().unlock();
+ }
+ }
+
+ private void deserializeSpaceQuotaLimit(FileInputStream fileInputStream)
throws IOException {
+ int size = ReadWriteIOUtils.readInt(fileInputStream);
+ while (size > 0) {
+ String path = ReadWriteIOUtils.readString(fileInputStream);
+ TSpaceQuota spaceQuota = new TSpaceQuota();
+ spaceQuota.setDeviceNum(ReadWriteIOUtils.readLong(fileInputStream));
+ spaceQuota.setTimeserieNum(ReadWriteIOUtils.readLong(fileInputStream));
+ spaceQuota.setDiskSize(ReadWriteIOUtils.readLong(fileInputStream));
+ spaceQuotaLimit.put(path, spaceQuota);
+ size--;
+ }
+ }
+
+ public Map<String, TSpaceQuota> getSpaceQuotaUsage() {
+ return spaceQuotaUsage;
+ }
+
+ public void clear() {
+ spaceQuotaLimit.clear();
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index da9a7cd5e6..1607250ac8 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -151,6 +152,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowTrailReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowTrailResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
@@ -970,4 +972,19 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
public TSStatus updateModelState(TUpdateModelStateReq req) throws TException
{
return configManager.updateModelState(req);
}
+
+ @Override
+ public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) throws TException {
+ return configManager.setSpaceQuota(req);
+ }
+
+ @Override
+ public TSpaceQuotaResp showSpaceQuota(List<String> databases) throws
TException {
+ return configManager.showSpaceQuota(databases);
+ }
+
+ @Override
+ public TSpaceQuotaResp getSpaceQuota() throws TException {
+ return configManager.getSpaceQuota();
+ }
}
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 0441bfa2ef..ed28dc7889 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TNodeResource;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
@@ -83,6 +84,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataP
import
org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
import
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import
org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
import
org.apache.iotdb.confignode.consensus.request.write.region.PollRegionMaintainTaskPlan;
@@ -1350,4 +1352,18 @@ public class ConfigPhysicalPlanSerDeTest {
ConfigPhysicalPlan.Factory.create(plan.serializeToByteBuffer());
Assert.assertEquals(deserializedPlan.getRegionIdSet(), regionIdSet);
}
+
+ @Test
+ public void setSpaceQuotaPlanTest() throws IOException {
+ TSpaceQuota spaceQuota = new TSpaceQuota();
+ spaceQuota.setDeviceNum(2);
+ spaceQuota.setTimeserieNum(3);
+ spaceQuota.setDiskSize(1024);
+ SetSpaceQuotaPlan plan =
+ new SetSpaceQuotaPlan(Collections.singletonList("root.sg"),
spaceQuota);
+ SetSpaceQuotaPlan deserializedPlan =
+ (SetSpaceQuotaPlan)
ConfigPhysicalPlan.Factory.create(plan.serializeToByteBuffer());
+ Assert.assertEquals(plan.getPrefixPathList(),
deserializedPlan.getPrefixPathList());
+ Assert.assertEquals(plan.getSpaceLimit(),
deserializedPlan.getSpaceLimit());
+ }
}
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/QuotaInfoTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/QuotaInfoTest.java
new file mode 100644
index 0000000000..075237e61d
--- /dev/null
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/QuotaInfoTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
+import
org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
+import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.iotdb.db.constant.TestConstant.BASE_OUTPUT_PATH;
+
+public class QuotaInfoTest {
+
+ private QuotaInfo quotaInfo;
+ private static final File snapshotDir = new File(BASE_OUTPUT_PATH,
"snapshot");
+
+ @Before
+ public void setup() throws IOException {
+ quotaInfo = new QuotaInfo();
+ if (!snapshotDir.exists()) {
+ snapshotDir.mkdirs();
+ }
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ if (snapshotDir.exists()) {
+ FileUtils.deleteDirectory(snapshotDir);
+ }
+ }
+
+ private void prepareQuotaInfo() {
+ List<String> prefixPathList = new ArrayList<>();
+ prefixPathList.add("root.sg");
+ prefixPathList.add("root.ln");
+ TSpaceQuota spaceQuota = new TSpaceQuota();
+ spaceQuota.setTimeserieNum(10000);
+ spaceQuota.setDeviceNum(100);
+ spaceQuota.setDiskSize(512);
+ SetSpaceQuotaPlan setSpaceQuotaPlan = new
SetSpaceQuotaPlan(prefixPathList, spaceQuota);
+ quotaInfo.setSpaceQuota(setSpaceQuotaPlan);
+ }
+
+ @Test
+ public void testSnapshot() throws TException, IOException {
+ prepareQuotaInfo();
+
+ quotaInfo.processTakeSnapshot(snapshotDir);
+ QuotaInfo quotaInfo2 = new QuotaInfo();
+ quotaInfo2.processLoadSnapshot(snapshotDir);
+
+ Assert.assertEquals(quotaInfo.getSpaceQuotaLimit(),
quotaInfo2.getSpaceQuotaLimit());
+ }
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index bc6e07c36b..ba148179d2 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -126,6 +126,9 @@ public class CommonConfig {
/** Ip and port of target ML node. */
private TEndPoint targetMLNodeEndPoint = new TEndPoint("127.0.0.1", 10810);
+ /** multi-tenancy */
+ private boolean quotaEnable = false;
+
CommonConfig() {}
public void updatePath(String homeDir) {
@@ -370,4 +373,12 @@ public class CommonConfig {
public void setStopping(boolean stopping) {
isStopping = stopping;
}
+
+ public boolean isQuotaEnable() {
+ return quotaEnable;
+ }
+
+ public void setQuotaEnable(boolean quotaEnable) {
+ this.quotaEnable = quotaEnable;
+ }
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 5aa42bbe5f..64f21944d7 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -187,6 +187,9 @@ public class CommonDescriptor {
"disk_space_warning_threshold",
String.valueOf(config.getDiskSpaceWarningThreshold()))
.trim()));
+ config.setQuotaEnable(
+ Boolean.parseBoolean(
+ properties.getProperty("quota_enable",
String.valueOf(config.isQuotaEnable()))));
String endPointUrl =
properties.getProperty(
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 37321694d2..fd1c98e8c4 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -278,6 +278,10 @@ public class IoTDBConstant {
public static final String IOTDB_FOREGROUND = "iotdb-foreground";
public static final String IOTDB_PIDFILE = "iotdb-pidfile";
+ // quota
+ public static final String SPACE_QUOTA_DISK = "disk";
+ public static final String SPACE_QUOTA_UNLIMITED = "unlimited";
+
// client version number
public enum ClientVersion {
V_0_12,
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/enums/SpaceQuotaType.java
b/node-commons/src/main/java/org/apache/iotdb/commons/enums/SpaceQuotaType.java
new file mode 100644
index 0000000000..bfbb6562a4
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/enums/SpaceQuotaType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.enums;
+
+public enum SpaceQuotaType {
+ diskSize,
+ deviceNum,
+ timeSeriesNum
+}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/utils/BasicStructureSerDeUtil.java
b/node-commons/src/main/java/org/apache/iotdb/commons/utils/BasicStructureSerDeUtil.java
index efb6796b80..7d6ff521a8 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/utils/BasicStructureSerDeUtil.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/utils/BasicStructureSerDeUtil.java
@@ -29,6 +29,7 @@ import java.util.Map;
public class BasicStructureSerDeUtil {
public static final int INT_LEN = 4;
+ public static final int LONG_LEN = 8;
private BasicStructureSerDeUtil() {}
@@ -63,6 +64,11 @@ public class BasicStructureSerDeUtil {
return buffer.getInt();
}
+ /** read a long var from byteBuffer. */
+ public static long readLong(ByteBuffer buffer) {
+ return buffer.getLong();
+ }
+
/**
* write string to byteBuffer.
*
@@ -125,6 +131,16 @@ public class BasicStructureSerDeUtil {
return INT_LEN;
}
+ /**
+ * write a long n to dataOutputStream.
+ *
+ * @return The number of bytes used to represent n.
+ */
+ public static int write(long n, DataOutputStream stream) throws IOException {
+ stream.writeLong(n);
+ return LONG_LEN;
+ }
+
/**
* write an int n to dataOutputStream.
*
diff --git
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index 69aec0c956..0d853b2667 100644
---
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -1534,6 +1534,16 @@ public class RSchemaRegion implements ISchemaRegion {
throw new UnsupportedOperationException();
}
+ @Override
+ public long countDeviceNumBySchemaRegion() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long countTimeSeriesNumBySchemaRegion() throws MetadataException {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public String toString() {
return String.format("database:[%s]", storageGroupFullPath);
diff --git
a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
index 75b73a4d9e..9ac8eb1812 100644
---
a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
+++
b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
@@ -583,6 +583,16 @@ public class TagSchemaRegion implements ISchemaRegion {
throw new UnsupportedOperationException();
}
+ @Override
+ public long countDeviceNumBySchemaRegion() throws MetadataException {
+ throw new UnsupportedOperationException("countDeviceNumBySchemaRegion");
+ }
+
+ @Override
+ public long countTimeSeriesNumBySchemaRegion() throws MetadataException {
+ throw new
UnsupportedOperationException("countTimeSeriesNumBySchemaRegion");
+ }
+
@Override
public String toString() {
return "TagSchemaRegion{"
diff --git
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index ed6865a302..53b8c99439 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.ThriftClient;
@@ -117,6 +118,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowTrailReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowTrailResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
@@ -2073,6 +2075,54 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
throw new TException(new UnsupportedOperationException().getCause());
}
+ @Override
+ public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.setSpaceQuota(req);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ waitAndReconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
+ @Override
+ public TSpaceQuotaResp showSpaceQuota(List<String> databases) throws
TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSpaceQuotaResp resp = client.showSpaceQuota(databases);
+ if (!updateConfigNodeLeader(resp.getStatus())) {
+ return resp;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ waitAndReconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
+ @Override
+ public TSpaceQuotaResp getSpaceQuota() throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSpaceQuotaResp resp = client.getSpaceQuota();
+ if (!updateConfigNodeLeader(resp.getStatus())) {
+ return resp;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ waitAndReconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
public static class Factory extends ThriftClientFactory<ConfigRegionId,
ConfigNodeClient> {
public Factory(
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index ae3f0b2af7..f92c093ddf 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1066,6 +1066,8 @@ public class IoTDBConfig {
/** The maximum number of threads that can be used to execute subtasks in
PipeSubtaskExecutor */
private int pipeMaxThreadNum = 5;
+ private boolean quotaEnable = false;
+
IoTDBConfig() {}
public float getUdfMemoryBudgetInMB() {
@@ -3696,4 +3698,12 @@ public class IoTDBConfig {
public int getPipeSubtaskExecutorMaxThreadNum() {
return pipeMaxThreadNum;
}
+
+ public boolean isQuotaEnable() {
+ return quotaEnable;
+ }
+
+ public void setQuotaEnable(boolean quotaEnable) {
+ this.quotaEnable = quotaEnable;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 3dc79cbb59..c1365a7685 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1030,6 +1030,10 @@ public class IoTDBDescriptor {
conf.setTimePartitionInterval(
DateTimeUtils.convertMilliTimeWithPrecision(
conf.getTimePartitionInterval(), conf.getTimestampPrecision()));
+
+ conf.setQuotaEnable(
+ Boolean.parseBoolean(
+ properties.getProperty("quota_enable",
String.valueOf(conf.isQuotaEnable()))));
}
private void loadAuthorCache(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index a542d8ac33..6ebfff6ffc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -839,6 +839,16 @@ public class StorageEngine implements IService {
}
}
+ public void getDiskSizeByDataRegion(
+ Map<Integer, Long> dataRegionDisk, List<Integer> dataRegionIds) {
+ dataRegionMap.forEach(
+ (dataRegionId, dataRegion) -> {
+ if (dataRegionIds.contains(dataRegionId.getId())) {
+ dataRegionDisk.put(dataRegionId.getId(),
dataRegion.countRegionDiskSize());
+ }
+ });
+ }
+
static class InstanceHolder {
private static final StorageEngine INSTANCE = new StorageEngine();
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 7aa38a3a68..2ed0072e7a 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.quota.ExceedQuotaException;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.metadata.idtable.IDTableManager;
@@ -81,6 +82,7 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDevic
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.quotas.DataNodeSpaceQuotaManager;
import org.apache.iotdb.db.rescon.TsFileResourceManager;
import org.apache.iotdb.db.service.SettleService;
import org.apache.iotdb.db.sync.SyncService;
@@ -132,6 +134,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -1253,6 +1256,13 @@ public class DataRegion implements IDataRegionForQuery {
int retryCnt = 0;
do {
try {
+ if
(!DataNodeSpaceQuotaManager.getInstance().checkRegionDisk(databaseName)) {
+ throw new ExceedQuotaException(
+ "Unable to continue writing data, because the space allocated to
the database "
+ + databaseName
+ + " has already used the upper limit",
+ TSStatusCode.EXCEED_QUOTA_ERROR.getStatusCode());
+ }
if (sequence) {
tsFileProcessor =
getOrCreateTsFileProcessorIntern(timeRangeId,
workSequenceTsFileProcessors, true);
@@ -1276,6 +1286,9 @@ public class DataRegion implements IDataRegionForQuery {
CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
break;
}
+ } catch (ExceedQuotaException e) {
+ logger.error(e.getMessage());
+ break;
}
} while (tsFileProcessor == null);
return tsFileProcessor;
@@ -3211,6 +3224,38 @@ public class DataRegion implements IDataRegionForQuery {
}
}
+ /** @return the disk space occupied by this data region, unit is MB */
+ public long countRegionDiskSize() {
+ AtomicLong diskSize = new AtomicLong(0);
+ DirectoryManager.getInstance()
+ .getAllFilesFolders()
+ .forEach(
+ folder -> {
+ folder = folder + File.separator + databaseName + File.separator
+ dataRegionId;
+ countFolderDiskSize(folder, diskSize);
+ });
+ return diskSize.get() / 1024 / 1024;
+ }
+
+ /**
+ * @param folder the folder's path
+ * @param diskSize the disk space occupied by this folder, unit is MB
+ */
+ private void countFolderDiskSize(String folder, AtomicLong diskSize) {
+ File file = new File(folder);
+ File[] allFile = file.listFiles();
+ if (allFile == null) {
+ return;
+ }
+ for (File f : allFile) {
+ if (f.isFile()) {
+ diskSize.addAndGet(f.length());
+ } else if (f.isDirectory()) {
+ countFolderDiskSize(f.getAbsolutePath(), diskSize);
+ }
+ }
+ }
+
@TestOnly
public long getPartitionMaxFileVersions(long partitionId) {
return partitionMaxFileVersions.getOrDefault(partitionId, 0L);
diff --git
a/server/src/main/java/org/apache/iotdb/db/exception/quota/ExceedQuotaException.java
b/server/src/main/java/org/apache/iotdb/db/exception/quota/ExceedQuotaException.java
new file mode 100644
index 0000000000..60872e0916
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/exception/quota/ExceedQuotaException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception.quota;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+
+public class ExceedQuotaException extends MetadataException {
+
+ public ExceedQuotaException(String message, int errorCode) {
+ super(message, errorCode);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index 9d03f3075e..c3f3448654 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -38,6 +38,7 @@ import
org.apache.iotdb.db.exception.metadata.PathNotExistException;
import
org.apache.iotdb.db.exception.metadata.template.DifferentTemplateException;
import
org.apache.iotdb.db.exception.metadata.template.TemplateImcompatibeException;
import
org.apache.iotdb.db.exception.metadata.template.TemplateIsInUseException;
+import org.apache.iotdb.db.exception.quota.ExceedQuotaException;
import org.apache.iotdb.db.metadata.MetadataConstant;
import org.apache.iotdb.db.metadata.mnode.mem.IMemMNode;
import org.apache.iotdb.db.metadata.mnode.mem.factory.MemMNodeFactory;
@@ -63,6 +64,8 @@ import
org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.utils.MetaFormatUtils;
+import org.apache.iotdb.db.quotas.DataNodeSpaceQuotaManager;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -371,13 +374,18 @@ public class MTreeBelowSGMemoryImpl {
}
private IMemMNode checkAndAutoCreateDeviceNode(String deviceName, IMemMNode
deviceParent)
- throws PathAlreadyExistException {
+ throws PathAlreadyExistException, ExceedQuotaException {
if (deviceParent == null) {
// device is sg
return storageGroupMNode;
}
IMemMNode device = store.getChild(deviceParent, deviceName);
if (device == null) {
+ if
(!DataNodeSpaceQuotaManager.getInstance().checkDeviceLimit(storageGroupMNode.getName()))
{
+ throw new ExceedQuotaException(
+ "The number of devices has reached the upper limit",
+ TSStatusCode.EXCEED_QUOTA_ERROR.getStatusCode());
+ }
device =
store.addChild(
deviceParent, deviceName,
nodeFactory.createInternalMNode(deviceParent, deviceName));
@@ -430,6 +438,14 @@ public class MTreeBelowSGMemoryImpl {
new AliasAlreadyExistException(
devicePath.getFullPath() + "." + measurementList.get(i),
aliasList.get(i)));
}
+ if (!DataNodeSpaceQuotaManager.getInstance()
+ .checkTimeSeriesNum(storageGroupMNode.getName())) {
+ failingMeasurementMap.put(
+ i,
+ new ExceedQuotaException(
+ "The number of timeSeries has reached the upper limit",
+ TSStatusCode.EXCEED_QUOTA_ERROR.getStatusCode()));
+ }
}
return failingMeasurementMap;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index 9e9d8922f2..9db4a6f5d3 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -277,4 +277,9 @@ public interface ISchemaRegion {
throws MetadataException;
// endregion
+
+ // count
+ long countDeviceNumBySchemaRegion() throws MetadataException;
+
+ long countTimeSeriesNumBySchemaRegion() throws MetadataException;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
index 20c90e66d1..c2a31d3bd6 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
@@ -343,6 +344,35 @@ public class SchemaEngine {
return schemaRegionMap == null ? 0 : schemaRegionMap.size();
}
+ public Map<Integer, Long> countDeviceNumBySchemaRegion(List<Integer>
schemaIds) {
+ Map<Integer, Long> deviceNum = new HashMap<>();
+ try {
+ for (Map.Entry<SchemaRegionId, ISchemaRegion> entry :
schemaRegionMap.entrySet()) {
+ if (schemaIds.contains(entry.getKey().getId())) {
+ deviceNum.put(entry.getKey().getId(),
entry.getValue().countDeviceNumBySchemaRegion());
+ }
+ }
+ } catch (MetadataException e) {
+ // no
+ }
+ return deviceNum;
+ }
+
+ public Map<Integer, Long> countTimeSeriesNumBySchemaRegion(List<Integer>
schemaIds) {
+ Map<Integer, Long> timeSeriesNum = new HashMap<>();
+ try {
+ for (Map.Entry<SchemaRegionId, ISchemaRegion> entry :
schemaRegionMap.entrySet()) {
+ if (schemaIds.contains(entry.getKey().getId())) {
+ timeSeriesNum.put(
+ entry.getKey().getId(),
entry.getValue().countTimeSeriesNumBySchemaRegion());
+ }
+ }
+ } catch (MetadataException e) {
+ // no
+ }
+ return timeSeriesNum;
+ }
+
@TestOnly
public ISchemaEngineStatistics getSchemaEngineStatistics() {
return schemaEngineStatistics;
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index b9bec0d522..45d88abc19 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.metadata.schemaregion;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
@@ -49,6 +50,7 @@ import
org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
import
org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanDeserializer;
import
org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanSerializer;
+import
org.apache.iotdb.db.metadata.plan.schemaregion.impl.read.SchemaRegionReadPlanFactory;
import
org.apache.iotdb.db.metadata.plan.schemaregion.impl.write.SchemaRegionWritePlanFactory;
import org.apache.iotdb.db.metadata.plan.schemaregion.read.IShowDevicesPlan;
import org.apache.iotdb.db.metadata.plan.schemaregion.read.IShowNodesPlan;
@@ -85,6 +87,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -1163,6 +1166,48 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
return mtree.getNodeReader(showNodesPlan);
}
+ @Override
+ public long countDeviceNumBySchemaRegion() throws MetadataException {
+ ISchemaReader<IDeviceSchemaInfo> deviceReader =
+ this.getDeviceReader(
+ SchemaRegionReadPlanFactory.getShowDevicesPlan(
+ new PartialPath(
+ IoTDBConstant.PATH_ROOT
+ + IoTDBConstant.PATH_SEPARATOR
+ + IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD),
+ false));
+ long count = 0;
+ while (deviceReader.hasNext()) {
+ deviceReader.next();
+ count++;
+ }
+ return count;
+ }
+
+ @Override
+ public long countTimeSeriesNumBySchemaRegion() throws MetadataException {
+ ISchemaReader<ITimeSeriesSchemaInfo> timeSeriesReader =
+ this.getTimeSeriesReader(
+ SchemaRegionReadPlanFactory.getShowTimeSeriesPlan(
+ new PartialPath(
+ IoTDBConstant.PATH_ROOT
+ + IoTDBConstant.PATH_SEPARATOR
+ + IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD),
+ new HashMap<>(),
+ false,
+ null,
+ null,
+ 0,
+ 0,
+ false));
+ long count = 0;
+ while (timeSeriesReader.hasNext()) {
+ timeSeriesReader.next();
+ count++;
+ }
+ return count;
+ }
+
// endregion
private static class RecoverOperationResult {
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 7f100b0b6b..1768ba2713 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.metadata.schemaregion;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.file.SystemFileFactory;
@@ -52,6 +53,7 @@ import
org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
import
org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanDeserializer;
import
org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanSerializer;
+import
org.apache.iotdb.db.metadata.plan.schemaregion.impl.read.SchemaRegionReadPlanFactory;
import
org.apache.iotdb.db.metadata.plan.schemaregion.impl.write.SchemaRegionWritePlanFactory;
import org.apache.iotdb.db.metadata.plan.schemaregion.read.IShowDevicesPlan;
import org.apache.iotdb.db.metadata.plan.schemaregion.read.IShowNodesPlan;
@@ -90,6 +92,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -1322,6 +1325,48 @@ public class SchemaRegionSchemaFileImpl implements
ISchemaRegion {
}
// endregion
+ @Override
+ public long countDeviceNumBySchemaRegion() throws MetadataException {
+ ISchemaReader<IDeviceSchemaInfo> deviceReader =
+ this.getDeviceReader(
+ SchemaRegionReadPlanFactory.getShowDevicesPlan(
+ new PartialPath(
+ IoTDBConstant.PATH_ROOT
+ + IoTDBConstant.PATH_SEPARATOR
+ + IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD),
+ false));
+ long count = 0;
+ while (deviceReader.hasNext()) {
+ deviceReader.next();
+ count++;
+ }
+ return count;
+ }
+
+ @Override
+ public long countTimeSeriesNumBySchemaRegion() throws MetadataException {
+ ISchemaReader<ITimeSeriesSchemaInfo> timeSeriesReader =
+ this.getTimeSeriesReader(
+ SchemaRegionReadPlanFactory.getShowTimeSeriesPlan(
+ new PartialPath(
+ IoTDBConstant.PATH_ROOT
+ + IoTDBConstant.PATH_SEPARATOR
+ + IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD),
+ new HashMap<>(),
+ false,
+ null,
+ null,
+ 0,
+ 0,
+ false));
+ long count = 0;
+ while (timeSeriesReader.hasNext()) {
+ timeSeriesReader.next();
+ count++;
+ }
+ return count;
+ }
+
private static class RecoverOperationResult {
private static final RecoverOperationResult SUCCESS = new
RecoverOperationResult(null);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
index ded93fc3df..213778db8a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -161,6 +161,11 @@ public class ColumnHeaderConstant {
public static final String ELAPSED_TIME = "ElapsedTime";
public static final String STATEMENT = "Statement";
+ // column names for quotas
+ public static final String QUOTA_TYPE = "quotaType";
+ public static final String LIMIT = "limit";
+ public static final String USED = "used";
+
// column names for show models/trails
public static final String MODEL_ID = "ModelId";
public static final String TRAIL_ID = "TrailId";
@@ -405,6 +410,13 @@ public class ColumnHeaderConstant {
new ColumnHeader(ELAPSED_TIME, TSDataType.FLOAT),
new ColumnHeader(STATEMENT, TSDataType.TEXT));
+ public static final List<ColumnHeader> showSpaceQuotaColumnHeaders =
+ ImmutableList.of(
+ new ColumnHeader(DATABASE, TSDataType.TEXT),
+ new ColumnHeader(QUOTA_TYPE, TSDataType.TEXT),
+ new ColumnHeader(LIMIT, TSDataType.TEXT),
+ new ColumnHeader(USED, TSDataType.TEXT));
+
public static final List<ColumnHeader> showModelsColumnHeaders =
ImmutableList.of(
new ColumnHeader(MODEL_ID, TSDataType.TEXT),
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
index c7b07431d0..2896be7dd3 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
@@ -169,6 +169,10 @@ public class DatasetHeaderFactory {
return new DatasetHeader(ColumnHeaderConstant.showQueriesColumnHeaders,
false);
}
+ public static DatasetHeader getShowSpaceQuotaHeader() {
+ return new DatasetHeader(ColumnHeaderConstant.showSpaceQuotaColumnHeaders,
true);
+ }
+
public static DatasetHeader getShowModelsHeader() {
return new DatasetHeader(ColumnHeaderConstant.showModelsColumnHeaders,
true);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index 81c33c845c..20bf24ce4a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -73,6 +73,8 @@ import
org.apache.iotdb.db.mpp.plan.execution.config.sys.pipe.DropPipeTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.pipe.ShowPipeTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.pipe.StartPipeTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.pipe.StopPipeTask;
+import
org.apache.iotdb.db.mpp.plan.execution.config.sys.quota.SetSpaceQuotaTask;
+import
org.apache.iotdb.db.mpp.plan.execution.config.sys.quota.ShowSpaceQuotaTask;
import
org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.CreatePipeSinkTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.DropPipeSinkTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeSinkTask;
@@ -132,6 +134,8 @@ import
org.apache.iotdb.db.mpp.plan.statement.sys.pipe.DropPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StartPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StopPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
+import
org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
@@ -456,6 +460,18 @@ public class ConfigTaskVisitor
return new ShowContinuousQueriesTask();
}
+ @Override
+ public IConfigTask visitSetSpaceQuota(
+ SetSpaceQuotaStatement setSpaceQuotaStatement, TaskContext context) {
+ return new SetSpaceQuotaTask(setSpaceQuotaStatement);
+ }
+
+ @Override
+ public IConfigTask visitShowSpaceQuota(
+ ShowSpaceQuotaStatement showSpaceQuotaStatement, TaskContext context) {
+ return new ShowSpaceQuotaTask(showSpaceQuotaStatement);
+ }
+
/** ML Model Management */
@Override
public IConfigTask visitCreateModel(
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 53567fc601..e59005b75f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -21,7 +21,9 @@ package
org.apache.iotdb.db.mpp.plan.execution.config.executor;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.cluster.NodeStatus;
@@ -84,6 +86,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowTrailReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowTrailResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeClientManager;
@@ -120,6 +123,7 @@ import
org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowNodes
import
org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowPathSetTemplateTask;
import
org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowSchemaTemplateTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.pipe.ShowPipeTask;
+import
org.apache.iotdb.db.mpp.plan.execution.config.sys.quota.ShowSpaceQuotaTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeSinkTask;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountDatabaseStatement;
@@ -155,6 +159,8 @@ import
org.apache.iotdb.db.mpp.plan.statement.sys.pipe.DropPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StartPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StopPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
+import
org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
@@ -1814,6 +1820,33 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
return future;
}
+ @Override
+ public SettableFuture<ConfigTaskResult> setSpaceQuota(
+ SetSpaceQuotaStatement setSpaceQuotaStatement) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ TSStatus tsStatus = new TSStatus();
+ TSetSpaceQuotaReq req = new TSetSpaceQuotaReq();
+ req.setDatabase(setSpaceQuotaStatement.getPrefixPathList());
+ TSpaceQuota spaceQuota = new TSpaceQuota();
+ spaceQuota.setDeviceNum(setSpaceQuotaStatement.getDeviceNum());
+ spaceQuota.setTimeserieNum(setSpaceQuotaStatement.getTimeSeriesNum());
+ spaceQuota.setDiskSize(setSpaceQuotaStatement.getDiskSize());
+ req.setSpaceLimit(spaceQuota);
+ try (ConfigNodeClient client =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ // Send request to some API server
+ tsStatus = client.setSpaceQuota(req);
+ } catch (Exception e) {
+ future.setException(e);
+ }
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } else {
+ future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
+ }
+ return future;
+ }
+
@Override
public SettableFuture<ConfigTaskResult> createModel(CreateModelStatement
createModelStatement) {
createModelStatement.semanticCheck();
@@ -1856,6 +1889,29 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
return future;
}
+ @Override
+ public SettableFuture<ConfigTaskResult> showSpaceQuota(
+ ShowSpaceQuotaStatement showSpaceQuotaStatement) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+
+ try (ConfigNodeClient configNodeClient =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ List<String> databases = new ArrayList<>();
+ if (showSpaceQuotaStatement.getDatabases() != null) {
+ showSpaceQuotaStatement
+ .getDatabases()
+ .forEach(database -> databases.add(database.toString()));
+ }
+ // Send request to some API server
+ TSpaceQuotaResp showSpaceQuotaResp =
configNodeClient.showSpaceQuota(databases);
+ // build TSBlock
+ ShowSpaceQuotaTask.buildTSBlock(showSpaceQuotaResp, future);
+ } catch (Exception e) {
+ future.setException(e);
+ }
+ return future;
+ }
+
@Override
public SettableFuture<ConfigTaskResult> dropModel(String modelId) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
@@ -1890,10 +1946,22 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
} catch (ClientManagerException | TException e) {
future.setException(e);
}
-
return future;
}
+ @Override
+ public TSpaceQuotaResp getSpaceQuota() {
+ TSpaceQuotaResp spaceQuotaResp = new TSpaceQuotaResp();
+ try (ConfigNodeClient configNodeClient =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ // Send request to some API server
+ spaceQuotaResp = configNodeClient.getSpaceQuota();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
+ }
+ return spaceQuotaResp;
+ }
+
@Override
public SettableFuture<ConfigTaskResult> showTrails(String modelId) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
@@ -1910,7 +1978,6 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
} catch (ClientManagerException | TException e) {
future.setException(e);
}
-
return future;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index 8fdaa7c8a1..8832009240 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.mpp.plan.execution.config.executor;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountDatabaseStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
@@ -55,6 +56,8 @@ import
org.apache.iotdb.db.mpp.plan.statement.sys.pipe.DropPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StartPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StopPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
+import
org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
@@ -177,6 +180,12 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> showContinuousQueries();
+ SettableFuture<ConfigTaskResult> setSpaceQuota(SetSpaceQuotaStatement
setSpaceQuotaStatement);
+
+ SettableFuture<ConfigTaskResult> showSpaceQuota(ShowSpaceQuotaStatement
showSpaceQuotaStatement);
+
+ TSpaceQuotaResp getSpaceQuota();
+
SettableFuture<ConfigTaskResult> createModel(CreateModelStatement
createModelStatement);
SettableFuture<ConfigTaskResult> dropModel(String modelId);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/quota/SetSpaceQuotaTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/quota/SetSpaceQuotaTask.java
new file mode 100644
index 0000000000..bb80bc4d9b
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/quota/SetSpaceQuotaTask.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config.sys.quota;
+
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import
org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class SetSpaceQuotaTask implements IConfigTask {
+
+ private final SetSpaceQuotaStatement setSpaceQuotaStatement;
+
+ public SetSpaceQuotaTask(SetSpaceQuotaStatement setSpaceQuotaStatement) {
+ this.setSpaceQuotaStatement = setSpaceQuotaStatement;
+ }
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
+ throws InterruptedException {
+ return configTaskExecutor.setSpaceQuota(setSpaceQuotaStatement);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/quota/ShowSpaceQuotaTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/quota/ShowSpaceQuotaTask.java
new file mode 100644
index 0000000000..0c26415229
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/quota/ShowSpaceQuotaTask.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config.sys.quota;
+
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.enums.SpaceQuotaType;
+import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import
org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import
org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ShowSpaceQuotaTask implements IConfigTask {
+
+ private final ShowSpaceQuotaStatement showSpaceQuotaStatement;
+
+ public ShowSpaceQuotaTask(ShowSpaceQuotaStatement showSpaceQuotaStatement) {
+ this.showSpaceQuotaStatement = showSpaceQuotaStatement;
+ }
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
+ throws InterruptedException {
+ return configTaskExecutor.showSpaceQuota(showSpaceQuotaStatement);
+ }
+
+ public static void buildTSBlock(TSpaceQuotaResp resp,
SettableFuture<ConfigTaskResult> future) {
+ List<TSDataType> outputDataTypes =
+ ColumnHeaderConstant.showSpaceQuotaColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList());
+ TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+ if (resp.getSpaceQuota() != null) {
+ for (Map.Entry<String, TSpaceQuota> spaceQuotaEntry :
resp.getSpaceQuota().entrySet()) {
+ if (spaceQuotaEntry.getValue().getDiskSize() != -1) {
+ builder.getTimeColumnBuilder().writeLong(0L);
+
builder.getColumnBuilder(0).writeBinary(Binary.valueOf(spaceQuotaEntry.getKey()));
+
builder.getColumnBuilder(1).writeBinary(Binary.valueOf(SpaceQuotaType.diskSize.name()));
+ builder
+ .getColumnBuilder(2)
+ .writeBinary(
+ Binary.valueOf(
+ spaceQuotaEntry.getValue().getDiskSize() == 0
+ ? IoTDBConstant.SPACE_QUOTA_UNLIMITED
+ : spaceQuotaEntry.getValue().getDiskSize() + "M"));
+ builder
+ .getColumnBuilder(3)
+ .writeBinary(
+ Binary.valueOf(
+
resp.getSpaceQuotaUsage().get(spaceQuotaEntry.getKey()).getDiskSize() + "M"));
+ builder.declarePosition();
+ }
+ if (spaceQuotaEntry.getValue().getDeviceNum() != -1) {
+ builder.getTimeColumnBuilder().writeLong(0L);
+
builder.getColumnBuilder(0).writeBinary(Binary.valueOf(spaceQuotaEntry.getKey()));
+
builder.getColumnBuilder(1).writeBinary(Binary.valueOf(SpaceQuotaType.deviceNum.name()));
+ builder
+ .getColumnBuilder(2)
+ .writeBinary(
+ Binary.valueOf(
+ spaceQuotaEntry.getValue().getDeviceNum() == 0
+ ? IoTDBConstant.SPACE_QUOTA_UNLIMITED
+ : spaceQuotaEntry.getValue().getDeviceNum() + ""));
+ builder
+ .getColumnBuilder(3)
+ .writeBinary(
+ Binary.valueOf(
+
resp.getSpaceQuotaUsage().get(spaceQuotaEntry.getKey()).getDeviceNum() + ""));
+ builder.declarePosition();
+ }
+ if (spaceQuotaEntry.getValue().getTimeserieNum() != -1) {
+ builder.getTimeColumnBuilder().writeLong(0L);
+
builder.getColumnBuilder(0).writeBinary(Binary.valueOf(spaceQuotaEntry.getKey()));
+ builder
+ .getColumnBuilder(1)
+
.writeBinary(Binary.valueOf(SpaceQuotaType.timeSeriesNum.name()));
+ builder
+ .getColumnBuilder(2)
+ .writeBinary(
+ Binary.valueOf(
+ spaceQuotaEntry.getValue().getTimeserieNum() == 0
+ ? IoTDBConstant.SPACE_QUOTA_UNLIMITED
+ : spaceQuotaEntry.getValue().getTimeserieNum() +
""));
+ builder
+ .getColumnBuilder(3)
+ .writeBinary(
+ Binary.valueOf(
+
resp.getSpaceQuotaUsage().get(spaceQuotaEntry.getKey()).getTimeserieNum()
+ + ""));
+ builder.declarePosition();
+ }
+ }
+ }
+ DatasetHeader datasetHeader =
DatasetHeaderFactory.getShowSpaceQuotaHeader();
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS,
builder.build(), datasetHeader));
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 4bc44543fe..934067f1a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -167,6 +167,8 @@ import
org.apache.iotdb.db.mpp.plan.statement.sys.pipe.DropPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StartPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StopPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
+import
org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
@@ -3379,4 +3381,95 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
Integer.parseInt(ctx.fromId.getText()),
Integer.parseInt(ctx.toId.getText()));
}
+
+ // Quota
+ @Override
+ public Statement visitSetSpaceQuota(IoTDBSqlParser.SetSpaceQuotaContext ctx)
{
+ if (!IoTDBDescriptor.getInstance().getConfig().isQuotaEnable()) {
+ throw new SemanticException("Limit configuration is not enabled, please
enable it first.");
+ }
+ SetSpaceQuotaStatement setSpaceQuotaStatement = new
SetSpaceQuotaStatement();
+ List<IoTDBSqlParser.PrefixPathContext> prefixPathContexts =
ctx.prefixPath();
+ List<String> paths = new ArrayList<>();
+ for (IoTDBSqlParser.PrefixPathContext prefixPathContext :
prefixPathContexts) {
+ paths.add(parsePrefixPath(prefixPathContext).getFullPath());
+ }
+ setSpaceQuotaStatement.setPrefixPathList(paths);
+
+ Map<String, String> quotas = new HashMap<>();
+ for (IoTDBSqlParser.AttributePairContext attributePair :
ctx.attributePair()) {
+ quotas.put(
+ parseAttributeKey(attributePair.attributeKey()),
+ parseAttributeValue(attributePair.attributeValue()));
+ }
+
+ if (quotas.containsKey(IoTDBConstant.COLUMN_DEVICES)) {
+ if
(quotas.get(IoTDBConstant.COLUMN_DEVICES).equals(IoTDBConstant.SPACE_QUOTA_UNLIMITED))
{
+ setSpaceQuotaStatement.setDeviceNum(-1);
+ } else if (Long.parseLong(quotas.get(IoTDBConstant.COLUMN_DEVICES)) <=
0) {
+ throw new SemanticException("Please set the number of devices greater
than 0");
+ } else {
+ setSpaceQuotaStatement.setDeviceNum(
+ Long.parseLong(quotas.get(IoTDBConstant.COLUMN_DEVICES)));
+ }
+ }
+ if (quotas.containsKey(IoTDBConstant.COLUMN_TIMESERIES)) {
+ if
(quotas.get(IoTDBConstant.COLUMN_TIMESERIES).equals(IoTDBConstant.SPACE_QUOTA_UNLIMITED))
{
+ setSpaceQuotaStatement.setTimeSeriesNum(-1);
+ } else if (Long.parseLong(quotas.get(IoTDBConstant.COLUMN_TIMESERIES))
<= 0) {
+ throw new SemanticException("Please set the number of timeseries
greater than 0");
+ } else {
+ setSpaceQuotaStatement.setTimeSeriesNum(
+ Long.parseLong(quotas.get(IoTDBConstant.COLUMN_TIMESERIES)));
+ }
+ }
+ if (quotas.containsKey(IoTDBConstant.SPACE_QUOTA_DISK)) {
+ if
(quotas.get(IoTDBConstant.SPACE_QUOTA_DISK).equals(IoTDBConstant.SPACE_QUOTA_UNLIMITED))
{
+ setSpaceQuotaStatement.setDiskSize(-1);
+ } else {
+
setSpaceQuotaStatement.setDiskSize(parseUnit(quotas.get(IoTDBConstant.SPACE_QUOTA_DISK)));
+ }
+ }
+ return setSpaceQuotaStatement;
+ }
+
+ private long parseUnit(String data) {
+ String unit = data.substring(data.length() - 1);
+ long disk = Long.parseLong(data.substring(0, data.length() - 1));
+ if (disk <= 0) {
+ throw new SemanticException("Please set the disk size greater than 0");
+ }
+ switch (unit.toLowerCase()) {
+ case "m":
+ return disk;
+ case "g":
+ return disk * 1024;
+ case "t":
+ return disk * 1024 * 1024;
+ case "p":
+ return disk * 1024 * 1024 * 1024;
+ default:
+ throw new SemanticException(
+ "When setting the disk size, the unit is incorrect. Please use
'M', 'G', 'P', 'T' as the unit");
+ }
+ }
+
+ @Override
+ public Statement visitShowSpaceQuota(IoTDBSqlParser.ShowSpaceQuotaContext
ctx) {
+ if (!IoTDBDescriptor.getInstance().getConfig().isQuotaEnable()) {
+ throw new SemanticException("Limit configuration is not enabled, please
enable it first.");
+ }
+ ShowSpaceQuotaStatement showSpaceQuotaStatement = new
ShowSpaceQuotaStatement();
+ List<PartialPath> databases = null;
+ if (ctx.prefixPath() != null) {
+ databases = new ArrayList<>();
+ for (IoTDBSqlParser.PrefixPathContext prefixPathContext :
ctx.prefixPath()) {
+ databases.add(parsePrefixPath(prefixPathContext));
+ }
+ showSpaceQuotaStatement.setDatabases(databases);
+ } else {
+ showSpaceQuotaStatement.setDatabases(null);
+ }
+ return showSpaceQuotaStatement;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java
index f0a91bbfee..27a041fba8 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java
@@ -161,4 +161,7 @@ public enum StatementType {
SHOW_PIPES,
BATCH_ACTIVATE_TEMPLATE,
+
+ SET_SPACE_QUOTA,
+ SHOW_SPACE_QUOTA,
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 93bbda8947..1e73670802 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -103,6 +103,8 @@ import
org.apache.iotdb.db.mpp.plan.statement.sys.pipe.DropPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.ShowPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StartPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.pipe.StopPipeStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.quota.SetSpaceQuotaStatement;
+import
org.apache.iotdb.db.mpp.plan.statement.sys.quota.ShowSpaceQuotaStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
@@ -522,4 +524,12 @@ public abstract class StatementVisitor<R, C> {
InternalCreateMultiTimeSeriesStatement
internalCreateMultiTimeSeriesStatement, C context) {
return visitStatement(internalCreateMultiTimeSeriesStatement, context);
}
+
+ public R visitSetSpaceQuota(SetSpaceQuotaStatement setSpaceQuotaStatement, C
context) {
+ return visitStatement(setSpaceQuotaStatement, context);
+ }
+
+ public R visitShowSpaceQuota(ShowSpaceQuotaStatement
showSpaceQuotaStatement, C context) {
+ return visitStatement(showSpaceQuotaStatement, context);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/quota/SetSpaceQuotaStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/quota/SetSpaceQuotaStatement.java
new file mode 100644
index 0000000000..0eb4023d5b
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/quota/SetSpaceQuotaStatement.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.sys.quota;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+
+import java.util.Collections;
+import java.util.List;
+
+public class SetSpaceQuotaStatement extends Statement implements
IConfigStatement {
+
+ private long timeSeriesNum;
+ private long deviceNum;
+ private long diskSize;
+ private String policy;
+ private List<String> prefixPathList;
+
+ /** QuotaOperator Constructor with OperatorType. */
+ public SetSpaceQuotaStatement() {
+ super();
+ statementType = StatementType.SET_SPACE_QUOTA;
+ }
+
+ public long getTimeSeriesNum() {
+ return timeSeriesNum;
+ }
+
+ public void setTimeSeriesNum(long timeSeriesNum) {
+ this.timeSeriesNum = timeSeriesNum;
+ }
+
+ public long getDeviceNum() {
+ return deviceNum;
+ }
+
+ public void setDeviceNum(long deviceNum) {
+ this.deviceNum = deviceNum;
+ }
+
+ public long getDiskSize() {
+ return diskSize;
+ }
+
+ public void setDiskSize(long diskSize) {
+ this.diskSize = diskSize;
+ }
+
+ public String getPolicy() {
+ return policy;
+ }
+
+ public void setPolicy(String policy) {
+ this.policy = policy;
+ }
+
+ public List<String> getPrefixPathList() {
+ return prefixPathList;
+ }
+
+ public void setPrefixPathList(List<String> prefixPathList) {
+ this.prefixPathList = prefixPathList;
+ }
+
+ @Override
+ public QueryType getQueryType() {
+ return QueryType.WRITE;
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitSetSpaceQuota(this, context);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/quota/ShowSpaceQuotaStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/quota/ShowSpaceQuotaStatement.java
new file mode 100644
index 0000000000..35669ec7e8
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/quota/ShowSpaceQuotaStatement.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.sys.quota;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+
+import java.util.List;
+
+public class ShowSpaceQuotaStatement extends Statement implements
IConfigStatement {
+
+ private List<PartialPath> databases;
+
+ public ShowSpaceQuotaStatement() {
+ super();
+ statementType = StatementType.SHOW_SPACE_QUOTA;
+ }
+
+ public List<PartialPath> getDatabases() {
+ return databases;
+ }
+
+ public void setDatabases(List<PartialPath> databases) {
+ this.databases = databases;
+ }
+
+ @Override
+ public QueryType getQueryType() {
+ return QueryType.READ;
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return databases;
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitShowSpaceQuota(this, context);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSizeStore.java
b/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSizeStore.java
new file mode 100644
index 0000000000..6585460fcf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSizeStore.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.quotas;
+
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.db.engine.StorageEngine;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class DataNodeSizeStore {
+
+ private final StorageEngine storageEngine;
+ private final Map<Integer, Long> dataRegionDisk;
+ private final ScheduledExecutorService scheduledExecutorService;
+ private List<Integer> dataRegionIds;
+
+ public DataNodeSizeStore() {
+ storageEngine = StorageEngine.getInstance();
+ dataRegionDisk = new HashMap<>();
+ dataRegionIds = new ArrayList<>();
+ this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
+ ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+ scheduledExecutorService, () -> calculateRegionSize(dataRegionIds), 0,
5, TimeUnit.SECONDS);
+ }
+
+ public void calculateRegionSize(List<Integer> dataRegionIds) {
+ storageEngine.getDiskSizeByDataRegion(dataRegionDisk, dataRegionIds);
+ }
+
+ public Map<Integer, Long> getDataRegionDisk() {
+ return dataRegionDisk;
+ }
+
+ public void setDataRegionIds(List<Integer> dataRegionIds) {
+ this.dataRegionIds = dataRegionIds;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSpaceQuotaManager.java
b/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSpaceQuotaManager.java
new file mode 100644
index 0000000000..1e46ee2958
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/quotas/DataNodeSpaceQuotaManager.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.quotas;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
+import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
+import
org.apache.iotdb.db.mpp.plan.execution.config.executor.ClusterConfigTaskExecutor;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DataNodeSpaceQuotaManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeSpaceQuotaManager.class);
+
+ private Map<String, TSpaceQuota> spaceQuotaLimit;
+ private Map<String, TSpaceQuota> spaceQuotaUsage;
+ private DataNodeSizeStore dataNodeSizeStore;
+
+ public DataNodeSpaceQuotaManager() {
+ spaceQuotaLimit = new HashMap<>();
+ spaceQuotaUsage = new HashMap<>();
+ dataNodeSizeStore = new DataNodeSizeStore();
+ recover();
+ }
+
+ public DataNodeSpaceQuotaManager(
+ Map<String, TSpaceQuota> spaceQuotaLimit, Map<String, TSpaceQuota>
spaceQuotaUsage) {
+ this.spaceQuotaLimit = spaceQuotaLimit;
+ this.spaceQuotaUsage = spaceQuotaUsage;
+ }
+
+ /** SingleTone */
+ private static class DataNodeSpaceQuotaManagerHolder {
+ private static final DataNodeSpaceQuotaManager INSTANCE = new
DataNodeSpaceQuotaManager();
+
+ private DataNodeSpaceQuotaManagerHolder() {}
+ }
+
+ public static DataNodeSpaceQuotaManager getInstance() {
+ return DataNodeSpaceQuotaManagerHolder.INSTANCE;
+ }
+
+ public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) {
+ for (String database : req.getDatabase()) {
+ spaceQuotaLimit.put(database, req.getSpaceLimit());
+ spaceQuotaUsage.put(database, new TSpaceQuota());
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
+ private void recover() {
+ TSpaceQuotaResp spaceQuota =
ClusterConfigTaskExecutor.getInstance().getSpaceQuota();
+ if (spaceQuota.getStatus() != null) {
+ if (spaceQuota.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && spaceQuota.getSpaceQuota() != null) {
+ for (String database : spaceQuota.getSpaceQuota().keySet()) {
+ spaceQuotaLimit.put(database,
spaceQuota.getSpaceQuota().get(database));
+ spaceQuotaUsage.put(database, new TSpaceQuota());
+ }
+ }
+ LOGGER.info("Space quota limit restored succeeded. " +
spaceQuotaLimit.toString());
+ } else {
+ LOGGER.error("Space quota limit restored failed. " +
spaceQuotaLimit.toString());
+ }
+ }
+
+ public boolean checkDeviceLimit(String database) {
+ database = IoTDBConstant.PATH_ROOT + IoTDBConstant.PATH_SEPARATOR +
database;
+ TSpaceQuota spaceQuota = spaceQuotaLimit.get(database);
+ if (spaceQuota == null) {
+ return true;
+ } else if (spaceQuota.getDeviceNum() == 0 || spaceQuota.getDeviceNum() ==
-1) {
+ return true;
+ }
+ long deviceNum = spaceQuotaUsage.get(database).getDeviceNum();
+ if (spaceQuota.getDeviceNum() - deviceNum > 0) {
+ return true;
+ }
+ return false;
+ }
+
+ public void updateSpaceQuotaUsage(Map<String, TSpaceQuota> spaceQuotaUsage) {
+ this.spaceQuotaUsage = spaceQuotaUsage;
+ }
+
+ public boolean checkTimeSeriesNum(String database) {
+ database = IoTDBConstant.PATH_ROOT + IoTDBConstant.PATH_SEPARATOR +
database;
+ TSpaceQuota spaceQuota = spaceQuotaLimit.get(database);
+ if (spaceQuota == null) {
+ return true;
+ } else if (spaceQuota.getTimeserieNum() == 0 ||
spaceQuota.getTimeserieNum() == -1) {
+ return true;
+ }
+ long timeSeriesNum = spaceQuotaUsage.get(database).getTimeserieNum();
+ if (spaceQuota.getTimeserieNum() - timeSeriesNum > 0) {
+ return true;
+ }
+ return false;
+ }
+
+ public boolean checkRegionDisk(String database) {
+ TSpaceQuota spaceQuota = spaceQuotaLimit.get(database);
+ if (spaceQuota == null) {
+ return true;
+ } else if (spaceQuota.getDiskSize() == 0 || spaceQuota.getDiskSize() ==
-1) {
+ return true;
+ }
+ long diskSize = spaceQuotaUsage.get(database).getDiskSize();
+ if (spaceQuota.getDiskSize() - diskSize > 0) {
+ return true;
+ }
+ return false;
+ }
+
+ public void setDataRegionIds(List<Integer> dataRegionIds) {
+ dataNodeSizeStore.setDataRegionIds(dataRegionIds);
+ }
+
+ public Map<Integer, Long> getRegionDisk() {
+ return dataNodeSizeStore.getDataRegionDisk();
+ }
+
+ public void setSpaceQuotaLimit(Map<String, TSpaceQuota> spaceQuotaLimit) {
+ this.spaceQuotaLimit = spaceQuotaLimit;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index d00903759a..c087ca9aa1 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.common.rpc.thrift.TSettleReq;
import org.apache.iotdb.commons.cluster.NodeStatus;
@@ -106,6 +107,7 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.control.clientsession.IClientSession;
import org.apache.iotdb.db.query.control.clientsession.InternalClientSession;
+import org.apache.iotdb.db.quotas.DataNodeSpaceQuotaManager;
import org.apache.iotdb.db.service.DataNode;
import org.apache.iotdb.db.service.RegionMigrateService;
import org.apache.iotdb.db.sync.SyncService;
@@ -225,6 +227,9 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
private final DataNodeRegionManager regionManager =
DataNodeRegionManager.getInstance();
+ private final DataNodeSpaceQuotaManager spaceQuotaManager =
+ DataNodeSpaceQuotaManager.getInstance();
+
public DataNodeInternalRPCServiceImpl() {
super();
PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
@@ -873,6 +878,11 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
return RpcUtils.SUCCESS_STATUS;
}
+ @Override
+ public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) throws TException {
+ return spaceQuotaManager.setSpaceQuota(req);
+ }
+
private PathPatternTree filterPathPatternTree(PathPatternTree patternTree,
String storageGroup) {
PathPatternTree filteredPatternTree = new PathPatternTree();
try {
@@ -929,6 +939,16 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
if (CommonDescriptor.getInstance().getConfig().getStatusReason() != null) {
resp.setStatusReason(CommonDescriptor.getInstance().getConfig().getStatusReason());
}
+ if (req.getSchemaRegionIds() != null) {
+ spaceQuotaManager.updateSpaceQuotaUsage(req.getSpaceQuotaUsage());
+
resp.setDeviceNum(schemaEngine.countDeviceNumBySchemaRegion(req.getSchemaRegionIds()));
+ resp.setTimeSeriesNum(
+
schemaEngine.countTimeSeriesNumBySchemaRegion(req.getSchemaRegionIds()));
+ }
+ if (req.getDataRegionIds() != null) {
+ spaceQuotaManager.setDataRegionIds(req.getDataRegionIds());
+ resp.setRegionDisk(spaceQuotaManager.getRegionDisk());
+ }
return resp;
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 9042f5a12a..69d6cc6c3a 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -193,7 +193,10 @@ public enum TSStatusCode {
PIPE_PLUGIN_LOAD_CLASS_ERROR(1602),
PIPE_PLUGIN_DOWNLOAD_ERROR(1603),
CREATE_PIPE_PLUGIN_ON_DATANODE_ERROR(1604),
- DROP_PIPE_PLUGIN_ON_DATANODE_ERROR(1605);
+ DROP_PIPE_PLUGIN_ON_DATANODE_ERROR(1605),
+
+ // Quota
+ EXCEED_QUOTA_ERROR(1700);
private final int statusCode;
diff --git a/thrift-commons/src/main/thrift/common.thrift
b/thrift-commons/src/main/thrift/common.thrift
index 0b19406d71..dc10721b37 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -128,6 +128,18 @@ struct TFilesResp {
2: required list<TFile> files
}
+// quota
+struct TSpaceQuota {
+ 1: optional i64 diskSize
+ 2: optional i64 deviceNum
+ 3: optional i64 timeserieNum
+}
+
+struct TSetSpaceQuotaReq {
+ 1: required list<string> database
+ 2: required TSpaceQuota spaceLimit
+}
+
enum TAggregationType {
COUNT,
AVG,
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index ac5ab69268..050f00fe33 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -730,6 +730,15 @@ struct TUpdateModelStateReq {
3: optional string bestTrailId
}
+// ====================================================
+// Quota
+// ====================================================
+struct TSpaceQuotaResp{
+ 1: required common.TSStatus status
+ 2: optional map<string, common.TSpaceQuota> spaceQuota
+ 3: optional map<string, common.TSpaceQuota> spaceQuotaUsage
+}
+
service IConfigNodeRPCService {
// ======================================================
@@ -1333,5 +1342,15 @@ service IConfigNodeRPCService {
* @return SUCCESS_STATUS if the model was removed successfully
*/
common.TSStatus updateModelState(TUpdateModelStateReq req)
+
+ // ======================================================
+ // Quota
+ // ======================================================
+ /** Set Space Quota */
+ common.TSStatus setSpaceQuota(common.TSetSpaceQuotaReq req)
+
+ TSpaceQuotaResp showSpaceQuota(list<string> databases);
+
+ TSpaceQuotaResp getSpaceQuota();
}
diff --git a/thrift/src/main/thrift/datanode.thrift
b/thrift/src/main/thrift/datanode.thrift
index b3fe6d3787..c20f32a703 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -235,6 +235,9 @@ struct THeartbeatReq {
1: required i64 heartbeatTimestamp
2: required bool needJudgeLeader
3: required bool needSamplingLoad
+ 4: optional list<i32> schemaRegionIds
+ 5: optional list<i32> dataRegionIds
+ 6: optional map<string, common.TSpaceQuota> spaceQuotaUsage
}
struct THeartbeatResp {
@@ -243,6 +246,9 @@ struct THeartbeatResp {
3: optional string statusReason
4: optional map<common.TConsensusGroupId, bool> judgedLeaders
5: optional TLoadSample loadSample
+ 6: optional map<i32, i64> deviceNum
+ 7: optional map<i32, i64> timeSeriesNum
+ 8: optional map<i32, i64> regionDisk
}
struct TLoadSample {
@@ -758,6 +764,11 @@ service IDataNodeRPCService {
* Delete model training metrics on DataNode
*/
common.TSStatus deleteModelMetrics(TDeleteModelMetricsReq req)
+
+ /**
+ * Set space quota
+ **/
+ common.TSStatus setSpaceQuota(common.TSetSpaceQuotaReq req)
}
service MPPDataExchangeService {
@@ -792,6 +803,5 @@ service IMLNodeInternalRPCService{
* Record model training metrics on DataNode
*/
common.TSStatus recordModelMetrics(TRecordModelMetricsReq req)
-
}