This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch consensusManager_init in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d54bcc5e752c2f7ff5530123e1f60ab4cefd4149 Author: OneSizeFitQuorum <[email protected]> AuthorDate: Wed Feb 28 19:38:39 2024 +0800 finish Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../statemachine/ConfigRegionStateMachine.java | 6 ++-- .../iotdb/confignode/manager/ClusterManager.java | 10 ------ .../iotdb/confignode/manager/ConfigManager.java | 1 + .../manager/consensus/ConsensusManager.java | 40 ++++++++++++---------- .../iotdb/confignode/manager/cq/CQManager.java | 11 ------ .../pipe/coordinator/runtime/PipeMetaSyncer.java | 10 ------ 6 files changed, 24 insertions(+), 54 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index b68370f9668..13d801d9be0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -243,11 +243,9 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev // Add Metric after leader ready configManager.addMetrics(); - // we do cq recovery async for two reasons: - // 1. For performance: cq recovery may be time-consuming, we use another thread to do it in + // we do cq recovery async for performance: + // cq recovery may be time-consuming, we use another thread to do it in // make notifyLeaderChanged not blocked by it - // 2. For correctness: in cq recovery processing, it will use ConsensusManager which may be - // initialized after notifyLeaderChanged finished threadPool.submit(() -> configManager.getCQManager().startCQScheduler()); threadPool.submit( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java index 6f77bbc5936..0dc662aa13a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java @@ -27,7 +27,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.UUID; -import java.util.concurrent.TimeUnit; public class ClusterManager { @@ -72,15 +71,6 @@ public class ClusterManager { private void generateClusterId() { String clusterId = String.valueOf(UUID.randomUUID()); UpdateClusterIdPlan updateClusterIdPlan = new UpdateClusterIdPlan(clusterId); - while (configManager.getConsensusManager() == null) { - try { - LOGGER.info("consensus layer is not ready, sleep 100ms..."); - TimeUnit.MILLISECONDS.sleep(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.warn("Unexpected interruption during waiting for consensus layer ready."); - } - } try { configManager.getConsensusManager().write(updateClusterIdPlan); } catch (ConsensusException e) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 760de526241..6bbcf44adbd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -311,6 +311,7 @@ public class ConfigManager implements IManager { public void initConsensusManager() throws IOException { this.consensusManager.set(new ConsensusManager(this, this.stateMachine)); + this.consensusManager.get().start(); } public void close() throws IOException { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java index 69647bf7488..f9ce496ea65 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java @@ -78,13 +78,33 @@ public class ConsensusManager { setConsensusLayer(stateMachine); } + public void start() throws IOException { + consensusImpl.start(); + if (SystemPropertiesUtils.isRestarted()) { + LOGGER.info("Init ConsensusManager successfully when restarted"); + } else if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) { + // Create ConsensusGroup that contains only itself + // if the current ConfigNode is Seed-ConfigNode + try { + createPeerForConsensusGroup( + Collections.singletonList( + new TConfigNodeLocation( + SEED_CONFIG_NODE_ID, + new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()), + new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())))); + } catch (ConsensusException e) { + LOGGER.error( + "Something wrong happened while calling consensus layer's createLocalPeer API.", e); + } + } + } + public void close() throws IOException { consensusImpl.stop(); } /** ConsensusLayer local implementation. */ private void setConsensusLayer(ConfigRegionStateMachine stateMachine) throws IOException { - if (SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) { upgrade(); consensusImpl = @@ -211,24 +231,6 @@ public class ConsensusManager { ConsensusFactory.CONSTRUCT_FAILED_MSG, CONF.getConfigNodeConsensusProtocolClass()))); } - consensusImpl.start(); - if (SystemPropertiesUtils.isRestarted()) { - LOGGER.info("Init ConsensusManager successfully when restarted"); - } else if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) { - // Create ConsensusGroup that contains only itself - // if the current ConfigNode is Seed-ConfigNode - try { - createPeerForConsensusGroup( - Collections.singletonList( - new TConfigNodeLocation( - SEED_CONFIG_NODE_ID, - new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()), - new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())))); - } catch (ConsensusException e) { - LOGGER.error( - "Something wrong happened while calling consensus layer's createLocalPeer API.", e); - } - } } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java index 02c6e8df242..ec27ec8cc92 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java @@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -134,16 +133,6 @@ public class CQManager { // 3. get all CQs List<CQInfo.CQEntry> allCQs = null; - // wait for consensus layer ready - while (configManager.getConsensusManager() == null) { - try { - LOGGER.info("consensus layer is not ready, sleep 1s..."); - TimeUnit.SECONDS.sleep(1); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.warn("Unexpected interruption during waiting for consensus layer ready."); - } - } // keep fetching until we get all CQEntries if this node is still leader while (needFetch(allCQs)) { try { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java index c80b1018ca3..7ea6aaf5cc9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java @@ -70,16 +70,6 @@ public class PipeMetaSyncer { } public synchronized void start() { - while (configManager.getConsensusManager() == null) { - try { - LOGGER.info("Consensus layer is not ready, sleep 1s..."); - TimeUnit.SECONDS.sleep(1); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.warn("Unexpected interruption during waiting for consensus layer ready."); - } - } - if (metaSyncFuture == null) { metaSyncFuture = ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
