This is an automated email from the ASF dual-hosted git repository.
maobaolong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 3b0521abe [#2269] refactor: Fix duplicated blockIds issue caused by
duplicated reportShuffleResult (#2270)
3b0521abe is described below
commit 3b0521abe0b36c6e42d20382c17638cf61a54221
Author: maobaolong <[email protected]>
AuthorDate: Sat Jan 4 14:23:51 2025 +0800
[#2269] refactor: Fix duplicated blockIds issue caused by duplicated
reportShuffleResult (#2270)
### What changes were proposed in this pull request?
Client stop retrying and throw RssException for reportShuffleResult
operation while interrupted by caller.
### Why are the changes needed?
Fix: #2269
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No need
---
.../client/impl/grpc/ShuffleServerGrpcClient.java | 36 ++++++++++------------
1 file changed, 17 insertions(+), 19 deletions(-)
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 56f02721d..399c21547 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -30,6 +30,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
+import io.grpc.StatusRuntimeException;
import io.netty.buffer.Unpooled;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
@@ -792,26 +793,23 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
}
private ReportShuffleResultResponse
doReportShuffleResult(ReportShuffleResultRequest rpcRequest) {
- int retryNum = 0;
- while (retryNum < maxRetryAttempts) {
- try {
- ReportShuffleResultResponse response =
getBlockingStub().reportShuffleResult(rpcRequest);
- return response;
- } catch (Exception e) {
- retryNum++;
- LOG.warn(
- "Report shuffle result to host["
- + host
- + "], port["
- + port
- + "] failed, try again, retryNum["
- + retryNum
- + "]",
- e);
- }
+ try {
+ return RetryUtils.retryWithCondition(
+ () -> getBlockingStub().reportShuffleResult(rpcRequest),
+ null, // No specific callback to execute
+ 0, // No delay between retries, retry immediately
+ maxRetryAttempts, // Maximum number of retry attempts
+ t -> { // Define retry condition directly in the method call
+ if (t instanceof StatusRuntimeException) {
+ return !(t.getCause() instanceof InterruptedException);
+ }
+ return t instanceof Exception; // Retry for all other Exceptions
+ });
+ } catch (Throwable t) {
+ // Handle or rethrow the exception as appropriate
+ throw new RssException(
+ "Failed to report shuffle result to host[" + host + "], port[" +
port + "]", t);
}
- throw new RssException(
- "Report shuffle result to host[" + host + "], port[" + port + "]
failed");
}
@Override