This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new eec4e1d22b fix close order
eec4e1d22b is described below
commit eec4e1d22ba6d77592286a3802f1e60c0275c429
Author: Tian Jiang <[email protected]>
AuthorDate: Tue Apr 25 15:21:13 2023 +0800
fix close order
---
.../org/apache/iotdb/consensus/natraft/RaftConsensus.java | 4 ++--
.../iotdb/consensus/natraft/protocol/RaftMember.java | 14 +++++---------
.../natraft/protocol/heartbeat/HeartbeatThread.java | 13 +++++++++----
3 files changed, 16 insertions(+), 15 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
index e7bc64c42f..035ce5842d 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
@@ -166,10 +166,10 @@ public class RaftConsensus implements IConsensus {
@Override
public void stop() throws IOException {
reportThread.shutdownNow();
- clientManager.close();
stateMachineMap.values().parallelStream().forEach(RaftMember::stop);
- registerManager.deregisterAll();
FlowMonitorManager.INSTANCE.close();
+ clientManager.close();
+ registerManager.deregisterAll();
}
@Override
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index cbac7ac20a..1c2711e184 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -373,22 +373,19 @@ public class RaftMember {
*/
public void stop() {
setStopped(true);
- closeLogManager();
- if (clientManager != null) {
- clientManager.close();
- }
- if (logSequencer != null) {
- logSequencer.close();
- }
-
if (heartbeatThread == null) {
return;
}
+ closeLogManager();
logDispatcher.stop();
heartbeatThread.stop();
catchUpManager.stop();
+ if (logSequencer != null) {
+ logSequencer.close();
+ }
+
if (commitLogPool != null) {
commitLogPool.shutdownNow();
try {
@@ -398,7 +395,6 @@ public class RaftMember {
logger.error("Unexpected interruption when waiting for commitLogPool
to end", e);
}
}
-
status.leader.set(null);
catchUpManager = null;
heartbeatThread = null;
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
index b73b812449..f243b4d203 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
@@ -103,7 +103,7 @@ public class HeartbeatThread implements Runnable {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
- while (!Thread.interrupted()) {
+ while (!Thread.interrupted() && !localMember.isStopped()) {
try {
switch (localMember.getRole()) {
case LEADER:
@@ -165,7 +165,9 @@ public class HeartbeatThread implements Runnable {
logger.info("{}: End elections", memberName);
}
- /** Send each node (except the local node) in the group of the member a
heartbeat. */
+ /**
+ * Send each node (except the local node) in the group of the member a
heartbeat.
+ */
protected void sendHeartbeats() {
try {
localMember.getLogManager().getLock().readLock().lock();
@@ -181,7 +183,9 @@ public class HeartbeatThread implements Runnable {
sendHeartbeats(localMember.getAllNodes());
}
- /** Send each node (except the local node) in list a heartbeat. */
+ /**
+ * Send each node (except the local node) in list a heartbeat.
+ */
@SuppressWarnings("java:S2445")
private void sendHeartbeats(Collection<Peer> nodes) {
logger.debug(
@@ -240,7 +244,8 @@ public class HeartbeatThread implements Runnable {
}
// the election goes on until this node becomes a follower or a leader
- while (localMember.getRole() == RaftRole.CANDIDATE &&
!Thread.interrupted()) {
+ while (localMember.getRole() == RaftRole.CANDIDATE && !Thread.interrupted()
+ && !localMember.isStopped()) {
startElection();
if (localMember.getRole() == RaftRole.CANDIDATE) {
// sleep random time to reduce election conflicts