This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f2953af [ISSUE-339] Optimize retry logic in send shuffle data (#361)
1f2953af is described below

commit 1f2953af7020e68be7c5f329daaf4d13bf00fbe0
Author: xianjingfeng <[email protected]>
AuthorDate: Sun Nov 27 17:04:50 2022 +0800

    [ISSUE-339] Optimize retry logic in send shuffle data (#361)
    
    ### What changes were proposed in this pull request?
    1. Set the default value of `rss.client.retry.max` to 50
    2. Set  `rss.client.retry.max`  to  `rss.client.retry.max/2` if  replica > 
replicaWrite
    3. Throw an exception if `rss.client.retry.max * 
rss.client.retry.interval.max > rss.client.send.check.timeout.ms`
    ### Why are the changes needed?
    More reasonable. #339
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added
---
 .../org/apache/hadoop/mapreduce/RssMRUtils.java    | 18 +++++++++++++
 .../hadoop/mapreduce/v2/app/RssMRAppMaster.java    |  2 +-
 .../apache/hadoop/mapreduce/RssMRUtilsTest.java    | 21 +++++++++++++++
 .../apache/spark/shuffle/RssSparkShuffleUtils.java | 13 ++++++++++
 .../spark/shuffle/RssSparkShuffleUtilsTest.java    | 30 +++++++++++++++++++++-
 .../spark/shuffle/writer/RssShuffleWriterTest.java |  3 ++-
 .../spark/shuffle/writer/RssShuffleWriterTest.java |  2 +-
 .../client/factory/ShuffleClientFactory.java       |  5 ++++
 .../uniffle/client/util/RssClientConfig.java       |  2 +-
 .../uniffle/test/SparkIntegrationTestBase.java     |  1 +
 .../client/impl/grpc/ShuffleServerGrpcClient.java  |  3 ++-
 11 files changed, 94 insertions(+), 6 deletions(-)

diff --git 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index 347c20b4..83153eb6 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -241,4 +241,22 @@ public class RssMRUtils {
         RssMRConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE);
     return (int) Math.ceil(taskConcurrency * 1.0 / taskConcurrencyPerServer);
   }
+
+  public static void validateRssClientConf(JobConf rssJobConf, JobConf 
mrJobConf) {
+    int retryMax = getInt(rssJobConf, mrJobConf, 
RssMRConfig.RSS_CLIENT_RETRY_MAX,
+        RssMRConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
+    long retryIntervalMax = getLong(rssJobConf, mrJobConf, 
RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
+        RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
+    long sendCheckTimeout = getLong(rssJobConf, mrJobConf, 
RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS,
+        RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE);
+    if (retryIntervalMax * retryMax > sendCheckTimeout) {
+      throw new IllegalArgumentException(String.format("%s(%s) * %s(%s) should 
not bigger than %s(%s)",
+          RssMRConfig.RSS_CLIENT_RETRY_MAX,
+          retryMax,
+          RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
+          retryIntervalMax,
+          RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS,
+          sendCheckTimeout));
+    }
+  }
 }
diff --git 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index 02c63527..1fb6a96a 100644
--- 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++ 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -168,7 +168,7 @@ public class RssMRAppMaster extends MRAppMaster {
       // set the remote storage with actual value
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, 
remoteStorage.getPath());
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, 
remoteStorage.getConfString());
-
+      RssMRUtils.validateRssClientConf(extraConf, conf);
       // When containers have disk with very limited space, reduce is allowed 
to spill data to hdfs
       if (conf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
           RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
diff --git 
a/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java 
b/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
index 385693f3..d55c8edd 100644
--- a/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
+++ b/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
@@ -30,8 +30,11 @@ import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 public class RssMRUtilsTest {
+  
+  private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should 
be thrown";
 
   @Test
   public void baskAttemptIdTest() {
@@ -212,4 +215,22 @@ public class RssMRUtilsTest {
     
jobConf.setDouble("mapreduce.rss.estimate.task.concurrency.dynamic.factor", 
0.5);
     assertEquals(2, RssMRUtils.getRequiredShuffleServerNumber(jobConf));
   }
+
+  @Test
+  public void testValidateRssClientConf() {
+    JobConf jobConf = new JobConf();
+    JobConf rssJobConf = new JobConf();
+    rssJobConf.setInt("mapreduce.job.maps", 500);
+    rssJobConf.setInt("mapreduce.job.reduces", 20);
+    RssMRUtils.validateRssClientConf(rssJobConf, jobConf);
+    rssJobConf.setInt(RssMRConfig.RSS_CLIENT_RETRY_MAX, 5);
+    rssJobConf.setLong(RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX, 1000L);
+    rssJobConf.setLong(RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS, 4999L);
+    try {
+      RssMRUtils.validateRssClientConf(rssJobConf, jobConf);
+      fail(EXPECTED_EXCEPTION_MESSAGE);
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("should not bigger than"));
+    }
+  }
 }
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
index 2320f55b..b7409687 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
@@ -112,6 +112,19 @@ public class RssSparkShuffleUtils {
       LOG.error(msg);
       throw new IllegalArgumentException(msg);
     }
+
+    int retryMax = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX);
+    long retryIntervalMax = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX);
+    long sendCheckTimeout = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS);
+    if (retryIntervalMax * retryMax > sendCheckTimeout) {
+      throw new IllegalArgumentException(String.format("%s(%s) * %s(%s) should 
not bigger than %s(%s)",
+          RssSparkConfig.RSS_CLIENT_RETRY_MAX.key(),
+          retryMax,
+          RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX.key(),
+          retryIntervalMax,
+          RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(),
+          sendCheckTimeout));
+    }
   }
 
   public static Configuration getRemoteStorageHadoopConf(
diff --git 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/RssSparkShuffleUtilsTest.java
 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/RssSparkShuffleUtilsTest.java
index 856fff8c..625db083 100644
--- 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/RssSparkShuffleUtilsTest.java
+++ 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/RssSparkShuffleUtilsTest.java
@@ -33,9 +33,12 @@ import org.apache.uniffle.storage.util.StorageType;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 public class RssSparkShuffleUtilsTest {
-
+  
+  private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should 
be thrown";
+  
   @Test
   public void testAssignmentTags() {
     SparkConf conf = new SparkConf();
@@ -210,4 +213,29 @@ public class RssSparkShuffleUtilsTest {
     sparkConf.set(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER, 
100);
     assertEquals(3, 
RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf));
   }
+
+  @Test
+  public void testValidateRssClientConf() {
+    SparkConf sparkConf = new SparkConf();
+    try {
+      RssSparkShuffleUtils.validateRssClientConf(sparkConf);
+      fail(EXPECTED_EXCEPTION_MESSAGE);
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("must be set by the client or fetched 
from coordinators"));
+    }
+    sparkConf.set(RssSparkConfig.RSS_STORAGE_TYPE, "MEMORY_LOCALFILE_HDFS");
+    RssSparkShuffleUtils.validateRssClientConf(sparkConf);
+    sparkConf.set(RssSparkConfig.RSS_CLIENT_RETRY_MAX, 5);
+    sparkConf.set(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX, 1000L);
+    sparkConf.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS, 4999L);
+    try {
+      RssSparkShuffleUtils.validateRssClientConf(sparkConf);
+      fail(EXPECTED_EXCEPTION_MESSAGE);
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("should not bigger than"));
+    }
+    sparkConf.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS, 5000L);
+    RssSparkShuffleUtils.validateRssClientConf(sparkConf);
+  }
+
 }
