This is an automated email from the ASF dual-hosted git repository.
zhengchenyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 43323bbdb [#2015] improvement(netty): Support Netty for MR integration
test. (#2016)
43323bbdb is described below
commit 43323bbdb1e60381ffb029716a742c750f9e8572
Author: QI Jiale <[email protected]>
AuthorDate: Mon Oct 21 14:33:33 2024 +0800
[#2015] improvement(netty): Support Netty for MR integration test. (#2016)
### What changes were proposed in this pull request?
Support Netty for MR integration test.
### Why are the changes needed?
Fix: #2015
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Integration tests themselves.
---
.../org/apache/uniffle/test/DynamicConfTest.java | 15 +++----
.../org/apache/uniffle/test/HadoopConfTest.java | 14 ++++---
.../org/apache/uniffle/test/LargeSorterTest.java | 14 ++++---
.../apache/uniffle/test/MRIntegrationTestBase.java | 47 +++++++++++++---------
.../org/apache/uniffle/test/RMWordCountTest.java | 11 +++--
.../org/apache/uniffle/test/SecondarySortTest.java | 12 ++++--
.../org/apache/uniffle/test/WordCountTest.java | 12 ++++--
7 files changed, 76 insertions(+), 49 deletions(-)
diff --git
a/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java
b/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java
index c5421d113..aad036794 100644
---
a/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java
+++
b/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java
@@ -25,7 +25,8 @@ import org.apache.hadoop.mapreduce.LargeSorter;
import org.apache.hadoop.mapreduce.RssMRConfig;
import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.storage.util.StorageType;
@@ -41,18 +42,18 @@ public class DynamicConfTest extends MRIntegrationTestBase {
Map<String, String> dynamicConf = new HashMap<>();
dynamicConf.put(RssMRConfig.RSS_REMOTE_STORAGE_PATH, HDFS_URI +
"rss/test");
dynamicConf.put(RssMRConfig.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE_HDFS.name());
- dynamicConf.put(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
return dynamicConf;
}
- @Test
- public void dynamicConfTest() throws Exception {
- run();
+ @ParameterizedTest
+ @MethodSource("clientTypeProvider")
+ public void dynamicConfTest(ClientType clientType) throws Exception {
+ run(clientType);
}
@Override
- protected void updateRssConfiguration(Configuration jobConf) {
- jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
+ protected void updateRssConfiguration(Configuration jobConf, ClientType
clientType) {
+ jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, clientType.name());
jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
jobConf.setInt(LargeSorter.MBS_PER_MAP, 256);
}
diff --git
a/integration-test/mr/src/test/java/org/apache/uniffle/test/HadoopConfTest.java
b/integration-test/mr/src/test/java/org/apache/uniffle/test/HadoopConfTest.java
index 892b9e19a..be8227bdc 100644
---
a/integration-test/mr/src/test/java/org/apache/uniffle/test/HadoopConfTest.java
+++
b/integration-test/mr/src/test/java/org/apache/uniffle/test/HadoopConfTest.java
@@ -25,7 +25,8 @@ import org.apache.hadoop.mapreduce.LargeSorter;
import org.apache.hadoop.mapreduce.RssMRConfig;
import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.storage.util.StorageType;
@@ -41,14 +42,15 @@ public class HadoopConfTest extends MRIntegrationTestBase {
return new HashMap<>();
}
- @Test
- public void hadoopConfTest() throws Exception {
- run();
+ @ParameterizedTest
+ @MethodSource("clientTypeProvider")
+ public void hadoopConfTest(ClientType clientType) throws Exception {
+ run(clientType);
}
@Override
- protected void updateRssConfiguration(Configuration jobConf) {
- jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
+ protected void updateRssConfiguration(Configuration jobConf, ClientType
clientType) {
+ jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, clientType.name());
jobConf.set(RssMRConfig.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE_HDFS.name());
jobConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, HDFS_URI + "rss/test");
jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
diff --git
a/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
b/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
index be7ec8375..a1546b762 100644
---
a/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
+++
b/integration-test/mr/src/test/java/org/apache/uniffle/test/LargeSorterTest.java
@@ -23,7 +23,8 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.RssMRConfig;
import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import org.apache.uniffle.common.ClientType;
@@ -34,14 +35,15 @@ public class LargeSorterTest extends MRIntegrationTestBase {
MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf());
}
- @Test
- public void largeSorterTest() throws Exception {
- run();
+ @ParameterizedTest
+ @MethodSource("clientTypeProvider")
+ public void largeSorterTest(ClientType clientType) throws Exception {
+ run(clientType);
}
@Override
- protected void updateRssConfiguration(Configuration jobConf) {
- jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
+ protected void updateRssConfiguration(Configuration jobConf, ClientType
clientType) {
+ jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, clientType.name());
jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
jobConf.setInt(LargeSorter.MBS_PER_MAP, 256);
jobConf.set(
diff --git
a/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
b/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
index 0387eb9cb..0c6e32652 100644
---
a/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
+++
b/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
@@ -24,6 +24,7 @@ import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
@@ -45,6 +46,7 @@ import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.provider.Arguments;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.rpc.ServerType;
@@ -75,6 +77,10 @@ public class MRIntegrationTestBase extends
IntegrationTestBase {
private static final String OUTPUT_ROOT_DIR = "/tmp/" +
TestMRJobs.class.getSimpleName();
private static final Path TEST_RESOURCES_DIR = new Path(TEST_ROOT_DIR,
"localizedResources");
+ static Stream<Arguments> clientTypeProvider() {
+ return Stream.of(Arguments.of(ClientType.GRPC),
Arguments.of(ClientType.GRPC_NETTY));
+ }
+
@BeforeAll
public static void setUpMRYarn() throws IOException {
mrYarnCluster = new MiniMRYarnCluster("test");
@@ -99,29 +105,29 @@ public class MRIntegrationTestBase extends
IntegrationTestBase {
}
}
- public void run() throws Exception {
+ public void run(ClientType clientType) throws Exception {
JobConf appConf = new JobConf(mrYarnCluster.getConfig());
updateCommonConfiguration(appConf);
runOriginApp(appConf);
final String originPath =
appConf.get("mapreduce.output.fileoutputformat.outputdir");
appConf = new JobConf(mrYarnCluster.getConfig());
updateCommonConfiguration(appConf);
- runRssApp(appConf);
+ runRssApp(appConf, clientType);
String rssPath =
appConf.get("mapreduce.output.fileoutputformat.outputdir");
verifyResults(originPath, rssPath);
appConf = new JobConf(mrYarnCluster.getConfig());
appConf.set("mapreduce.rss.reduce.remote.spill.enable", "true");
- runRssApp(appConf);
+ runRssApp(appConf, clientType);
String rssRemoteSpillPath =
appConf.get("mapreduce.output.fileoutputformat.outputdir");
verifyResults(originPath, rssRemoteSpillPath);
}
- public void runWithRemoteMerge() throws Exception {
+ public void runWithRemoteMerge(ClientType clientType) throws Exception {
// 1 run application when remote merge is enable
JobConf appConf = new JobConf(mrYarnCluster.getConfig());
updateCommonConfiguration(appConf);
- runRssApp(appConf, true);
+ runRssApp(appConf, true, clientType);
final String rssPath1 =
appConf.get("mapreduce.output.fileoutputformat.outputdir");
// 2 run original application
@@ -142,11 +148,12 @@ public class MRIntegrationTestBase extends
IntegrationTestBase {
runMRApp(jobConf, getTestTool(), getTestArgs());
}
- private void runRssApp(Configuration jobConf) throws Exception {
- runRssApp(jobConf, false);
+ private void runRssApp(Configuration jobConf, ClientType clientType) throws
Exception {
+ runRssApp(jobConf, false, clientType);
}
- private void runRssApp(Configuration jobConf, boolean remoteMerge) throws
Exception {
+ private void runRssApp(Configuration jobConf, boolean remoteMerge,
ClientType clientType)
+ throws Exception {
URL url = MRIntegrationTestBase.class.getResource("/");
final String parentPath =
new
Path(url.getPath()).getParent().getParent().getParent().getParent().toString();
@@ -185,19 +192,19 @@ public class MRIntegrationTestBase extends
IntegrationTestBase {
}
assertNotNull(localFile);
String props = System.getProperty("java.class.path");
- String newProps = "";
+ StringBuilder newProps = new StringBuilder();
String[] splittedProps = props.split(":");
for (String prop : splittedProps) {
if (!prop.contains("classes")
&& !prop.contains("grpc")
&& !prop.contains("rss-")
&& !prop.contains("shuffle-storage")) {
- newProps = newProps + ":" + prop;
+ newProps.append(":").append(prop);
} else if (prop.contains("mr") && prop.contains("integration-test")) {
- newProps = newProps + ":" + prop;
+ newProps.append(":").append(prop);
}
}
- System.setProperty("java.class.path", newProps);
+ System.setProperty("java.class.path", newProps.toString());
Path newPath = new Path(HDFS_URI + "/rss.jar");
FileUtil.copy(file, fs, newPath, false, jobConf);
DistributedCache.addFileToClassPath(new Path(newPath.toUri().getPath()),
jobConf, fs);
@@ -208,8 +215,9 @@ public class MRIntegrationTestBase extends
IntegrationTestBase {
+ ","
+ MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH);
jobConf.set(RssMRConfig.RSS_COORDINATOR_QUORUM, COORDINATOR_QUORUM);
- updateRssConfiguration(jobConf);
+ updateRssConfiguration(jobConf, clientType);
runMRApp(jobConf, getTestTool(), getTestArgs());
+ fs.delete(newPath, true);
}
protected String[] getTestArgs() {
@@ -225,11 +233,14 @@ public class MRIntegrationTestBase extends
IntegrationTestBase {
CoordinatorConf coordinatorConf = getCoordinatorConf();
addDynamicConf(coordinatorConf, dynamicConf);
createCoordinatorServer(coordinatorConf);
- ShuffleServerConf shuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
+ ShuffleServerConf grpcShuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
+ ShuffleServerConf nettyShuffleServerConf =
getShuffleServerConf(ServerType.GRPC_NETTY);
if (serverConf != null) {
- shuffleServerConf.addAll(serverConf);
+ grpcShuffleServerConf.addAll(serverConf);
+ nettyShuffleServerConf.addAll(serverConf);
}
- createShuffleServer(shuffleServerConf);
+ createShuffleServer(grpcShuffleServerConf);
+ createShuffleServer(nettyShuffleServerConf);
startServers();
}
@@ -240,8 +251,8 @@ public class MRIntegrationTestBase extends
IntegrationTestBase {
return dynamicConf;
}
- protected void updateRssConfiguration(Configuration jobConf) {
- jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name());
+ protected void updateRssConfiguration(Configuration jobConf, ClientType
clientType) {
+ jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, clientType.name());
}
private void runMRApp(Configuration conf, Tool tool, String[] args) throws
Exception {
diff --git
a/integration-test/mr/src/test/java/org/apache/uniffle/test/RMWordCountTest.java
b/integration-test/mr/src/test/java/org/apache/uniffle/test/RMWordCountTest.java
index 4ac8fc589..9c2252920 100644
---
a/integration-test/mr/src/test/java/org/apache/uniffle/test/RMWordCountTest.java
+++
b/integration-test/mr/src/test/java/org/apache/uniffle/test/RMWordCountTest.java
@@ -29,8 +29,10 @@ import org.apache.hadoop.mapred.WordCount;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.buffer.ShuffleBufferType;
@@ -51,10 +53,11 @@ public class RMWordCountTest extends MRIntegrationTestBase {
MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf(),
serverConf);
}
- @Test
- public void wordCountTest() throws Exception {
+ @ParameterizedTest
+ @MethodSource("clientTypeProvider")
+ public void wordCountTest(ClientType clientType) throws Exception {
generateInputFile();
- runWithRemoteMerge();
+ runWithRemoteMerge(clientType);
}
@Override
diff --git
a/integration-test/mr/src/test/java/org/apache/uniffle/test/SecondarySortTest.java
b/integration-test/mr/src/test/java/org/apache/uniffle/test/SecondarySortTest.java
index 6e0bf5dcd..4882ec5d6 100644
---
a/integration-test/mr/src/test/java/org/apache/uniffle/test/SecondarySortTest.java
+++
b/integration-test/mr/src/test/java/org/apache/uniffle/test/SecondarySortTest.java
@@ -32,7 +32,10 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.uniffle.common.ClientType;
public class SecondarySortTest extends MRIntegrationTestBase {
@@ -43,10 +46,11 @@ public class SecondarySortTest extends
MRIntegrationTestBase {
MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf());
}
- @Test
- public void secondarySortTest() throws Exception {
+ @ParameterizedTest
+ @MethodSource("clientTypeProvider")
+ public void secondarySortTest(ClientType clientType) throws Exception {
generateInputFile();
- run();
+ run(clientType);
}
private void generateInputFile() throws Exception {
diff --git
a/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java
b/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java
index 2ba76f5a9..47aef051e 100644
---
a/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java
+++
b/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java
@@ -32,7 +32,10 @@ import org.apache.hadoop.mapred.WordCount;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.uniffle.common.ClientType;
public class WordCountTest extends MRIntegrationTestBase {
@@ -46,10 +49,11 @@ public class WordCountTest extends MRIntegrationTestBase {
MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf());
}
- @Test
- public void wordCountTest() throws Exception {
+ @ParameterizedTest
+ @MethodSource("clientTypeProvider")
+ public void wordCountTest(ClientType clientType) throws Exception {
generateInputFile();
- run();
+ run(clientType);
}
@Override