This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 3407a0a35 [#2460] fix(spark3): Record require buffer failure number
every time (#2505)
3407a0a35 is described below
commit 3407a0a35b6150078632a8dcc32849c193aeee55
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Jun 18 17:18:32 2025 +0800
[#2460] fix(spark3): Record require buffer failure number every time (#2505)
---
.../uniffle/client/impl/grpc/ShuffleServerGrpcClient.java | 13 ++++++++++---
.../client/impl/grpc/ShuffleServerGrpcNettyClient.java | 9 ++++++++-
2 files changed, 18 insertions(+), 4 deletions(-)
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 80600d36c..c7f8563de 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -306,7 +306,8 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
requireSize,
retryMax,
retryIntervalMax,
- new AtomicReference<>(StatusCode.INTERNAL_ERROR))
+ new AtomicReference<>(StatusCode.INTERNAL_ERROR),
+ null)
.getLeft();
}
@@ -318,7 +319,8 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
int requireSize,
int retryMax,
long retryIntervalMax,
- AtomicReference<StatusCode> failedStatusCodeRef) {
+ AtomicReference<StatusCode> failedStatusCodeRef,
+ ShuffleServerPushCostTracker costTracker) {
RequireBufferRequest rpcRequest =
RequireBufferRequest.newBuilder()
.setShuffleId(shuffleId)
@@ -372,6 +374,10 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
return Pair.of(result, needSplitPartitionIds);
}
try {
+ ClientInfo clientInfo = getClientInfo();
+ if (clientInfo != null && costTracker != null) {
+
costTracker.recordRequireBufferFailure(clientInfo.getShuffleServerInfo().getId());
+ }
LOG.info(
"Can't require buffer for appId: {}, shuffleId: {}, partitionIds:
{} with {} bytes from {}:{} due to {}, sleep and try[{}] again",
appId,
@@ -596,7 +602,8 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
allocateSize,
request.getRetryMax() / maxRetryAttempts,
request.getRetryIntervalMax(),
- failedStatusCode);
+ failedStatusCode,
+ costTracker);
long requireId = allocationResult.getLeft();
needSplitPartitionIds.addAll(allocationResult.getRight());
if (requireId == FAILED_REQUIRE_ID) {
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index 07e91ea9e..1f014c8b2 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ClientInfo;
+import org.apache.uniffle.client.common.ShuffleServerPushCostTracker;
import org.apache.uniffle.client.request.RssGetInMemoryShuffleDataRequest;
import org.apache.uniffle.client.request.RssGetShuffleDataRequest;
import org.apache.uniffle.client.request.RssGetShuffleIndexRequest;
@@ -158,6 +159,7 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
partitionRequireSizes.add(partitionRequireSize);
}
+ ShuffleServerPushCostTracker costTracker = request.getCostTracker();
SendShuffleDataRequest sendShuffleDataRequest =
new SendShuffleDataRequest(
requestId(),
@@ -182,10 +184,15 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
allocateSize,
request.getRetryMax(),
request.getRetryIntervalMax(),
- failedStatusCode);
+ failedStatusCode,
+ costTracker);
long requireId = result.getLeft();
needSplitPartitionIds.addAll(result.getRight());
if (requireId == FAILED_REQUIRE_ID) {
+ ClientInfo clientInfo = getClientInfo();
+ if (clientInfo != null && costTracker != null) {
+
costTracker.recordRequireBufferFailure(clientInfo.getShuffleServerInfo().getId());
+ }
throw new RssException(
String.format(
"requirePreAllocation failed! size[%s], host[%s],
port[%s]",