This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 3407a0a35 [#2460] fix(spark3): Record require buffer failure number 
every time (#2505)
3407a0a35 is described below

commit 3407a0a35b6150078632a8dcc32849c193aeee55
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Jun 18 17:18:32 2025 +0800

    [#2460] fix(spark3): Record require buffer failure number every time (#2505)
---
 .../uniffle/client/impl/grpc/ShuffleServerGrpcClient.java   | 13 ++++++++++---
 .../client/impl/grpc/ShuffleServerGrpcNettyClient.java      |  9 ++++++++-
 2 files changed, 18 insertions(+), 4 deletions(-)

diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 80600d36c..c7f8563de 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -306,7 +306,8 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
             requireSize,
             retryMax,
             retryIntervalMax,
-            new AtomicReference<>(StatusCode.INTERNAL_ERROR))
+            new AtomicReference<>(StatusCode.INTERNAL_ERROR),
+            null)
         .getLeft();
   }
 
@@ -318,7 +319,8 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
       int requireSize,
       int retryMax,
       long retryIntervalMax,
-      AtomicReference<StatusCode> failedStatusCodeRef) {
+      AtomicReference<StatusCode> failedStatusCodeRef,
+      ShuffleServerPushCostTracker costTracker) {
     RequireBufferRequest rpcRequest =
         RequireBufferRequest.newBuilder()
             .setShuffleId(shuffleId)
@@ -372,6 +374,10 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
         return Pair.of(result, needSplitPartitionIds);
       }
       try {
+        ClientInfo clientInfo = getClientInfo();
+        if (clientInfo != null && costTracker != null) {
+          
costTracker.recordRequireBufferFailure(clientInfo.getShuffleServerInfo().getId());
+        }
         LOG.info(
             "Can't require buffer for appId: {}, shuffleId: {}, partitionIds: 
{} with {} bytes from {}:{} due to {}, sleep and try[{}] again",
             appId,
@@ -596,7 +602,8 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
                       allocateSize,
                       request.getRetryMax() / maxRetryAttempts,
                       request.getRetryIntervalMax(),
-                      failedStatusCode);
+                      failedStatusCode,
+                      costTracker);
               long requireId = allocationResult.getLeft();
               needSplitPartitionIds.addAll(allocationResult.getRight());
               if (requireId == FAILED_REQUIRE_ID) {
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index 07e91ea9e..1f014c8b2 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ClientInfo;
+import org.apache.uniffle.client.common.ShuffleServerPushCostTracker;
 import org.apache.uniffle.client.request.RssGetInMemoryShuffleDataRequest;
 import org.apache.uniffle.client.request.RssGetShuffleDataRequest;
 import org.apache.uniffle.client.request.RssGetShuffleIndexRequest;
@@ -158,6 +159,7 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
         partitionRequireSizes.add(partitionRequireSize);
       }
 
+      ShuffleServerPushCostTracker costTracker = request.getCostTracker();
       SendShuffleDataRequest sendShuffleDataRequest =
           new SendShuffleDataRequest(
               requestId(),
@@ -182,10 +184,15 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
                       allocateSize,
                       request.getRetryMax(),
                       request.getRetryIntervalMax(),
-                      failedStatusCode);
+                      failedStatusCode,
+                      costTracker);
               long requireId = result.getLeft();
               needSplitPartitionIds.addAll(result.getRight());
               if (requireId == FAILED_REQUIRE_ID) {
+                ClientInfo clientInfo = getClientInfo();
+                if (clientInfo != null && costTracker != null) {
+                  
costTracker.recordRequireBufferFailure(clientInfo.getShuffleServerInfo().getId());
+                }
                 throw new RssException(
                     String.format(
                         "requirePreAllocation failed! size[%s], host[%s], 
port[%s]",

Reply via email to