This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9d21b20 Fix issues of blocking heartbeat broadcast and vote
requesting caused by client reconnection (#3751)
9d21b20 is described below
commit 9d21b207df77f0297457fe3fa137d0dd79cd8dd6
Author: BaiJian <[email protected]>
AuthorDate: Mon Aug 16 21:22:56 2021 +0800
Fix issues of blocking heartbeat broadcast and vote requesting caused by
client reconnection (#3751)
---
.../cluster/server/heartbeat/HeartbeatThread.java | 34 +++++++++++-----------
1 file changed, 17 insertions(+), 17 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index 98b0fa7..20ac505 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -203,7 +203,6 @@ public class HeartbeatThread implements Runnable {
}
void sendHeartbeatSync(Node node) {
- Client client = localMember.getSyncHeartbeatClient(node);
HeartbeatHandler heartbeatHandler = new HeartbeatHandler(localMember,
node);
HeartBeatRequest req = new HeartBeatRequest();
req.setCommitLogTerm(request.commitLogTerm);
@@ -219,11 +218,12 @@ public class HeartbeatThread implements Runnable {
req.partitionTableBytes = request.partitionTableBytes;
req.setPartitionTableBytesIsSet(true);
}
- if (client != null) {
- localMember
- .getSerialToParallelPool()
- .submit(
- () -> {
+ localMember
+ .getSerialToParallelPool()
+ .submit(
+ () -> {
+ Client client = localMember.getSyncHeartbeatClient(node);
+ if (client != null) {
try {
logger.debug("{}: Sending heartbeat to {}", memberName,
node);
HeartBeatResponse heartBeatResponse =
client.sendHeartbeat(req);
@@ -237,8 +237,8 @@ public class HeartbeatThread implements Runnable {
} finally {
ClientUtils.putBackSyncHeartbeatClient(client);
}
- });
- }
+ }
+ });
}
/**
@@ -400,13 +400,13 @@ public class HeartbeatThread implements Runnable {
}
private void requestVoteSync(Node node, ElectionHandler handler,
ElectionRequest request) {
- Client client = localMember.getSyncHeartbeatClient(node);
- if (client != null) {
- logger.info("{}: Requesting a vote from {}", memberName, node);
- localMember
- .getSerialToParallelPool()
- .submit(
- () -> {
+ localMember
+ .getSerialToParallelPool()
+ .submit(
+ () -> {
+ Client client = localMember.getSyncHeartbeatClient(node);
+ if (client != null) {
+ logger.info("{}: Requesting a vote from {}", memberName, node);
try {
long result = client.startElection(request);
handler.onComplete(result);
@@ -420,7 +420,7 @@ public class HeartbeatThread implements Runnable {
} finally {
ClientUtils.putBackSyncHeartbeatClient(client);
}
- });
- }
+ }
+ });
}
}