diff --git 
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
 
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index f71900ce..0189b79f 100644
--- 
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++ 
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -71,6 +71,8 @@ public class RssShuffleWriterTest {
         .setMaster("local[2]")
         .set(RssSparkConfig.RSS_TEST_FLAG.key(), "true")
         .set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(), "10000")
+        .set(RssSparkConfig.RSS_CLIENT_RETRY_MAX.key(), "10")
+        .set(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX.key(), "1000")
         .set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key(), "1000")
         .set(RssSparkConfig.RSS_STORAGE_TYPE.key(), 
StorageType.LOCALFILE.name())
         .set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), 
"127.0.0.1:12345,127.0.0.1:12346");
@@ -137,7 +139,6 @@ public class RssShuffleWriterTest {
         .set(RssSparkConfig.RSS_WRITER_SERIALIZER_BUFFER_SIZE.key(), "32")
         .set(RssSparkConfig.RSS_WRITER_BUFFER_SEGMENT_SIZE.key(), "64")
         .set(RssSparkConfig.RSS_WRITER_BUFFER_SPILL_SIZE.key(), "128")
-        .set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(), "10000")
         .set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key(), "1000")
         .set(RssSparkConfig.RSS_STORAGE_TYPE.key(), 
StorageType.LOCALFILE.name())
         .set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), 
