This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch optimize_insert_first
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/optimize_insert_first by this
push:
new 532ed5011be optimization of the time-consuming first Insert statement
(#12078)
532ed5011be is described below
commit 532ed5011be60c066d626401f2754e0218087c08
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Mon Feb 26 10:26:58 2024 +0800
optimization of the time-consuming first Insert statement (#12078)
---
.../java/org/apache/iotdb/cli/AbstractCli.java | 5 +++++
.../iotdb/confignode/manager/load/LoadManager.java | 6 +++++
.../manager/load/balancer/RouteBalancer.java | 26 ++++++++++++++++++++--
.../confignode/manager/load/cache/LoadCache.java | 9 +++++---
.../manager/load/cache/route/RegionRouteCache.java | 16 +++++++++++++
.../procedure/env/ConfigNodeProcedureEnv.java | 2 ++
.../confignode1conf/iotdb-common.properties | 8 +++----
.../schemaregion/SchemaRegionLoader.java | 14 ++++++++++++
8 files changed, 77 insertions(+), 9 deletions(-)
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
index 1ba700a0a94..0dc81f0a889 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
@@ -32,6 +32,8 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
@@ -48,6 +50,7 @@ import java.util.List;
import java.util.Set;
public abstract class AbstractCli {
+ private static final Logger logger =
LoggerFactory.getLogger(AbstractCli.class);
static final String HOST_ARGS = "h";
static final String HOST_NAME = "host";
@@ -596,6 +599,8 @@ public abstract class AbstractCli {
}
} else {
ctx.getPrinter().println("Msg: " + SUCCESS_MESSAGE);
+ long costTime = System.currentTimeMillis() - startTime;
+ logger.info("INSERT_FIRST: execute successfully, costTime: {}ms",
costTime);
}
} catch (Exception e) {
ctx.getPrinter().println("Msg: " + e);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index b52a2d5a1dc..3c5b197d073 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -413,4 +413,10 @@ public class LoadManager {
public void removeRegionRouteCache(TConsensusGroupId regionGroupId) {
loadCache.removeRegionRouteCache(regionGroupId);
}
+
+ /** Force balance the region leader. */
+ public void forceBalanceRegionLeader() {
+ routeBalancer.balanceRegionLeader();
+ routeBalancer.balanceRegionPriority();
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index f8f1c92484f..088e2b12617 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
@@ -74,12 +75,14 @@ public class RouteBalancer {
(CONF.isEnableAutoLeaderBalanceForRatisConsensus()
&&
ConsensusFactory.RATIS_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
|| (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
- &&
ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS));
+ &&
ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
+ ||
ConsensusFactory.SIMPLE_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS);
private static final boolean IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION
=
(CONF.isEnableAutoLeaderBalanceForRatisConsensus()
&&
ConsensusFactory.RATIS_CONSENSUS.equals(SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS))
|| (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
- &&
ConsensusFactory.IOT_CONSENSUS.equals(SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS));
+ &&
ConsensusFactory.IOT_CONSENSUS.equals(SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS))
+ ||
ConsensusFactory.SIMPLE_CONSENSUS.equals(SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS);
private final IManager configManager;
@@ -184,6 +187,24 @@ public class RouteBalancer {
if (requestId.get() > 0) {
// Don't retry ChangeLeader request
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNode(clientHandler);
+ for (int i = 0; i < requestId.get(); i++) {
+ if (clientHandler.getResponseMap().get(i).getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ getLoadManager()
+ .forceUpdateRegionLeader(
+ clientHandler.getRequest(i).getRegionId(),
+
clientHandler.getRequest(i).getNewLeaderNode().getDataNodeId());
+ LOGGER.info(
+ "[forceUpdateRegionLeader], regionId:{}, datanodeId:{}",
+ clientHandler.getRequest(i).getRegionId(),
+ clientHandler.getRequest(i).getNewLeaderNode().getDataNodeId());
+ } else {
+ LOGGER.error(
+ "[LeaderBalancer] Failed to change the leader of Region: {} to
DataNode: {}",
+ clientHandler.getRequest(i).getRegionId(),
+ clientHandler.getRequest(i).getNewLeaderNode().getDataNodeId());
+ }
+ }
}
return differentRegionLeaderMap;
}
@@ -196,6 +217,7 @@ public class RouteBalancer {
TDataNodeLocation newLeader) {
switch (consensusProtocolClass) {
case ConsensusFactory.IOT_CONSENSUS:
+ case ConsensusFactory.SIMPLE_CONSENSUS:
// For IoTConsensus protocol, change RegionRouteMap is enough.
// And the result will be broadcast by Cluster-LoadStatistics-Service
soon.
getLoadManager().forceUpdateRegionLeader(regionGroupId,
newLeader.getDataNodeId());
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 3ecc795ebf0..4aea84a789e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -65,7 +65,7 @@ public class LoadCache {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoadCache.class);
private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
- private static final long HEARTBEAT_INTERVAL =
CONF.getHeartbeatIntervalInMs();
+ private static final long WAIT_LEADER_INTERVAL = 50;
private static final long LEADER_ELECTION_WAITING_TIMEOUT =
Math.max(
ProcedureManager.PROCEDURE_WAIT_TIME_OUT -
TimeUnit.SECONDS.toMillis(2),
@@ -545,10 +545,13 @@ public class LoadCache {
LOGGER.info("[RegionElection] Wait for leader election of RegionGroups:
{}", regionGroupIds);
while (System.currentTimeMillis() - startTime <=
LEADER_ELECTION_WAITING_TIMEOUT) {
AtomicBoolean allRegionLeaderElected = new AtomicBoolean(true);
+ LOGGER.info(
+ "[RegionElection] containsKey: {}",
+ regionRouteCacheMap.containsKey(regionGroupIds.get(0)));
regionGroupIds.forEach(
regionGroupId -> {
if (!regionRouteCacheMap.containsKey(regionGroupId)
- ||
regionRouteCacheMap.get(regionGroupId).isRegionGroupUnready()) {
+ ||
regionRouteCacheMap.get(regionGroupId).isRegionGroupUnready(true)) {
allRegionLeaderElected.set(false);
}
});
@@ -557,7 +560,7 @@ public class LoadCache {
return;
}
try {
- TimeUnit.MILLISECONDS.sleep(HEARTBEAT_INTERVAL);
+ TimeUnit.MILLISECONDS.sleep(WAIT_LEADER_INTERVAL);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("Interrupt when wait for leader election", e);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/route/RegionRouteCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/route/RegionRouteCache.java
index 02d82a34338..81adff83c75 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/route/RegionRouteCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/route/RegionRouteCache.java
@@ -26,10 +26,14 @@ import
org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class RegionRouteCache {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RegionRouteCache.class);
private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
private static final String SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS =
@@ -98,6 +102,7 @@ public class RegionRouteCache {
// The leader of simple and ratis consensus is self-elected
if (leaderSample.get().getRight() != leaderId.get()) {
leaderId.set(leaderSample.get().getRight());
+ LOGGER.info("[RegionElection], setLeaderId1: {}",
leaderSample.get().getRight());
return true;
}
return false;
@@ -116,6 +121,7 @@ public class RegionRouteCache {
* @param leaderId Leader DataNodeId
*/
public void forceUpdateRegionLeader(int leaderId) {
+ LOGGER.info("[RegionElection], setLeaderId2: {}", leaderId);
this.leaderId.set(leaderId);
}
@@ -128,6 +134,16 @@ public class RegionRouteCache {
this.regionPriority.set(regionPriority);
}
+ public boolean isRegionGroupUnready(boolean isDebug) {
+ if (isDebug) {
+ LOGGER.info(
+ "[RegionElection], isRegionGroupUnready: {}, {}",
+ unReadyLeaderId == leaderId.get(),
+ unReadyRegionPriority.equals(regionPriority.get()));
+ }
+ return unReadyLeaderId == leaderId.get() ||
unReadyRegionPriority.equals(regionPriority.get());
+ }
+
public boolean isRegionGroupUnready() {
return unReadyLeaderId == leaderId.get() ||
unReadyRegionPriority.equals(regionPriority.get());
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index e919f127d7f..148677fea14 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -575,6 +575,8 @@ public class ConfigNodeProcedureEnv {
heartbeatSampleMap.put(
dataNodeId, new RegionHeartbeatSample(currentTime,
currentTime, regionStatus)));
getLoadManager().forceUpdateRegionGroupCache(regionGroupId,
heartbeatSampleMap);
+ // force balance region leader to skip waiting for leader election
+ getLoadManager().forceBalanceRegionLeader();
// Wait for leader election
getLoadManager().waitForLeaderElection(Collections.singletonList(regionGroupId));
}
diff --git
a/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-common.properties
b/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-common.properties
index 8981b21285b..a45651ad9de 100644
---
a/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-common.properties
+++
b/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-common.properties
@@ -18,10 +18,10 @@
#
timestamp_precision=ms
-data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus
-schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
-schema_replication_factor=3
-data_replication_factor=3
+data_region_consensus_protocol_class=org.apache.iotdb.consensus.simple.SimpleConsensus
+schema_region_consensus_protocol_class=org.apache.iotdb.consensus.simple.SimpleConsensus
+schema_replication_factor=1
+data_replication_factor=1
udf_lib_dir=target/confignode1/ext/udf
trigger_lib_dir=target/confignode1/ext/trigger
pipe_lib_dir=target/confignode1/ext/pipe
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/SchemaRegionLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/SchemaRegionLoader.java
index feda7e54e3c..5d47acfda5c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/SchemaRegionLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/SchemaRegionLoader.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.schemaengine.schemaregion;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.schema.SchemaConstant;
+import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader;
import org.reflections.Reflections;
import org.reflections.util.ConfigurationBuilder;
@@ -105,6 +106,19 @@ public class SchemaRegionLoader {
currentMode = schemaEngineMode;
currentConstructor = constructor;
}
+ if (currentMode.equals(SchemaConstant.DEFAULT_SCHEMA_ENGINE_MODE)) {
+ MNodeFactoryLoader.getInstance().getMemMNodeIMNodeFactory();
+ logger.info(
+ "[SchemaRegionLoader], schemaEngineMode:{}, currentMode:{}",
+ schemaEngineMode,
+ currentMode);
+ } else {
+ MNodeFactoryLoader.getInstance().getCachedMNodeIMNodeFactory();
+ logger.info(
+ "[SchemaRegionLoader], schemaEngineMode:{}, currentMode:{}",
+ schemaEngineMode,
+ currentMode);
+ }
}
public ISchemaRegion createSchemaRegion(ISchemaRegionParams
schemaRegionParams)