This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch Computing-resource-balancing_cp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c6356e935fdff9c0f9220c3419b06b00dc6bf821
Author: YongzaoDan <[email protected]>
AuthorDate: Fri Jul 21 16:03:27 2023 +0800

    Function Finish
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  14 --
 .../confignode/conf/ConfigNodeDescriptor.java      |   6 -
 .../iotdb/confignode/manager/load/LoadManager.java |  21 +-
 .../manager/load/balancer/PartitionBalancer.java   | 270 ++++++++++++++++++++-
 .../load/balancer/partition/DataAllotTable.java    | 218 +++++++++++++++++
 .../partition/GreedyPartitionAllocator.java        | 203 ----------------
 .../balancer/partition/IPartitionAllocator.java    |  55 -----
 .../manager/partition/PartitionManager.java        | 123 ++++++++--
 .../partition/DatabasePartitionTable.java          |  82 ++++++-
 .../persistence/partition/PartitionInfo.java       | 111 ++++++++-
 .../statemachine/CreateRegionGroupsProcedure.java  |   6 +
 .../resources/conf/iotdb-common.properties         |   7 -
 .../commons/partition/DataPartitionTable.java      | 123 +++++++++-
 .../commons/partition/SeriesPartitionTable.java    | 119 +++++++--
 .../iotdb/commons/structure/BalanceTreeMap.java    |  75 ++++++
 15 files changed, 1066 insertions(+), 367 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 2907c3b1eff..6b99e0da369 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -110,12 +110,6 @@ public class ConfigNodeConfig {
   private RegionBalancer.RegionGroupAllocatePolicy regionGroupAllocatePolicy =
       RegionBalancer.RegionGroupAllocatePolicy.GREEDY;
 
-  /**
-   * DataPartition within the same SeriesPartitionSlot will inherit the 
allocation result of the
-   * predecessor or successor TimePartitionSlot if set true.
-   */
-  private boolean enableDataPartitionInheritPolicy = true;
-
   /** Max concurrent client number. */
   private int rpcMaxConcurrentClientNum = 65535;
 
@@ -559,14 +553,6 @@ public class ConfigNodeConfig {
     this.regionGroupAllocatePolicy = regionGroupAllocatePolicy;
   }
 
-  public boolean isEnableDataPartitionInheritPolicy() {
-    return enableDataPartitionInheritPolicy;
-  }
-
-  public void setEnableDataPartitionInheritPolicy(boolean 
enableDataPartitionInheritPolicy) {
-    this.enableDataPartitionInheritPolicy = enableDataPartitionInheritPolicy;
-  }
-
   public int getThriftServerAwaitTimeForStopService() {
     return thriftServerAwaitTimeForStopService;
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 59eab23ddc4..5490b700c36 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -272,12 +272,6 @@ public class ConfigNodeDescriptor {
           "The configured region allocate strategy does not exist, use the 
default: GREEDY!");
     }
 
-    conf.setEnableDataPartitionInheritPolicy(
-        Boolean.parseBoolean(
-            properties.getProperty(
-                "enable_data_partition_inherit_policy",
-                String.valueOf(conf.isEnableDataPartitionInheritPolicy()))));
-
     conf.setCnRpcAdvancedCompressionEnable(
         Boolean.parseBoolean(
             properties
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index d88ebd70fe4..0c284a149ee 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -94,7 +94,7 @@ public class LoadManager {
   /**
    * Generate an optimal CreateRegionGroupsPlan.
    *
-   * @param allotmentMap Map<StorageGroupName, Region allotment>
+   * @param allotmentMap Map<DatabaseName, Region allotment>
    * @param consensusGroupType TConsensusGroupType of RegionGroup to be 
allocated
    * @return CreateRegionGroupsPlan
    * @throws NotEnoughDataNodeException If there are not enough DataNodes
@@ -110,7 +110,7 @@ public class LoadManager {
    * Allocate SchemaPartitions.
    *
    * @param unassignedSchemaPartitionSlotsMap SchemaPartitionSlots that should 
be assigned
-   * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
+   * @return Map<DatabaseName, SchemaPartitionTable>, the allocating result
    */
   public Map<String, SchemaPartitionTable> allocateSchemaPartition(
       Map<String, List<TSeriesPartitionSlot>> 
unassignedSchemaPartitionSlotsMap)
@@ -122,7 +122,7 @@ public class LoadManager {
    * Allocate DataPartitions.
    *
    * @param unassignedDataPartitionSlotsMap DataPartitionSlots that should be 
assigned
-   * @return Map<StorageGroupName, DataPartitionTable>, the allocating result
+   * @return Map<DatabaseName, DataPartitionTable>, the allocating result
    */
   public Map<String, DataPartitionTable> allocateDataPartition(
       Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> 
unassignedDataPartitionSlotsMap)
@@ -130,6 +130,19 @@ public class LoadManager {
     return 
partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap);
   }
 
+  /**
+   * Re-balance runtime status cached in the PartitionBalancer. This method 
may shift the
+   * currentTimePartition or update the DataAllotTable.
+   */
+  public void reBalancePartitionPolicyIfNecessary(
+      Map<String, DataPartitionTable> assignedDataPartition) {
+    
partitionBalancer.reBalanceDataPartitionPolicyIfNecessary(assignedDataPartition);
+  }
+
+  public void updateDataAllotTable(String database) {
+    partitionBalancer.updateDataAllotTable(database);
+  }
+
   public void broadcastLatestRegionRouteMap() {
     statisticsService.broadcastLatestRegionRouteMap();
   }
@@ -138,12 +151,14 @@ public class LoadManager {
     loadCache.initHeartbeatCache(configManager);
     heartbeatService.startHeartbeatService();
     statisticsService.startLoadStatisticsService();
+    partitionBalancer.setupPartitionBalancer();
   }
 
   public void stopLoadServices() {
     heartbeatService.stopHeartbeatService();
     statisticsService.stopLoadStatisticsService();
     loadCache.clearHeartbeatCache();
+    partitionBalancer.clearPartitionBalancer();
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
index 89d9235911c..f0d408e91cd 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
@@ -16,19 +16,37 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.manager.load.balancer;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
+import org.apache.iotdb.commons.partition.SeriesPartitionTable;
+import org.apache.iotdb.commons.structure.BalanceTreeMap;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
 import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
 import org.apache.iotdb.confignode.manager.IManager;
-import 
org.apache.iotdb.confignode.manager.load.balancer.partition.GreedyPartitionAllocator;
-import 
org.apache.iotdb.confignode.manager.load.balancer.partition.IPartitionAllocator;
+import 
org.apache.iotdb.confignode.manager.load.balancer.partition.DataAllotTable;
+import org.apache.iotdb.confignode.manager.partition.PartitionManager;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * The SeriesPartitionSlotBalancer provides interfaces to generate optimal 
Partition allocation and
@@ -36,38 +54,272 @@ import java.util.Map;
  */
 public class PartitionBalancer {
 
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+  private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum();
+  private static final long TIME_PARTITION_INTERVAL =
+      CommonDescriptor.getInstance().getConfig().getTimePartitionInterval();
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionBalancer.class);
+
   private final IManager configManager;
 
+  // Map<DatabaseName, DataAllotTable>
+  private final Map<String, DataAllotTable> dataAllotTableMap;
+
   public PartitionBalancer(IManager configManager) {
     this.configManager = configManager;
+    this.dataAllotTableMap = new ConcurrentHashMap<>();
   }
 
   /**
    * Allocate SchemaPartitions
    *
    * @param unassignedSchemaPartitionSlotsMap SchemaPartitionSlots that should 
be assigned
-   * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
+   * @return Map<DatabaseName, SchemaPartitionTable>, the allocating result
    */
   public Map<String, SchemaPartitionTable> allocateSchemaPartition(
       Map<String, List<TSeriesPartitionSlot>> 
unassignedSchemaPartitionSlotsMap)
       throws NoAvailableRegionGroupException {
-    return 
genPartitionAllocator().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
+    Map<String, SchemaPartitionTable> result = new ConcurrentHashMap<>();
+
+    for (Map.Entry<String, List<TSeriesPartitionSlot>> slotsMapEntry :
+        unassignedSchemaPartitionSlotsMap.entrySet()) {
+      final String database = slotsMapEntry.getKey();
+      final List<TSeriesPartitionSlot> unassignedPartitionSlots = 
slotsMapEntry.getValue();
+
+      // Filter available SchemaRegionGroups and
+      // sort them by the number of allocated SchemaPartitions
+      BalanceTreeMap<TConsensusGroupId, Integer> counter = new 
BalanceTreeMap<>();
+      List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
+          getPartitionManager()
+              .getSortedRegionGroupSlotsCounter(database, 
TConsensusGroupType.SchemaRegion);
+      for (Pair<Long, TConsensusGroupId> pair : regionSlotsCounter) {
+        counter.put(pair.getRight(), pair.getLeft().intValue());
+      }
+
+      // Enumerate SeriesPartitionSlot
+      Map<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionMap = new 
ConcurrentHashMap<>();
+      for (TSeriesPartitionSlot seriesPartitionSlot : 
unassignedPartitionSlots) {
+        // Greedy allocation: allocate the unassigned SchemaPartition to
+        // the RegionGroup whose allocated SchemaPartitions is the least
+        TConsensusGroupId consensusGroupId = counter.getKeyWithMinValue();
+        schemaPartitionMap.put(seriesPartitionSlot, consensusGroupId);
+        counter.put(consensusGroupId, counter.get(consensusGroupId) + 1);
+      }
+      result.put(database, new SchemaPartitionTable(schemaPartitionMap));
+    }
+
+    return result;
   }
 
   /**
    * Allocate DataPartitions
    *
    * @param unassignedDataPartitionSlotsMap DataPartitionSlots that should be 
assigned
-   * @return Map<StorageGroupName, DataPartitionTable>, the allocating result
+   * @return Map<DatabaseName, DataPartitionTable>, the allocating result
    */
   public Map<String, DataPartitionTable> allocateDataPartition(
       Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> 
unassignedDataPartitionSlotsMap)
       throws NoAvailableRegionGroupException {
-    return 
genPartitionAllocator().allocateDataPartition(unassignedDataPartitionSlotsMap);
+    Map<String, DataPartitionTable> result = new ConcurrentHashMap<>();
+
+    for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>> 
slotsMapEntry :
+        unassignedDataPartitionSlotsMap.entrySet()) {
+      final String database = slotsMapEntry.getKey();
+      final Map<TSeriesPartitionSlot, TTimeSlotList> 
unassignedPartitionSlotsMap =
+          slotsMapEntry.getValue();
+
+      // Filter available DataRegionGroups and
+      // sort them by the number of allocated DataPartitions
+      BalanceTreeMap<TConsensusGroupId, Integer> counter = new 
BalanceTreeMap<>();
+      List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
+          getPartitionManager()
+              .getSortedRegionGroupSlotsCounter(database, 
TConsensusGroupType.DataRegion);
+      for (Pair<Long, TConsensusGroupId> pair : regionSlotsCounter) {
+        counter.put(pair.getRight(), pair.getLeft().intValue());
+      }
+
+      DataAllotTable allotTable = dataAllotTableMap.get(database);
+      allotTable.acquireReadLock();
+      TTimePartitionSlot currentTimePartition = 
allotTable.getCurrentTimePartition();
+      DataPartitionTable dataPartitionTable = new DataPartitionTable();
+
+      try {
+        // Enumerate SeriesPartitionSlot
+        for (Map.Entry<TSeriesPartitionSlot, TTimeSlotList> 
seriesPartitionEntry :
+            unassignedPartitionSlotsMap.entrySet()) {
+          SeriesPartitionTable seriesPartitionTable = new 
SeriesPartitionTable();
+
+          // Enumerate TimePartitionSlot in ascending order
+          TSeriesPartitionSlot seriesPartitionSlot = 
seriesPartitionEntry.getKey();
+          List<TTimePartitionSlot> timePartitionSlots =
+              seriesPartitionEntry.getValue().getTimePartitionSlots();
+          
timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime));
+
+          for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
+
+            // 1. The historical DataPartition will try to inherit successor 
DataPartition first
+            if (timePartitionSlot.getStartTime() < 
currentTimePartition.getStartTime()) {
+              TConsensusGroupId successor =
+                  getPartitionManager()
+                      .getSuccessorDataPartition(database, 
seriesPartitionSlot, timePartitionSlot);
+              if (successor != null && counter.containsKey(successor)) {
+                seriesPartitionTable.putDataPartition(timePartitionSlot, 
successor);
+                counter.put(successor, counter.get(successor) + 1);
+                continue;
+              }
+            }
+
+            // 2. Assign DataPartition base on the DataAllotTable
+            TConsensusGroupId allotGroupId = 
allotTable.getRegionGroupId(seriesPartitionSlot);
+            if (counter.containsKey(allotGroupId)) {
+              seriesPartitionTable.putDataPartition(timePartitionSlot, 
allotGroupId);
+              counter.put(allotGroupId, counter.get(allotGroupId) + 1);
+              continue;
+            }
+
+            // 3. Assign the DataPartition to DataRegionGroup with the least 
DataPartitions
+            // If the above DataRegionGroups are unavailable
+            TConsensusGroupId greedyGroupId = counter.getKeyWithMinValue();
+            seriesPartitionTable.putDataPartition(timePartitionSlot, 
greedyGroupId);
+            counter.put(greedyGroupId, counter.get(greedyGroupId) + 1);
+          }
+
+          dataPartitionTable
+              .getDataPartitionMap()
+              .put(seriesPartitionEntry.getKey(), seriesPartitionTable);
+        }
+      } finally {
+        allotTable.releaseReadLock();
+      }
+      result.put(database, dataPartitionTable);
+    }
+
+    return result;
+  }
+
+  /**
+   * Try to re-balance the DataPartitionPolicy when new DataPartitions are 
created.
+   *
+   * @param assignedDataPartition new created DataPartitions
+   */
+  public void reBalanceDataPartitionPolicyIfNecessary(
+      Map<String, DataPartitionTable> assignedDataPartition) {
+    assignedDataPartition.forEach(
+        (database, dataPartitionTable) -> {
+          if (updateDataPartitionCount(database, dataPartitionTable)) {
+            // Update the DataAllotTable if the currentTimePartition is updated
+            updateDataAllotTable(database);
+          }
+        });
+  }
+
+  /**
+   * Update the DataPartitionCount in DataAllotTable
+   *
+   * @param database Database name
+   * @param dataPartitionTable new created DataPartitionTable
+   * @return true if the currentTimePartition is updated, false otherwise
+   */
+  public boolean updateDataPartitionCount(String database, DataPartitionTable 
dataPartitionTable) {
+    try {
+      dataAllotTableMap
+          .get(database)
+          .addTimePartitionCount(dataPartitionTable.getTimeSlotCountMap());
+      return dataAllotTableMap
+          .get(database)
+          .updateCurrentTimePartition(
+              getPartitionManager().getRegionGroupCount(database, 
TConsensusGroupType.DataRegion));
+    } catch (DatabaseNotExistsException e) {
+      LOGGER.error("Database {} not exists", database);
+      return false;
+    }
+  }
+
+  /**
+   * Update the DataAllotTable
+   *
+   * @param database Database name
+   */
+  public void updateDataAllotTable(String database) {
+    TTimePartitionSlot currentTimePartition =
+        dataAllotTableMap.get(database).getCurrentTimePartition();
+    Map<TSeriesPartitionSlot, TConsensusGroupId> allocatedTable = new 
ConcurrentHashMap<>();
+    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+      TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+      Pair<TTimePartitionSlot, TConsensusGroupId> lastDataPartition =
+          getPartitionManager().getLastDataPartition(database, 
seriesPartitionSlot);
+      if (lastDataPartition != null
+          && currentTimePartition.compareTo(lastDataPartition.getLeft()) < 0) {
+        // Put all future DataPartitions into the allocatedTable
+        allocatedTable.put(seriesPartitionSlot, lastDataPartition.getRight());
+      }
+    }
+
+    try {
+      dataAllotTableMap
+          .get(database)
+          .updateDataAllotTable(
+              getPartitionManager().getAllRegionGroupIds(database, 
TConsensusGroupType.DataRegion),
+              allocatedTable);
+    } catch (DatabaseNotExistsException e) {
+      LOGGER.error("Database {} not exists", database);
+    }
+  }
+
+  /** Set up the PartitionBalancer when the current ConfigNode becomes leader. 
*/
+  public void setupPartitionBalancer() {
+    dataAllotTableMap.clear();
+    getClusterSchemaManager()
+        .getDatabaseNames()
+        .forEach(
+            database -> {
+              dataAllotTableMap.put(database, new DataAllotTable());
+              DataAllotTable dataAllotTable = dataAllotTableMap.get(database);
+              dataAllotTable.acquireWriteLock();
+              try {
+                int threshold =
+                    DataAllotTable.timePartitionThreshold(
+                        getPartitionManager()
+                            .getRegionGroupCount(database, 
TConsensusGroupType.DataRegion));
+                TTimePartitionSlot maxTimePartitionSlot =
+                    getPartitionManager().getMaxTimePartitionSlot(database);
+                TTimePartitionSlot minTimePartitionSlot =
+                    getPartitionManager().getMinTimePartitionSlot(database);
+                TTimePartitionSlot currentTimePartition = 
maxTimePartitionSlot.deepCopy();
+                while (currentTimePartition.compareTo(minTimePartitionSlot) > 
0) {
+                  int seriesSlotCount =
+                      getPartitionManager().countSeriesSlot(database, 
currentTimePartition);
+                  if (seriesSlotCount >= threshold) {
+                    
dataAllotTable.setCurrentTimePartition(currentTimePartition.getStartTime());
+                    break;
+                  }
+                  dataAllotTable.addTimePartitionCount(
+                      Collections.singletonMap(currentTimePartition, 
seriesSlotCount));
+                  currentTimePartition.setStartTime(
+                      currentTimePartition.getStartTime() - 
TIME_PARTITION_INTERVAL);
+                }
+
+                dataAllotTable.setDataAllotTable(
+                    getPartitionManager().getLastDataAllotTable(database));
+              } catch (DatabaseNotExistsException e) {
+                LOGGER.error("Database {} not exists", database);
+              } finally {
+                dataAllotTable.releaseWriteLock();
+              }
+            });
+  }
+
+  /** Clear the PartitionBalancer when the current ConfigNode is no longer the 
leader. */
+  public void clearPartitionBalancer() {
+    dataAllotTableMap.clear();
+  }
+
+  private ClusterSchemaManager getClusterSchemaManager() {
+    return configManager.getClusterSchemaManager();
   }
 
-  private IPartitionAllocator genPartitionAllocator() {
-    // TODO: The type of PartitionAllocator should be configurable
-    return new GreedyPartitionAllocator(configManager);
+  private PartitionManager getPartitionManager() {
+    return configManager.getPartitionManager();
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java
new file mode 100644
index 00000000000..f334f490888
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.structure.BalanceTreeMap;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class DataAllotTable {
+
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+  private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum();
+
+  private final ReentrantReadWriteLock dataAllotTableLock;
+  private final AtomicReference<TTimePartitionSlot> currentTimePartition;
+  // Map<TimePartitionSlot, DataPartitionCount>
+  // Cache the number of DataPartitions in each future TimePartitionSlot
+  private final TreeMap<TTimePartitionSlot, AtomicInteger> 
dataPartitionCounter;
+  // Map<SeriesPartitionSlot, RegionGroupId>
+  // The optimal allocation of SeriesSlots to RegionGroups in the 
currentTimePartition
+  private final Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotTable;
+
+  public DataAllotTable() {
+    this.dataAllotTableLock = new ReentrantReadWriteLock();
+    this.currentTimePartition = new AtomicReference<>(new 
TTimePartitionSlot(0));
+    this.dataPartitionCounter = new TreeMap<>();
+    this.dataAllotTable = new HashMap<>();
+  }
+
+  public boolean isEmpty() {
+    dataAllotTableLock.readLock().lock();
+    try {
+      return dataAllotTable.isEmpty();
+    } finally {
+      dataAllotTableLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Update the DataAllotTable according to the current DataRegionGroups and 
future DataAllotTable.
+   *
+   * @param dataRegionGroups the current DataRegionGroups
+   * @param allocatedTable the future DataAllotTable, i.e. some SeriesSlots 
have already allocated
+   */
+  public void updateDataAllotTable(
+      List<TConsensusGroupId> dataRegionGroups,
+      Map<TSeriesPartitionSlot, TConsensusGroupId> allocatedTable) {
+    dataAllotTableLock.writeLock().lock();
+    try {
+      // mu is the average number of slots allocated to each regionGroup
+      int mu = SERIES_SLOT_NUM / dataRegionGroups.size();
+      // Decide all SeriesSlot randomly
+      List<TSeriesPartitionSlot> seriesSlotList = new ArrayList<>();
+      for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+        seriesSlotList.add(new TSeriesPartitionSlot(i));
+      }
+      Collections.shuffle(seriesSlotList);
+
+      // The counter will maintain the number of slots allocated to each 
regionGroup
+      BalanceTreeMap<TConsensusGroupId, Integer> counter = new 
BalanceTreeMap<>();
+      Map<TConsensusGroupId, AtomicInteger> regionSlotCounter = new 
HashMap<>();
+      allocatedTable.forEach(
+          (seriesSlot, regionGroupId) ->
+              regionSlotCounter
+                  .computeIfAbsent(regionGroupId, empty -> new 
AtomicInteger(0))
+                  .incrementAndGet());
+      dataRegionGroups.forEach(
+          regionGroupId -> regionSlotCounter.putIfAbsent(regionGroupId, new 
AtomicInteger(0)));
+      regionSlotCounter.forEach(
+          (regionGroupId, slotNum) -> counter.put(regionGroupId, 
slotNum.get()));
+
+      Map<TSeriesPartitionSlot, TConsensusGroupId> newAllotTable = new 
HashMap<>();
+      for (TSeriesPartitionSlot seriesPartitionSlot : seriesSlotList) {
+        TConsensusGroupId oldRegionGroupId = 
dataAllotTable.get(seriesPartitionSlot);
+        if (counter.get(oldRegionGroupId) < mu) {
+          // Inherit the oldRegionGroupId when the slotNum of oldRegionGroupId 
is less than average
+          newAllotTable.put(seriesPartitionSlot, oldRegionGroupId);
+        } else {
+          // Otherwise, choose the regionGroup with the least slotNum to keep 
load balance
+          TConsensusGroupId newRegionGroupId = counter.getKeyWithMinValue();
+          newAllotTable.put(seriesPartitionSlot, newRegionGroupId);
+          counter.put(newRegionGroupId, counter.get(newRegionGroupId) + 1);
+        }
+      }
+
+      dataAllotTable.clear();
+      dataAllotTable.putAll(newAllotTable);
+    } finally {
+      dataAllotTableLock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Update the current time partition and remove the useless time partitions.
+   *
+   * @param regionGroupNum the number of regionGroups
+   * @return whether the current time partition is updated
+   */
+  public boolean updateCurrentTimePartition(int regionGroupNum) {
+    dataAllotTableLock.writeLock().lock();
+    try {
+      AtomicLong newStartTime = new AtomicLong(Long.MIN_VALUE);
+      dataPartitionCounter.forEach(
+          (timePartition, counter) -> {
+            // Select the maximum TimePartition whose slotNum is greater than 
the following equation
+            // Ensure that the remaining slots can be still distributed to new 
regionGroups
+            if (counter.get() >= timePartitionThreshold(regionGroupNum)
+                && timePartition.getStartTime() > newStartTime.get()) {
+              newStartTime.set(timePartition.getStartTime());
+            }
+          });
+
+      if (newStartTime.get() > currentTimePartition.get().getStartTime()) {
+        currentTimePartition.set(new TTimePartitionSlot(newStartTime.get()));
+        dataPartitionCounter
+            .keySet()
+            .forEach(
+                timePartition -> {
+                  // Remove the useless TimePartitions
+                  if (timePartition.getStartTime() < newStartTime.get()) {
+                    dataPartitionCounter.remove(timePartition);
+                  }
+                });
+        return true;
+      }
+    } finally {
+      dataAllotTableLock.writeLock().unlock();
+    }
+    return false;
+  }
+
+  public void addTimePartitionCount(Map<TTimePartitionSlot, Integer> 
timePartitionCountMap) {
+    dataAllotTableLock.writeLock().lock();
+    try {
+      timePartitionCountMap.forEach(
+          (timePartition, count) ->
+              dataPartitionCounter
+                  .computeIfAbsent(timePartition, empty -> new 
AtomicInteger(0))
+                  .addAndGet(count));
+    } finally {
+      dataAllotTableLock.writeLock().unlock();
+    }
+  }
+
+  public TTimePartitionSlot getCurrentTimePartition() {
+    return currentTimePartition.get();
+  }
+
+  public TConsensusGroupId getRegionGroupId(TSeriesPartitionSlot 
seriesPartitionSlot) {
+    dataAllotTableLock.readLock().lock();
+    try {
+      return dataAllotTable.get(seriesPartitionSlot);
+    } finally {
+      dataAllotTableLock.readLock().unlock();
+    }
+  }
+
+  /** Only use this interface when init PartitionBalancer. */
+  public void setCurrentTimePartition(long startTime) {
+    currentTimePartition.set(new TTimePartitionSlot(startTime));
+  }
+
+  /** Only use this interface when init PartitionBalancer. */
+  public void setDataAllotTable(Map<TSeriesPartitionSlot, TConsensusGroupId> 
dataAllotTable) {
+    this.dataAllotTable.putAll(dataAllotTable);
+  }
+
+  public static int timePartitionThreshold(int regionGroupNum) {
+    return (int) (SERIES_SLOT_NUM * (1.0 - 2.0 / regionGroupNum));
+  }
+
+  public void acquireReadLock() {
+    dataAllotTableLock.readLock().lock();
+  }
+
+  public void releaseReadLock() {
+    dataAllotTableLock.readLock().unlock();
+  }
+
+  public void acquireWriteLock() {
+    dataAllotTableLock.writeLock().lock();
+  }
+
+  public void releaseWriteLock() {
+    dataAllotTableLock.writeLock().unlock();
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
deleted file mode 100644
index aacf165a3ea..00000000000
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.manager.load.balancer.partition;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.partition.DataPartitionTable;
-import org.apache.iotdb.commons.partition.SchemaPartitionTable;
-import org.apache.iotdb.commons.partition.SeriesPartitionTable;
-import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
-import org.apache.iotdb.confignode.manager.IManager;
-import org.apache.iotdb.confignode.manager.partition.PartitionManager;
-import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
-import org.apache.iotdb.tsfile.utils.Pair;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/** Allocating new Partitions by greedy algorithm. */
-public class GreedyPartitionAllocator implements IPartitionAllocator {
-
-  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
-  private static final boolean ENABLE_DATA_PARTITION_INHERIT_POLICY =
-      CONF.isEnableDataPartitionInheritPolicy();
-  private static final long TIME_PARTITION_INTERVAL =
-      CommonDescriptor.getInstance().getConfig().getTimePartitionInterval();
-
-  private final IManager configManager;
-
-  public GreedyPartitionAllocator(IManager configManager) {
-    this.configManager = configManager;
-  }
-
-  @Override
-  public Map<String, SchemaPartitionTable> allocateSchemaPartition(
-      Map<String, List<TSeriesPartitionSlot>> 
unassignedSchemaPartitionSlotsMap)
-      throws NoAvailableRegionGroupException {
-    Map<String, SchemaPartitionTable> result = new ConcurrentHashMap<>();
-
-    for (Map.Entry<String, List<TSeriesPartitionSlot>> slotsMapEntry :
-        unassignedSchemaPartitionSlotsMap.entrySet()) {
-      final String storageGroup = slotsMapEntry.getKey();
-      final List<TSeriesPartitionSlot> unassignedPartitionSlots = 
slotsMapEntry.getValue();
-
-      // List<Pair<allocatedSlotsNum, TConsensusGroupId>>
-      List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
-          getPartitionManager()
-              .getSortedRegionGroupSlotsCounter(storageGroup, 
TConsensusGroupType.SchemaRegion);
-
-      // Enumerate SeriesPartitionSlot
-      Map<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionMap = new 
ConcurrentHashMap<>();
-      for (TSeriesPartitionSlot seriesPartitionSlot : 
unassignedPartitionSlots) {
-        // Greedy allocation
-        schemaPartitionMap.put(seriesPartitionSlot, 
regionSlotsCounter.get(0).getRight());
-        // Bubble sort
-        bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter);
-      }
-      result.put(storageGroup, new SchemaPartitionTable(schemaPartitionMap));
-    }
-
-    return result;
-  }
-
-  @Override
-  public Map<String, DataPartitionTable> allocateDataPartition(
-      Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> 
unassignedDataPartitionSlotsMap)
-      throws NoAvailableRegionGroupException {
-    Map<String, DataPartitionTable> result = new ConcurrentHashMap<>();
-
-    for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>> 
slotsMapEntry :
-        unassignedDataPartitionSlotsMap.entrySet()) {
-      final String database = slotsMapEntry.getKey();
-      final Map<TSeriesPartitionSlot, TTimeSlotList> 
unassignedPartitionSlotsMap =
-          slotsMapEntry.getValue();
-
-      // List<Pair<allocatedSlotsNum, TConsensusGroupId>>
-      List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
-          getPartitionManager()
-              .getSortedRegionGroupSlotsCounter(database, 
TConsensusGroupType.DataRegion);
-
-      DataPartitionTable dataPartitionTable = new DataPartitionTable();
-
-      // Enumerate SeriesPartitionSlot
-      for (Map.Entry<TSeriesPartitionSlot, TTimeSlotList> seriesPartitionEntry 
:
-          unassignedPartitionSlotsMap.entrySet()) {
-        SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable();
-
-        // Enumerate TimePartitionSlot in ascending order
-        List<TTimePartitionSlot> timePartitionSlots =
-            seriesPartitionEntry.getValue().getTimePartitionSlots();
-        
timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime));
-        for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
-
-          /* 1. Inherit policy */
-          if (ENABLE_DATA_PARTITION_INHERIT_POLICY) {
-            // Check if the current Partition's neighbor(predecessor or 
successor)
-            // is allocated in the same batch of Partition creation
-            TConsensusGroupId neighbor =
-                seriesPartitionTable.getAdjacentDataPartition(
-                    timePartitionSlot, TIME_PARTITION_INTERVAL);
-            if (neighbor != null) {
-              seriesPartitionTable
-                  .getSeriesPartitionMap()
-                  .put(timePartitionSlot, Collections.singletonList(neighbor));
-              bubbleSort(neighbor, regionSlotsCounter);
-              continue;
-            }
-
-            // Check if the current Partition's neighbor(predecessor or 
successor)
-            // was allocated in the former Partition creation
-            neighbor =
-                getPartitionManager()
-                    .getAdjacentDataPartition(
-                        database,
-                        seriesPartitionEntry.getKey(),
-                        timePartitionSlot,
-                        TIME_PARTITION_INTERVAL);
-            if (neighbor != null) {
-              seriesPartitionTable
-                  .getSeriesPartitionMap()
-                  .put(timePartitionSlot, Collections.singletonList(neighbor));
-              bubbleSort(neighbor, regionSlotsCounter);
-              continue;
-            }
-          }
-
-          /* 2. Greedy policy */
-          seriesPartitionTable
-              .getSeriesPartitionMap()
-              .put(
-                  timePartitionSlot,
-                  
Collections.singletonList(regionSlotsCounter.get(0).getRight()));
-          bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter);
-        }
-        dataPartitionTable
-            .getDataPartitionMap()
-            .put(seriesPartitionEntry.getKey(), seriesPartitionTable);
-      }
-      result.put(database, dataPartitionTable);
-    }
-
-    return result;
-  }
-
-  /**
-   * Bubble sort the regionSlotsCounter from the specified consensus group
-   *
-   * <p>Notice: Here we use bubble sort instead of other sorting algorithm is 
because that, there is
-   * only one Partition allocated in each loop. Therefore, only consider one 
consensus group weight
-   * change is enough
-   *
-   * @param consensusGroupId The consensus group where the new Partition is 
allocated
-   * @param regionSlotsCounter List<Pair<Allocated Partition num, 
TConsensusGroupId>>
-   */
-  private void bubbleSort(
-      TConsensusGroupId consensusGroupId, List<Pair<Long, TConsensusGroupId>> 
regionSlotsCounter) {
-    // Find the corresponding consensus group
-    int index = 0;
-    for (int i = 0; i < regionSlotsCounter.size(); i++) {
-      if (regionSlotsCounter.get(i).getRight().equals(consensusGroupId)) {
-        index = i;
-        break;
-      }
-    }
-
-    // Do bubble sort
-    
regionSlotsCounter.get(index).setLeft(regionSlotsCounter.get(index).getLeft() + 
1);
-    while (index < regionSlotsCounter.size() - 1
-        && regionSlotsCounter.get(index).getLeft() > 
regionSlotsCounter.get(index + 1).getLeft()) {
-      Collections.swap(regionSlotsCounter, index, index + 1);
-      index += 1;
-    }
-  }
-
-  private PartitionManager getPartitionManager() {
-    return configManager.getPartitionManager();
-  }
-}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java
deleted file mode 100644
index 62ef53d9655..00000000000
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.manager.load.balancer.partition;
-
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.commons.partition.DataPartitionTable;
-import org.apache.iotdb.commons.partition.SchemaPartitionTable;
-import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
-import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * The IPartitionAllocator is a functional interface, which means a new 
functional class who
- * implements the IPartitionAllocator must be created for each Partition 
allocation.
- */
-public interface IPartitionAllocator {
-
-  /**
-   * Allocate SchemaPartitions
-   *
-   * @param unassignedSchemaPartitionSlotsMap SchemaPartitionSlots that should 
be assigned
-   * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
-   */
-  Map<String, SchemaPartitionTable> allocateSchemaPartition(
-      Map<String, List<TSeriesPartitionSlot>> 
unassignedSchemaPartitionSlotsMap)
-      throws NoAvailableRegionGroupException;
-
-  /**
-   * Allocate DataPartitions
-   *
-   * @param unassignedDataPartitionSlotsMap DataPartitionSlots that should be 
assigned
-   * @return Map<StorageGroupName, DataPartitionTable>, the allocating result
-   */
-  Map<String, DataPartitionTable> allocateDataPartition(
-      Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> 
unassignedDataPartitionSlotsMap)
-      throws NoAvailableRegionGroupException;
-}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index fae18091d36..a671150a308 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -41,6 +41,7 @@ import 
org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
 import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.partition.CountTimeSlotListPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.partition.GetNodePathsPartitionPlan;
