This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 01f6e2557 [#2378] test(client): Fix flaky test QuorumTest#case10
(#2379)
01f6e2557 is described below
commit 01f6e2557a4aff68feae298552de90b02ff9a67d
Author: summaryzb <[email protected]>
AuthorDate: Thu Mar 6 10:25:19 2025 +0800
[#2378] test(client): Fix flaky test QuorumTest#case10 (#2379)
### What changes were proposed in this pull request?
1. Fake ports are not duplicate and limit to the valid range 1-65535, which
can make the client rpc fast fail if the corresponding shuffleServer do not
exist
2. App expire time should be greater enough, so that rpc retry and
secondary sending around will not encounter app expire
### Why are the changes needed?
Fix: #2378
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT
---
.../java/org/apache/uniffle/test/QuorumTest.java | 52 +++++++++++++---------
1 file changed, 32 insertions(+), 20 deletions(-)
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index 88a5c1d26..9be5ef78d 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -18,9 +18,12 @@
package org.apache.uniffle.test;
import java.io.File;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -89,8 +92,11 @@ public class QuorumTest extends ShuffleReadWriteBase {
throws Exception {
ShuffleServerConf shuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
shuffleServerConf.setInteger("rss.rpc.server.port", 0);
- shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 8000);
- shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000);
+ // app expires time should be greater than the client send data time which
is 180s by default.
+ // (rpcTimeout*retryTimes)
+ // this can avoid get NO_REGISTER when second data to secondaryServer
+ shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 240 *
1000L);
+ shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000L);
File dataDir1 = new File(tmpDir, id + "_1");
File dataDir2 = new File(tmpDir, id + "_2");
String basePath = dataDir1.getAbsolutePath() + "," +
dataDir2.getAbsolutePath();
@@ -101,6 +107,21 @@ public class QuorumTest extends ShuffleReadWriteBase {
return new MockedShuffleServer(shuffleServerConf);
}
+ private List<Integer> generateFakePort(int num) {
+ Set<Integer> portExistsSet =
+
grpcShuffleServers.stream().map(ShuffleServer::getGrpcPort).collect(Collectors.toSet());
+ int i = 0;
+ List<Integer> fakePorts = new ArrayList<>(num);
+ while (i < num) {
+ int port = ThreadLocalRandom.current().nextInt(1, 65535);
+ if (portExistsSet.add(port)) {
+ fakePorts.add(port);
+ i++;
+ }
+ }
+ return fakePorts;
+ }
+
@BeforeEach
public void initCluster(@TempDir File tmpDir) throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
@@ -152,31 +173,22 @@ public class QuorumTest extends ShuffleReadWriteBase {
grpcShuffleServers.get(4).getGrpcPort());
// simulator of failed servers
+ List<Integer> fakePortList = generateFakePort(5);
fakedShuffleServerInfo0 =
new ShuffleServerInfo(
- "127.0.0.1-20001",
- grpcShuffleServers.get(0).getIp(),
- grpcShuffleServers.get(0).getGrpcPort() + 100);
+ "127.0.0.1-20001", grpcShuffleServers.get(0).getIp(),
fakePortList.get(0));
fakedShuffleServerInfo1 =
new ShuffleServerInfo(
- "127.0.0.1-20002",
- grpcShuffleServers.get(1).getIp(),
- grpcShuffleServers.get(0).getGrpcPort() + 200);
+ "127.0.0.1-20002", grpcShuffleServers.get(1).getIp(),
fakePortList.get(1));
fakedShuffleServerInfo2 =
new ShuffleServerInfo(
- "127.0.0.1-20003",
- grpcShuffleServers.get(2).getIp(),
- grpcShuffleServers.get(0).getGrpcPort() + 300);
+ "127.0.0.1-20003", grpcShuffleServers.get(2).getIp(),
fakePortList.get(2));
fakedShuffleServerInfo3 =
new ShuffleServerInfo(
- "127.0.0.1-20004",
- grpcShuffleServers.get(2).getIp(),
- grpcShuffleServers.get(0).getGrpcPort() + 400);
+ "127.0.0.1-20004", grpcShuffleServers.get(2).getIp(),
fakePortList.get(3));
fakedShuffleServerInfo4 =
new ShuffleServerInfo(
- "127.0.0.1-20005",
- grpcShuffleServers.get(2).getIp(),
- grpcShuffleServers.get(0).getGrpcPort() + 500);
+ "127.0.0.1-20005", grpcShuffleServers.get(2).getIp(),
fakePortList.get(4));
// spark.rss.data.replica=3
// spark.rss.data.replica.write=2
@@ -339,7 +351,7 @@ public class QuorumTest extends ShuffleReadWriteBase {
new MockedShuffleWriteClientImpl(
ShuffleClientFactory.newWriteBuilder()
.clientType(ClientType.GRPC.name())
- .retryMax(3)
+ .retryMax(2)
.retryIntervalMax(1000)
.heartBeatThreadNum(1)
.replica(replica)
@@ -945,8 +957,8 @@ public class QuorumTest extends ShuffleReadWriteBase {
shuffleServerInfo3,
shuffleServerInfo4));
SendShuffleDataResult result =
shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
- assertTrue(result.getSuccessBlockIds().size() == 3);
- assertTrue(result.getFailedBlockIds().size() == 0);
+ assertEquals(3, result.getSuccessBlockIds().size());
+ assertEquals(0, result.getFailedBlockIds().size());
}
// we cannot read any blocks from server 1 due to failures