This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c1eb55dc51 [IOTDB-6208] Node error detection through broken thrift 
pipe (#11397)
9c1eb55dc51 is described below

commit 9c1eb55dc5125ecc0ea01e8746f711725379c224
Author: Yongzao <[email protected]>
AuthorDate: Fri Oct 27 18:58:51 2023 +0800

    [IOTDB-6208] Node error detection through broken thrift pipe (#11397)
---
 .../heartbeat/ConfigNodeHeartbeatHandler.java         | 19 ++++++++++++++-----
 .../handlers/heartbeat/DataNodeHeartbeatHandler.java  |  8 +++++++-
 2 files changed, 21 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
index 9d7e11b23b8..69f72df9702 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
 
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
 import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
 import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
 
@@ -27,21 +30,27 @@ import org.apache.thrift.async.AsyncMethodCallback;
 public class ConfigNodeHeartbeatHandler implements AsyncMethodCallback<Long> {
 
   private final int nodeId;
-  private final LoadCache cache;
+  private final LoadCache loadCache;
 
-  public ConfigNodeHeartbeatHandler(int nodeId, LoadCache cache) {
+  public ConfigNodeHeartbeatHandler(int nodeId, LoadCache loadCache) {
     this.nodeId = nodeId;
-    this.cache = cache;
+    this.loadCache = loadCache;
   }
 
   @Override
   public void onComplete(Long timestamp) {
     long receiveTime = System.currentTimeMillis();
-    cache.cacheConfigNodeHeartbeatSample(nodeId, new 
NodeHeartbeatSample(timestamp, receiveTime));
+    loadCache.cacheConfigNodeHeartbeatSample(
+        nodeId, new NodeHeartbeatSample(timestamp, receiveTime));
   }
 
   @Override
   public void onError(Exception e) {
-    // Do nothing
+    if (ThriftClient.isConnectionBroken(e)) {
+      loadCache.forceUpdateNodeCache(
+          NodeType.ConfigNode,
+          nodeId,
+          NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
+    }
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index d054478d877..33ae5ca3835 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
 
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
 import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
 import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
@@ -121,6 +124,9 @@ public class DataNodeHeartbeatHandler implements 
AsyncMethodCallback<THeartbeatR
 
   @Override
   public void onError(Exception e) {
-    // Do nothing
+    if (ThriftClient.isConnectionBroken(e)) {
+      loadCache.forceUpdateNodeCache(
+          NodeType.DataNode, nodeId, 
NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
+    }
   }
 }

Reply via email to