@@ -85,6 +86,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -265,14 +267,13 @@ public class PartitionManager {
       // Cache allocating result only if the current ConfigNode still holds 
its leadership
       CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
       createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
-      status = getConsensusManager().confirmLeader();
+
+      status = consensusWritePartitionResult(createPlan);
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        // Here we check the leadership second time
-        // since the RegionGroup creating process might take some time
+        // The allocation might fail due to consensus error
         resp.setStatus(status);
         return resp;
       }
-      getConsensusManager().write(createPlan);
     }
 
     resp = (SchemaPartitionResp) getSchemaPartition(req);
@@ -389,14 +390,16 @@ public class PartitionManager {
       // Cache allocating result only if the current ConfigNode still holds 
its leadership
       CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
       createPlan.setAssignedDataPartition(assignedDataPartition);
-      status = getConsensusManager().confirmLeader();
+
+      status = consensusWritePartitionResult(createPlan);
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        // Here we check the leadership second time
-        // since the RegionGroup creating process might take some time
+        // The allocation might fail due to consensus error
         resp.setStatus(status);
         return resp;
       }
-      getConsensusManager().write(createPlan);
+
+      // Statistical allocation result and re-balance the DataPartitionPolicy 
if necessary
+      
getLoadManager().reBalancePartitionPolicyIfNecessary(assignedDataPartition);
     }
 
     resp = (DataPartitionResp) getDataPartition(req);
