This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9f809556e7e Do not wait to retry when configLeader exists (#12075)
9f809556e7e is described below
commit 9f809556e7e5c06d5d59854817f3a0889c5897ff
Author: Peng Junzhi <[email protected]>
AuthorDate: Thu Feb 29 14:57:20 2024 +0800
Do not wait to retry when configLeader exists (#12075)
---
.../manager/consensus/ConsensusManager.java | 18 ++++++++++-
.../iotdb/confignode/manager/node/NodeManager.java | 5 ++-
.../iotdb/db/protocol/client/ConfigNodeClient.java | 37 ++++++++--------------
3 files changed, 33 insertions(+), 27 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index f9ce496ea65..0e329fda460 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -65,6 +65,9 @@ public class ConsensusManager {
private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
private static final CommonConfig COMMON_CONF =
CommonDescriptor.getInstance().getConfig();
private static final int SEED_CONFIG_NODE_ID = 0;
+ private static final long MAX_WAIT_READY_TIME_MS =
+ CommonDescriptor.getInstance().getConfig().getConnectionTimeoutInMS() /
2;
+ private static final long RETRY_WAIT_TIME_MS = 100;
/** There is only one ConfigNodeGroup */
public static final ConsensusGroupId DEFAULT_CONSENSUS_GROUP_ID =
new ConfigRegionId(CONF.getConfigRegionId());
@@ -345,7 +348,7 @@ public class ConsensusManager {
return leaderPeer;
}
try {
- TimeUnit.MILLISECONDS.sleep(100);
+ TimeUnit.MILLISECONDS.sleep(RETRY_WAIT_TIME_MS);
} catch (InterruptedException e) {
LOGGER.warn("ConsensusManager getLeaderPeer been interrupted, ", e);
Thread.currentThread().interrupt();
@@ -384,6 +387,19 @@ public class ConsensusManager {
} else {
result.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
if (isLeader()) {
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime <
MAX_WAIT_READY_TIME_MS) {
+ if (isLeaderReady()) {
+ result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ return result;
+ }
+ try {
+ Thread.sleep(RETRY_WAIT_TIME_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Unexpected interruption during waiting for configNode
leader ready.");
+ }
+ }
result.setMessage(
"The current ConfigNode is leader but not ready yet, please try
again later.");
} else {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 307da61e91c..e212e2cc90d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -79,7 +79,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.exception.ConsensusException;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -263,7 +262,7 @@ public class NodeManager {
configManager
.getClusterManager()
.getClusterIdWithRetry(
-
IoTDBDescriptor.getInstance().getConfig().getConnectionTimeoutInMS() / 2);
+
CommonDescriptor.getInstance().getConfig().getConnectionTimeoutInMS() / 2);
if (clusterId == null) {
resp.setStatus(
new TSStatus(TSStatusCode.GET_CLUSTER_ID_ERROR.getStatusCode())
@@ -318,7 +317,7 @@ public class NodeManager {
configManager
.getClusterManager()
.getClusterIdWithRetry(
-
IoTDBDescriptor.getInstance().getConfig().getConnectionTimeoutInMS() / 2);
+
CommonDescriptor.getInstance().getConfig().getConnectionTimeoutInMS() / 2);
TDataNodeRestartResp resp = new TDataNodeRestartResp();
resp.setConfigNodeList(getRegisteredConfigNodes());
if (clusterId == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 1387e5f338f..c17af251d90 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -184,18 +184,10 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
this.configNodes = configNodes;
this.property = property;
this.clientManager = clientManager;
+ // Set the first configNode as configLeader for a tentative connection
+ this.configLeader = this.configNodes.get(0);
- init();
- }
-
- public void init() throws TException {
- try {
- tryToConnect();
- } catch (TException e) {
- // Can not connect to each config node
- syncLatestConfigNodeList();
- tryToConnect();
- }
+ connectAndSync();
}
public void connect(TEndPoint endpoint) throws TException {
@@ -215,16 +207,7 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
client = new
IConfigNodeRPCService.Client(property.getProtocolFactory().getProtocol(transport));
}
- private void waitAndReconnect() throws TException {
- try {
- // Wait to start the next try
- Thread.sleep(RETRY_INTERVAL_MS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new TException(
- "Unexpected interruption when waiting to retry to connect to
ConfigNode");
- }
-
+ private void connectAndSync() throws TException {
try {
tryToConnect();
} catch (TException e) {
@@ -243,6 +226,14 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
logger.warn("The current node may have been down {},try next node",
configLeader);
configLeader = null;
}
+ } else {
+ try {
+ // Wait to start the next try
+ Thread.sleep(RETRY_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warn("Unexpected interruption when waiting to try to connect to
ConfigNode");
+ }
}
if (transport != null) {
@@ -338,7 +329,7 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
logger.warn(message, e);
configLeader = null;
}
- waitAndReconnect();
+ connectAndSync();
}
throw new TException(MSG_RECONNECTION_FAIL);
}
@@ -386,7 +377,7 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
logger.warn(message, e);
configLeader = null;
}
- waitAndReconnect();
+ connectAndSync();
}
throw new TException(MSG_RECONNECTION_FAIL);
}