This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new c3616c2  Send commit concurrently in client side (#59)
c3616c2 is described below

commit c3616c27a545ffccf86c81d700346557bcaa9058
Author: Junfan Zhang <[email protected]>
AuthorDate: Sun Jul 17 10:53:21 2022 +0800

    Send commit concurrently in client side (#59)
    
    ### What changes were proposed in this pull request?
    Send commit concurrently in client side
    
    ### Why are the changes needed?
    I found when using the `LOCALFILE` storageType, waiting the commit will 
cost too much time. To speed up, it can be sent commit concurrently by using 
thread pool.
    
    **Performance Test Case**
    Using 1000 executors of Spark, single executor 1g/1core to run TeraSort 1TB.
    
    When using `LOCALFILE` storageType mode, it cost 7.3 min.
    And then after applying this PR, it cost 6.1 min
    
    ### Does this PR introduce _any_ user-facing change?
    1. Introducing the conf of `rss.client.data.commit.pool.size`, the default 
value is assigned shuffle server size.
    
    ### How was this patch tested?
    No need
---
 .../org/apache/hadoop/mapreduce/RssMRConfig.java   |  5 ++
 .../org/apache/hadoop/mapreduce/RssMRUtils.java    |  4 +-
 .../org/apache/spark/shuffle/RssSparkConfig.java   |  5 ++
 .../apache/spark/shuffle/RssShuffleManager.java    |  8 +-
 .../apache/spark/shuffle/RssShuffleManager.java    | 11 ++-
 .../client/factory/ShuffleClientFactory.java       |  5 +-
 .../client/impl/ShuffleWriteClientImpl.java        | 96 ++++++++++++++--------
 .../uniffle/client/util/RssClientConfig.java       |  2 +
 .../client/impl/ShuffleWriteClientImplTest.java    |  2 +-
 docs/client_guide.md                               |  1 +
 .../uniffle/test/AssignmentWithTagsTest.java       |  2 +-
 .../java/org/apache/uniffle/test/QuorumTest.java   |  2 +-
 .../apache/uniffle/test/ShuffleServerGrpcTest.java |  2 +-
 .../uniffle/test/ShuffleWithRssClientTest.java     |  2 +-
 14 files changed, 102 insertions(+), 45 deletions(-)

diff --git 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
index b1b9507..ed1f90f 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
@@ -55,6 +55,11 @@ public class RssMRConfig {
           MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
   public static final int RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE =
           RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE;
+  public static final String RSS_DATA_COMMIT_POOL_SIZE =
+      MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE;
+  public static final int RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE =
+      RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE;
+
   public static final String RSS_CLIENT_SEND_THREAD_NUM =
       MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_THREAD_NUM;
   public static final int RSS_CLIENT_DEFAULT_SEND_THREAD_NUM =
diff --git 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index 684a5ec..740de51 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -91,11 +91,13 @@ public class RssMRUtils {
         RssMRConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
     int dataTransferPoolSize = 
jobConf.getInt(RssMRConfig.RSS_DATA_TRANSFER_POOL_SIZE,
         RssMRConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+    int dataCommitPoolSize = 
jobConf.getInt(RssMRConfig.RSS_DATA_COMMIT_POOL_SIZE,
+        RssMRConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE);
     ShuffleWriteClient client = ShuffleClientFactory
         .getInstance()
         .createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
             heartBeatThreadNum, replica, replicaWrite, replicaRead, 
replicaSkipEnabled,
-            dataTransferPoolSize);
+            dataTransferPoolSize, dataCommitPoolSize);
     return client;
   }
 
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 041e21f..a9e845a 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -109,6 +109,11 @@ public class RssSparkConfig {
       SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
   public static final int RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE =
       RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE;
+  public static final String RSS_DATA_COMMIT_POOL_SIZE =
+      SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE;
+  public static final int RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE =
+      RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE;
+
   public static final boolean RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE =
       RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE;
   public static final String RSS_OZONE_DFS_NAMENODE_ODFS_ENABLE =
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 705d66b..4198460 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -81,6 +81,7 @@ public class RssShuffleManager implements ShuffleManager {
   private final int dataReplicaRead;
   private final boolean dataReplicaSkipEnabled;
   private final int dataTransferPoolSize;
+  private final int dataCommitPoolSize;
   private boolean heartbeatStarted = false;
   private boolean dynamicConfEnabled = false;
   private RemoteStorageInfo remoteStorage;
@@ -168,10 +169,13 @@ public class RssShuffleManager implements ShuffleManager {
         RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
     int heartBeatThreadNum = 
sparkConf.getInt(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM,
         RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE);
-    shuffleWriteClient = ShuffleClientFactory
+    this.dataCommitPoolSize = 
sparkConf.getInt(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE,
+        RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE);
+    this.shuffleWriteClient = ShuffleClientFactory
         .getInstance()
         .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, 
heartBeatThreadNum,
-            dataReplica, dataReplicaWrite, dataReplicaRead, 
dataReplicaSkipEnabled, dataTransferPoolSize);
+            dataReplica, dataReplicaWrite, dataReplicaRead, 
dataReplicaSkipEnabled, dataTransferPoolSize,
+            dataCommitPoolSize);
     registerCoordinator();
     // fetch client conf and apply them if necessary and disable ESS
     if (isDriver && dynamicConfEnabled) {
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index b71f1b3..124ae61 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -81,6 +81,7 @@ public class RssShuffleManager implements ShuffleManager {
   private final int dataReplicaRead;
   private final boolean dataReplicaSkipEnabled;
   private final int dataTransferPoolSize;
+  private final int dataCommitPoolSize;
   private ShuffleWriteClient shuffleWriteClient;
   private final Map<String, Set<Long>> taskToSuccessBlockIds;
   private final Map<String, Set<Long>> taskToFailedBlockIds;
@@ -169,11 +170,14 @@ public class RssShuffleManager implements ShuffleManager {
 
     this.dataTransferPoolSize = 
sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE,
         RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+    this.dataCommitPoolSize = 
sparkConf.getInt(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE,
+        RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE);
 
     shuffleWriteClient = ShuffleClientFactory
         .getInstance()
         .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, 
heartBeatThreadNum,
-            dataReplica, dataReplicaWrite, dataReplicaRead, 
dataReplicaSkipEnabled, dataTransferPoolSize);
+            dataReplica, dataReplicaWrite, dataReplicaRead, 
dataReplicaSkipEnabled, dataTransferPoolSize,
+            dataCommitPoolSize);
     registerCoordinator();
     // fetch client conf and apply them if necessary and disable ESS
     if (isDriver && dynamicConfEnabled) {
@@ -238,11 +242,14 @@ public class RssShuffleManager implements ShuffleManager {
         RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE);
     this.dataTransferPoolSize = 
sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE,
         RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+    this.dataCommitPoolSize = 
sparkConf.getInt(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE,
+        RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE);
 
     shuffleWriteClient = ShuffleClientFactory
         .getInstance()
         .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, 
heartBeatThreadNum,
-            dataReplica, dataReplicaWrite, dataReplicaRead, 
dataReplicaSkipEnabled, dataTransferPoolSize);
+            dataReplica, dataReplicaWrite, dataReplicaRead, 
dataReplicaSkipEnabled, dataTransferPoolSize,
+            dataCommitPoolSize);
     this.taskToSuccessBlockIds = taskToSuccessBlockIds;
     this.taskToFailedBlockIds = taskToFailedBlockIds;
     if (loop != null) {
diff --git 
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
 
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
index 5afa43d..d11a07f 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
@@ -36,9 +36,10 @@ public class ShuffleClientFactory {
 
   public ShuffleWriteClient createShuffleWriteClient(
       String clientType, int retryMax, long retryIntervalMax, int 
heartBeatThreadNum,
-      int replica, int replicaWrite, int replicaRead, boolean 
replicaSkipEnabled, int dataTransferPoolSize) {
+      int replica, int replicaWrite, int replicaRead, boolean 
replicaSkipEnabled, int dataTransferPoolSize,
+      int dataCommitPoolSize) {
     return new ShuffleWriteClientImpl(clientType, retryMax, retryIntervalMax, 
heartBeatThreadNum,
-      replica, replicaWrite, replicaRead, replicaSkipEnabled, 
dataTransferPoolSize);
+      replica, replicaWrite, replicaRead, replicaSkipEnabled, 
dataTransferPoolSize, dataCommitPoolSize);
   }
 
   public ShuffleReadClient 
createShuffleReadClient(CreateShuffleReadClientRequest request) {
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index ce4c247..b078d42 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -88,11 +88,20 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
   private int replicaRead;
   private boolean replicaSkipEnabled;
   private int dataTranferPoolSize;
+  private int dataCommitPoolSize = -1;
   private final ForkJoinPool dataTransferPool;
 
-  public ShuffleWriteClientImpl(String clientType, int retryMax, long 
retryIntervalMax, int heartBeatThreadNum,
-                                int replica, int replicaWrite, int 
replicaRead, boolean replicaSkipEnabled,
-                                int dataTranferPoolSize) {
+  public ShuffleWriteClientImpl(
+      String clientType,
+      int retryMax,
+      long retryIntervalMax,
+      int heartBeatThreadNum,
+      int replica,
+      int replicaWrite,
+      int replicaRead,
+      boolean replicaSkipEnabled,
+      int dataTranferPoolSize,
+      int dataCommitPoolSize) {
     this.clientType = clientType;
     this.retryMax = retryMax;
     this.retryIntervalMax = retryIntervalMax;
@@ -105,6 +114,7 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
     this.replicaSkipEnabled = replicaSkipEnabled;
     this.dataTranferPoolSize = dataTranferPoolSize;
     this.dataTransferPool = new ForkJoinPool(dataTranferPoolSize);
+    this.dataCommitPoolSize = dataCommitPoolSize;
   }
 
   private boolean sendShuffleDataAsync(
@@ -247,43 +257,62 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
     return new SendShuffleDataResult(successBlockIds, failedBlockIds);
   }
 
+  /**
+   * This method will wait until all shuffle data have been flushed
+   * to durable storage in assigned shuffle servers.
+   * @param shuffleServerInfoSet
+   * @param appId
+   * @param shuffleId
+   * @param numMaps
+   * @return
+   */
   @Override
   public boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, 
String appId, int shuffleId, int numMaps) {
+    ForkJoinPool forkJoinPool = new ForkJoinPool(
+        dataCommitPoolSize == -1 ? shuffleServerInfoSet.size() : 
dataCommitPoolSize
+    );
     AtomicInteger successfulCommit = new AtomicInteger(0);
-    shuffleServerInfoSet.stream().forEach(ssi -> {
-      RssSendCommitRequest request = new RssSendCommitRequest(appId, 
shuffleId);
-      String errorMsg = "Failed to commit shuffle data to " + ssi + " for 
shuffleId[" + shuffleId + "]";
-      long startTime = System.currentTimeMillis();
-      try {
-        RssSendCommitResponse response = 
getShuffleServerClient(ssi).sendCommit(request);
-        if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
-          int commitCount = response.getCommitCount();
-          LOG.info("Successfully sendCommit for appId[" + appId + "], 
shuffleId[" + shuffleId
-              + "] to ShuffleServer[" + ssi.getId() + "], cost "
-              + (System.currentTimeMillis() - startTime) + " ms, got committed 
maps["
-              + commitCount + "], map number of stage is " + numMaps);
-          if (commitCount >= numMaps) {
-            RssFinishShuffleResponse rfsResponse =
-                getShuffleServerClient(ssi).finishShuffle(new 
RssFinishShuffleRequest(appId, shuffleId));
-            if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) {
-              String msg = "Failed to finish shuffle to " + ssi + " for 
shuffleId[" + shuffleId
-                  + "] with statusCode " + rfsResponse.getStatusCode();
+    try {
+      forkJoinPool.submit(() -> {
+        shuffleServerInfoSet.parallelStream().forEach(ssi -> {
+          RssSendCommitRequest request = new RssSendCommitRequest(appId, 
shuffleId);
+          String errorMsg = "Failed to commit shuffle data to " + ssi + " for 
shuffleId[" + shuffleId + "]";
+          long startTime = System.currentTimeMillis();
+          try {
+            RssSendCommitResponse response = 
getShuffleServerClient(ssi).sendCommit(request);
+            if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+              int commitCount = response.getCommitCount();
+              LOG.info("Successfully sendCommit for appId[" + appId + "], 
shuffleId[" + shuffleId
+                  + "] to ShuffleServer[" + ssi.getId() + "], cost "
+                  + (System.currentTimeMillis() - startTime) + " ms, got 
committed maps["
+                  + commitCount + "], map number of stage is " + numMaps);
+              if (commitCount >= numMaps) {
+                RssFinishShuffleResponse rfsResponse =
+                    getShuffleServerClient(ssi).finishShuffle(new 
RssFinishShuffleRequest(appId, shuffleId));
+                if (rfsResponse.getStatusCode() != ResponseStatusCode.SUCCESS) 
{
+                  String msg = "Failed to finish shuffle to " + ssi + " for 
shuffleId[" + shuffleId
+                      + "] with statusCode " + rfsResponse.getStatusCode();
+                  LOG.error(msg);
+                  throw new Exception(msg);
+                } else {
+                  LOG.info("Successfully finish shuffle to " + ssi + " for 
shuffleId[" + shuffleId + "]");
+                }
+              }
+            } else {
+              String msg = errorMsg + " with statusCode " + 
response.getStatusCode();
               LOG.error(msg);
               throw new Exception(msg);
-            } else {
-              LOG.info("Successfully finish shuffle to " + ssi + " for 
shuffleId[" + shuffleId + "]");
             }
+            successfulCommit.incrementAndGet();
+          } catch (Exception e) {
+            LOG.error(errorMsg, e);
           }
-        } else {
-          String msg = errorMsg + " with statusCode " + 
response.getStatusCode();
-          LOG.error(msg);
-          throw new Exception(msg);
-        }
-        successfulCommit.incrementAndGet();
-      } catch (Exception e) {
-        LOG.error(errorMsg, e);
-      }
-    });
+        });
+      }).join();
+    } finally {
+      forkJoinPool.shutdownNow();
+    }
+
     // check if every commit/finish call is successful
     return successfulCommit.get() == shuffleServerInfoSet.size();
   }
@@ -508,6 +537,7 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
   public void close() {
     heartBeatExecutorService.shutdownNow();
     coordinatorClients.forEach(CoordinatorClient::close);
+    dataTransferPool.shutdownNow();
   }
 
   private void throwExceptionIfNecessary(ClientResponse response, String 
errorMsg) {
diff --git 
a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java 
b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
index 0c519a8..22f662b 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
@@ -36,6 +36,8 @@ public class RssClientConfig {
   public static final boolean RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE = 
true;
   public static final String RSS_DATA_TRANSFER_POOL_SIZE = 
"rss.client.data.transfer.pool.size";
   public static final int RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE = 
Runtime.getRuntime().availableProcessors();
+  public static final String RSS_DATA_COMMIT_POOL_SIZE = 
"rss.client.data.commit.pool.size";
+  public static final int RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE = -1;
   public static final String RSS_HEARTBEAT_INTERVAL = "rss.heartbeat.interval";
   public static final long RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE = 10 * 1000L;
   public static final String RSS_HEARTBEAT_TIMEOUT = "rss.heartbeat.timeout";
diff --git 
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
 
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
index 67efea7..d7f539e 100644
--- 
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
+++ 
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
@@ -42,7 +42,7 @@ public class ShuffleWriteClientImplTest {
   @Test
   public void testSendData() {
     ShuffleWriteClientImpl shuffleWriteClient =
-        new ShuffleWriteClientImpl("GRPC", 3, 2000, 4, 1, 1, 1, true, 1);
+        new ShuffleWriteClientImpl("GRPC", 3, 2000, 4, 1, 1, 1, true, 1, 1);
     ShuffleServerClient mockShuffleServerClient = 
mock(ShuffleServerClient.class);
     ShuffleWriteClientImpl spyClient = Mockito.spy(shuffleWriteClient);
     
doReturn(mockShuffleServerClient).when(spyClient).getShuffleServerClient(any());
diff --git a/docs/client_guide.md b/docs/client_guide.md
index 14b914d..216b76a 100644
--- a/docs/client_guide.md
+++ b/docs/client_guide.md
@@ -87,6 +87,7 @@ These configurations are shared by all types of clients.
 |<client_type>.rss.client.read.buffer.size|14m|The max data size read from 
storage|
 |<client_type>.rss.client.send.threadPool.size|5|The thread size for send 
shuffle data to shuffle server|
 |<client_type>.rss.client.assignment.tags|-|The comma-separated list of tags 
for deciding assignment shuffle servers. Notice that the SHUFFLE_SERVER_VERSION 
will always as the assignment tag whether this conf is set or not|
+|<client_type>.rss.client.data.commit.pool.size|The number of assigned shuffle 
servers|The thread size for sending commit to shuffle servers|
 Notice:
 
 1. `<client_type>` should be `spark` or `mapreduce`
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
index 416af72..9ab84d4 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
@@ -147,7 +147,7 @@ public class AssignmentWithTagsTest extends 
CoordinatorTestBase {
     @Test
     public void testTags() throws Exception {
         ShuffleWriteClientImpl shuffleWriteClient = new 
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
-                1, 1, 1, true, 1);
+                1, 1, 1, true, 1, 1);
         shuffleWriteClient.registerCoordinators(COORDINATOR_QUORUM);
 
         // Case1 : only set the single default shuffle version tag
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java 
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index 0148f69..3dc71f4 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -258,7 +258,7 @@ public class QuorumTest extends ShuffleReadWriteBase {
       int replica, int replicaWrite, int replicaRead, boolean replicaSkip) {
 
     shuffleWriteClientImpl = new 
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
-      replica, replicaWrite, replicaRead, replicaSkip, 1);
+      replica, replicaWrite, replicaRead, replicaSkip, 1, 1);
 
     List<ShuffleServerInfo> allServers = 
Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1,
         shuffleServerInfo2, shuffleServerInfo3, shuffleServerInfo4);
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index 4339239..a1cc6a1 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -101,7 +101,7 @@ public class ShuffleServerGrpcTest extends 
IntegrationTestBase {
   public void clearResourceTest() throws Exception {
     final ShuffleWriteClient shuffleWriteClient =
         ShuffleClientFactory.getInstance().createShuffleWriteClient(
-            "GRPC", 2, 10000L, 4, 1, 1, 1, true, 1);
+            "GRPC", 2, 10000L, 4, 1, 1, 1, true, 1, 1);
     shuffleWriteClient.registerCoordinators("127.0.0.1:19999");
     shuffleWriteClient.registerShuffle(
         new ShuffleServerInfo("127.0.0.1-20001", "127.0.0.1", 20001),
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 53f2869..f2e35c1 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -88,7 +88,7 @@ public class ShuffleWithRssClientTest extends 
ShuffleReadWriteBase {
   @BeforeEach
   public void createClient() {
     shuffleWriteClientImpl = new 
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
-      1, 1, 1, true, 1);
+      1, 1, 1, true, 1, 1);
   }
 
   @AfterEach

Reply via email to