@@ -431,6 +434,24 @@ public class PartitionManager {
     return resp;
   }
 
+  private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) {
+    TSStatus status = getConsensusManager().confirmLeader();
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      // Here we check the leadership second time
+      // since the RegionGroup creating process might take some time
+      return status;
+    }
+
+    ConsensusWriteResponse writeResp = getConsensusManager().write(plan);
+    if (!writeResp.isSuccessful()) {
+      // The allocation might fail due to consensus error
+      status = writeResp.getStatus();
+      status.setMessage(writeResp.getErrorMessage());
+      LOGGER.error("Write DataPartition allocation result failed because: {}", 
status);
+    }
+    return status;
+  }
+
   // ======================================================
   // Leader scheduling interfaces
   // ======================================================
@@ -590,22 +611,20 @@ public class PartitionManager {
   }
 
   /**
-   * Only leader use this interface. Checks whether the specified 
DataPartition has a predecessor or
-   * successor and returns if it does
+   * Only leader use this interface. Checks whether the specified 
DataPartition has a successor and
+   * returns if it does.
    *
    * @param database DatabaseName
    * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
    * @param timePartitionSlot Corresponding TimePartitionSlot
-   * @param timePartitionInterval Time partition interval
    * @return The specific DataPartition's predecessor if exists, null otherwise
    */
