This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 4def3cc7e [#2093][followup] feat: Add support of partition split for
grpc (#2396)
4def3cc7e is described below
commit 4def3cc7e7f45ac8ee2bb37cb2fb29b7894f2c5e
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Mar 17 17:29:57 2025 +0800
[#2093][followup] feat: Add support of partition split for grpc (#2396)
### What changes were proposed in this pull request?
Add support of partition split for grpc
### Why are the changes needed?
followup for #2093
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing unit tests
---
.../shuffle/manager/RssShuffleManagerBase.java | 1 +
.../client/impl/grpc/ShuffleServerGrpcClient.java | 26 +++++++++++++---------
2 files changed, 16 insertions(+), 11 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index d869c64fe..43f9c1f00 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -271,6 +271,7 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
throw new RssException(
"The feature of task partition reassign is incompatible with
multiple replicas mechanism.");
}
+ LOG.info("Partition reassign is enabled.");
}
this.blockIdSelfManagedEnabled =
rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
this.shuffleManagerRpcServiceEnabled =
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index cb477df34..82993890e 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -23,11 +23,13 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.StatusRuntimeException;
@@ -540,7 +542,7 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
boolean isSuccessful = true;
AtomicReference<StatusCode> failedStatusCode = new
AtomicReference<>(StatusCode.INTERNAL_ERROR);
-
+ Set<Integer> needSplitPartitionIds = Sets.newHashSet();
// prepare rpc request based on shuffleId -> partitionId -> blocks
for (Map.Entry<Integer, Map<Integer, List<ShuffleBlockInfo>>> stb :
shuffleIdToBlocks.entrySet()) {
@@ -583,17 +585,18 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
RetryUtils.retryWithCondition(
() -> {
// TODO(baoloongmao): support partition split follow netty client
- long requireId =
+ Pair<Long, List<Integer>> allocationResult =
requirePreAllocation(
- appId,
- shuffleId,
- partitionIds,
- partitionRequireSizes,
- allocateSize,
- request.getRetryMax() / maxRetryAttempts,
- request.getRetryIntervalMax(),
- failedStatusCode)
- .getLeft();
+ appId,
+ shuffleId,
+ partitionIds,
+ partitionRequireSizes,
+ allocateSize,
+ request.getRetryMax() / maxRetryAttempts,
+ request.getRetryIntervalMax(),
+ failedStatusCode);
+ long requireId = allocationResult.getLeft();
+ needSplitPartitionIds.addAll(allocationResult.getRight());
if (requireId == FAILED_REQUIRE_ID) {
throw new RssException(
String.format(
@@ -661,6 +664,7 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
} else {
response = new RssSendShuffleDataResponse(failedStatusCode.get());
}
+ response.setNeedSplitPartitionIds(needSplitPartitionIds);
return response;
}