This is an automated email from the ASF dual-hosted git repository.
maobaolong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 3cf82d70f [#2219] feat: Introduce ShuffleBlockIdManagerFactory and
PartitionedShuffleBlockIdManager (#2227)
3cf82d70f is described below
commit 3cf82d70f2465f64d01a6af2bdf2cb39f7b9ce5d
Author: maobaolong <[email protected]>
AuthorDate: Mon Nov 18 14:16:21 2024 +0800
[#2219] feat: Introduce ShuffleBlockIdManagerFactory and
PartitionedShuffleBlockIdManager (#2227)
### What changes were proposed in this pull request?
- Introduce a ShuffleBlockIdManagerFactory to create configured
implementation.
- Introduce PartitionedShuffleBlockIdManager to individual bitmap for each
partition.
### Why are the changes needed?
Fix: #2219
<img width="1596" alt="image"
src="https://github.com/user-attachments/assets/c7752f94-941b-45d2-9a45-1da197e38984">
This is the maximum partition num with node metrics in our cluster, it is
~410K, it cost heap size 410K * 50KiB = 2GiB if we say a bitmap related a
partition avg 10000 blocks(cost 50KiB heap).
I think this is worth to choose this new policy in our production cluster.
### Does this PR introduce _any_ user-facing change?
- Config option: rss.server.blockIdStrategyClass
### How was this patch tested?
Test Locally, check result by `arthas`
- Common case, test with default config
```
[arthas@264]$ vmtool --action getInstances --className
org.apache.uniffle.server.ShuffleTaskInfo --express
'instances[0].shuffleBlockIdManager'
@DefaultShuffleBlockIdManager[
LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@3f211bae],
partitionsToBlockIds=@ConcurrentHashMapForJDK8[isEmpty=false;size=1],
]
[arthas@264]$ vmtool --action getInstances --className
org.apache.uniffle.server.ShuffleTaskInfo --express
'instances[0].shuffleBlockIdManager.partitionsToBlockIds.get("application_1729845342052_431031_1730297736664")
'
@ConcurrentHashMapForJDK8[
@Integer[0]:@Roaring64NavigableMap[][isEmpty=false;size=10],
@Integer[1]:@Roaring64NavigableMap[][isEmpty=false;size=10],
@Integer[2]:@Roaring64NavigableMap[][isEmpty=false;size=10],
]
```
- Config to PartitionedShuffleBlockIdManager
```
[arthas@264]$ vmtool --action getInstances --className
org.apache.uniffle.server.ShuffleTaskInfo --express
'instances[0].shuffleBlockIdManager'
@PartitionedShuffleBlockIdManager[
LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@6aa6fea0],
partitionsToBlockIds=@ConcurrentHashMap[isEmpty=false;size=1],
]
[arthas@264]$ vmtool --action getInstances --className
org.apache.uniffle.server.ShuffleTaskInfo --express
'instances[0].shuffleBlockIdManager.partitionsToBlockIds'
@ConcurrentHashMap[
@String[application_1729845342052_432324_1730299397916]:@ConcurrentHashMapForJDK8[isEmpty=false;size=2],
]
[arthas@264]$ vmtool --action getInstances --className
org.apache.uniffle.server.ShuffleTaskInfo --express
'instances[0].shuffleBlockIdManager.partitionsToBlockIds.get("application_1729845342052_432324_1730299397916")'
@ConcurrentHashMapForJDK8[
@Integer[0]:@ConcurrentHashMap[isEmpty=false;size=2000],
@Integer[1]:@ConcurrentHashMap[isEmpty=false;size=2000],
]
```
- Use client app level config
submit two app with
`spark.rss.client.blockIdStrategyClass=org.apache.uniffle.server.block.DefaultShuffleBlockIdManager`
and
`spark.rss.client.blockIdStrategyClass=org.apache.uniffle.server.block.PartitionedShuffleBlockIdManager`
```
[arthas@264]$ vmtool --action getInstances --className
org.apache.uniffle.server.ShuffleTaskInfo --express
'instances[0].shuffleBlockIdManager'
@DefaultShuffleBlockIdManager[
LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@ed3b52a],
partitionsToBlockIds=@ConcurrentHashMapForJDK8[isEmpty=false;size=1],
]
[arthas@264]$ vmtool --action getInstances --className
org.apache.uniffle.server.ShuffleTaskInfo --express
'instances[1].shuffleBlockIdManager'
@PartitionedShuffleBlockIdManager[
LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@5fc14628],
partitionsToBlockIds=@ConcurrentHashMap[isEmpty=false;size=1],
]
```
Log example
```
[2024-10-30 23:01:37.585] [Grpc-709] [INFO] ShuffleTaskInfo -
application_1729845342052_432387_1730300440568 use app configured
ShuffleBlockIdManager to
org.apache.uniffle.server.block.DefaultShuffleBlockIdManager@3c6b75a8
```
---
.../uniffle/common/config/RssClientConf.java | 8 +
docs/client_guide/client_guide.md | 1 +
docs/server_guide.md | 1 +
.../apache/uniffle/test/ShuffleServerGrpcTest.java | 9 +-
.../uniffle/test/ShuffleServerWithLocalTest.java | 12 +-
.../apache/uniffle/server/ShuffleServerConf.java | 8 +
.../org/apache/uniffle/server/ShuffleTaskInfo.java | 34 +++-
.../apache/uniffle/server/ShuffleTaskManager.java | 167 ++++------------
.../server/block/DefaultShuffleBlockIdManager.java | 220 +++++++++++++++++++++
.../block/PartitionedShuffleBlockIdManager.java | 159 +++++++++++++++
.../server/block/ShuffleBlockIdManager.java | 56 ++++++
.../server/block/ShuffleBlockIdManagerFactory.java | 46 +++++
.../uniffle/server/ShuffleTaskManagerTest.java | 11 +-
13 files changed, 590 insertions(+), 142 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 58b6d6a91..6d311a549 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -295,4 +295,12 @@ public class RssClientConf {
.asList()
.noDefaultValue()
.withDescription("the extra java properties could be configured by
this option");
+
+ public static final ConfigOption<String> RSS_CLIENT_BLOCK_ID_MANAGER_CLASS =
+ ConfigOptions.key("rss.client.blockIdManagerClass")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The block id manager class of server for this application, "
+ + "the implementation of this interface to manage the
shuffle block ids");
}
diff --git a/docs/client_guide/client_guide.md
b/docs/client_guide/client_guide.md
index bf7bb5d16..371528931 100644
--- a/docs/client_guide/client_guide.md
+++ b/docs/client_guide/client_guide.md
@@ -59,6 +59,7 @@ The important configuration of client is listed as following.
These configuratio
| <client_type>.rss.client.rpc.netty.pageSize | 4096
| The value of pageSize for PooledByteBufAllocator
when using gRPC internal Netty on the client-side. This configuration will only
take effect when rss.rpc.server.type is set to GRPC_NETTY.
[...]
| <client_type>.rss.client.rpc.netty.maxOrder | 3
| The value of maxOrder for PooledByteBufAllocator
when using gRPC internal Netty on the client-side. This configuration will only
take effect when rss.rpc.server.type is set to GRPC_NETTY.
[...]
| <client_type>.rss.client.rpc.netty.smallCacheSize | 1024
| The value of smallCacheSize for
PooledByteBufAllocator when using gRPC internal Netty on the client-side. This
configuration will only take effect when rss.rpc.server.type is set to
GRPC_NETTY.
[...]
+| <client_type>.rss.client.blockIdManagerClass | -
| The block id manager class of server for this
application, the implementation of this interface to manage the shuffle block
ids
[...]
Notice:
diff --git a/docs/server_guide.md b/docs/server_guide.md
index d975ce324..3c6b7750a 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -121,6 +121,7 @@ This document will introduce how to deploy Uniffle shuffle
servers.
| rss.storage.localFileWriterClass |
org.apache.uniffle.storage.handler.impl.LocalFileWriter | The
writer class to write shuffle data for LOCALFILE.
[...]
| rss.storage.hdfs.write.dataBufferSize | 8K
| The size of the buffer
used to cache data written for HDFS.
[...]
| rss.storage.hdfs.write.indexBufferSize | 8K
| The size of the buffer
used to cache index written for HDFS.
[...]
+| rss.server.blockIdManagerClass |
org.apache.uniffle.server.block.DefaultShuffleBlockIdManager | The
block id manager class. It is used to manage block id.
[...]
### Advanced Configurations
| Property Name | Default | Description
|
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index 1ee10a8aa..e188e04b6 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -352,14 +352,13 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
request = new RssReportShuffleResultRequest("shuffleResultTest", 2, 1L,
partitionToBlockIds, 3);
grpcShuffleServerClient.reportShuffleResult(request);
// validate bitmap in shuffleTaskManager
- Roaring64NavigableMap[] bitmaps =
+ long bitmapNum =
grpcShuffleServers
.get(0)
.getShuffleTaskManager()
- .getPartitionsToBlockIds()
- .get("shuffleResultTest")
- .get(2);
- assertEquals(3, bitmaps.length);
+ .getShuffleBlockIdManager()
+ .getBitmapNum("shuffleResultTest", 2);
+ assertEquals(3, bitmapNum);
req = new RssGetShuffleResultRequest("shuffleResultTest", 2, 1, layout);
result = grpcShuffleServerClient.getShuffleResult(req);
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
index a77472e6a..56c3a6a09 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
@@ -179,10 +179,18 @@ public class ShuffleServerWithLocalTest extends
ShuffleReadWriteBase {
List<ShuffleServer> shuffleServers = isNettyMode ? nettyShuffleServers :
grpcShuffleServers;
assertNotNull(
-
shuffleServers.get(0).getShuffleTaskManager().getPartitionsToBlockIds().get(testAppId));
+ shuffleServers
+ .get(0)
+ .getShuffleTaskManager()
+ .getShuffleBlockIdManager()
+ .contains(testAppId));
Thread.sleep(8000);
assertNull(
-
shuffleServers.get(0).getShuffleTaskManager().getPartitionsToBlockIds().get(testAppId));
+ shuffleServers
+ .get(0)
+ .getShuffleTaskManager()
+ .getShuffleBlockIdManager()
+ .contains(testAppId));
}
protected void validateResult(
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index fd2f5802c..8888e60b7 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -25,6 +25,7 @@ import org.apache.uniffle.common.config.ConfigOption;
import org.apache.uniffle.common.config.ConfigOptions;
import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.server.block.DefaultShuffleBlockIdManager;
import org.apache.uniffle.server.buffer.ShuffleBufferType;
public class ShuffleServerConf extends RssBaseConf {
@@ -743,6 +744,13 @@ public class ShuffleServerConf extends RssBaseConf {
.booleanType()
.defaultValue(false)
.withDescription("Whether to enable app detail log");
+ public static final ConfigOption<String> SERVER_BLOCK_ID_MANAGER_CLASS =
+ ConfigOptions.key("rss.server.blockIdManagerClass")
+ .stringType()
+ .defaultValue(DefaultShuffleBlockIdManager.class.getName())
+ .withDescription(
+ "The block id manager class, the implementation of this
interface "
+ + "to manage the shuffle block ids");
public ShuffleServerConf() {}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index 11ac2dff3..6b2a89554 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -26,14 +26,19 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.PartitionInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.UnitConverter;
+import org.apache.uniffle.server.block.ShuffleBlockIdManager;
+import org.apache.uniffle.server.block.ShuffleBlockIdManagerFactory;
/**
* ShuffleTaskInfo contains the information of submitting the shuffle, the
information of the cache
@@ -78,6 +83,7 @@ public class ShuffleTaskInfo {
private final Map<Integer, Integer> latestStageAttemptNumbers;
private Map<String, String> properties;
+ private ShuffleBlockIdManager shuffleBlockIdManager;
public ShuffleTaskInfo(String appId) {
this.appId = appId;
@@ -324,6 +330,32 @@ public class ShuffleTaskInfo {
.filter(entry -> entry.getKey().contains(".rss."))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
this.properties = filteredProperties;
- LOGGER.info("{} set properties to {}", appId, properties);
+ LOGGER.info("{} set properties to {}", appId, filteredProperties);
+ String keyName = RssClientConf.RSS_CLIENT_BLOCK_ID_MANAGER_CLASS.key();
+ String className = properties.get(keyName);
+ if (StringUtils.isEmpty(className)) {
+ keyName =
+ Constants.SPARK_RSS_CONFIG_PREFIX +
RssClientConf.RSS_CLIENT_BLOCK_ID_MANAGER_CLASS.key();
+ className =
+ properties.get(
+ Constants.SPARK_RSS_CONFIG_PREFIX
+ + RssClientConf.RSS_CLIENT_BLOCK_ID_MANAGER_CLASS.key());
+ }
+ if (StringUtils.isNotEmpty(className)) {
+ shuffleBlockIdManager =
+ ShuffleBlockIdManagerFactory.createShuffleBlockIdManager(className,
keyName);
+ LOGGER.info(
+ "{} use app configured ShuffleBlockIdManager to {}", appId,
shuffleBlockIdManager);
+ }
+ }
+
+ public ShuffleBlockIdManager getShuffleBlockIdManager() {
+ return shuffleBlockIdManager;
+ }
+
+ public void setShuffleBlockIdManagerIfNeeded(ShuffleBlockIdManager
shuffleBlockIdManager) {
+ if (this.shuffleBlockIdManager == null) {
+ this.shuffleBlockIdManager = shuffleBlockIdManager;
+ }
}
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 6211e6673..d6a9a3fe9 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -21,10 +21,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
@@ -43,7 +41,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
@@ -76,6 +73,8 @@ import org.apache.uniffle.common.util.OutputUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.common.util.UnitConverter;
+import org.apache.uniffle.server.block.ShuffleBlockIdManager;
+import org.apache.uniffle.server.block.ShuffleBlockIdManagerFactory;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
import org.apache.uniffle.server.buffer.ShuffleBuffer;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
@@ -115,13 +114,6 @@ public class ShuffleTaskManager {
private long commitCheckIntervalMax;
private long leakShuffleDataCheckInterval;
private long triggerFlushInterval;
- // appId -> shuffleId -> blockIds to avoid too many appId
- // store taskAttemptId info to filter speculation task
- // Roaring64NavigableMap instance will cost much memory,
- // merge different blockId of partition to one bitmap can reduce memory cost,
- // but when get blockId, performance will degrade a little which can be
optimized by client
- // configuration
- private Map<String, Map<Integer, Roaring64NavigableMap[]>>
partitionsToBlockIds;
private final ShuffleBufferManager shuffleBufferManager;
private Map<String, ShuffleTaskInfo> shuffleTaskInfos =
JavaUtils.newConcurrentMap();
private Map<Long, PreAllocatedBufferInfo> requireBufferIds =
JavaUtils.newConcurrentMap();
@@ -130,6 +122,7 @@ public class ShuffleTaskManager {
private final Cache<String, ReentrantReadWriteLock> appLocks;
private final long storageRemoveOperationTimeoutSec;
private ShuffleMergeManager shuffleMergeManager;
+ private ShuffleBlockIdManager shuffleBlockIdManager;
public ShuffleTaskManager(
ShuffleServerConf conf,
@@ -147,7 +140,6 @@ public class ShuffleTaskManager {
ShuffleMergeManager shuffleMergeManager) {
this.conf = conf;
this.shuffleFlushManager = shuffleFlushManager;
- this.partitionsToBlockIds = JavaUtils.newConcurrentMap();
this.shuffleBufferManager = shuffleBufferManager;
this.storageManager = storageManager;
this.shuffleMergeManager = shuffleMergeManager;
@@ -200,6 +192,8 @@ public class ShuffleTaskManager {
shuffleBufferManager.setShuffleTaskManager(this);
}
+ shuffleBlockIdManager =
ShuffleBlockIdManagerFactory.createShuffleBlockIdManager(conf);
+
appLocks =
CacheBuilder.newBuilder()
.expireAfterAccess(3600, TimeUnit.SECONDS)
@@ -260,13 +254,12 @@ public class ShuffleTaskManager {
ShuffleServerMetrics.addLabeledCacheGauge(
REPORTED_BLOCK_COUNT,
() ->
- partitionsToBlockIds.values().stream()
- .flatMap(innerMap -> innerMap.values().stream())
- .flatMapToLong(
- arr ->
- java.util.Arrays.stream(arr)
-
.mapToLong(Roaring64NavigableMap::getLongCardinality))
- .sum(),
+ shuffleBlockIdManager.getTotalBlockCount()
+ + shuffleTaskInfos.values().stream()
+ .map(ShuffleTaskInfo::getShuffleBlockIdManager)
+ .filter(manager -> manager != null && manager !=
shuffleBlockIdManager)
+ .mapToLong(ShuffleBlockIdManager::getTotalBlockCount)
+ .sum(),
2 * 60 * 1000L /* 2 minutes */);
ShuffleServerMetrics.addLabeledCacheGauge(
CACHED_BLOCK_COUNT,
@@ -339,8 +332,9 @@ public class ShuffleTaskManager {
getMaxConcurrencyWriting(maxConcurrencyPerPartitionToWrite,
conf))
.dataDistributionType(dataDistType)
.build());
+ taskInfo.setShuffleBlockIdManagerIfNeeded(shuffleBlockIdManager);
- partitionsToBlockIds.computeIfAbsent(appId, key ->
JavaUtils.newConcurrentMap());
+ taskInfo.getShuffleBlockIdManager().registerAppId(appId);
for (PartitionRange partitionRange : partitionRanges) {
shuffleBufferManager.registerBuffer(
appId, shuffleId, partitionRange.getStart(),
partitionRange.getEnd());
@@ -467,51 +461,16 @@ public class ShuffleTaskManager {
public int addFinishedBlockIds(
String appId, Integer shuffleId, Map<Integer, long[]>
partitionToBlockIds, int bitmapNum) {
refreshAppId(appId);
- Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions =
partitionsToBlockIds.get(appId);
- if (shuffleIdToPartitions == null) {
- throw new RssException("appId[" + appId + "] is expired!");
- }
- shuffleIdToPartitions.computeIfAbsent(
- shuffleId,
- key -> {
- Roaring64NavigableMap[] blockIds = new
Roaring64NavigableMap[bitmapNum];
- for (int i = 0; i < bitmapNum; i++) {
- blockIds[i] = Roaring64NavigableMap.bitmapOf();
- }
- return blockIds;
- });
- Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId);
- if (blockIds.length != bitmapNum) {
- throw new InvalidRequestException(
- "Request expects "
- + bitmapNum
- + " bitmaps, but there are "
- + blockIds.length
- + " bitmaps!");
- }
-
ShuffleTaskInfo taskInfo = getShuffleTaskInfo(appId);
if (taskInfo == null) {
throw new InvalidRequestException(
"ShuffleTaskInfo is not found that should not happen for appId: " +
appId);
}
- int totalUpdatedBlockCount = 0;
- for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) {
- Integer partitionId = entry.getKey();
- Roaring64NavigableMap bitmap = blockIds[partitionId % bitmapNum];
- int updatedBlockCount = 0;
- synchronized (bitmap) {
- for (long blockId : entry.getValue()) {
- if (!bitmap.contains(blockId)) {
- bitmap.addLong(blockId);
- updatedBlockCount++;
- totalUpdatedBlockCount++;
- }
- }
- }
- taskInfo.incBlockNumber(shuffleId, partitionId, updatedBlockCount);
+ ShuffleBlockIdManager manager = taskInfo.getShuffleBlockIdManager();
+ if (manager == null) {
+ throw new RssException("appId[" + appId + "] is expired!");
}
- return totalUpdatedBlockCount;
+ return manager.addFinishedBlockIds(taskInfo, appId, shuffleId,
partitionToBlockIds, bitmapNum);
}
public int updateAndGetCommitCount(String appId, int shuffleId) {
@@ -707,66 +666,12 @@ public class ShuffleTaskManager {
storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
}
}
- Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions =
partitionsToBlockIds.get(appId);
- if (shuffleIdToPartitions == null) {
- LOG.warn("Empty blockIds for app: {}. This should not happen", appId);
- return null;
- }
-
- Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId);
- if (blockIds == null) {
- LOG.warn("Empty blockIds for app: {}, shuffleId: {}", appId, shuffleId);
- return new byte[] {};
- }
-
ShuffleTaskInfo taskInfo = getShuffleTaskInfo(appId);
- long expectedBlockNumber = 0;
- Map<Integer, Set<Integer>> bitmapIndexToPartitions = Maps.newHashMap();
- for (int partitionId : partitions) {
- int bitmapIndex = partitionId % blockIds.length;
- if (bitmapIndexToPartitions.containsKey(bitmapIndex)) {
- bitmapIndexToPartitions.get(bitmapIndex).add(partitionId);
- } else {
- HashSet<Integer> newHashSet = Sets.newHashSet(partitionId);
- bitmapIndexToPartitions.put(bitmapIndex, newHashSet);
- }
- expectedBlockNumber += taskInfo.getBlockNumber(shuffleId, partitionId);
- }
-
- Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf();
- for (Map.Entry<Integer, Set<Integer>> entry :
bitmapIndexToPartitions.entrySet()) {
- Set<Integer> requestPartitions = entry.getValue();
- Roaring64NavigableMap bitmap = blockIds[entry.getKey()];
- getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout);
- }
-
- if (res.getLongCardinality() != expectedBlockNumber) {
- throw new RssException(
- "Inconsistent block number for partitions: "
- + partitions
- + ". Excepted: "
- + expectedBlockNumber
- + ", actual: "
- + res.getLongCardinality());
+ ShuffleBlockIdManager manager = taskInfo.getShuffleBlockIdManager();
+ if (manager == null) {
+ throw new RssException("appId[" + appId + "] is expired!");
}
-
- return RssUtils.serializeBitMap(res);
- }
-
- // filter the specific partition blockId in the bitmap to the resultBitmap
- protected Roaring64NavigableMap getBlockIdsByPartitionId(
- Set<Integer> requestPartitions,
- Roaring64NavigableMap bitmap,
- Roaring64NavigableMap resultBitmap,
- BlockIdLayout blockIdLayout) {
- bitmap.forEach(
- blockId -> {
- int partitionId = blockIdLayout.getPartitionId(blockId);
- if (requestPartitions.contains(partitionId)) {
- resultBitmap.addLong(blockId);
- }
- });
- return resultBitmap;
+ return manager.getFinishedBlockIds(taskInfo, appId, shuffleId, partitions,
blockIdLayout);
}
public ShuffleDataResult getInMemoryShuffleData(
@@ -890,14 +795,14 @@ public class ShuffleTaskManager {
taskInfo.getCommitCounts().remove(shuffleId);
taskInfo.getCommitLocks().remove(shuffleId);
}
+ ShuffleBlockIdManager manager = taskInfo.getShuffleBlockIdManager();
+ if (manager == null) {
+ throw new RssException("appId[" + appId + "] is expired!");
+ }
+ manager.removeBlockIdByShuffleId(appId, shuffleIds);
+ } else {
+ shuffleBlockIdManager.removeBlockIdByShuffleId(appId, shuffleIds);
}
- Optional.ofNullable(partitionsToBlockIds.get(appId))
- .ifPresent(
- x -> {
- for (Integer shuffleId : shuffleIds) {
- x.remove(shuffleId);
- }
- });
shuffleBufferManager.removeBufferByShuffleId(appId, shuffleIds);
shuffleFlushManager.removeResourcesOfShuffleId(appId, shuffleIds);
@@ -975,7 +880,11 @@ public class ShuffleTaskManager {
partitionInfoSummary.append("The app task info:
").append(shuffleTaskInfo);
LOG.info("Removing app summary info: {}", partitionInfoSummary);
- partitionsToBlockIds.remove(appId);
+ ShuffleBlockIdManager manager =
shuffleTaskInfo.getShuffleBlockIdManager();
+ if (manager != null) {
+ manager.removeBlockIdByAppId(appId);
+ }
+ shuffleBlockIdManager.removeBlockIdByAppId(appId);
shuffleBufferManager.removeBuffer(appId);
shuffleFlushManager.removeResources(appId);
@@ -1095,11 +1004,6 @@ public class ShuffleTaskManager {
return requireBufferIds;
}
- @VisibleForTesting
- public Map<String, Map<Integer, Roaring64NavigableMap[]>>
getPartitionsToBlockIds() {
- return partitionsToBlockIds;
- }
-
public void removeShuffleDataAsync(String appId, int shuffleId) {
expiredAppIdQueue.add(
new ShufflePurgeEvent(appId, getUserByAppId(appId),
Arrays.asList(shuffleId)));
@@ -1152,4 +1056,9 @@ public class ShuffleTaskManager {
protected void setShuffleFlushManager(ShuffleFlushManager flushManager) {
this.shuffleFlushManager = flushManager;
}
+
+ @VisibleForTesting
+ public ShuffleBlockIdManager getShuffleBlockIdManager() {
+ return shuffleBlockIdManager;
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java
b/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java
new file mode 100644
index 000000000..6487fd582
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.block;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.InvalidRequestException;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.JavaUtils;
+import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.server.ShuffleTaskInfo;
+
+/**
+ * The default implementation of ShuffleBlockIdManager, manage block id of all
partitions together.
+ */
+public class DefaultShuffleBlockIdManager implements ShuffleBlockIdManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultShuffleBlockIdManager.class);
+
+ // appId -> shuffleId -> blockIds to avoid too many appId
+ // store taskAttemptId info to filter speculation task
+ // Roaring64NavigableMap instance will cost much memory,
+ // merge different blockId of partition to one bitmap can reduce memory cost,
+ // but when get blockId, performance will degrade a little which can be
optimized by client
+ // configuration
+ // appId -> shuffleId -> hashId -> blockIds
+ private Map<String, Map<Integer, Roaring64NavigableMap[]>>
partitionsToBlockIds;
+
+ public DefaultShuffleBlockIdManager() {
+ this.partitionsToBlockIds = JavaUtils.newConcurrentMap();
+ }
+
+ @VisibleForTesting
+ /** Filter the specific partition blockId in the bitmap to the resultBitmap.
*/
+ public static Roaring64NavigableMap getBlockIdsByPartitionId(
+ Set<Integer> requestPartitions,
+ Roaring64NavigableMap bitmap,
+ Roaring64NavigableMap resultBitmap,
+ BlockIdLayout blockIdLayout) {
+ bitmap.forEach(
+ blockId -> {
+ int partitionId = blockIdLayout.getPartitionId(blockId);
+ if (requestPartitions.contains(partitionId)) {
+ resultBitmap.addLong(blockId);
+ }
+ });
+ return resultBitmap;
+ }
+
+ public void registerAppId(String appId) {
+ partitionsToBlockIds.computeIfAbsent(appId, key ->
JavaUtils.newConcurrentMap());
+ }
+
+ /**
+ * Add finished blockIds from client
+ *
+ * @param appId
+ * @param shuffleId
+ * @param partitionToBlockIds
+ * @param bitmapNum
+ * @return the number of added blockIds
+ */
+ public int addFinishedBlockIds(
+ ShuffleTaskInfo taskInfo,
+ String appId,
+ Integer shuffleId,
+ Map<Integer, long[]> partitionToBlockIds,
+ int bitmapNum) {
+ Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions =
partitionsToBlockIds.get(appId);
+ if (shuffleIdToPartitions == null) {
+ throw new RssException("appId[" + appId + "] is expired!");
+ }
+ shuffleIdToPartitions.computeIfAbsent(
+ shuffleId,
+ key -> {
+ Roaring64NavigableMap[] blockIds = new
Roaring64NavigableMap[bitmapNum];
+ for (int i = 0; i < bitmapNum; i++) {
+ blockIds[i] = Roaring64NavigableMap.bitmapOf();
+ }
+ return blockIds;
+ });
+ Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId);
+ if (blockIds.length != bitmapNum) {
+ throw new InvalidRequestException(
+ "Request expects "
+ + bitmapNum
+ + " bitmaps, but there are "
+ + blockIds.length
+ + " bitmaps!");
+ }
+
+ int totalUpdatedBlockCount = 0;
+ for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) {
+ Integer partitionId = entry.getKey();
+ Roaring64NavigableMap bitmap = blockIds[partitionId % bitmapNum];
+ int updatedBlockCount = 0;
+ synchronized (bitmap) {
+ for (long blockId : entry.getValue()) {
+ if (!bitmap.contains(blockId)) {
+ bitmap.addLong(blockId);
+ updatedBlockCount++;
+ totalUpdatedBlockCount++;
+ }
+ }
+ }
+ taskInfo.incBlockNumber(shuffleId, partitionId, updatedBlockCount);
+ }
+ return totalUpdatedBlockCount;
+ }
+
+ public byte[] getFinishedBlockIds(
+ ShuffleTaskInfo taskInfo,
+ String appId,
+ Integer shuffleId,
+ Set<Integer> partitions,
+ BlockIdLayout blockIdLayout)
+ throws IOException {
+ Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions =
partitionsToBlockIds.get(appId);
+ if (shuffleIdToPartitions == null) {
+ LOG.warn("Empty blockIds for app: {}. This should not happen", appId);
+ return null;
+ }
+ Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId);
+ if (blockIds == null) {
+ LOG.warn("Empty blockIds for app: {}, shuffleId: {}", appId, shuffleId);
+ return new byte[] {};
+ }
+ long expectedBlockNumber = 0;
+ Map<Integer, Set<Integer>> bitmapIndexToPartitions = Maps.newHashMap();
+ for (int partitionId : partitions) {
+ int bitmapIndex = partitionId % blockIds.length;
+ if (bitmapIndexToPartitions.containsKey(bitmapIndex)) {
+ bitmapIndexToPartitions.get(bitmapIndex).add(partitionId);
+ } else {
+ HashSet<Integer> newHashSet = Sets.newHashSet(partitionId);
+ bitmapIndexToPartitions.put(bitmapIndex, newHashSet);
+ }
+ expectedBlockNumber += taskInfo.getBlockNumber(shuffleId, partitionId);
+ }
+ Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf();
+ for (Map.Entry<Integer, Set<Integer>> entry :
bitmapIndexToPartitions.entrySet()) {
+ Set<Integer> requestPartitions = entry.getValue();
+ Roaring64NavigableMap bitmap = blockIds[entry.getKey()];
+ getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout);
+ }
+ if (res.getLongCardinality() != expectedBlockNumber) {
+ throw new RssException(
+ "Inconsistent block number for partitions: "
+ + partitions
+ + ". Excepted: "
+ + expectedBlockNumber
+ + ", actual: "
+ + res.getLongCardinality());
+ }
+ return RssUtils.serializeBitMap(res);
+ }
+
+ @Override
+ public void removeBlockIdByShuffleId(String appId, List<Integer> shuffleIds)
{
+ Optional.ofNullable(partitionsToBlockIds.get(appId))
+ .ifPresent(
+ x -> {
+ for (Integer shuffleId : shuffleIds) {
+ x.remove(shuffleId);
+ }
+ });
+ }
+
+ @Override
+ public void removeBlockIdByAppId(String appId) {
+ partitionsToBlockIds.remove(appId);
+ }
+
+ @Override
+ public long getTotalBlockCount() {
+ return partitionsToBlockIds.values().stream()
+ .flatMap(innerMap -> innerMap.values().stream())
+ .flatMapToLong(
+ arr ->
+
java.util.Arrays.stream(arr).mapToLong(Roaring64NavigableMap::getLongCardinality))
+ .sum();
+ }
+
+ @Override
+ public boolean contains(String appId) {
+ return partitionsToBlockIds.containsKey(appId);
+ }
+
+ @Override
+ public long getBitmapNum(String appId, int shuffleId) {
+ return partitionsToBlockIds.get(appId).get(shuffleId).length;
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java
b/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java
new file mode 100644
index 000000000..5e56fee83
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.block;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.JavaUtils;
+import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.server.ShuffleTaskInfo;
+
+/** Manage the block ids individual for each partition. */
+public class PartitionedShuffleBlockIdManager implements ShuffleBlockIdManager
{
+ private static final Logger LOG =
LoggerFactory.getLogger(PartitionedShuffleBlockIdManager.class);
+
+ // appId -> shuffleId -> partitionId -> blockIds
+ private Map<String, Map<Integer, Map<Integer, Roaring64NavigableMap>>>
partitionsToBlockIds;
+
+ public PartitionedShuffleBlockIdManager() {
+ this.partitionsToBlockIds = new ConcurrentHashMap<>();
+ }
+
+ public void registerAppId(String appId) {
+ partitionsToBlockIds.computeIfAbsent(appId, key ->
JavaUtils.newConcurrentMap());
+ }
+
+ @Override
+ public int addFinishedBlockIds(
+ ShuffleTaskInfo taskInfo,
+ String appId,
+ Integer shuffleId,
+ Map<Integer, long[]> partitionToBlockIds,
+ int bitmapNum) {
+ Map<Integer, Map<Integer, Roaring64NavigableMap>> shuffleIdToPartitions =
+ partitionsToBlockIds.get(appId);
+ if (shuffleIdToPartitions == null) {
+ throw new RssException("appId[" + appId + "] is expired!");
+ }
+ shuffleIdToPartitions.computeIfAbsent(shuffleId, key -> new
ConcurrentHashMap<>());
+
+ Map<Integer, Roaring64NavigableMap> partitions =
shuffleIdToPartitions.get(shuffleId);
+ int totalUpdatedBlockCount = 0;
+ for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) {
+ Integer partitionId = entry.getKey();
+ partitions.computeIfAbsent(partitionId, k ->
Roaring64NavigableMap.bitmapOf());
+ Roaring64NavigableMap bitmap = partitions.get(partitionId);
+ int updatedBlockCount = 0;
+ synchronized (bitmap) {
+ for (long blockId : entry.getValue()) {
+ if (!bitmap.contains(blockId)) {
+ bitmap.addLong(blockId);
+ updatedBlockCount++;
+ totalUpdatedBlockCount++;
+ }
+ }
+ }
+ taskInfo.incBlockNumber(shuffleId, partitionId, updatedBlockCount);
+ }
+ return totalUpdatedBlockCount;
+ }
+
+ @Override
+ public byte[] getFinishedBlockIds(
+ ShuffleTaskInfo taskInfo,
+ String appId,
+ Integer shuffleId,
+ Set<Integer> partitions,
+ BlockIdLayout blockIdLayout)
+ throws IOException {
+ Map<Integer, Map<Integer, Roaring64NavigableMap>> shuffleIdToPartitions =
+ partitionsToBlockIds.get(appId);
+ if (shuffleIdToPartitions == null) {
+ LOG.warn("Empty blockIds for app: {}. This should not happen", appId);
+ return null;
+ }
+
+ Map<Integer, Roaring64NavigableMap> partitionToBlockId =
shuffleIdToPartitions.get(shuffleId);
+
+ long expectedBlockNumber = 0;
+ Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf();
+ for (int partitionId : partitions) {
+ expectedBlockNumber += taskInfo.getBlockNumber(shuffleId, partitionId);
+ Roaring64NavigableMap bitmap = partitionToBlockId.get(partitionId);
+ res.or(bitmap);
+ }
+
+ if (res.getLongCardinality() != expectedBlockNumber) {
+ throw new RssException(
+ "Inconsistent block number for partitions: "
+ + partitions
+ + ". Excepted: "
+ + expectedBlockNumber
+ + ", actual: "
+ + res.getLongCardinality());
+ }
+
+ return RssUtils.serializeBitMap(res);
+ }
+
+ @Override
+ public void removeBlockIdByAppId(String appId) {
+ partitionsToBlockIds.remove(appId);
+ }
+
+ @Override
+ public void removeBlockIdByShuffleId(String appId, List<Integer> shuffleIds)
{
+ Optional.ofNullable(partitionsToBlockIds.get(appId))
+ .ifPresent(
+ x -> {
+ for (Integer shuffleId : shuffleIds) {
+ x.remove(shuffleId);
+ }
+ });
+ }
+
+ @Override
+ public long getTotalBlockCount() {
+ return partitionsToBlockIds.values().stream()
+ .flatMap(innerMap -> innerMap.values().stream())
+ .flatMap(innerMap -> innerMap.values().stream())
+ .mapToLong(roaringMap -> roaringMap.getLongCardinality())
+ .sum();
+ }
+
+ @Override
+ public boolean contains(String appId) {
+ return partitionsToBlockIds.containsKey(appId);
+ }
+
+ @Override
+ public long getBitmapNum(String appId, int shuffleId) {
+ return partitionsToBlockIds.get(appId).get(shuffleId).size();
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java
b/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java
new file mode 100644
index 000000000..635e0c394
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.block;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.server.ShuffleTaskInfo;
+
+/** The implementation of this interface to manage the shuffle block ids. */
+public interface ShuffleBlockIdManager {
+ void registerAppId(String appId);
+
+ int addFinishedBlockIds(
+ ShuffleTaskInfo taskInfo,
+ String appId,
+ Integer shuffleId,
+ Map<Integer, long[]> partitionToBlockIds,
+ int bitmapNum);
+
+ byte[] getFinishedBlockIds(
+ ShuffleTaskInfo taskInfo,
+ String appId,
+ Integer shuffleId,
+ Set<Integer> partitions,
+ BlockIdLayout blockIdLayout)
+ throws IOException;
+
+ void removeBlockIdByShuffleId(String appId, List<Integer> shuffleIds);
+
+ void removeBlockIdByAppId(String appId);
+
+ long getTotalBlockCount();
+
+ boolean contains(String testAppId);
+
+ long getBitmapNum(String appId, int shuffleId);
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManagerFactory.java
b/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManagerFactory.java
new file mode 100644
index 000000000..e76b8d72d
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManagerFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.block;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.server.ShuffleServerConf;
+
+public class ShuffleBlockIdManagerFactory {
+ public static ShuffleBlockIdManager
createShuffleBlockIdManager(ShuffleServerConf conf) {
+ String className =
conf.get(ShuffleServerConf.SERVER_BLOCK_ID_MANAGER_CLASS);
+ return createShuffleBlockIdManager(
+ className, ShuffleServerConf.SERVER_BLOCK_ID_MANAGER_CLASS.key());
+ }
+
+ public static ShuffleBlockIdManager createShuffleBlockIdManager(
+ String className, String configKey) {
+ if (StringUtils.isEmpty(className)) {
+ throw new IllegalStateException(
+ "Configuration error: " + configKey + " should not set to empty");
+ }
+
+ try {
+ return (ShuffleBlockIdManager)
RssUtils.getConstructor(className).newInstance();
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Configuration error: " + configKey + " is failed to create instance
of " + className, e);
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 2a67c548e..c6484b175 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -59,6 +59,7 @@ import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.server.block.DefaultShuffleBlockIdManager;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
import org.apache.uniffle.server.buffer.ShuffleBuffer;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
@@ -816,7 +817,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
}
}
Roaring64NavigableMap resultBlockIds =
- shuffleTaskManager.getBlockIdsByPartitionId(
+ DefaultShuffleBlockIdManager.getBlockIdsByPartitionId(
Sets.newHashSet(expectedPartitionId),
bitmapBlockIds,
Roaring64NavigableMap.bitmapOf(),
@@ -825,7 +826,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
bitmapBlockIds.addLong(layout.getBlockId(0, 0, 0));
resultBlockIds =
- shuffleTaskManager.getBlockIdsByPartitionId(
+ DefaultShuffleBlockIdManager.getBlockIdsByPartitionId(
Sets.newHashSet(0), bitmapBlockIds,
Roaring64NavigableMap.bitmapOf(), layout);
assertEquals(Roaring64NavigableMap.bitmapOf(0L), resultBlockIds);
@@ -833,7 +834,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
layout.getBlockId(layout.maxSequenceNo, layout.maxPartitionId,
layout.maxTaskAttemptId);
bitmapBlockIds.addLong(expectedBlockId);
resultBlockIds =
- shuffleTaskManager.getBlockIdsByPartitionId(
+ DefaultShuffleBlockIdManager.getBlockIdsByPartitionId(
Sets.newHashSet(layout.maxPartitionId),
bitmapBlockIds,
Roaring64NavigableMap.bitmapOf(),
@@ -872,12 +873,12 @@ public class ShuffleTaskManagerTest extends
HadoopTestBase {
}
Roaring64NavigableMap resultBlockIds =
- shuffleTaskManager.getBlockIdsByPartitionId(
+ DefaultShuffleBlockIdManager.getBlockIdsByPartitionId(
requestPartitions, bitmapBlockIds,
Roaring64NavigableMap.bitmapOf(), layout);
assertEquals(expectedBlockIds, resultBlockIds);
assertEquals(
bitmapBlockIds,
- shuffleTaskManager.getBlockIdsByPartitionId(
+ DefaultShuffleBlockIdManager.getBlockIdsByPartitionId(
allPartitions, bitmapBlockIds, Roaring64NavigableMap.bitmapOf(),
layout));
}