"127.0.0.1:12345,127.0.0.1:12346");
diff --git 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 98ffc8a6..1e821997 100644
--- 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++ 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -71,6 +71,7 @@ public class RssShuffleWriterTest {
         .setMaster("local[2]")
         .set(RssSparkConfig.RSS_TEST_FLAG.key(), "true")
         .set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(), "10000")
+        .set(RssSparkConfig.RSS_CLIENT_RETRY_MAX.key(), "10")
         .set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key(), "1000")
         .set(RssSparkConfig.RSS_STORAGE_TYPE.key(), 
StorageType.LOCALFILE.name())
         .set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), 
"127.0.0.1:12345,127.0.0.1:12346");
@@ -140,7 +141,6 @@ public class RssShuffleWriterTest {
         .set(RssSparkConfig.RSS_WRITER_BUFFER_SIZE.key(), "32")
         .set(RssSparkConfig.RSS_TEST_FLAG.key(), "true")
         .set(RssSparkConfig.RSS_WRITER_BUFFER_SEGMENT_SIZE.key(), "64")
-        .set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(), "10000")
         .set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key(), "1000")
         .set(RssSparkConfig.RSS_WRITER_BUFFER_SPILL_SIZE.key(), "128")
         .set(RssSparkConfig.RSS_STORAGE_TYPE.key(), 
StorageType.LOCALFILE.name())
diff --git 
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
 
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
index 2b8b6264..7cbbb37f 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
@@ -49,6 +49,11 @@ public class ShuffleClientFactory {
       String clientType, int retryMax, long retryIntervalMax, int 
heartBeatThreadNum,
       int replica, int replicaWrite, int replicaRead, boolean 
replicaSkipEnabled, int dataTransferPoolSize,
       int dataCommitPoolSize, int unregisterThreadPoolSize, int 
unregisterRequestTimeoutSec) {
+    // If replica > replicaWrite, blocks maybe will be sended for 2 rounds.
+    // We need retry less times in this case for let the first round fail fast.
+    if (replicaSkipEnabled && replica > replicaWrite) {
+      retryMax = retryMax / 2;
+    }
     return new ShuffleWriteClientImpl(
         clientType,
         retryMax,
diff --git 
a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java 
b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
index caf025ce..fd69221b 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
@@ -22,7 +22,7 @@ public class RssClientConfig {
   public static final String RSS_CLIENT_TYPE = "rss.client.type";
   public static final String RSS_CLIENT_TYPE_DEFAULT_VALUE = "GRPC";
   public static final String RSS_CLIENT_RETRY_MAX = "rss.client.retry.max";
-  public static final int RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE = 100;
+  public static final int RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE = 50;
   public static final String RSS_CLIENT_RETRY_INTERVAL_MAX = 
"rss.client.retry.interval.max";
   public static final long RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE = 
10000L;
   public static final String RSS_COORDINATOR_QUORUM = "rss.coordinator.quorum";
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
index 5632ab5e..3ef2dd6e 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
@@ -101,6 +101,7 @@ public abstract class SparkIntegrationTestBase extends 
IntegrationTestBase {
     sparkConf.set(RssSparkConfig.RSS_WRITER_BUFFER_SEGMENT_SIZE.key(), "256k");
     sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), 
COORDINATOR_QUORUM);
     sparkConf.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(), 
"30000");
+    sparkConf.set(RssSparkConfig.RSS_CLIENT_RETRY_MAX.key(), "10");
     sparkConf.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key(), 
"1000");
     sparkConf.set(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX.key(), "1000");
     sparkConf.set(RssSparkConfig.RSS_INDEX_READ_LIMIT.key(), "100");
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 82a590db..543ce1f6 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -302,7 +302,8 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
       final int finalBlockNum = blockNum;
       try {
         RetryUtils.retry(() -> {
-          long requireId = requirePreAllocation(allocateSize, 
request.getRetryMax(), request.getRetryIntervalMax());
+          long requireId = requirePreAllocation(allocateSize, 
request.getRetryMax() / maxRetryAttempts,
+              request.getRetryIntervalMax());
           if (requireId == FAILED_REQUIRE_ID) {
             throw new RssException(String.format(
                 "requirePreAllocation failed! size[%s], host[%s], port[%s]", 
allocateSize, host, port));

Reply via email to