This is an automated email from the ASF dual-hosted git repository.

maobaolong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b0521abe [#2269] refactor: Fix duplicated blockIds issue caused by 
duplicated reportShuffleResult (#2270)
3b0521abe is described below

commit 3b0521abe0b36c6e42d20382c17638cf61a54221
Author: maobaolong <[email protected]>
AuthorDate: Sat Jan 4 14:23:51 2025 +0800

    [#2269] refactor: Fix duplicated blockIds issue caused by duplicated 
reportShuffleResult (#2270)
    
    ### What changes were proposed in this pull request?
    
    Client stop retrying and throw RssException for reportShuffleResult 
operation while interrupted by caller.
    
    ### Why are the changes needed?
    
    Fix: #2269
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    No need
---
 .../client/impl/grpc/ShuffleServerGrpcClient.java  | 36 ++++++++++------------
 1 file changed, 17 insertions(+), 19 deletions(-)

diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 56f02721d..399c21547 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -30,6 +30,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.UnsafeByteOperations;
+import io.grpc.StatusRuntimeException;
 import io.netty.buffer.Unpooled;
 import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
@@ -792,26 +793,23 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
   }
 
   private ReportShuffleResultResponse 
doReportShuffleResult(ReportShuffleResultRequest rpcRequest) {
-    int retryNum = 0;
-    while (retryNum < maxRetryAttempts) {
-      try {
-        ReportShuffleResultResponse response = 
getBlockingStub().reportShuffleResult(rpcRequest);
-        return response;
-      } catch (Exception e) {
-        retryNum++;
-        LOG.warn(
-            "Report shuffle result to host["
-                + host
-                + "], port["
-                + port
-                + "] failed, try again, retryNum["
-                + retryNum
-                + "]",
-            e);
-      }
+    try {
+      return RetryUtils.retryWithCondition(
+          () -> getBlockingStub().reportShuffleResult(rpcRequest),
+          null, // No specific callback to execute
+          0, // No delay between retries, retry immediately
+          maxRetryAttempts, // Maximum number of retry attempts
+          t -> { // Define retry condition directly in the method call
+            if (t instanceof StatusRuntimeException) {
+              return !(t.getCause() instanceof InterruptedException);
+            }
+            return t instanceof Exception; // Retry for all other Exceptions
+          });
+    } catch (Throwable t) {
+      // Handle or rethrow the exception as appropriate
+      throw new RssException(
+          "Failed to report shuffle result to host[" + host + "], port[" + 
port + "]", t);
     }
-    throw new RssException(
-        "Report shuffle result to host[" + host + "], port[" + port + "] 
failed");
   }
 
   @Override

Reply via email to