This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new d75e1b8a73a [To rel/1.1][IOTDB-6061] Fix the instability failure
caused by initServer in IoTConsensus UT not binding to the corresponding port
(#10728)
d75e1b8a73a is described below
commit d75e1b8a73abff34f54fa8c9cb102caedcbf3815
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Fri Jul 28 20:10:13 2023 +0800
[To rel/1.1][IOTDB-6061] Fix the instability failure caused by initServer
in IoTConsensus UT not binding to the corresponding port (#10728)
---
.../apache/iotdb/consensus/iot/ReplicateTest.java | 55 ++++++++++++++++++----
1 file changed, 46 insertions(+), 9 deletions(-)
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
index 0242cbc4e42..af2dd786ac4 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.iot.util.TestEntry;
import org.apache.iotdb.consensus.iot.util.TestStateMachine;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
@@ -37,9 +38,13 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -53,11 +58,17 @@ public class ReplicateTest {
private static final long timeout = TimeUnit.SECONDS.toMillis(300);
+ private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
+
+ private static final String CONFIGURATION_TMP_FILE_NAME =
"configuration.dat.tmp";
+
+ private int basePort = 9000;
+
private final List<Peer> peers =
Arrays.asList(
- new Peer(gid, 1, new TEndPoint("127.0.0.1", 6000)),
- new Peer(gid, 2, new TEndPoint("127.0.0.1", 6001)),
- new Peer(gid, 3, new TEndPoint("127.0.0.1", 6002)));
+ new Peer(gid, 1, new TEndPoint("127.0.0.1", basePort - 2)),
+ new Peer(gid, 2, new TEndPoint("127.0.0.1", basePort - 1)),
+ new Peer(gid, 3, new TEndPoint("127.0.0.1", basePort)));
private final List<File> peersStorage =
Arrays.asList(
@@ -86,12 +97,35 @@ public class ReplicateTest {
}
}
+ public void changeConfiguration(int i) {
+ try (PublicBAOS publicBAOS = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
+ outputStream.writeInt(this.peers.size());
+ for (Peer peer : this.peers) {
+ peer.serialize(outputStream);
+ }
+ File storageDir = new
File(IoTConsensus.buildPeerDir(peersStorage.get(i), gid));
+ Path tmpConfigurationPath =
+ Paths.get(new File(storageDir,
CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
+ Path configurationPath =
+ Paths.get(new File(storageDir,
CONFIGURATION_FILE_NAME).getAbsolutePath());
+ Files.write(tmpConfigurationPath, publicBAOS.getBuf());
+ if (Files.exists(configurationPath)) {
+ Files.delete(configurationPath);
+ }
+ Files.move(tmpConfigurationPath, configurationPath);
+ } catch (IOException e) {
+ logger.error("Unexpected error occurs when persisting configuration", e);
+ }
+ }
+
private void initServer() throws IOException {
- for (Peer peer : peers) {
- waitPortAvailable(peer.getEndpoint().port);
+ for (int i = 0; i < peers.size(); i++) {
+ findPortAvailable(i);
}
for (int i = 0; i < peers.size(); i++) {
int finalI = i;
+ changeConfiguration(i);
servers.add(
(IoTConsensus)
ConsensusFactory.getConsensusImpl(
@@ -252,20 +286,23 @@ public class ReplicateTest {
Assert.assertEquals(stateMachines.get(2).getData(),
stateMachines.get(1).getData());
}
- private static void waitPortAvailable(int port) {
+ private void findPortAvailable(int i) {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeout) {
- try (ServerSocket ignored = new ServerSocket(port)) {
+ try (ServerSocket ignored = new
ServerSocket(this.peers.get(i).getEndpoint().port)) {
+ // success
return;
} catch (IOException e) {
// Port is already in use, wait and retry
+ this.peers.set(i, new Peer(gid, i + 1, new TEndPoint("127.0.0.1",
this.basePort)));
+ logger.info("try port {} for node {}.", this.basePort++, i + 1);
try {
- Thread.sleep(1000); // Wait for 1 second before retrying
+ Thread.sleep(50); // Wait for 1 second before retrying
} catch (InterruptedException ex) {
// Handle the interruption if needed
}
}
}
- Assert.fail(String.format("can not bind port %d after 300s", port));
+ Assert.fail(String.format("can not find port for node %d after 300s", i +
1));
}
}