This is an automated email from the ASF dual-hosted git repository.
maobaolong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new d7aad66ac [#2315] feat(test): Introduce a client simulator tool to
simulate send shuffle data request for testing server performance (#2316)
d7aad66ac is described below
commit d7aad66ac486250bc07992fceb76762ae532cce9
Author: maobaolong <[email protected]>
AuthorDate: Fri Jan 3 11:10:21 2025 +0800
[#2315] feat(test): Introduce a client simulator tool to simulate send
shuffle data request for testing server performance (#2316)
### What changes were proposed in this pull request?
Introduce a client simulator tool to simulate send shuffle data request for
testing server performance.
### Why are the changes needed?
Fix: #2315
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
This has been used on our company.
---
pom.xml | 1 +
tools/client-simulation-yarn/README.md | 79 ++++++
tools/client-simulation-yarn/pom.xml | 81 ++++++
.../apache/uniffle/client/simulator/Constants.java | 43 +++
.../uniffle/client/simulator/HadoopConfigApp.java | 142 ++++++++++
.../client/simulator/NMCallBackHandler.java | 77 ++++++
.../simulator/UniffleClientSimOnYarnAppMaster.java | 304 +++++++++++++++++++++
.../simulator/UniffleClientSimOnYarnClient.java | 221 +++++++++++++++
.../uniffle/client/simulator/UniffleTask.java | 187 +++++++++++++
.../org/apache/uniffle/client/simulator/Utils.java | 94 +++++++
10 files changed, 1229 insertions(+)
diff --git a/pom.xml b/pom.xml
index 11c2d1296..08102056d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,6 +144,7 @@
<module>integration-test/common</module>
<module>cli</module>
<module>server-common</module>
+ <module>tools/client-simulation-yarn</module>
</modules>
<dependencies>
diff --git a/tools/client-simulation-yarn/README.md
b/tools/client-simulation-yarn/README.md
new file mode 100644
index 000000000..27758c051
--- /dev/null
+++ b/tools/client-simulation-yarn/README.md
@@ -0,0 +1,79 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+# Uniffle Client Simulation On Yarn - Usage Guide
+
+Currently, we have evaluated the performance of the flush operation using the
Uniffle server's flush event recording and flush benchmark feature.
+This allows us to assess the server's maximum capability to handle flush block
requests for small blocks (e.g., 1 KiB) and the write throughput limit for
large blocks (e.g., 1 MiB).
+
+However, there may also be performance bottlenecks between the server
receiving requests and the actual flush operation. Therefore, we need a
simulated client that continuously sends data to the server.
+
+## Parameter Description
+
+| Parameter Name | Default Value | Description
|
+|----------------------------------------------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `uniffle.client.sim.serverId` | None | Uniffle
server ID
|
+| `uniffle.client.sim.container.num` | 3 | Number of
containers to start in the Yarn application, which corresponds to the number of
concurrent client processes |
+| `uniffle.client.sim.threadCount` | 1 | Number of
concurrent threads running in each container process. The actual number of
working threads is `threadCount + 1` when each thread is concurrent. |
+| `uniffle.client.sim.queueName` | default | Yarn resource
queue name
|
+| `uniffle.client.sim.jarPath.list` | None | HDFS
addresses of additional JARs or other resources to download to AM or Task
local, separated by commas (e.g., HDFS address of RSS shaded JAR) |
+| `uniffle.client.sim.tmp.hdfs.path` | None | A writable
HDFS address for uploading temporary application resources
|
+| `uniffle.client.sim.shuffleCount` | 1 | Number of
shuffles included in a single `sendShuffleData` request
|
+| `uniffle.client.sim.partitionCount` | 1 | Number of
partitions included in each shuffle of a single `sendShuffleData` request
|
+| `uniffle.client.sim.blockCount` | 1 | Number of
blocks included in each partition of a single `sendShuffleData` request
|
+| `uniffle.client.sim.blockSize` | 1024 | Size of each
block in a single `sendShuffleData` request
|
+| `uniffle.client.sim.am.vCores` | 8 | Number of
virtual cores specified when requesting the Application Master (AM)
|
+| `uniffle.client.sim.am.memory` | 4096 | Memory size
(in MB) specified when requesting the AM
|
+| `uniffle.client.sim.container.vCores` | 2 | Number of
virtual cores specified when requesting task containers
|
+| `uniffle.client.sim.container.memory` | 2048 | Memory size
(in MB) specified when requesting task containers
|
+| `uniffle.client.sim.am.jvm.opts` | None | Additional
JVM options for debugging when the execution result is abnormal
|
+| `uniffle.client.sim.container.jvm.opts` | None | Additional
JVM options for debugging when the execution result is abnormal
|
+
+## Running Example
+
+1. Change to the Hadoop directory and execute the test on Yarn program:
+
+```bash
+cd $HADOOP_HOME
+```
+
+2. Execute the example command:
+
+```bash
+$ bin/yarn jar rss-client-simulation-yarn-0.11.0-SNAPSHOT.jar \
+-Duniffle.client.sim.serverId=<UNIFFLE_SERVER_ID> \
+-Duniffle.client.sim.container.num=1000 \
+-Duniffle.client.sim.queueName=<YOUR_QUEUE_NAME> \
+-Duniffle.client.sim.jarPath.list=hdfs://ns1/tmp/rss-client-spark3-shaded.jar
\
+-Duniffle.client.sim.tmp.hdfs.path=hdfs://ns1/user/xx/tmp/uniffle-client-sim/
\
+-Duniffle.client.sim.shuffleCount=5 \
+-Duniffle.client.sim.partitionCount=50 \
+-Duniffle.client.sim.blockCount=100 \
+-Duniffle.client.sim.blockSize=10240 \
+-Duniffle.client.sim.threadCount=10
+```
+
+3. Example Output:
+
+```plaintext
+24/12/30 15:03:47 INFO simulator.UniffleClientSimOnYarnClient: appId:
application_1729845342052_5295913
+...
+Application killed: application_1729845342052_5295913
+Application status: KILLED
+```
+
+This guide provides a comprehensive overview of how to run the Uniffle Client
simulator on Yarn, including parameter descriptions and a step-by-step example.
Adjust the parameters as needed for your specific use case.
diff --git a/tools/client-simulation-yarn/pom.xml
b/tools/client-simulation-yarn/pom.xml
new file mode 100644
index 000000000..706f722c6
--- /dev/null
+++ b/tools/client-simulation-yarn/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>uniffle-parent</artifactId>
+ <version>0.11.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>rss-client-simulation-yarn</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache Uniffle Client Simulation On Yarn</name>
+ <properties>
+ <uniffle.version>0.9.1</uniffle.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>rss-client-spark3-shaded</artifactId>
+ <version>${uniffle.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>rss-client-spark3</artifactId>
+ <groupId>org.apache.uniffle</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>protobuf-java</artifactId>
+ <groupId>com.google.protobuf</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+
<mainClass>org.apache.uniffle.client.simulator.UniffleClientSimOnYarnClient</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git
a/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/Constants.java
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/Constants.java
new file mode 100644
index 000000000..251bae546
--- /dev/null
+++
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/Constants.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.simulator;
+
+public class Constants {
+ public static final String KEY_PREFIX = "uniffle.client.sim.";
+ public static final String KEY_AM_MEMORY = KEY_PREFIX + "am.memory";
+ public static final String KEY_AM_VCORES = KEY_PREFIX + "am.vCores";
+ public static final String KEY_CONTAINER_MEMORY = KEY_PREFIX +
"container.memory";
+ public static final String KEY_CONTAINER_VCORES = KEY_PREFIX +
"container.vCores";
+ public static final String KEY_QUEUE_NAME = KEY_PREFIX + "queueName";
+ public static final String KEY_CONTAINER_NUM = KEY_PREFIX + "container.num";
+ public static final int CONTAINER_NUM_DEFAULT = 3;
+ public static final String KEY_SERVER_ID = KEY_PREFIX + "serverId";
+ public static final String KEY_SHUFFLE_COUNT = KEY_PREFIX + "shuffleCount";
+ public static final String KEY_PARTITION_COUNT = KEY_PREFIX +
"partitionCount";
+ public static final String KEY_BLOCK_COUNT = KEY_PREFIX + "blockCount";
+ public static final String KEY_BLOCK_SIZE = KEY_PREFIX + "blockSize";
+ public static final String KEY_EXTRA_JAR_PATH_LIST = KEY_PREFIX +
"jarPath.list";
+ public static final String KEY_YARN_APP_ID = KEY_PREFIX + "appId";
+ public static final String KEY_CONTAINER_INDEX = KEY_PREFIX +
"containerIndex";
+ public static final String KEY_AM_EXTRA_JVM_OPTS = KEY_PREFIX +
"am.jvm.opts";
+ public static final String KEY_CONTAINER_EXTRA_JVM_OPTS = KEY_PREFIX +
"container.jvm.opts";
+ public static final String KEY_TMP_HDFS_PATH = KEY_PREFIX + "tmp.hdfs.path";
+ public static final String TMP_HDFS_PATH_DEFAULT =
"./tmp/uniffle-client-sim/";
+ public static final String JOB_CONF_NAME = "job.conf";
+ public static final String KEY_THREAD_COUNT = KEY_PREFIX + ".threadCount";
+}
diff --git
a/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/HadoopConfigApp.java
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/HadoopConfigApp.java
new file mode 100644
index 000000000..ed6b2497b
--- /dev/null
+++
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/HadoopConfigApp.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.simulator;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+
+public class HadoopConfigApp {
+
+ private final Configuration conf;
+ private Map<String, String> localConf = new HashMap<>();
+
+ public HadoopConfigApp(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public int run(String[] args) {
+ Options options = new Options();
+
+ Option confOption =
+ OptionBuilder.withArgName("conf")
+ .withLongOpt("conf")
+ .hasArg()
+ .withDescription("Path to the configuration file with key=value
pairs")
+ .create("c");
+ options.addOption(confOption);
+
+ Option defineOption =
+ OptionBuilder.withArgName("key=value")
+ .withLongOpt("define")
+ .hasArgs(2)
+ .withValueSeparator()
+ .withDescription("Define a key-value pair configuration")
+ .create("D");
+ options.addOption(defineOption);
+
+ CommandLineParser parser = new GnuParser();
+ CommandLine cmd;
+
+ try {
+ cmd = parser.parse(options, args);
+ } catch (ParseException e) {
+ System.err.println("Failed to parse command line arguments: " +
e.getMessage());
+ return 1;
+ }
+
+ // handle --conf option
+ if (cmd.hasOption("conf")) {
+ String confFile = cmd.getOptionValue("conf");
+ try {
+ loadConfigurationFromFile(localConf, confFile);
+ } catch (IOException e) {
+ System.err.println("Error loading configuration from file: " +
e.getMessage());
+ return 1;
+ }
+ }
+
+ // handle -D option
+ if (cmd.hasOption("define")) {
+ Properties properties = cmd.getOptionProperties("D");
+ for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+ localConf.put(entry.getKey().toString(), entry.getValue().toString());
+ }
+ }
+
+ for (Map.Entry<String, String> entry : localConf.entrySet()) {
+ conf.set(entry.getKey(), entry.getValue());
+ }
+ return 0;
+ }
+
+ protected void loadConfigurationFromFile(Map<String, String> map, String
filePath)
+ throws IOException {
+ try (BufferedReader reader =
+ new BufferedReader(
+ new InputStreamReader(new FileInputStream(filePath),
StandardCharsets.UTF_8))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ line = line.trim();
+ if (line.isEmpty() || line.startsWith("#")) {
+ continue; // Skip empty lines and comments
+ }
+ int eq = line.indexOf('=');
+ if (eq > 0) {
+ String key = line.substring(0, eq).trim();
+ String value = line.substring(eq + 1).trim();
+ map.put(key, value);
+ } else {
+ System.err.println("Invalid configuration line: " + line);
+ }
+ }
+ }
+ }
+
+ public Map<String, String> getLocalConf() {
+ return localConf;
+ }
+
+ protected static String writeConfigurationToString(Map<String, String> map) {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+
stringBuilder.append(entry.getKey()).append("=").append(entry.getValue()).append("\n");
+ }
+ return stringBuilder.toString().trim();
+ }
+
+ public static void main(String[] args) {
+ HadoopConfigApp app = new HadoopConfigApp(new Configuration());
+ int exitCode = app.run(args);
+ System.exit(exitCode);
+ }
+}
diff --git
a/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/NMCallBackHandler.java
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/NMCallBackHandler.java
new file mode 100644
index 000000000..572f9a1b4
--- /dev/null
+++
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/NMCallBackHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.simulator;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+
+public class NMCallBackHandler extends NMClientAsync.AbstractCallbackHandler {
+ @Override
+ public void onContainerStarted(
+ ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
+ System.out.println("NM: container started, id=" + containerId.toString());
+ }
+
+ @Override
+ public void onContainerStatusReceived(ContainerId containerId,
ContainerStatus containerStatus) {
+ System.out.println("NM: container status received, id=" +
containerId.toString());
+ }
+
+ @Override
+ public void onContainerStopped(ContainerId containerId) {
+ System.out.println("NM: container stopped, id=" + containerId.toString());
+ }
+
+ @Override
+ public void onStartContainerError(ContainerId containerId, Throwable t) {
+ System.out.println("NM: start container error, id=" +
containerId.toString());
+ }
+
+ @Override
+ public void onContainerResourceIncreased(ContainerId containerId, Resource
resource) {
+ System.out.println("NM: container resource increased, id=" +
containerId.toString());
+ }
+
+ public void onContainerResourceUpdated(ContainerId containerId, Resource
resource) {
+ System.out.println("NM: container resource updated, id=" +
containerId.toString());
+ }
+
+ @Override
+ public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
+ System.out.println("NM: get container status error, id=" +
containerId.toString());
+ }
+
+ @Override
+ public void onIncreaseContainerResourceError(ContainerId containerId,
Throwable t) {
+ System.out.println("NM: increase container resource error, id=" +
containerId.toString());
+ }
+
+ public void onUpdateContainerResourceError(ContainerId containerId,
Throwable t) {
+ System.out.println("NM: update container resource error, id=" +
containerId.toString());
+ }
+
+ @Override
+ public void onStopContainerError(ContainerId containerId, Throwable t) {
+ System.out.println("NM: stop container error, id=" +
containerId.toString());
+ }
+}
diff --git
a/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/UniffleClientSimOnYarnAppMaster.java
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/UniffleClientSimOnYarnAppMaster.java
new file mode 100644
index 000000000..bc58a9cc3
--- /dev/null
+++
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/UniffleClientSimOnYarnAppMaster.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.simulator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
+import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
+import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+public class UniffleClientSimOnYarnAppMaster {
+ private static final Logger LOG =
LoggerFactory.getLogger(UniffleClientSimOnYarnAppMaster.class);
+ private final Configuration conf;
+ private final String appId;
+ // object lock
+ private Object lock = new Object();
+ // child tasks num
+ private int childTaskNum;
+ // completed tasks num
+ private int childTaskCompletedNum = 0;
+ private AtomicInteger allocatedContainerNum = new AtomicInteger(0);
+ NMClientAsyncImpl nmClientAsync;
+
+ public static void main(String[] args) {
+ UniffleClientSimOnYarnAppMaster master = new
UniffleClientSimOnYarnAppMaster();
+ master.run();
+ }
+
+ public UniffleClientSimOnYarnAppMaster() {
+ conf = new Configuration();
+ HadoopConfigApp hadoopConfigApp = new HadoopConfigApp(conf);
+ hadoopConfigApp.run(new String[] {"--conf", "./" +
Constants.JOB_CONF_NAME});
+ appId = conf.get(Constants.KEY_YARN_APP_ID, "unknown");
+ childTaskNum = conf.getInt(Constants.KEY_CONTAINER_NUM,
Constants.CONTAINER_NUM_DEFAULT);
+ }
+
+ private void run() {
+ try {
+ String serverId = conf.get(Constants.KEY_SERVER_ID);
+ System.out.println("serverId:" + serverId);
+ // start am-rm client,build rm-am connection to register AM,
allocListener responsible for
+ // handle AM's status
+ AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient =
+ AMRMClientAsync.createAMRMClientAsync(1000, new RMCallBackHandler());
+ amRmClient.init(new Configuration());
+ amRmClient.start();
+ String hostName = NetUtils.getHostname();
+ // register to RM
+ amRmClient.registerApplicationMaster(hostName, -1, null);
+ // init nmClient
+ nmClientAsync = new NMClientAsyncImpl(new NMCallBackHandler());
+ nmClientAsync.init(new Configuration());
+ nmClientAsync.start();
+ // run
+ doRun(amRmClient);
+ // unregister AM
+ amRmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
"SUCCESS", null);
+ // stop am-rm
+ amRmClient.stop();
+ // stop nm client
+ nmClientAsync.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /** run the main logic */
+ private void doRun(AMRMClientAsync amRmClient) throws InterruptedException {
+ int shuffleCount = conf.getInt(Constants.KEY_SHUFFLE_COUNT, 1);
+ int partitionCount = conf.getInt(Constants.KEY_PARTITION_COUNT, 1);
+ String serverId = conf.get(Constants.KEY_SERVER_ID, "");
+ registerShuffleAndSetupHeartbeat(shuffleCount, partitionCount, serverId);
+ // apply child tasks
+ for (int i = 0; i < childTaskNum; i++) {
+ int memory = conf.getInt(Constants.KEY_CONTAINER_MEMORY, 2048);
+ int vCores = conf.getInt(Constants.KEY_CONTAINER_VCORES, 2);
+ // apply container
+ AMRMClient.ContainerRequest containerRequest =
+ new AMRMClient.ContainerRequest(
+ Resource.newInstance(memory, vCores), null, null,
Priority.UNDEFINED);
+ amRmClient.addContainerRequest(containerRequest);
+ }
+ synchronized (lock) {
+ while (childTaskCompletedNum < childTaskNum) {
+ lock.wait(1000);
+ }
+ }
+ System.out.println("AM: Finish main logic");
+ }
+
+ private void registerShuffleAndSetupHeartbeat(
+ int shuffleCount, int partitionCount, String serverId) {
+ String[] parts = serverId.split("-");
+ String host = parts[0];
+ int port0 = Integer.parseInt(parts[1]);
+ int port1 = Integer.parseInt(parts[2]);
+
+ ShuffleWriteClientImpl shuffleWriteClientImpl =
+ ShuffleClientFactory.newWriteBuilder()
+ .clientType(ClientType.GRPC.name())
+ .retryMax(3)
+ .retryIntervalMax(1000)
+ .heartBeatThreadNum(4)
+ .replica(1)
+ .replicaWrite(1)
+ .replicaRead(1)
+ .replicaSkipEnabled(true)
+ .dataTransferPoolSize(10)
+ .dataCommitPoolSize(10)
+ .unregisterThreadPoolSize(10)
+ .unregisterRequestTimeSec(10)
+ .build();
+
+ ShuffleServerInfo shuffleServerInfo = new ShuffleServerInfo(host, port0,
port1);
+ registerShuffle(shuffleCount, partitionCount, shuffleWriteClientImpl,
shuffleServerInfo);
+ long heartbeatInterval = 10_000;
+ ScheduledExecutorService heartBeatScheduledExecutorService =
+ ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
+ heartBeatScheduledExecutorService.scheduleAtFixedRate(
+ () -> {
+ try {
+ String appId = this.appId.toString();
+ shuffleWriteClientImpl.sendAppHeartbeat(appId, 10_000);
+ LOG.info("Finish send heartbeat to coordinator and servers");
+ } catch (Exception e) {
+ LOG.warn("Fail to send heartbeat to coordinator and servers", e);
+ }
+ },
+ heartbeatInterval / 2,
+ heartbeatInterval,
+ TimeUnit.MILLISECONDS);
+ }
+
+ private void registerShuffle(
+ int shuffleCount,
+ int partitionCount,
+ ShuffleWriteClientImpl shuffleWriteClientImpl,
+ ShuffleServerInfo shuffleServerInfo) {
+ List<PartitionRange> partitionRanges = new ArrayList<>(partitionCount);
+ for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
+ partitionRanges.add(new PartitionRange(partitionId, partitionId));
+ }
+ for (int shuffleId = 0; shuffleId < shuffleCount; shuffleId++) {
+ shuffleWriteClientImpl.registerShuffle(
+ shuffleServerInfo,
+ appId.toString(),
+ shuffleId,
+ partitionRanges,
+ new RemoteStorageInfo(""),
+ ShuffleDataDistributionType.NORMAL,
+ 1);
+ }
+ }
+
+ class RMCallBackHandler extends AMRMClientAsync.AbstractCallbackHandler {
+
+ @Override
+ public void onContainersCompleted(List<ContainerStatus> statuses) {
+ for (ContainerStatus status : statuses) {
+ synchronized (lock) {
+ System.out.println(++childTaskCompletedNum + " container completed");
+ try {
+ Thread.sleep(10_000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ // notify main thread when all child tasks are completed
+ if (childTaskCompletedNum == childTaskNum) {
+ lock.notify();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onContainersAllocated(List<Container> containers) {
+ try {
+ for (Container container : containers) {
+ System.out.println("container allocated, Node=" +
container.getNodeHttpAddress());
+ // build AM<->NM client and start container
+ Map<String, String> env = new HashMap<>();
+ StringBuilder classPathEnv =
+ new
StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
+ .append(ApplicationConstants.CLASS_PATH_SEPARATOR)
+ .append("./*");
+ for (String c :
+ conf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+
YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
+ classPathEnv.append(c.trim());
+ }
+ env.put("CLASSPATH", classPathEnv.toString());
+ List<String> commands = new ArrayList<>();
+ int index = allocatedContainerNum.getAndIncrement();
+ String extraJvmOpts =
conf.get(Constants.KEY_CONTAINER_EXTRA_JVM_OPTS, "");
+ commands.add(
+ ApplicationConstants.Environment.JAVA_HOME.$$()
+ + "/bin/java "
+ + " -Djava.specification.version="
+ + System.getProperty("java.specification.version")
+ + " "
+ + extraJvmOpts
+ + " "
+ + UniffleTask.class.getName()
+ + " -D"
+ + Constants.KEY_CONTAINER_INDEX
+ + "="
+ + index);
+
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>() {
+ {
+ String[] extraJarPathList =
conf.getStrings(Constants.KEY_EXTRA_JAR_PATH_LIST);
+ if (extraJarPathList != null) {
+ for (String extraJarPath : extraJarPathList) {
+ Path path = new Path(extraJarPath);
+ String name = path.getName();
+ put(name, Utils.addHdfsToResource(conf, path));
+ }
+ }
+ }
+ };
+ ContainerLaunchContext containerLaunchContext =
+ ContainerLaunchContext.newInstance(localResources, env,
commands, null, null, null);
+ // request nm to start container
+ nmClientAsync.startContainerAsync(container, containerLaunchContext);
+ System.out.println(index + ": container started, Node=" +
container.getNodeHttpAddress());
+ System.out.println("containerLaunchContext: " +
containerLaunchContext);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void onContainersUpdated(List<UpdatedContainer> containers) {}
+
+ @Override
+ public void onShutdownRequest() {
+ System.out.println("RM: shutdown request");
+ }
+
+ @Override
+ public void onNodesUpdated(List<NodeReport> updatedNodes) {}
+
+ @Override
+ public float getProgress() {
+ return 0;
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git
a/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/UniffleClientSimOnYarnClient.java
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/UniffleClientSimOnYarnClient.java
new file mode 100644
index 000000000..326ee7093
--- /dev/null
+++
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/UniffleClientSimOnYarnClient.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.simulator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UniffleClientSimOnYarnClient {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(UniffleClientSimOnYarnClient.class);
+ private final Configuration conf;
+
+ public UniffleClientSimOnYarnClient(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public static void main(String[] args) {
+ Configuration conf = new Configuration();
+ // Load configuration from parameters
+ HadoopConfigApp hadoopConfigApp = new HadoopConfigApp(conf);
+ int exitCode = hadoopConfigApp.run(args);
+ if (exitCode != 0) {
+ System.exit(exitCode);
+ }
+ UniffleClientSimOnYarnClient client = new
UniffleClientSimOnYarnClient(conf);
+ try {
+ client.run(hadoopConfigApp.getLocalConf());
+ } catch (Exception e) {
+ LOG.error("client run exception , please check log file.", e);
+ }
+ }
+
+ public void run(Map<String, String> localConf) throws Exception {
+ // YarnClient to talk to ResourceManager
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
+ // request RM to create an application
+ YarnClientApplication application = yarnClient.createApplication();
+ // prepare app submission context
+ ApplicationSubmissionContext applicationSubmissionContext =
+ application.getApplicationSubmissionContext();
+ // get application id and store to conf for am and sub-tasks to use
+ ApplicationId appId = applicationSubmissionContext.getApplicationId();
+ LOG.info("appId: {}", appId);
+ localConf.put(Constants.KEY_YARN_APP_ID, appId.toString());
+ applicationSubmissionContext.setApplicationName("UniffleClientSimOnYarn");
+
+ Runtime.getRuntime()
+ .addShutdownHook(
+ new Thread(
+ () -> {
+ System.out.println("Received Ctrl+C, terminating
application...");
+
+ try {
+ yarnClient.killApplication(appId);
+ System.out.println("Application killed: " + appId);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ try {
+ ApplicationReport report =
yarnClient.getApplicationReport(appId);
+ System.out.println("Application status: " +
report.getYarnApplicationState());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }));
+
+ // copy jar to hdfs
+ String jarLocalPathStr =
Utils.getCurrentJarPath(UniffleClientSimOnYarnClient.class);
+ System.out.println("jarLocalPath: " + jarLocalPathStr);
+ if (jarLocalPathStr == null || !jarLocalPathStr.endsWith(".jar")) {
+ String cmdline = System.getProperty("sun.java.command");
+ if (cmdline != null) {
+ String[] parts = cmdline.split("\\s+");
+ if (parts.length > 1 && parts[1].endsWith(".jar")) {
+ jarLocalPathStr = parts[1];
+ System.out.println(
+ "jarLocalPath: " + jarLocalPathStr + " updated it from command
line: " + cmdline);
+ }
+ }
+ }
+ Path jarLocalPath = new Path(jarLocalPathStr);
+ Path jarHdfsPath =
+ Utils.copyToHdfs(conf, jarLocalPath, Utils.getHdfsDestPath(conf,
jarLocalPath.getName()));
+ System.out.println("jarHdfsPath: " + jarHdfsPath);
+ localConf.put(
+ Constants.KEY_EXTRA_JAR_PATH_LIST,
+ Utils.isBlank(localConf.get(Constants.KEY_EXTRA_JAR_PATH_LIST))
+ ? jarHdfsPath.toString()
+ : localConf.get(Constants.KEY_EXTRA_JAR_PATH_LIST) + "," +
jarHdfsPath);
+ conf.set(Constants.KEY_EXTRA_JAR_PATH_LIST,
localConf.get(Constants.KEY_EXTRA_JAR_PATH_LIST));
+
+ // write job configuration to hdfs
+ Path jobConfHdfsPath = Utils.getHdfsDestPath(conf,
Constants.JOB_CONF_NAME);
+ localConf.put(
+ Constants.KEY_EXTRA_JAR_PATH_LIST,
+ Utils.isBlank(localConf.get(Constants.KEY_EXTRA_JAR_PATH_LIST))
+ ? jobConfHdfsPath.toString()
+ : localConf.get(Constants.KEY_EXTRA_JAR_PATH_LIST) + "," +
jobConfHdfsPath);
+ conf.set(Constants.KEY_EXTRA_JAR_PATH_LIST,
localConf.get(Constants.KEY_EXTRA_JAR_PATH_LIST));
+ Utils.writeStringToHdfs(
+ conf, HadoopConfigApp.writeConfigurationToString(localConf),
jobConfHdfsPath);
+
+ // prepare local resources
+ Map<String, LocalResource> localResources = new HashMap<>();
+ String[] extraJarPathList =
conf.getStrings(Constants.KEY_EXTRA_JAR_PATH_LIST);
+ if (extraJarPathList != null) {
+ for (String extraJarPath : extraJarPathList) {
+ Path path = new Path(extraJarPath);
+ String name = path.getName();
+ localResources.put(name, Utils.addHdfsToResource(conf, path));
+ }
+ }
+ // prepare environment classpath
+ Map<String, String> env = new HashMap<>();
+ // the dependencies of am
+ StringBuilder classPathEnv =
+ new StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
+ .append(ApplicationConstants.CLASS_PATH_SEPARATOR)
+ .append("./*");
+ // yarn dependencies
+ for (String c :
+ conf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+
YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
+ classPathEnv.append(c.trim());
+ }
+ env.put("CLASSPATH", classPathEnv.toString());
+
+ String extraJvmOpts = conf.get(Constants.KEY_AM_EXTRA_JVM_OPTS, "");
+ // prepare launch command to run am
+ List<String> commands =
+ new ArrayList<String>() {
+ {
+ add(
+ ApplicationConstants.Environment.JAVA_HOME.$$()
+ + "/bin/java "
+ + " -Djava.specification.version="
+ + System.getProperty("java.specification.version")
+ + " "
+ + extraJvmOpts
+ + " "
+ + UniffleClientSimOnYarnAppMaster.class.getName());
+ }
+ };
+
+ // create am container and submit the application
+ ContainerLaunchContext amContainer =
+ ContainerLaunchContext.newInstance(localResources, env, commands,
null, null, null);
+ // prepare the am container context
+ applicationSubmissionContext.setAMContainerSpec(amContainer);
+ int memory = conf.getInt(Constants.KEY_AM_MEMORY, 4096);
+ int vCores = conf.getInt(Constants.KEY_AM_VCORES, 8);
+ applicationSubmissionContext.setResource(Resource.newInstance(memory,
vCores));
+ String queueName = conf.get(Constants.KEY_QUEUE_NAME,
YarnConfiguration.DEFAULT_QUEUE_NAME);
+ if (!YarnConfiguration.DEFAULT_QUEUE_NAME.equals(queueName)) {
+ applicationSubmissionContext.setQueue(queueName);
+ }
+ System.out.println(applicationSubmissionContext);
+ yarnClient.submitApplication(applicationSubmissionContext);
+
+ // monitor application progress
+ for (; ; ) {
+ ApplicationReport applicationReport =
yarnClient.getApplicationReport(appId);
+ YarnApplicationState state = applicationReport.getYarnApplicationState();
+ FinalApplicationStatus status =
applicationReport.getFinalApplicationStatus();
+ if (state.equals(YarnApplicationState.FINISHED)) {
+ if (status.equals(FinalApplicationStatus.SUCCEEDED)) {
+ LOG.info("SUCCESSFUL FINISH.");
+ break;
+ } else {
+ LOG.error("FINISHED WITH ERROR.");
+ break;
+ }
+ } else if (state.equals(YarnApplicationState.FAILED)
+ || state.equals(YarnApplicationState.KILLED)) {
+ LOG.error("Application failed with state: {}", state);
+ break;
+ }
+ LOG.info("running...");
+ Thread.sleep(5000);
+ }
+ }
+}
diff --git
a/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/UniffleTask.java
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/UniffleTask.java
new file mode 100644
index 000000000..72e7537a7
--- /dev/null
+++
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/UniffleTask.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.simulator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
+import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
+import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.shaded.io.netty.buffer.Unpooled;
+
+public class UniffleTask {
+ private static byte[] data;
+ private static AtomicLong sId = new AtomicLong(0);
+
+ public static void main(String[] args) {
+ System.out.println(
+ "Start " + UniffleTask.class.getSimpleName() + " with " +
Arrays.toString(args));
+ if (args.length < 1) {
+ System.err.println(
+ "Usage: "
+ + UniffleTask.class.getSimpleName()
+ + " [--conf <CONFIG_FILE>] [-D<KEY>=<VALUE>]");
+ System.exit(1);
+ }
+
+ Configuration conf = new Configuration();
+ List<String> combinedArgs = new ArrayList<>(args.length);
+ combinedArgs.addAll(Arrays.asList(args));
+ combinedArgs.add("--conf");
+ combinedArgs.add("./" + Constants.JOB_CONF_NAME);
+ HadoopConfigApp hadoopConfigApp = new HadoopConfigApp(conf);
+ hadoopConfigApp.run(combinedArgs.toArray(new String[0]));
+ System.out.println(hadoopConfigApp.getLocalConf());
+ String appId = conf.get(Constants.KEY_YARN_APP_ID);
+ int taskIndex = conf.getInt(Constants.KEY_CONTAINER_INDEX, 0);
+ String serverId = conf.get(Constants.KEY_SERVER_ID, "");
+ int shuffleCount = conf.getInt(Constants.KEY_SHUFFLE_COUNT, 1);
+ int partitionCount = conf.getInt(Constants.KEY_PARTITION_COUNT, 1);
+ int blockCount = conf.getInt(Constants.KEY_BLOCK_COUNT, 1000);
+ int blockSize = conf.getInt(Constants.KEY_BLOCK_SIZE, 1024);
+ int threadCount = conf.getInt(Constants.KEY_THREAD_COUNT, 1);
+ data = new byte[blockSize];
+ System.out.println(taskIndex + ": start to send shuffle data to " +
serverId);
+
+ ExecutorService executorService =
+ new ThreadPoolExecutor(
+ threadCount,
+ threadCount,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ Executors.defaultThreadFactory(),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+
+ try {
+ while (true) {
+ executorService.submit(
+ () ->
+ sendData(
+ taskIndex,
+ appId,
+ serverId,
+ shuffleCount,
+ partitionCount,
+ blockCount,
+ blockSize));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ executorService.shutdown();
+ }
+ System.out.println("end to send shuffle data...");
+ }
+
+ private static void sendData(
+ int taskIndex,
+ String appId,
+ String serverId,
+ int shuffleCount,
+ int partitionCount,
+ int blockCount,
+ int blockSize) {
+ try {
+ String[] parts = serverId.split("-");
+ String host = parts[0];
+ int port0 = Integer.parseInt(parts[1]);
+ int port1 = Integer.parseInt(parts[2]);
+ ShuffleServerInfo shuffleServerInfo = new ShuffleServerInfo(host, port0,
port1);
+ ShuffleServerGrpcClient client =
+ ((ShuffleServerGrpcClient)
+ ShuffleServerClientFactory.getInstance()
+ .getShuffleServerClient("GRPC_NETTY", shuffleServerInfo));
+ Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = new
HashMap<>();
+ constructShuffleBlockInfo(
+ shuffleToBlocks, taskIndex, shuffleCount, partitionCount,
blockCount, blockSize);
+
+ RssSendShuffleDataRequest sendShuffleDataRequest =
+ new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
+ RssSendShuffleDataResponse response =
client.sendShuffleData(sendShuffleDataRequest);
+ if (response != null && response.getStatusCode() != StatusCode.SUCCESS) {
+ System.out.println(
+ "send shuffle data error with "
+ + response.getStatusCode()
+ + " message: "
+ + response.getMessage());
+ } else {
+ System.out.printf(".");
+ }
+ } catch (Exception e) {
+ System.err.printf(
+ "Error during task %d for sending
shuffleCount=%d,partitionCount=%d,blockCount=%d,blockSize=%d: %s%n",
+ taskIndex, shuffleCount, partitionCount, blockCount, blockSize,
e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ @VisibleForTesting
+ protected static void constructShuffleBlockInfo(
+ Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks,
+ int taskIndex,
+ int shuffleCount,
+ int partitionCount,
+ int blockCount,
+ int blockSize) {
+ for (int shuffleId = 0; shuffleId < shuffleCount; shuffleId++) {
+ Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = new HashMap<>();
+ for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
+ List<ShuffleBlockInfo> blocks = partitionToBlocks.get(partitionId);
+ if (blocks == null) {
+ blocks = new ArrayList<>();
+ partitionToBlocks.put(partitionId, blocks);
+ }
+ for (int blockIndex = 0; blockIndex < blockCount; blockIndex++) {
+ long blockId = (((long) taskIndex + 1) << (Long.SIZE - 5)) |
sId.getAndIncrement();
+ blocks.add(
+ new ShuffleBlockInfo(
+ shuffleId,
+ partitionId,
+ blockId,
+ blockSize,
+ 0,
+ Unpooled.wrappedBuffer(data).retain(),
+ Collections.emptyList(),
+ 0,
+ 0,
+ 0));
+ }
+ }
+ shuffleToBlocks.put(shuffleId, partitionToBlocks);
+ }
+ }
+}
diff --git
a/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/Utils.java
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/Utils.java
new file mode 100644
index 000000000..45fef342a
--- /dev/null
+++
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/Utils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.simulator;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.security.CodeSource;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+
+public final class Utils {
+ public static Path copyToHdfs(Configuration conf, Path srcPath, Path
dstPath) throws IOException {
+ try (FileSystem fs = FileSystem.get(dstPath.toUri(), conf)) {
+ // upload
+ fs.copyFromLocalFile(srcPath, dstPath);
+ return dstPath;
+ }
+ }
+
+ public static LocalResource addHdfsToResource(Configuration conf, Path
dstPath)
+ throws IOException {
+ try (FileSystem fs = FileSystem.get(dstPath.toUri(), conf)) {
+ FileStatus scFileStatus = fs.getFileStatus(dstPath);
+ LocalResource scRsrc =
+ LocalResource.newInstance(
+ URL.fromURI(dstPath.toUri()),
+ LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION,
+ scFileStatus.getLen(),
+ scFileStatus.getModificationTime());
+ return scRsrc;
+ }
+ }
+
+ public static Path getHdfsDestPath(Configuration conf, String name) {
+ return new Path(conf.get(Constants.KEY_TMP_HDFS_PATH,
Constants.TMP_HDFS_PATH_DEFAULT), name);
+ }
+
+ public static void writeStringToHdfs(Configuration conf, String content,
Path path)
+ throws IOException {
+ try (FileSystem fs = FileSystem.get(path.toUri(), conf)) {
+ // upload
+ try (FSDataOutputStream os = fs.create(path)) {
+ os.write(content.getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ }
+
+ public static String getCurrentJarPath(Class clazz) throws
URISyntaxException {
+ CodeSource codeSource = clazz.getProtectionDomain().getCodeSource();
+ if (codeSource != null && codeSource.getLocation() != null) {
+ return new File(codeSource.getLocation().toURI()).getPath();
+ }
+ return null;
+ }
+
+ public static boolean isBlank(String str) {
+ int strLen;
+ if (str == null || (strLen = str.length()) == 0) {
+ return true;
+ }
+ for (int i = 0; i < strLen; i++) {
+ if ((Character.isWhitespace(str.charAt(i)) == false)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}