This is an automated email from the ASF dual-hosted git repository.

maobaolong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new d7aad66ac [#2315] feat(test): Introduce a client simulator tool to 
simulate send shuffle data request for testing server performance (#2316)
d7aad66ac is described below

commit d7aad66ac486250bc07992fceb76762ae532cce9
Author: maobaolong <[email protected]>
AuthorDate: Fri Jan 3 11:10:21 2025 +0800

    [#2315] feat(test): Introduce a client simulator tool to simulate send 
shuffle data request for testing server performance (#2316)
    
    ### What changes were proposed in this pull request?
    
    Introduce a client simulator tool to simulate send shuffle data request for 
testing server performance.
    
    ### Why are the changes needed?
    
    Fix: #2315
    
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    This has been used on our company.
---
 pom.xml                                            |   1 +
 tools/client-simulation-yarn/README.md             |  79 ++++++
 tools/client-simulation-yarn/pom.xml               |  81 ++++++
 .../apache/uniffle/client/simulator/Constants.java |  43 +++
 .../uniffle/client/simulator/HadoopConfigApp.java  | 142 ++++++++++
 .../client/simulator/NMCallBackHandler.java        |  77 ++++++
 .../simulator/UniffleClientSimOnYarnAppMaster.java | 304 +++++++++++++++++++++
 .../simulator/UniffleClientSimOnYarnClient.java    | 221 +++++++++++++++
 .../uniffle/client/simulator/UniffleTask.java      | 187 +++++++++++++
 .../org/apache/uniffle/client/simulator/Utils.java |  94 +++++++
 10 files changed, 1229 insertions(+)

diff --git a/pom.xml b/pom.xml
index 11c2d1296..08102056d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,6 +144,7 @@
     <module>integration-test/common</module>
     <module>cli</module>
     <module>server-common</module>
+    <module>tools/client-simulation-yarn</module>
   </modules>
 
   <dependencies>
diff --git a/tools/client-simulation-yarn/README.md 
b/tools/client-simulation-yarn/README.md
new file mode 100644
index 000000000..27758c051
--- /dev/null
+++ b/tools/client-simulation-yarn/README.md
@@ -0,0 +1,79 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+# Uniffle Client Simulation On Yarn - Usage Guide
+
+Currently, we have evaluated the performance of the flush operation using the 
Uniffle server's flush event recording and flush benchmark feature.
+This allows us to assess the server's maximum capability to handle flush block 
requests for small blocks (e.g., 1 KiB) and the write throughput limit for 
large blocks (e.g., 1 MiB).
+
+However, there may also be performance bottlenecks between the server 
receiving requests and the actual flush operation. Therefore, we need a 
simulated client that continuously sends data to the server.
+
+## Parameter Description
+
+| Parameter Name                               | Default Value | Description   
                                                                                
                                                            |
+|----------------------------------------------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `uniffle.client.sim.serverId`                | None          | Uniffle 
server ID                                                                       
                                                                  |
+| `uniffle.client.sim.container.num`           | 3             | Number of 
containers to start in the Yarn application, which corresponds to the number of 
concurrent client processes                                     |
+| `uniffle.client.sim.threadCount`             | 1             | Number of 
concurrent threads running in each container process. The actual number of 
working threads is `threadCount + 1` when each thread is concurrent. |
+| `uniffle.client.sim.queueName`               | default       | Yarn resource 
queue name                                                                      
                                                            |
+| `uniffle.client.sim.jarPath.list`            | None          | HDFS 
addresses of additional JARs or other resources to download to AM or Task 
local, separated by commas (e.g., HDFS address of RSS shaded JAR)          |
+| `uniffle.client.sim.tmp.hdfs.path`           | None          | A writable 
HDFS address for uploading temporary application resources                      
                                                               |
+| `uniffle.client.sim.shuffleCount`            | 1             | Number of 
shuffles included in a single `sendShuffleData` request                         
                                                                |
+| `uniffle.client.sim.partitionCount`          | 1             | Number of 
partitions included in each shuffle of a single `sendShuffleData` request       
                                                                |
+| `uniffle.client.sim.blockCount`              | 1             | Number of 
blocks included in each partition of a single `sendShuffleData` request         
                                                                |
+| `uniffle.client.sim.blockSize`               | 1024          | Size of each 
block in a single `sendShuffleData` request                                     
                                                             |
+| `uniffle.client.sim.am.vCores`               | 8             | Number of 
virtual cores specified when requesting the Application Master (AM)             
                                                                |
+| `uniffle.client.sim.am.memory`               | 4096          | Memory size 
(in MB) specified when requesting the AM                                        
                                                              |
+| `uniffle.client.sim.container.vCores`        | 2             | Number of 
virtual cores specified when requesting task containers                         
                                                                |
+| `uniffle.client.sim.container.memory`        | 2048          | Memory size 
(in MB) specified when requesting task containers                               
                                                              |
+| `uniffle.client.sim.am.jvm.opts`             | None          | Additional 
JVM options for debugging when the execution result is abnormal                 
                                                               |
+| `uniffle.client.sim.container.jvm.opts`      | None          | Additional 
JVM options for debugging when the execution result is abnormal                 
                                                               |
+
+## Running Example
+
+1. Change to the Hadoop directory and execute the test on Yarn program:
+
+```bash
+cd $HADOOP_HOME
+```
+
+2. Execute the example command:
+
+```bash
+$ bin/yarn jar rss-client-simulation-yarn-0.11.0-SNAPSHOT.jar \
+-Duniffle.client.sim.serverId=<UNIFFLE_SERVER_ID> \
+-Duniffle.client.sim.container.num=1000 \
+-Duniffle.client.sim.queueName=<YOUR_QUEUE_NAME>  \
+-Duniffle.client.sim.jarPath.list=hdfs://ns1/tmp/rss-client-spark3-shaded.jar  
\
+-Duniffle.client.sim.tmp.hdfs.path=hdfs://ns1/user/xx/tmp/uniffle-client-sim/  
\
+-Duniffle.client.sim.shuffleCount=5 \
+-Duniffle.client.sim.partitionCount=50 \
+-Duniffle.client.sim.blockCount=100 \
+-Duniffle.client.sim.blockSize=10240 \
+-Duniffle.client.sim.threadCount=10
+```
+
+3. Example Output:
+
+```plaintext
+24/12/30 15:03:47 INFO simulator.UniffleClientSimOnYarnClient: appId: 
application_1729845342052_5295913
+...
+Application killed: application_1729845342052_5295913
+Application status: KILLED
+```
+
+This guide provides a comprehensive overview of how to run the Uniffle Client 
simulator on Yarn, including parameter descriptions and a step-by-step example. 
Adjust the parameters as needed for your specific use case.
diff --git a/tools/client-simulation-yarn/pom.xml 
b/tools/client-simulation-yarn/pom.xml
new file mode 100644
index 000000000..706f722c6
--- /dev/null
+++ b/tools/client-simulation-yarn/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.uniffle</groupId>
+    <artifactId>uniffle-parent</artifactId>
+    <version>0.11.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>rss-client-simulation-yarn</artifactId>
+  <packaging>jar</packaging>
+  <name>Apache Uniffle Client Simulation On Yarn</name>
+  <properties>
+    <uniffle.version>0.9.1</uniffle.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.uniffle</groupId>
+      <artifactId>rss-client-spark3-shaded</artifactId>
+      <version>${uniffle.version}</version>
+      <exclusions>
+        <exclusion>
+          <artifactId>rss-client-spark3</artifactId>
+          <groupId>org.apache.uniffle</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>protobuf-java</artifactId>
+          <groupId>com.google.protobuf</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <configuration>
+          <archive>
+            <manifest>
+              
<mainClass>org.apache.uniffle.client.simulator.UniffleClientSimOnYarnClient</mainClass>
+            </manifest>
+          </archive>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
\ No newline at end of file
diff --git 
a/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/Constants.java
 
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/Constants.java
new file mode 100644
index 000000000..251bae546
--- /dev/null
+++ 
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/Constants.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.simulator;
+
+public class Constants {
+  public static final String KEY_PREFIX = "uniffle.client.sim.";
+  public static final String KEY_AM_MEMORY = KEY_PREFIX + "am.memory";
+  public static final String KEY_AM_VCORES = KEY_PREFIX + "am.vCores";
+  public static final String KEY_CONTAINER_MEMORY = KEY_PREFIX + 
"container.memory";
+  public static final String KEY_CONTAINER_VCORES = KEY_PREFIX + 
"container.vCores";
+  public static final String KEY_QUEUE_NAME = KEY_PREFIX + "queueName";
+  public static final String KEY_CONTAINER_NUM = KEY_PREFIX + "container.num";
+  public static final int CONTAINER_NUM_DEFAULT = 3;
+  public static final String KEY_SERVER_ID = KEY_PREFIX + "serverId";
+  public static final String KEY_SHUFFLE_COUNT = KEY_PREFIX + "shuffleCount";
+  public static final String KEY_PARTITION_COUNT = KEY_PREFIX + 
"partitionCount";
+  public static final String KEY_BLOCK_COUNT = KEY_PREFIX + "blockCount";
+  public static final String KEY_BLOCK_SIZE = KEY_PREFIX + "blockSize";
+  public static final String KEY_EXTRA_JAR_PATH_LIST = KEY_PREFIX + 
"jarPath.list";
+  public static final String KEY_YARN_APP_ID = KEY_PREFIX + "appId";
+  public static final String KEY_CONTAINER_INDEX = KEY_PREFIX + 
"containerIndex";
+  public static final String KEY_AM_EXTRA_JVM_OPTS = KEY_PREFIX + 
"am.jvm.opts";
+  public static final String KEY_CONTAINER_EXTRA_JVM_OPTS = KEY_PREFIX + 
"container.jvm.opts";
+  public static final String KEY_TMP_HDFS_PATH = KEY_PREFIX + "tmp.hdfs.path";
+  public static final String TMP_HDFS_PATH_DEFAULT = 
"./tmp/uniffle-client-sim/";
+  public static final String JOB_CONF_NAME = "job.conf";
+  public static final String KEY_THREAD_COUNT = KEY_PREFIX + ".threadCount";
+}
diff --git 
a/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/HadoopConfigApp.java
 
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/HadoopConfigApp.java
new file mode 100644
index 000000000..ed6b2497b
--- /dev/null
+++ 
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/HadoopConfigApp.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.simulator;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+
+public class HadoopConfigApp {
+
+  private final Configuration conf;
+  private Map<String, String> localConf = new HashMap<>();
+
+  public HadoopConfigApp(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public int run(String[] args) {
+    Options options = new Options();
+
+    Option confOption =
+        OptionBuilder.withArgName("conf")
+            .withLongOpt("conf")
+            .hasArg()
+            .withDescription("Path to the configuration file with key=value 
pairs")
+            .create("c");
+    options.addOption(confOption);
+
+    Option defineOption =
+        OptionBuilder.withArgName("key=value")
+            .withLongOpt("define")
+            .hasArgs(2)
+            .withValueSeparator()
+            .withDescription("Define a key-value pair configuration")
+            .create("D");
+    options.addOption(defineOption);
+
+    CommandLineParser parser = new GnuParser();
+    CommandLine cmd;
+
+    try {
+      cmd = parser.parse(options, args);
+    } catch (ParseException e) {
+      System.err.println("Failed to parse command line arguments: " + 
e.getMessage());
+      return 1;
+    }
+
+    // handle --conf option
+    if (cmd.hasOption("conf")) {
+      String confFile = cmd.getOptionValue("conf");
+      try {
+        loadConfigurationFromFile(localConf, confFile);
+      } catch (IOException e) {
+        System.err.println("Error loading configuration from file: " + 
e.getMessage());
+        return 1;
+      }
+    }
+
+    // handle -D option
+    if (cmd.hasOption("define")) {
+      Properties properties = cmd.getOptionProperties("D");
+      for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+        localConf.put(entry.getKey().toString(), entry.getValue().toString());
+      }
+    }
+
+    for (Map.Entry<String, String> entry : localConf.entrySet()) {
+      conf.set(entry.getKey(), entry.getValue());
+    }
+    return 0;
+  }
+
+  protected void loadConfigurationFromFile(Map<String, String> map, String 
filePath)
+      throws IOException {
+    try (BufferedReader reader =
+        new BufferedReader(
+            new InputStreamReader(new FileInputStream(filePath), 
StandardCharsets.UTF_8))) {
+      String line;
+      while ((line = reader.readLine()) != null) {
+        line = line.trim();
+        if (line.isEmpty() || line.startsWith("#")) {
+          continue; // Skip empty lines and comments
+        }
+        int eq = line.indexOf('=');
+        if (eq > 0) {
+          String key = line.substring(0, eq).trim();
+          String value = line.substring(eq + 1).trim();
+          map.put(key, value);
+        } else {
+          System.err.println("Invalid configuration line: " + line);
+        }
+      }
+    }
+  }
+
+  public Map<String, String> getLocalConf() {
+    return localConf;
+  }
+
+  protected static String writeConfigurationToString(Map<String, String> map) {
+    StringBuilder stringBuilder = new StringBuilder();
+    for (Map.Entry<String, String> entry : map.entrySet()) {
+      
stringBuilder.append(entry.getKey()).append("=").append(entry.getValue()).append("\n");
+    }
+    return stringBuilder.toString().trim();
+  }
+
+  public static void main(String[] args) {
+    HadoopConfigApp app = new HadoopConfigApp(new Configuration());
+    int exitCode = app.run(args);
+    System.exit(exitCode);
+  }
+}
diff --git 
a/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/NMCallBackHandler.java
 
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/NMCallBackHandler.java
new file mode 100644
index 000000000..572f9a1b4
--- /dev/null
+++ 
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/NMCallBackHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.simulator;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+
+public class NMCallBackHandler extends NMClientAsync.AbstractCallbackHandler {
+  @Override
+  public void onContainerStarted(
+      ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
+    System.out.println("NM: container started, id=" + containerId.toString());
+  }
+
+  @Override
+  public void onContainerStatusReceived(ContainerId containerId, 
ContainerStatus containerStatus) {
+    System.out.println("NM: container status received, id=" + 
containerId.toString());
+  }
+
+  @Override
+  public void onContainerStopped(ContainerId containerId) {
+    System.out.println("NM: container stopped, id=" + containerId.toString());
+  }
+
+  @Override
+  public void onStartContainerError(ContainerId containerId, Throwable t) {
+    System.out.println("NM: start container error, id=" + 
containerId.toString());
+  }
+
+  @Override
+  public void onContainerResourceIncreased(ContainerId containerId, Resource 
resource) {
+    System.out.println("NM: container resource increased, id=" + 
containerId.toString());
+  }
+
+  public void onContainerResourceUpdated(ContainerId containerId, Resource 
resource) {
+    System.out.println("NM: container resource updated, id=" + 
containerId.toString());
+  }
+
+  @Override
+  public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
+    System.out.println("NM: get container status error, id=" + 
containerId.toString());
+  }
+
+  @Override
+  public void onIncreaseContainerResourceError(ContainerId containerId, 
Throwable t) {
+    System.out.println("NM: increase container resource error, id=" + 
containerId.toString());
+  }
+
+  public void onUpdateContainerResourceError(ContainerId containerId, 
Throwable t) {
+    System.out.println("NM: update container resource error, id=" + 
containerId.toString());
+  }
+
+  @Override
+  public void onStopContainerError(ContainerId containerId, Throwable t) {
+    System.out.println("NM: stop container error, id=" + 
containerId.toString());
+  }
+}
diff --git 
a/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/UniffleClientSimOnYarnAppMaster.java
 
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/UniffleClientSimOnYarnAppMaster.java
new file mode 100644
index 000000000..bc58a9cc3
--- /dev/null
+++ 
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/UniffleClientSimOnYarnAppMaster.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.simulator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
+import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
+import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+public class UniffleClientSimOnYarnAppMaster {
+  private static final Logger LOG = 
LoggerFactory.getLogger(UniffleClientSimOnYarnAppMaster.class);
+  private final Configuration conf;
+  private final String appId;
+  // object lock
+  private Object lock = new Object();
+  // child tasks num
+  private int childTaskNum;
+  // completed tasks num
+  private int childTaskCompletedNum = 0;
+  private AtomicInteger allocatedContainerNum = new AtomicInteger(0);
+  NMClientAsyncImpl nmClientAsync;
+
+  public static void main(String[] args) {
+    UniffleClientSimOnYarnAppMaster master = new 
UniffleClientSimOnYarnAppMaster();
+    master.run();
+  }
+
+  public UniffleClientSimOnYarnAppMaster() {
+    conf = new Configuration();
+    HadoopConfigApp hadoopConfigApp = new HadoopConfigApp(conf);
+    hadoopConfigApp.run(new String[] {"--conf", "./" + 
Constants.JOB_CONF_NAME});
+    appId = conf.get(Constants.KEY_YARN_APP_ID, "unknown");
+    childTaskNum = conf.getInt(Constants.KEY_CONTAINER_NUM, 
Constants.CONTAINER_NUM_DEFAULT);
+  }
+
+  private void run() {
+    try {
+      String serverId = conf.get(Constants.KEY_SERVER_ID);
+      System.out.println("serverId:" + serverId);
+      // start am-rm client,build rm-am connection to register AM, 
allocListener responsible for
+      // handle AM's status
+      AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient =
+          AMRMClientAsync.createAMRMClientAsync(1000, new RMCallBackHandler());
+      amRmClient.init(new Configuration());
+      amRmClient.start();
+      String hostName = NetUtils.getHostname();
+      // register to RM
+      amRmClient.registerApplicationMaster(hostName, -1, null);
+      // init nmClient
+      nmClientAsync = new NMClientAsyncImpl(new NMCallBackHandler());
+      nmClientAsync.init(new Configuration());
+      nmClientAsync.start();
+      // run
+      doRun(amRmClient);
+      // unregister AM
+      amRmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, 
"SUCCESS", null);
+      // stop am-rm
+      amRmClient.stop();
+      // stop nm client
+      nmClientAsync.stop();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  /** run the main logic */
+  private void doRun(AMRMClientAsync amRmClient) throws InterruptedException {
+    int shuffleCount = conf.getInt(Constants.KEY_SHUFFLE_COUNT, 1);
+    int partitionCount = conf.getInt(Constants.KEY_PARTITION_COUNT, 1);
+    String serverId = conf.get(Constants.KEY_SERVER_ID, "");
+    registerShuffleAndSetupHeartbeat(shuffleCount, partitionCount, serverId);
+    // apply child tasks
+    for (int i = 0; i < childTaskNum; i++) {
+      int memory = conf.getInt(Constants.KEY_CONTAINER_MEMORY, 2048);
+      int vCores = conf.getInt(Constants.KEY_CONTAINER_VCORES, 2);
+      // apply container
+      AMRMClient.ContainerRequest containerRequest =
+          new AMRMClient.ContainerRequest(
+              Resource.newInstance(memory, vCores), null, null, 
Priority.UNDEFINED);
+      amRmClient.addContainerRequest(containerRequest);
+    }
+    synchronized (lock) {
+      while (childTaskCompletedNum < childTaskNum) {
+        lock.wait(1000);
+      }
+    }
+    System.out.println("AM: Finish main logic");
+  }
+
+  private void registerShuffleAndSetupHeartbeat(
+      int shuffleCount, int partitionCount, String serverId) {
+    String[] parts = serverId.split("-");
+    String host = parts[0];
+    int port0 = Integer.parseInt(parts[1]);
+    int port1 = Integer.parseInt(parts[2]);
+
+    ShuffleWriteClientImpl shuffleWriteClientImpl =
+        ShuffleClientFactory.newWriteBuilder()
+            .clientType(ClientType.GRPC.name())
+            .retryMax(3)
+            .retryIntervalMax(1000)
+            .heartBeatThreadNum(4)
+            .replica(1)
+            .replicaWrite(1)
+            .replicaRead(1)
+            .replicaSkipEnabled(true)
+            .dataTransferPoolSize(10)
+            .dataCommitPoolSize(10)
+            .unregisterThreadPoolSize(10)
+            .unregisterRequestTimeSec(10)
+            .build();
+
+    ShuffleServerInfo shuffleServerInfo = new ShuffleServerInfo(host, port0, 
port1);
+    registerShuffle(shuffleCount, partitionCount, shuffleWriteClientImpl, 
shuffleServerInfo);
+    long heartbeatInterval = 10_000;
+    ScheduledExecutorService heartBeatScheduledExecutorService =
+        ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
+    heartBeatScheduledExecutorService.scheduleAtFixedRate(
+        () -> {
+          try {
+            String appId = this.appId.toString();
+            shuffleWriteClientImpl.sendAppHeartbeat(appId, 10_000);
+            LOG.info("Finish send heartbeat to coordinator and servers");
+          } catch (Exception e) {
+            LOG.warn("Fail to send heartbeat to coordinator and servers", e);
+          }
+        },
+        heartbeatInterval / 2,
+        heartbeatInterval,
+        TimeUnit.MILLISECONDS);
+  }
+
+  private void registerShuffle(
+      int shuffleCount,
+      int partitionCount,
+      ShuffleWriteClientImpl shuffleWriteClientImpl,
+      ShuffleServerInfo shuffleServerInfo) {
+    List<PartitionRange> partitionRanges = new ArrayList<>(partitionCount);
+    for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
+      partitionRanges.add(new PartitionRange(partitionId, partitionId));
+    }
+    for (int shuffleId = 0; shuffleId < shuffleCount; shuffleId++) {
+      shuffleWriteClientImpl.registerShuffle(
+          shuffleServerInfo,
+          appId.toString(),
+          shuffleId,
+          partitionRanges,
+          new RemoteStorageInfo(""),
+          ShuffleDataDistributionType.NORMAL,
+          1);
+    }
+  }
+
+  class RMCallBackHandler extends AMRMClientAsync.AbstractCallbackHandler {
+
+    @Override
+    public void onContainersCompleted(List<ContainerStatus> statuses) {
+      for (ContainerStatus status : statuses) {
+        synchronized (lock) {
+          System.out.println(++childTaskCompletedNum + " container completed");
+          try {
+            Thread.sleep(10_000);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return;
+          }
+          // notify main thread when all child tasks are completed
+          if (childTaskCompletedNum == childTaskNum) {
+            lock.notify();
+          }
+        }
+      }
+    }
+
+    @Override
+    public void onContainersAllocated(List<Container> containers) {
+      try {
+        for (Container container : containers) {
+          System.out.println("container allocated, Node=" + 
container.getNodeHttpAddress());
+          // build AM<->NM client and start container
+          Map<String, String> env = new HashMap<>();
+          StringBuilder classPathEnv =
+              new 
StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
+                  .append(ApplicationConstants.CLASS_PATH_SEPARATOR)
+                  .append("./*");
+          for (String c :
+              conf.getStrings(
+                  YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+                  
YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
+            classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
+            classPathEnv.append(c.trim());
+          }
+          env.put("CLASSPATH", classPathEnv.toString());
+          List<String> commands = new ArrayList<>();
+          int index = allocatedContainerNum.getAndIncrement();
+          String extraJvmOpts = 
conf.get(Constants.KEY_CONTAINER_EXTRA_JVM_OPTS, "");
+          commands.add(
+              ApplicationConstants.Environment.JAVA_HOME.$$()
+                  + "/bin/java  "
+                  + " -Djava.specification.version="
+                  + System.getProperty("java.specification.version")
+                  + " "
+                  + extraJvmOpts
+                  + " "
+                  + UniffleTask.class.getName()
+                  + " -D"
+                  + Constants.KEY_CONTAINER_INDEX
+                  + "="
+                  + index);
+
+          Map<String, LocalResource> localResources =
+              new HashMap<String, LocalResource>() {
+                {
+                  String[] extraJarPathList = 
conf.getStrings(Constants.KEY_EXTRA_JAR_PATH_LIST);
+                  if (extraJarPathList != null) {
+                    for (String extraJarPath : extraJarPathList) {
+                      Path path = new Path(extraJarPath);
+                      String name = path.getName();
+                      put(name, Utils.addHdfsToResource(conf, path));
+                    }
+                  }
+                }
+              };
+          ContainerLaunchContext containerLaunchContext =
+              ContainerLaunchContext.newInstance(localResources, env, 
commands, null, null, null);
+          // request nm to start container
+          nmClientAsync.startContainerAsync(container, containerLaunchContext);
+          System.out.println(index + ": container started, Node=" + 
container.getNodeHttpAddress());
+          System.out.println("containerLaunchContext: " + 
containerLaunchContext);
+        }
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void onContainersUpdated(List<UpdatedContainer> containers) {}
+
+    @Override
+    public void onShutdownRequest() {
+      System.out.println("RM: shutdown request");
+    }
+
+    @Override
+    public void onNodesUpdated(List<NodeReport> updatedNodes) {}
+
+    @Override
+    public float getProgress() {
+      return 0;
+    }
+
+    @Override
+    public void onError(Throwable e) {
+      e.printStackTrace();
+    }
+  }
+}
diff --git 
a/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/UniffleClientSimOnYarnClient.java
 
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/UniffleClientSimOnYarnClient.java
new file mode 100644
index 000000000..326ee7093
--- /dev/null
+++ 
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/UniffleClientSimOnYarnClient.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.simulator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UniffleClientSimOnYarnClient {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(UniffleClientSimOnYarnClient.class);
+  private final Configuration conf;
+
+  public UniffleClientSimOnYarnClient(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public static void main(String[] args) {
+    Configuration conf = new Configuration();
+    // Load configuration from parameters
+    HadoopConfigApp hadoopConfigApp = new HadoopConfigApp(conf);
+    int exitCode = hadoopConfigApp.run(args);
+    if (exitCode != 0) {
+      System.exit(exitCode);
+    }
+    UniffleClientSimOnYarnClient client = new 
UniffleClientSimOnYarnClient(conf);
+    try {
+      client.run(hadoopConfigApp.getLocalConf());
+    } catch (Exception e) {
+      LOG.error("client run exception , please check log file.", e);
+    }
+  }
+
+  public void run(Map<String, String> localConf) throws Exception {
+    // YarnClient to talk to ResourceManager
+    YarnClient yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(conf);
+    yarnClient.start();
+    // request RM to create an application
+    YarnClientApplication application = yarnClient.createApplication();
+    // prepare app submission context
+    ApplicationSubmissionContext applicationSubmissionContext =
+        application.getApplicationSubmissionContext();
+    // get application id and store to conf for am and sub-tasks to use
+    ApplicationId appId = applicationSubmissionContext.getApplicationId();
+    LOG.info("appId: {}", appId);
+    localConf.put(Constants.KEY_YARN_APP_ID, appId.toString());
+    applicationSubmissionContext.setApplicationName("UniffleClientSimOnYarn");
+
+    Runtime.getRuntime()
+        .addShutdownHook(
+            new Thread(
+                () -> {
+                  System.out.println("Received Ctrl+C, terminating 
application...");
+
+                  try {
+                    yarnClient.killApplication(appId);
+                    System.out.println("Application killed: " + appId);
+                  } catch (Exception e) {
+                    e.printStackTrace();
+                  }
+
+                  try {
+                    ApplicationReport report = 
yarnClient.getApplicationReport(appId);
+                    System.out.println("Application status: " + 
report.getYarnApplicationState());
+                  } catch (Exception e) {
+                    e.printStackTrace();
+                  }
+                }));
+
+    // copy jar to hdfs
+    String jarLocalPathStr = 
Utils.getCurrentJarPath(UniffleClientSimOnYarnClient.class);
+    System.out.println("jarLocalPath: " + jarLocalPathStr);
+    if (jarLocalPathStr == null || !jarLocalPathStr.endsWith(".jar")) {
+      String cmdline = System.getProperty("sun.java.command");
+      if (cmdline != null) {
+        String[] parts = cmdline.split("\\s+");
+        if (parts.length > 1 && parts[1].endsWith(".jar")) {
+          jarLocalPathStr = parts[1];
+          System.out.println(
+              "jarLocalPath: " + jarLocalPathStr + " updated it from command 
line: " + cmdline);
+        }
+      }
+    }
+    Path jarLocalPath = new Path(jarLocalPathStr);
+    Path jarHdfsPath =
+        Utils.copyToHdfs(conf, jarLocalPath, Utils.getHdfsDestPath(conf, 
jarLocalPath.getName()));
+    System.out.println("jarHdfsPath: " + jarHdfsPath);
+    localConf.put(
+        Constants.KEY_EXTRA_JAR_PATH_LIST,
+        Utils.isBlank(localConf.get(Constants.KEY_EXTRA_JAR_PATH_LIST))
+            ? jarHdfsPath.toString()
+            : localConf.get(Constants.KEY_EXTRA_JAR_PATH_LIST) + "," + 
jarHdfsPath);
+    conf.set(Constants.KEY_EXTRA_JAR_PATH_LIST, 
localConf.get(Constants.KEY_EXTRA_JAR_PATH_LIST));
+
+    // write job configuration to hdfs
+    Path jobConfHdfsPath = Utils.getHdfsDestPath(conf, 
Constants.JOB_CONF_NAME);
+    localConf.put(
+        Constants.KEY_EXTRA_JAR_PATH_LIST,
+        Utils.isBlank(localConf.get(Constants.KEY_EXTRA_JAR_PATH_LIST))
+            ? jobConfHdfsPath.toString()
+            : localConf.get(Constants.KEY_EXTRA_JAR_PATH_LIST) + "," + 
jobConfHdfsPath);
+    conf.set(Constants.KEY_EXTRA_JAR_PATH_LIST, 
localConf.get(Constants.KEY_EXTRA_JAR_PATH_LIST));
+    Utils.writeStringToHdfs(
+        conf, HadoopConfigApp.writeConfigurationToString(localConf), 
jobConfHdfsPath);
+
+    // prepare local resources
+    Map<String, LocalResource> localResources = new HashMap<>();
+    String[] extraJarPathList = 
conf.getStrings(Constants.KEY_EXTRA_JAR_PATH_LIST);
+    if (extraJarPathList != null) {
+      for (String extraJarPath : extraJarPathList) {
+        Path path = new Path(extraJarPath);
+        String name = path.getName();
+        localResources.put(name, Utils.addHdfsToResource(conf, path));
+      }
+    }
+    // prepare environment classpath
+    Map<String, String> env = new HashMap<>();
+    // the dependencies of am
+    StringBuilder classPathEnv =
+        new StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
+            .append(ApplicationConstants.CLASS_PATH_SEPARATOR)
+            .append("./*");
+    // yarn dependencies
+    for (String c :
+        conf.getStrings(
+            YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+            
YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
+      classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
+      classPathEnv.append(c.trim());
+    }
+    env.put("CLASSPATH", classPathEnv.toString());
+
+    String extraJvmOpts = conf.get(Constants.KEY_AM_EXTRA_JVM_OPTS, "");
+    // prepare launch command to run am
+    List<String> commands =
+        new ArrayList<String>() {
+          {
+            add(
+                ApplicationConstants.Environment.JAVA_HOME.$$()
+                    + "/bin/java "
+                    + " -Djava.specification.version="
+                    + System.getProperty("java.specification.version")
+                    + " "
+                    + extraJvmOpts
+                    + " "
+                    + UniffleClientSimOnYarnAppMaster.class.getName());
+          }
+        };
+
+    // create am container and submit the application
+    ContainerLaunchContext amContainer =
+        ContainerLaunchContext.newInstance(localResources, env, commands, 
null, null, null);
+    // prepare the am container context
+    applicationSubmissionContext.setAMContainerSpec(amContainer);
+    int memory = conf.getInt(Constants.KEY_AM_MEMORY, 4096);
+    int vCores = conf.getInt(Constants.KEY_AM_VCORES, 8);
+    applicationSubmissionContext.setResource(Resource.newInstance(memory, 
vCores));
+    String queueName = conf.get(Constants.KEY_QUEUE_NAME, 
YarnConfiguration.DEFAULT_QUEUE_NAME);
+    if (!YarnConfiguration.DEFAULT_QUEUE_NAME.equals(queueName)) {
+      applicationSubmissionContext.setQueue(queueName);
+    }
+    System.out.println(applicationSubmissionContext);
+    yarnClient.submitApplication(applicationSubmissionContext);
+
+    // monitor application progress
+    for (; ; ) {
+      ApplicationReport applicationReport = 
yarnClient.getApplicationReport(appId);
+      YarnApplicationState state = applicationReport.getYarnApplicationState();
+      FinalApplicationStatus status = 
applicationReport.getFinalApplicationStatus();
+      if (state.equals(YarnApplicationState.FINISHED)) {
+        if (status.equals(FinalApplicationStatus.SUCCEEDED)) {
+          LOG.info("SUCCESSFUL FINISH.");
+          break;
+        } else {
+          LOG.error("FINISHED WITH ERROR.");
+          break;
+        }
+      } else if (state.equals(YarnApplicationState.FAILED)
+          || state.equals(YarnApplicationState.KILLED)) {
+        LOG.error("Application failed with state: {}", state);
+        break;
+      }
+      LOG.info("running...");
+      Thread.sleep(5000);
+    }
+  }
+}
diff --git 
a/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/UniffleTask.java
 
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/UniffleTask.java
new file mode 100644
index 000000000..72e7537a7
--- /dev/null
+++ 
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/UniffleTask.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.simulator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
+import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
+import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.shaded.io.netty.buffer.Unpooled;
+
+public class UniffleTask {
+  private static byte[] data;
+  private static AtomicLong sId = new AtomicLong(0);
+
+  public static void main(String[] args) {
+    System.out.println(
+        "Start " + UniffleTask.class.getSimpleName() + " with " + 
Arrays.toString(args));
+    if (args.length < 1) {
+      System.err.println(
+          "Usage: "
+              + UniffleTask.class.getSimpleName()
+              + " [--conf <CONFIG_FILE>] [-D<KEY>=<VALUE>]");
+      System.exit(1);
+    }
+
+    Configuration conf = new Configuration();
+    List<String> combinedArgs = new ArrayList<>(args.length);
+    combinedArgs.addAll(Arrays.asList(args));
+    combinedArgs.add("--conf");
+    combinedArgs.add("./" + Constants.JOB_CONF_NAME);
+    HadoopConfigApp hadoopConfigApp = new HadoopConfigApp(conf);
+    hadoopConfigApp.run(combinedArgs.toArray(new String[0]));
+    System.out.println(hadoopConfigApp.getLocalConf());
+    String appId = conf.get(Constants.KEY_YARN_APP_ID);
+    int taskIndex = conf.getInt(Constants.KEY_CONTAINER_INDEX, 0);
+    String serverId = conf.get(Constants.KEY_SERVER_ID, "");
+    int shuffleCount = conf.getInt(Constants.KEY_SHUFFLE_COUNT, 1);
+    int partitionCount = conf.getInt(Constants.KEY_PARTITION_COUNT, 1);
+    int blockCount = conf.getInt(Constants.KEY_BLOCK_COUNT, 1000);
+    int blockSize = conf.getInt(Constants.KEY_BLOCK_SIZE, 1024);
+    int threadCount = conf.getInt(Constants.KEY_THREAD_COUNT, 1);
+    data = new byte[blockSize];
+    System.out.println(taskIndex + ": start to send shuffle data to " + 
serverId);
+
+    ExecutorService executorService =
+        new ThreadPoolExecutor(
+            threadCount,
+            threadCount,
+            60L,
+            TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            Executors.defaultThreadFactory(),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    try {
+      while (true) {
+        executorService.submit(
+            () ->
+                sendData(
+                    taskIndex,
+                    appId,
+                    serverId,
+                    shuffleCount,
+                    partitionCount,
+                    blockCount,
+                    blockSize));
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      executorService.shutdown();
+    }
+    System.out.println("end to send shuffle data...");
+  }
+
+  private static void sendData(
+      int taskIndex,
+      String appId,
+      String serverId,
+      int shuffleCount,
+      int partitionCount,
+      int blockCount,
+      int blockSize) {
+    try {
+      String[] parts = serverId.split("-");
+      String host = parts[0];
+      int port0 = Integer.parseInt(parts[1]);
+      int port1 = Integer.parseInt(parts[2]);
+      ShuffleServerInfo shuffleServerInfo = new ShuffleServerInfo(host, port0, 
port1);
+      ShuffleServerGrpcClient client =
+          ((ShuffleServerGrpcClient)
+              ShuffleServerClientFactory.getInstance()
+                  .getShuffleServerClient("GRPC_NETTY", shuffleServerInfo));
+      Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = new 
HashMap<>();
+      constructShuffleBlockInfo(
+          shuffleToBlocks, taskIndex, shuffleCount, partitionCount, 
blockCount, blockSize);
+
+      RssSendShuffleDataRequest sendShuffleDataRequest =
+          new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
+      RssSendShuffleDataResponse response = 
client.sendShuffleData(sendShuffleDataRequest);
+      if (response != null && response.getStatusCode() != StatusCode.SUCCESS) {
+        System.out.println(
+            "send shuffle data error with "
+                + response.getStatusCode()
+                + " message: "
+                + response.getMessage());
+      } else {
+        System.out.printf(".");
+      }
+    } catch (Exception e) {
+      System.err.printf(
+          "Error during task %d for sending 
shuffleCount=%d,partitionCount=%d,blockCount=%d,blockSize=%d: %s%n",
+          taskIndex, shuffleCount, partitionCount, blockCount, blockSize, 
e.getMessage());
+      e.printStackTrace();
+    }
+  }
+
+  @VisibleForTesting
+  protected static void constructShuffleBlockInfo(
+      Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks,
+      int taskIndex,
+      int shuffleCount,
+      int partitionCount,
+      int blockCount,
+      int blockSize) {
+    for (int shuffleId = 0; shuffleId < shuffleCount; shuffleId++) {
+      Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = new HashMap<>();
+      for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
+        List<ShuffleBlockInfo> blocks = partitionToBlocks.get(partitionId);
+        if (blocks == null) {
+          blocks = new ArrayList<>();
+          partitionToBlocks.put(partitionId, blocks);
+        }
+        for (int blockIndex = 0; blockIndex < blockCount; blockIndex++) {
+          long blockId = (((long) taskIndex + 1) << (Long.SIZE - 5)) | 
sId.getAndIncrement();
+          blocks.add(
+              new ShuffleBlockInfo(
+                  shuffleId,
+                  partitionId,
+                  blockId,
+                  blockSize,
+                  0,
+                  Unpooled.wrappedBuffer(data).retain(),
+                  Collections.emptyList(),
+                  0,
+                  0,
+                  0));
+        }
+      }
+      shuffleToBlocks.put(shuffleId, partitionToBlocks);
+    }
+  }
+}
diff --git 
a/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/Utils.java
 
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/Utils.java
new file mode 100644
index 000000000..45fef342a
--- /dev/null
+++ 
b/tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/Utils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.simulator;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.security.CodeSource;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+
+public final class Utils {
+  public static Path copyToHdfs(Configuration conf, Path srcPath, Path 
dstPath) throws IOException {
+    try (FileSystem fs = FileSystem.get(dstPath.toUri(), conf)) {
+      // upload
+      fs.copyFromLocalFile(srcPath, dstPath);
+      return dstPath;
+    }
+  }
+
+  public static LocalResource addHdfsToResource(Configuration conf, Path 
dstPath)
+      throws IOException {
+    try (FileSystem fs = FileSystem.get(dstPath.toUri(), conf)) {
+      FileStatus scFileStatus = fs.getFileStatus(dstPath);
+      LocalResource scRsrc =
+          LocalResource.newInstance(
+              URL.fromURI(dstPath.toUri()),
+              LocalResourceType.FILE,
+              LocalResourceVisibility.APPLICATION,
+              scFileStatus.getLen(),
+              scFileStatus.getModificationTime());
+      return scRsrc;
+    }
+  }
+
+  public static Path getHdfsDestPath(Configuration conf, String name) {
+    return new Path(conf.get(Constants.KEY_TMP_HDFS_PATH, 
Constants.TMP_HDFS_PATH_DEFAULT), name);
+  }
+
+  public static void writeStringToHdfs(Configuration conf, String content, 
Path path)
+      throws IOException {
+    try (FileSystem fs = FileSystem.get(path.toUri(), conf)) {
+      // upload
+      try (FSDataOutputStream os = fs.create(path)) {
+        os.write(content.getBytes(StandardCharsets.UTF_8));
+      }
+    }
+  }
+
+  public static String getCurrentJarPath(Class clazz) throws 
URISyntaxException {
+    CodeSource codeSource = clazz.getProtectionDomain().getCodeSource();
+    if (codeSource != null && codeSource.getLocation() != null) {
+      return new File(codeSource.getLocation().toURI()).getPath();
+    }
+    return null;
+  }
+
+  public static boolean isBlank(String str) {
+    int strLen;
+    if (str == null || (strLen = str.length()) == 0) {
+      return true;
+    }
+    for (int i = 0; i < strLen; i++) {
+      if ((Character.isWhitespace(str.charAt(i)) == false)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

Reply via email to