-  public TConsensusGroupId getAdjacentDataPartition(
+  public TConsensusGroupId getSuccessorDataPartition(
       String database,
       TSeriesPartitionSlot seriesPartitionSlot,
-      TTimePartitionSlot timePartitionSlot,
-      long timePartitionInterval) {
-    return partitionInfo.getAdjacentDataPartition(
-        database, seriesPartitionSlot, timePartitionSlot, 
timePartitionInterval);
+      TTimePartitionSlot timePartitionSlot) {
+    return partitionInfo.getSuccessorDataPartition(
+        database, seriesPartitionSlot, timePartitionSlot);
   }
 
   /**
@@ -711,6 +730,21 @@ public class PartitionManager {
     return partitionInfo.getRegionGroupCount(database, type);
   }
 
+  /**
+   * Only leader use this interface.
+   *
+   * <p>Get all the RegionGroups currently owned by the specified Database
+   *
+   * @param database DatabaseName
+   * @param type SchemaRegion or DataRegion
+   * @return List of TConsensusGroupId
+   * @throws DatabaseNotExistsException When the specified Database doesn't 
exist
+   */
+  public List<TConsensusGroupId> getAllRegionGroupIds(String database, 
TConsensusGroupType type)
+      throws DatabaseNotExistsException {
+    return partitionInfo.getAllRegionGroupIds(database, type);
+  }
+
   /**
    * Check if the specified Database exists.
    *
@@ -1249,6 +1283,63 @@ public class PartitionManager {
     partitionInfo.getDataRegionIds(databases, dataRegionIds);
   }
 
+  /**
+   * Get the max TimePartitionSlot of the specified Database.
+   *
+   * @param database The specified Database
+   * @return The max TimePartitionSlot, null if the Database doesn't exist or 
there are no
+   *     DataPartitions yet
+   */
+  public TTimePartitionSlot getMaxTimePartitionSlot(String database) {
+    return partitionInfo.getMaxTimePartitionSlot(database);
+  }
+
+  /**
+   * Get the min TimePartitionSlot of the specified Database.
+   *
+   * @param database The specified Database
+   * @return The last DataPartition, null if the Database doesn't exist or 
there are no
+   *     DataPartitions in the specified SeriesPartitionSlot
+   */
+  public TTimePartitionSlot getMinTimePartitionSlot(String database) {
+    return partitionInfo.getMinTimePartitionSlot(database);
+  }
+
+  /**
+   * Get the DataPartition with max TimePartition of the specified Database 
and the
+   * SeriesPartitionSlot.
+   *
+   * @param database The specified Database
+   * @param seriesPartitionSlot The specified SeriesPartitionSlot
+   * @return The last DataPartition, null if the Database doesn't exist or 
there are no
+   *     DataPartitions yet
+   */
+  public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition(
+      String database, TSeriesPartitionSlot seriesPartitionSlot) {
+    return partitionInfo.getLastDataPartition(database, seriesPartitionSlot);
+  }
+
+  /**
+   * Count SeriesSlot in the specified TimePartitionSlot of the Database.
+   *
+   * @param database The specified Database
+   * @param timePartitionSlot The specified TimePartitionSlot
+   * @return The count of SeriesSlot
+   */
+  public int countSeriesSlot(String database, TTimePartitionSlot 
timePartitionSlot) {
+    return partitionInfo.countSeriesSlot(database, timePartitionSlot);
+  }
+
+  /**
+   * Get the last DataAllotTable of the specified Database.
+   *
+   * @param database The specified Database
+   * @return The last DataAllotTable
+   */
+  public Map<TSeriesPartitionSlot, TConsensusGroupId> 
getLastDataAllotTable(String database) {
+    return partitionInfo.getLastDataAllotTable(database);
+  }
+
   public ScheduledExecutorService getRegionMaintainer() {
     return regionMaintainer;
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index b2ca2faf6b7..c1516e6f7c9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -220,6 +220,27 @@ public class DatabasePartitionTable {
     return result.getAndIncrement();
   }
 
+  /**
+   * Only leader use this interface.
+   *
+   * <p>Get all the RegionGroups currently owned by the specified Database
+   *
+   * @param type SchemaRegion or DataRegion
+   * @return List of TConsensusGroupId
+   */
+  public List<TConsensusGroupId> getAllRegionGroupIds(TConsensusGroupType 
type) {
+    List<TConsensusGroupId> result = new Vector<>();
+    regionGroupMap
+        .values()
+        .forEach(
+            regionGroup -> {
+              if (regionGroup.getId().getType().equals(type)) {
+                result.add(regionGroup.getId());
+              }
+            });
+    return result;
+  }
+
   public int getAssignedSeriesPartitionSlotsCount() {
     return Math.max(
         schemaPartitionTable.getSchemaPartitionMap().size(),
@@ -251,19 +272,15 @@ public class DatabasePartitionTable {
   }
 
   /**
-   * Checks whether the specified DataPartition has a predecessor and returns 
if it does.
+   * Checks whether the specified DataPartition has a successor and returns if 
it does.
    *
    * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
    * @param timePartitionSlot Corresponding TimePartitionSlot
-   * @param timePartitionInterval Time partition interval
    * @return The specific DataPartition's predecessor if exists, null otherwise
    */
-  public TConsensusGroupId getAdjacentDataPartition(
-      TSeriesPartitionSlot seriesPartitionSlot,
-      TTimePartitionSlot timePartitionSlot,
-      long timePartitionInterval) {
-    return dataPartitionTable.getAdjacentDataPartition(
-        seriesPartitionSlot, timePartitionSlot, timePartitionInterval);
+  public TConsensusGroupId getSuccessorDataPartition(
+      TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot 
timePartitionSlot) {
+    return dataPartitionTable.getSuccessorDataPartition(seriesPartitionSlot, 
timePartitionSlot);
   }
 
   /**
@@ -553,6 +570,55 @@ public class DatabasePartitionTable {
     return dataRegionIds;
   }
 
+  /**
+   * Get the max TimePartitionSlot.
+   *
+   * @return The max TimePartitionSlot, null if there are no DataPartitions yet
+   */
+  public TTimePartitionSlot getMaxTimePartitionSlot() {
+    return dataPartitionTable.getMaxTimePartitionSlot();
+  }
+
+  /**
+   * Get the min TimePartitionSlot.
+   *
+   * @return The min TimePartitionSlot, null if there are no DataPartitions yet
+   */
+  public TTimePartitionSlot getMinTimePartitionSlot() {
+    return dataPartitionTable.getMinTimePartitionSlot();
+  }
+
+  /**
+   * Get the DataPartition with max TimePartition of the specified the 
SeriesPartitionSlot.
+   *
+   * @param seriesPartitionSlot The specified SeriesPartitionSlot
+   * @return The last DataPartition, null if there are no DataPartitions in 
the specified
+   *     SeriesPartitionSlot
+   */
+  public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition(
+      TSeriesPartitionSlot seriesPartitionSlot) {
+    return dataPartitionTable.getLastDataPartition(seriesPartitionSlot);
+  }
+
+  /**
+   * Count SeriesSlot in the specified TimePartitionSlot.
+   *
+   * @param timePartitionSlot The specified TimePartitionSlot
+   * @return The count of SeriesSlot
+   */
+  public int countSeriesSlot(TTimePartitionSlot timePartitionSlot) {
+    return dataPartitionTable.countSeriesSlot(timePartitionSlot);
+  }
+
+  /**
+   * Get the last DataAllotTable.
+   *
+   * @return The last DataAllotTable
+   */
+  public Map<TSeriesPartitionSlot, TConsensusGroupId> getLastDataAllotTable() {
+    return dataPartitionTable.getLastDataAllotTable();
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index d85c63e4e10..a0e75d29168 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -79,6 +79,7 @@ import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -385,24 +386,21 @@ public class PartitionInfo implements SnapshotProcessor {
   }
 
   /**
-   * Checks whether the specified DataPartition has a predecessor or successor 
and returns if it
-   * does.
+   * Checks whether the specified DataPartition has a successor and returns if 
it does.
    *
    * @param database DatabaseName
    * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
    * @param timePartitionSlot Corresponding TimePartitionSlot
-   * @param timePartitionInterval Time partition interval
    * @return The specific DataPartition's predecessor if exists, null otherwise
    */
-  public TConsensusGroupId getAdjacentDataPartition(
+  public TConsensusGroupId getSuccessorDataPartition(
       String database,
       TSeriesPartitionSlot seriesPartitionSlot,
-      TTimePartitionSlot timePartitionSlot,
-      long timePartitionInterval) {
-    if (databasePartitionTables.containsKey(database)) {
+      TTimePartitionSlot timePartitionSlot) {
+    if (isDatabaseExisted(database)) {
       return databasePartitionTables
           .get(database)
-          .getAdjacentDataPartition(seriesPartitionSlot, timePartitionSlot, 
timePartitionInterval);
+          .getSuccessorDataPartition(seriesPartitionSlot, timePartitionSlot);
     } else {
       return null;
     }
@@ -722,6 +720,25 @@ public class PartitionInfo implements SnapshotProcessor {
     return databasePartitionTables.get(database).getRegionGroupCount(type);
   }
 
+  /**
+   * Only leader use this interface.
+   *
+   * <p>Get all the RegionGroups currently owned by the specified Database
+   *
+   * @param database DatabaseName
+   * @param type SchemaRegion or DataRegion
+   * @return List of TConsensusGroupId
+   * @throws DatabaseNotExistsException When the specified Database doesn't 
exist
+   */
+  public List<TConsensusGroupId> getAllRegionGroupIds(String database, 
TConsensusGroupType type)
+      throws DatabaseNotExistsException {
+    if (!isDatabaseExisted(database)) {
+      throw new DatabaseNotExistsException(database);
+    }
+
+    return databasePartitionTables.get(database).getAllRegionGroupIds(type);
+  }
+
   /**
    * Only leader use this interface.
    *
@@ -761,14 +778,14 @@ public class PartitionInfo implements SnapshotProcessor {
   /**
    * Only leader use this interface.
    *
-   * @param storageGroup StorageGroupName
+   * @param database DatabaseName
    * @param type SchemaRegion or DataRegion
    * @return The StorageGroup's Running or Available Regions that sorted by 
the number of allocated
    *     slots
    */
   public List<Pair<Long, TConsensusGroupId>> getRegionGroupSlotsCounter(
-      String storageGroup, TConsensusGroupType type) {
-    return 
databasePartitionTables.get(storageGroup).getRegionGroupSlotsCounter(type);
+      String database, TConsensusGroupType type) {
+    return 
databasePartitionTables.get(database).getRegionGroupSlotsCounter(type);
   }
 
   /**
@@ -784,6 +801,78 @@ public class PartitionInfo implements SnapshotProcessor {
     return schemaPartitionSet;
   }
 
+  /**
+   * Get the max TimePartitionSlot of the specified Database.
+   *
+   * @param database The specified Database
+   * @return The max TimePartitionSlot, null if the Database doesn't exist or 
there are no
+   *     DataPartitions yet
+   */
+  public TTimePartitionSlot getMaxTimePartitionSlot(String database) {
+    if (isDatabaseExisted(database)) {
+      return databasePartitionTables.get(database).getMaxTimePartitionSlot();
+    }
+    return null;
+  }
+
+  /**
+   * Get the min TimePartitionSlot of the specified Database.
+   *
+   * @param database The specified Database
+   * @return The min TimePartitionSlot, null if the Database doesn't exist or 
there are no
+   *     DataPartitions yet
+   */
+  public TTimePartitionSlot getMinTimePartitionSlot(String database) {
+    if (isDatabaseExisted(database)) {
+      return databasePartitionTables.get(database).getMinTimePartitionSlot();
+    }
+    return null;
+  }
+
+  /**
+   * Get the DataPartition with max TimePartition of the specified Database 
and the
+   * SeriesPartitionSlot.
+   *
+   * @param database The specified Database
+   * @param seriesPartitionSlot The specified SeriesPartitionSlot
+   * @return The last DataPartition, null if the Database doesn't exist or 
there are no
+   *     DataPartitions in the specified SeriesPartitionSlot
+   */
+  public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition(
+      String database, TSeriesPartitionSlot seriesPartitionSlot) {
+    if (isDatabaseExisted(database)) {
+      return 
databasePartitionTables.get(database).getLastDataPartition(seriesPartitionSlot);
+    }
+    return null;
+  }
+
+  /**
+   * Count SeriesSlot in the specified TimePartitionSlot of the Database.
+   *
+   * @param database The specified Database
+   * @param timePartitionSlot The specified TimePartitionSlot
+   * @return The count of SeriesSlot
+   */
+  public int countSeriesSlot(String database, TTimePartitionSlot 
timePartitionSlot) {
+    if (isDatabaseExisted(database)) {
+      return 
databasePartitionTables.get(database).countSeriesSlot(timePartitionSlot);
+    }
+    return 0;
+  }
+
+  /**
+   * Get the last DataAllotTable of the specified Database.
+   *
+   * @param database The specified Database
+   * @return The last DataAllotTable
+   */
+  public Map<TSeriesPartitionSlot, TConsensusGroupId> 
getLastDataAllotTable(String database) {
+    if (isDatabaseExisted(database)) {
+      return databasePartitionTables.get(database).getLastDataAllotTable();
+    }
+    return new HashMap<>();
+  }
+
   @Override
   public boolean processTakeSnapshot(File snapshotDir) throws TException, 
IOException {
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
index 78abfbca2dc..3072746b378 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
@@ -201,6 +201,12 @@ public class CreateRegionGroupsProcedure
         setNextState(CreateRegionGroupsState.CREATE_REGION_GROUPS_FINISH);
         break;
       case CREATE_REGION_GROUPS_FINISH:
+        // Update all corresponding DataAllotTables
+        createRegionGroupsPlan
+            .getRegionGroupMap()
+            .keySet()
+            .forEach(
+                database -> 
env.getConfigManager().getLoadManager().updateDataAllotTable(database));
         return Flow.NO_MORE_STATE;
     }
 
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 2a6ead060ec..08cc77c3c35 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -128,13 +128,6 @@ cluster_name=defaultCluster
 # data_region_per_data_node=5.0
 
 
-# Whether to enable the DataPartition inherit policy.
-# DataPartition within the same SeriesPartitionSlot will inherit the 
allocation result of
-# the predecessor or successor TimePartitionSlot if set true
-# Datatype: Boolean
-# enable_data_partition_inherit_policy=true
-
-
 # The policy of cluster RegionGroups' leader distribution.
 # E.g. we should balance cluster RegionGroups' leader distribution when some 
DataNodes are shutdown or re-connected.
 # These policies are currently supported:
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
index 1a5611536ae..6c2108f5898 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.apache.thrift.TException;
@@ -34,12 +35,15 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 public class DataPartitionTable {
@@ -99,22 +103,16 @@ public class DataPartitionTable {
   }
 
   /**
-   * Checks whether the specified DataPartition has a predecessor or successor 
and returns if it
-   * does
+   * Checks whether the specified DataPartition has a successor and returns if 
it does.
    *
    * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
    * @param timePartitionSlot Corresponding TimePartitionSlot
-   * @param timePartitionInterval Time partition interval
    * @return The specific DataPartition's predecessor if exists, null otherwise
    */
-  public TConsensusGroupId getAdjacentDataPartition(
-      TSeriesPartitionSlot seriesPartitionSlot,
-      TTimePartitionSlot timePartitionSlot,
-      long timePartitionInterval) {
+  public TConsensusGroupId getSuccessorDataPartition(
+      TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot 
timePartitionSlot) {
     if (dataPartitionMap.containsKey(seriesPartitionSlot)) {
-      return dataPartitionMap
-          .get(seriesPartitionSlot)
-          .getAdjacentDataPartition(timePartitionSlot, timePartitionInterval);
+      return 
dataPartitionMap.get(seriesPartitionSlot).getSuccessorDataPartition(timePartitionSlot);
     } else {
       return null;
     }
@@ -236,6 +234,111 @@ public class DataPartitionTable {
         .collect(Collectors.toList());
   }
 
+  /**
+   * Get the max TimePartitionSlot of the specified Database.
+   *
+   * @return The max TimePartitionSlot, null if there are no DataPartitions yet
+   */
+  public TTimePartitionSlot getMaxTimePartitionSlot() {
+    AtomicReference<TTimePartitionSlot> maxTimeSlot =
+        new AtomicReference<>(new TTimePartitionSlot(0));
+    dataPartitionMap
+        .values()
+        .forEach(
+            seriesPartitionTable -> {
+              TTimePartitionSlot timePartitionSlot = 
seriesPartitionTable.getMaxTimePartitionSlot();
+              if (timePartitionSlot != null
+                  && timePartitionSlot.getStartTime() > 
maxTimeSlot.get().getStartTime()) {
+                maxTimeSlot.set(timePartitionSlot);
+              }
+            });
+    return maxTimeSlot.get().getStartTime() > 0 ? maxTimeSlot.get() : null;
+  }
+
+  /**
+   * Get the min TimePartitionSlot of the specified Database.
+   *
+   * @return The min TimePartitionSlot, null if there are no DataPartitions yet
+   */
+  public TTimePartitionSlot getMinTimePartitionSlot() {
+    AtomicReference<TTimePartitionSlot> minTimeSlot =
+        new AtomicReference<>(new TTimePartitionSlot(Long.MAX_VALUE));
+    dataPartitionMap
+        .values()
+        .forEach(
+            seriesPartitionTable -> {
+              TTimePartitionSlot timePartitionSlot = 
seriesPartitionTable.getMinTimePartitionSlot();
+              if (timePartitionSlot != null
+                  && timePartitionSlot.getStartTime() < 
minTimeSlot.get().getStartTime()) {
+                minTimeSlot.set(timePartitionSlot);
+              }
+            });
+    return minTimeSlot.get().getStartTime() < Long.MAX_VALUE ? 
minTimeSlot.get() : null;
+  }
+
+  /**
+   * Get the DataPartition with max TimePartition of the specified Database 
and the
+   * SeriesPartitionSlot.
+   *
+   * @param seriesPartitionSlot The specified SeriesPartitionSlot
+   * @return The last DataPartition, null if there are no DataPartitions in 
the specified
+   *     SeriesPartitionSlot
+   */
+  public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition(
+      TSeriesPartitionSlot seriesPartitionSlot) {
+    if (dataPartitionMap.containsKey(seriesPartitionSlot)) {
+      return dataPartitionMap.get(seriesPartitionSlot).getLastDataPartition();
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Count SeriesSlot in the specified TimePartitionSlot of the Database.
+   *
+   * @param timePartitionSlot The specified TimePartitionSlot
+   * @return The count of SeriesSlot
+   */
+  public int countSeriesSlot(TTimePartitionSlot timePartitionSlot) {
+    AtomicInteger count = new AtomicInteger(0);
+    dataPartitionMap
+        .values()
+        .forEach(
+            seriesPartitionTable ->
+                
count.addAndGet(seriesPartitionTable.isDataPartitionExist(timePartitionSlot)));
+    return count.get();
+  }
+
+  /**
+   * Get the last DataAllotTable.
+   *
+   * @return The last DataAllotTable
+   */
+  public Map<TSeriesPartitionSlot, TConsensusGroupId> getLastDataAllotTable() {
+    Map<TSeriesPartitionSlot, TConsensusGroupId> result = new HashMap<>();
+    dataPartitionMap.forEach(
+        (seriesPartitionSlot, seriesPartitionTable) ->
+            result.put(seriesPartitionSlot, 
seriesPartitionTable.getLastConsensusGroupId()));
+    return result;
+  }
+
+  /**
+   * Get the number of DataPartitions in each TimePartitionSlot
+   *
+   * @return Map<TimePartitionSlot, the number of DataPartitions>
+   */
+  public Map<TTimePartitionSlot, Integer> getTimeSlotCountMap() {
+    Map<TTimePartitionSlot, Integer> result = new ConcurrentHashMap<>();
+    dataPartitionMap.forEach(
+        (seriesPartitionSlot, seriesPartitionTable) ->
+            seriesPartitionTable
+                .getTimeSlotCountMap()
+                .forEach(
+                    (timePartitionSlot, count) ->
+                        result.merge(timePartitionSlot, count, Integer::sum)));
+    return result;
+  }
+
   public void serialize(OutputStream outputStream, TProtocol protocol)
       throws IOException, TException {
     ReadWriteIOUtils.write(dataPartitionMap.size(), outputStream);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
index 47216cc41ce..def3cc07fd6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.apache.thrift.TException;
@@ -33,10 +34,12 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.TreeMap;
 import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,20 +48,24 @@ import java.util.stream.Collectors;
 
 public class SeriesPartitionTable {
 
-  private final Map<TTimePartitionSlot, List<TConsensusGroupId>> 
seriesPartitionMap;
+  private final TreeMap<TTimePartitionSlot, List<TConsensusGroupId>> 
seriesPartitionMap;
 
   public SeriesPartitionTable() {
-    this.seriesPartitionMap = new ConcurrentHashMap<>();
+    this.seriesPartitionMap = new TreeMap<>();
   }
 
   public SeriesPartitionTable(Map<TTimePartitionSlot, List<TConsensusGroupId>> 
seriesPartitionMap) {
-    this.seriesPartitionMap = seriesPartitionMap;
+    this.seriesPartitionMap = new TreeMap<>(seriesPartitionMap);
   }
 
   public Map<TTimePartitionSlot, List<TConsensusGroupId>> 
getSeriesPartitionMap() {
     return seriesPartitionMap;
   }
 
+  public void putDataPartition(TTimePartitionSlot timePartitionSlot, 
TConsensusGroupId groupId) {
+    seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new 
ArrayList<>()).add(groupId);
+  }
+
   /**
    * Thread-safely get DataPartition within the specific StorageGroup
    *
@@ -119,30 +126,14 @@ public class SeriesPartitionTable {
   }
 
   /**
-   * Checks whether the specified DataPartition has a predecessor or successor 
and returns if it
-   * does
+   * Check and return the specified DataPartition's successor
    *
    * @param timePartitionSlot Corresponding TimePartitionSlot
-   * @param timePartitionInterval Time partition interval
-   * @return The specific DataPartition's predecessor if exists, null otherwise
+   * @return The specified DataPartition's successor if exists, null otherwise
    */
-  public TConsensusGroupId getAdjacentDataPartition(
-      TTimePartitionSlot timePartitionSlot, long timePartitionInterval) {
-    if (timePartitionSlot.getStartTime() >= timePartitionInterval) {
-      // Check predecessor first
-      TTimePartitionSlot predecessorSlot =
-          new TTimePartitionSlot(timePartitionSlot.getStartTime() - 
timePartitionInterval);
-      TConsensusGroupId predecessor =
-          seriesPartitionMap.getOrDefault(predecessorSlot, 
Collections.singletonList(null)).get(0);
-      if (predecessor != null) {
-        return predecessor;
-      }
-    }
-
-    // Check successor
-    TTimePartitionSlot successorSlot =
-        new TTimePartitionSlot(timePartitionSlot.getStartTime() + 
timePartitionInterval);
-    return seriesPartitionMap.getOrDefault(successorSlot, 
Collections.singletonList(null)).get(0);
+  public TConsensusGroupId getSuccessorDataPartition(TTimePartitionSlot 
timePartitionSlot) {
+    TTimePartitionSlot successorSlot = 
seriesPartitionMap.higherKey(timePartitionSlot);
+    return successorSlot == null ? null : 
seriesPartitionMap.get(successorSlot).get(0);
   }
 
   /**
@@ -225,6 +216,84 @@ public class SeriesPartitionTable {
     return result;
   }
 
+  /**
+   * Get the max TimePartitionSlot of the specified Database.
+   *
+   * @return The max TimePartitionSlot, null if there are no DataPartitions yet
+   */
+  public TTimePartitionSlot getMaxTimePartitionSlot() {
+    try {
+      return seriesPartitionMap.lastKey();
+    } catch (NoSuchElementException e) {
+      return null;
+    }
+  }
+
+  /**
+   * Get the min TimePartitionSlot of the specified Database.
+   *
+   * @return The min TimePartitionSlot, null if there are no DataPartitions yet
+   */
+  public TTimePartitionSlot getMinTimePartitionSlot() {
+    try {
+      return seriesPartitionMap.firstKey();
+    } catch (NoSuchElementException e) {
+      return null;
+    }
+  }
+
+  /**
+   * Get the DataPartition with max TimePartition of the specified Database 
and the
+   * SeriesPartitionSlot.
+   *
+   * @return The last DataPartition, null if there are no DataPartitions
+   */
+  public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition() {
+    try {
+      Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry =
+          seriesPartitionMap.lastEntry();
+      return new Pair<>(
+          lastEntry.getKey(), 
lastEntry.getValue().get(lastEntry.getValue().size() - 1));
+    } catch (NoSuchElementException e) {
+      return null;
+    }
+  }
+
+  /**
+   * Check whether the specified DataPartition exists
+   *
+   * @param timePartitionSlot Corresponding TimePartitionSlot
+   * @return 1 if exists, 0 otherwise
+   */
+  public int isDataPartitionExist(TTimePartitionSlot timePartitionSlot) {
+    return seriesPartitionMap.containsKey(timePartitionSlot) ? 1 : 0;
+  }
+
+  /**
+   * Get the last DataPartition's ConsensusGroupId
+   *
+   * @return The last DataPartition's ConsensusGroupId, null if there are no 
DataPartitions yet
+   */
+  public TConsensusGroupId getLastConsensusGroupId() {
+    try {
+      return seriesPartitionMap.lastEntry().getValue().get(0);
+    } catch (NoSuchElementException e) {
+      return null;
+    }
+  }
+
+  /**
+   * Get the number of DataPartitions in each TimePartitionSlot
+   *
+   * @return Map<TimePartitionSlot, the number of DataPartitions>
+   */
+  public Map<TTimePartitionSlot, Integer> getTimeSlotCountMap() {
+    Map<TTimePartitionSlot, Integer> result = new HashMap<>();
+    seriesPartitionMap.forEach(
+        (timePartitionSlot, consensusGroupIds) -> 
result.put(timePartitionSlot, 1));
+    return result;
+  }
+
   public void serialize(OutputStream outputStream, TProtocol protocol)
       throws IOException, TException {
     ReadWriteIOUtils.write(seriesPartitionMap.size(), outputStream);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
new file mode 100644
index 00000000000..f1ecdc3a5fb
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.structure;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+
+public class BalanceTreeMap<K, V extends Comparable<V>> {
+
+  private final HashMap<K, V> keyValueMap;
+  private final TreeMap<V, Set<K>> valueKeyMap;
+
+  public BalanceTreeMap() {
+    this.keyValueMap = new HashMap<>();
+    this.valueKeyMap = new TreeMap<>();
+  }
+
+  /**
+   * Put or modify a key-value pair.
+   *
+   * @param key Key
+   * @param value Value
+   */
+  public void put(K key, V value) {
+    V oldValue = keyValueMap.getOrDefault(key, null);
+
+    // Update keyValueMap
+    keyValueMap.put(key, value);
+
+    // Update valueKeyMap
+    if (oldValue != null) {
+      valueKeyMap.get(oldValue).remove(key);
+      if (valueKeyMap.get(oldValue).isEmpty()) {
+        valueKeyMap.remove(oldValue);
+      }
+    }
+    valueKeyMap.computeIfAbsent(value, empty -> new HashSet<>()).add(key);
+  }
+
+  public V get(K key) {
+    return keyValueMap.getOrDefault(key, null);
+  }
+
+  public boolean containsKey(K key) {
+    return keyValueMap.containsKey(key);
+  }
+
+  /**
+   * Get key with minimum value.
+   *
+   * @return Key with minimum value
+   */
+  public K getKeyWithMinValue() {
+    return valueKeyMap.firstEntry().getValue().iterator().next();
+  }
+}

Reply via email to