This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new c8455e5 Fix issues of blocking heartbeat broadcast and vote
requesting caused by client reconnection (#3751) (#3757)
c8455e5 is described below
commit c8455e518908d027d93fe9362e74dc2c10a36d8f
Author: BaiJian <[email protected]>
AuthorDate: Mon Aug 16 22:32:42 2021 +0800
Fix issues of blocking heartbeat broadcast and vote requesting caused by
client reconnection (#3751) (#3757)
---
.../cluster/server/heartbeat/HeartbeatThread.java | 34 +++++++++++-----------
1 file changed, 17 insertions(+), 17 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index 67acc5f..d113e5d 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -195,7 +195,6 @@ public class HeartbeatThread implements Runnable {
}
void sendHeartbeatSync(Node node) {
- Client client = localMember.getSyncHeartbeatClient(node);
HeartbeatHandler heartbeatHandler = new HeartbeatHandler(localMember,
node);
HeartBeatRequest req = new HeartBeatRequest();
req.setCommitLogTerm(request.commitLogTerm);
@@ -211,11 +210,12 @@ public class HeartbeatThread implements Runnable {
req.partitionTableBytes = request.partitionTableBytes;
req.setPartitionTableBytesIsSet(true);
}
- if (client != null) {
- localMember
- .getSerialToParallelPool()
- .submit(
- () -> {
+ localMember
+ .getSerialToParallelPool()
+ .submit(
+ () -> {
+ Client client = localMember.getSyncHeartbeatClient(node);
+ if (client != null) {
try {
logger.debug("{}: Sending heartbeat to {}", memberName,
node);
HeartBeatResponse heartBeatResponse =
client.sendHeartbeat(req);
@@ -230,8 +230,8 @@ public class HeartbeatThread implements Runnable {
} finally {
ClientUtils.putBackSyncHeartbeatClient(client);
}
- });
- }
+ }
+ });
}
/**
@@ -393,13 +393,13 @@ public class HeartbeatThread implements Runnable {
}
private void requestVoteSync(Node node, ElectionHandler handler,
ElectionRequest request) {
- Client client = localMember.getSyncHeartbeatClient(node);
- if (client != null) {
- logger.info("{}: Requesting a vote from {}", memberName, node);
- localMember
- .getSerialToParallelPool()
- .submit(
- () -> {
+ localMember
+ .getSerialToParallelPool()
+ .submit(
+ () -> {
+ Client client = localMember.getSyncHeartbeatClient(node);
+ if (client != null) {
+ logger.info("{}: Requesting a vote from {}", memberName, node);
try {
long result = client.startElection(request);
handler.onComplete(result);
@@ -413,7 +413,7 @@ public class HeartbeatThread implements Runnable {
} finally {
ClientUtils.putBackSyncHeartbeatClient(client);
}
- });
- }
+ }
+ });
}
}