This is an automated email from the ASF dual-hosted git repository.
chaow pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 89dba65 [To rel/0.12][IOTDB-1651]add reconnect to solve out of
sequence (#4087)
89dba65 is described below
commit 89dba65c9824d2957737a062604e65205d778fa4
Author: yschengzi <[email protected]>
AuthorDate: Sat Oct 9 10:53:23 2021 +0800
[To rel/0.12][IOTDB-1651]add reconnect to solve out of sequence (#4087)
---
.../iotdb/db/sync/sender/transfer/SyncClient.java | 41 ++++++++++++++++++++--
1 file changed, 39 insertions(+), 2 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index b7f0540..6ff1143 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -95,7 +95,7 @@ public class SyncClient implements ISyncClient {
private static final IoTDBConfig ioTDBConfig =
IoTDBDescriptor.getInstance().getConfig();
- private static final int TIMEOUT_MS = 1000;
+ private static final int TIMEOUT_MS = 2000;
/**
* When transferring schema information, it is a better choice to transfer
only new schema
@@ -127,6 +127,8 @@ public class SyncClient implements ISyncClient {
/** Record sync progress in log. */
private ISyncSenderLogger syncLog;
+ private boolean isSyncConnect = false;
+
private ISyncFileManager syncFileManager = SyncFileManager.getInstance();
private ScheduledExecutorService executorService;
@@ -222,6 +224,11 @@ public class SyncClient implements ISyncClient {
syncAll();
} catch (Exception e) {
logger.error("Sync failed", e);
+ } finally {
+ if (transport != null && transport.isOpen()) {
+ transport.close();
+ }
+ isSyncConnect = false;
}
},
SyncConstant.SYNC_PROCESS_DELAY,
@@ -311,12 +318,28 @@ public class SyncClient implements ISyncClient {
if (!transport.isOpen()) {
transport.open();
}
+
+ isSyncConnect = true;
} catch (TTransportException e) {
logger.error("Cannot connect to the receiver.");
throw new SyncConnectionException(e);
}
}
+ private boolean reconnect() {
+ if (transport != null && transport.isOpen()) {
+ transport.close();
+ }
+
+ try {
+ establishConnection(config.getServerIp(), config.getServerPort());
+ } catch (SyncConnectionException e) {
+ logger.warn("Can not reconnect to receiver {}. Caused by ",
config.getSyncReceiverName(), e);
+ return false;
+ }
+ return true;
+ }
+
@Override
public void confirmIdentity() throws SyncConnectionException {
try (Socket socket = new Socket(config.getServerIp(),
config.getServerPort())) {
@@ -380,6 +403,10 @@ public class SyncClient implements ISyncClient {
String.format(
"Can not sync schema after %s retries.",
config.getMaxNumOfSyncFileRetry()));
}
+ if (!isSyncConnect && !reconnect()) {
+ retryCount++;
+ continue;
+ }
if (tryToSyncSchema()) {
writeSyncSchemaPos(getSchemaPosFile());
break;
@@ -417,7 +444,13 @@ public class SyncClient implements ISyncClient {
// check digest
return checkDigestForSchema(new BigInteger(1, md.digest()).toString(16));
- } catch (NoSuchAlgorithmException | IOException | TException e) {
+ } catch (TException e) {
+ logger.error(
+ "Can not finish transfer schema to receiver, thrift error happen {},
try to reconnect",
+ e);
+ isSyncConnect = false;
+ return false;
+ } catch (NoSuchAlgorithmException | IOException e) {
logger.error("Can not finish transfer schema to receiver", e);
return false;
}
@@ -550,6 +583,9 @@ public class SyncClient implements ISyncClient {
logger.info("Start to sync names of deleted files in storage group {}",
sgName);
for (File file : deletedFilesName) {
try {
+ if (!isSyncConnect && !reconnect()) {
+ continue;
+ }
if (serviceClient.syncDeletedFileName(getFileNameWithSG(file)).code ==
SUCCESS_CODE) {
logger.info(
"Receiver has received deleted file name {} successfully.",
getFileNameWithSG(file));
@@ -558,6 +594,7 @@ public class SyncClient implements ISyncClient {
}
} catch (TException e) {
logger.error("Can not sync deleted file name {}, skip it.", file);
+ isSyncConnect = false;
}
}
logger.info("Finish to sync names of deleted files in storage group {}",
sgName);