This is an automated email from the ASF dual-hosted git repository. roryqi pushed a commit to branch branch-0.10 in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit 6951e367ed79bb1d75f4cc965a2dacc7b8f70077 Author: maobaolong <[email protected]> AuthorDate: Mon Nov 18 14:16:21 2024 +0800 [#2219] feat: Introduce ShuffleBlockIdManagerFactory and PartitionedShuffleBlockIdManager (#2227) ### What changes were proposed in this pull request? - Introduce a ShuffleBlockIdManagerFactory to create configured implementation. - Introduce PartitionedShuffleBlockIdManager to individual bitmap for each partition. ### Why are the changes needed? Fix: #2219 <img width="1596" alt="image" src="https://github.com/user-attachments/assets/c7752f94-941b-45d2-9a45-1da197e38984"> This is the maximum partition num with node metrics in our cluster, it is ~410K, it cost heap size 410K * 50KiB = 2GiB if we say a bitmap related a partition avg 10000 blocks(cost 50KiB heap). I think this is worth to choose this new policy in our production cluster. ### Does this PR introduce _any_ user-facing change? - Config option: rss.server.blockIdStrategyClass ### How was this patch tested? Test Locally, check result by `arthas` - Common case, test with default config ``` [arthas@264]$ vmtool --action getInstances --className org.apache.uniffle.server.ShuffleTaskInfo --express 'instances[0].shuffleBlockIdManager' @DefaultShuffleBlockIdManager[ LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@3f211bae], partitionsToBlockIds=@ConcurrentHashMapForJDK8[isEmpty=false;size=1], ] [arthas@264]$ vmtool --action getInstances --className org.apache.uniffle.server.ShuffleTaskInfo --express 'instances[0].shuffleBlockIdManager.partitionsToBlockIds.get("application_1729845342052_431031_1730297736664") ' @ConcurrentHashMapForJDK8[ @Integer[0]:@Roaring64NavigableMap[][isEmpty=false;size=10], @Integer[1]:@Roaring64NavigableMap[][isEmpty=false;size=10], @Integer[2]:@Roaring64NavigableMap[][isEmpty=false;size=10], ] ``` - Config to PartitionedShuffleBlockIdManager ``` [arthas@264]$ vmtool --action getInstances --className org.apache.uniffle.server.ShuffleTaskInfo --express 'instances[0].shuffleBlockIdManager' @PartitionedShuffleBlockIdManager[ LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@6aa6fea0], partitionsToBlockIds=@ConcurrentHashMap[isEmpty=false;size=1], ] [arthas@264]$ vmtool --action getInstances --className org.apache.uniffle.server.ShuffleTaskInfo --express 'instances[0].shuffleBlockIdManager.partitionsToBlockIds' @ConcurrentHashMap[ @String[application_1729845342052_432324_1730299397916]:@ConcurrentHashMapForJDK8[isEmpty=false;size=2], ] [arthas@264]$ vmtool --action getInstances --className org.apache.uniffle.server.ShuffleTaskInfo --express 'instances[0].shuffleBlockIdManager.partitionsToBlockIds.get("application_1729845342052_432324_1730299397916")' @ConcurrentHashMapForJDK8[ @Integer[0]:@ConcurrentHashMap[isEmpty=false;size=2000], @Integer[1]:@ConcurrentHashMap[isEmpty=false;size=2000], ] ``` - Use client app level config submit two app with `spark.rss.client.blockIdStrategyClass=org.apache.uniffle.server.block.DefaultShuffleBlockIdManager` and `spark.rss.client.blockIdStrategyClass=org.apache.uniffle.server.block.PartitionedShuffleBlockIdManager` ``` [arthas@264]$ vmtool --action getInstances --className org.apache.uniffle.server.ShuffleTaskInfo --express 'instances[0].shuffleBlockIdManager' @DefaultShuffleBlockIdManager[ LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@ed3b52a], partitionsToBlockIds=@ConcurrentHashMapForJDK8[isEmpty=false;size=1], ] [arthas@264]$ vmtool --action getInstances --className org.apache.uniffle.server.ShuffleTaskInfo --express 'instances[1].shuffleBlockIdManager' @PartitionedShuffleBlockIdManager[ LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@5fc14628], partitionsToBlockIds=@ConcurrentHashMap[isEmpty=false;size=1], ] ``` Log example ``` [2024-10-30 23:01:37.585] [Grpc-709] [INFO] ShuffleTaskInfo - application_1729845342052_432387_1730300440568 use app configured ShuffleBlockIdManager to org.apache.uniffle.server.block.DefaultShuffleBlockIdManager@3c6b75a8 ``` --- .../uniffle/common/config/RssClientConf.java | 8 + docs/client_guide/client_guide.md | 1 + docs/server_guide.md | 1 + .../apache/uniffle/test/ShuffleServerGrpcTest.java | 9 +- .../uniffle/test/ShuffleServerWithLocalTest.java | 12 +- .../apache/uniffle/server/ShuffleServerConf.java | 8 + .../org/apache/uniffle/server/ShuffleTaskInfo.java | 34 +++- .../apache/uniffle/server/ShuffleTaskManager.java | 167 ++++------------ .../server/block/DefaultShuffleBlockIdManager.java | 220 +++++++++++++++++++++ .../block/PartitionedShuffleBlockIdManager.java | 159 +++++++++++++++ .../server/block/ShuffleBlockIdManager.java | 56 ++++++ .../server/block/ShuffleBlockIdManagerFactory.java | 46 +++++ .../uniffle/server/ShuffleTaskManagerTest.java | 11 +- 13 files changed, 590 insertions(+), 142 deletions(-) diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java index 58b6d6a91..6d311a549 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java @@ -295,4 +295,12 @@ public class RssClientConf { .asList() .noDefaultValue() .withDescription("the extra java properties could be configured by this option"); + + public static final ConfigOption<String> RSS_CLIENT_BLOCK_ID_MANAGER_CLASS = + ConfigOptions.key("rss.client.blockIdManagerClass") + .stringType() + .noDefaultValue() + .withDescription( + "The block id manager class of server for this application, " + + "the implementation of this interface to manage the shuffle block ids"); } diff --git a/docs/client_guide/client_guide.md b/docs/client_guide/client_guide.md index bf7bb5d16..371528931 100644 --- a/docs/client_guide/client_guide.md +++ b/docs/client_guide/client_guide.md @@ -59,6 +59,7 @@ The important configuration of client is listed as following. These configuratio | <client_type>.rss.client.rpc.netty.pageSize | 4096 | The value of pageSize for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. [...] | <client_type>.rss.client.rpc.netty.maxOrder | 3 | The value of maxOrder for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. [...] | <client_type>.rss.client.rpc.netty.smallCacheSize | 1024 | The value of smallCacheSize for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. [...] +| <client_type>.rss.client.blockIdManagerClass | - | The block id manager class of server for this application, the implementation of this interface to manage the shuffle block ids [...] Notice: diff --git a/docs/server_guide.md b/docs/server_guide.md index d975ce324..3c6b7750a 100644 --- a/docs/server_guide.md +++ b/docs/server_guide.md @@ -121,6 +121,7 @@ This document will introduce how to deploy Uniffle shuffle servers. | rss.storage.localFileWriterClass | org.apache.uniffle.storage.handler.impl.LocalFileWriter | The writer class to write shuffle data for LOCALFILE. [...] | rss.storage.hdfs.write.dataBufferSize | 8K | The size of the buffer used to cache data written for HDFS. [...] | rss.storage.hdfs.write.indexBufferSize | 8K | The size of the buffer used to cache index written for HDFS. [...] +| rss.server.blockIdManagerClass | org.apache.uniffle.server.block.DefaultShuffleBlockIdManager | The block id manager class. It is used to manage block id. [...] ### Advanced Configurations | Property Name | Default | Description | diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java index 1ee10a8aa..e188e04b6 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java @@ -352,14 +352,13 @@ public class ShuffleServerGrpcTest extends IntegrationTestBase { request = new RssReportShuffleResultRequest("shuffleResultTest", 2, 1L, partitionToBlockIds, 3); grpcShuffleServerClient.reportShuffleResult(request); // validate bitmap in shuffleTaskManager - Roaring64NavigableMap[] bitmaps = + long bitmapNum = grpcShuffleServers .get(0) .getShuffleTaskManager() - .getPartitionsToBlockIds() - .get("shuffleResultTest") - .get(2); - assertEquals(3, bitmaps.length); + .getShuffleBlockIdManager() + .getBitmapNum("shuffleResultTest", 2); + assertEquals(3, bitmapNum); req = new RssGetShuffleResultRequest("shuffleResultTest", 2, 1, layout); result = grpcShuffleServerClient.getShuffleResult(req); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java index a77472e6a..56c3a6a09 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java @@ -179,10 +179,18 @@ public class ShuffleServerWithLocalTest extends ShuffleReadWriteBase { List<ShuffleServer> shuffleServers = isNettyMode ? nettyShuffleServers : grpcShuffleServers; assertNotNull( - shuffleServers.get(0).getShuffleTaskManager().getPartitionsToBlockIds().get(testAppId)); + shuffleServers + .get(0) + .getShuffleTaskManager() + .getShuffleBlockIdManager() + .contains(testAppId)); Thread.sleep(8000); assertNull( - shuffleServers.get(0).getShuffleTaskManager().getPartitionsToBlockIds().get(testAppId)); + shuffleServers + .get(0) + .getShuffleTaskManager() + .getShuffleBlockIdManager() + .contains(testAppId)); } protected void validateResult( diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java index fd2f5802c..8888e60b7 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java @@ -25,6 +25,7 @@ import org.apache.uniffle.common.config.ConfigOption; import org.apache.uniffle.common.config.ConfigOptions; import org.apache.uniffle.common.config.ConfigUtils; import org.apache.uniffle.common.config.RssBaseConf; +import org.apache.uniffle.server.block.DefaultShuffleBlockIdManager; import org.apache.uniffle.server.buffer.ShuffleBufferType; public class ShuffleServerConf extends RssBaseConf { @@ -743,6 +744,13 @@ public class ShuffleServerConf extends RssBaseConf { .booleanType() .defaultValue(false) .withDescription("Whether to enable app detail log"); + public static final ConfigOption<String> SERVER_BLOCK_ID_MANAGER_CLASS = + ConfigOptions.key("rss.server.blockIdManagerClass") + .stringType() + .defaultValue(DefaultShuffleBlockIdManager.class.getName()) + .withDescription( + "The block id manager class, the implementation of this interface " + + "to manage the shuffle block ids"); public ShuffleServerConf() {} diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java index 11ac2dff3..6b2a89554 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java @@ -26,14 +26,19 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import com.google.common.collect.Sets; +import org.apache.commons.lang3.StringUtils; import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.uniffle.common.PartitionInfo; import org.apache.uniffle.common.ShuffleDataDistributionType; +import org.apache.uniffle.common.config.RssClientConf; +import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.common.util.JavaUtils; import org.apache.uniffle.common.util.UnitConverter; +import org.apache.uniffle.server.block.ShuffleBlockIdManager; +import org.apache.uniffle.server.block.ShuffleBlockIdManagerFactory; /** * ShuffleTaskInfo contains the information of submitting the shuffle, the information of the cache @@ -78,6 +83,7 @@ public class ShuffleTaskInfo { private final Map<Integer, Integer> latestStageAttemptNumbers; private Map<String, String> properties; + private ShuffleBlockIdManager shuffleBlockIdManager; public ShuffleTaskInfo(String appId) { this.appId = appId; @@ -324,6 +330,32 @@ public class ShuffleTaskInfo { .filter(entry -> entry.getKey().contains(".rss.")) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); this.properties = filteredProperties; - LOGGER.info("{} set properties to {}", appId, properties); + LOGGER.info("{} set properties to {}", appId, filteredProperties); + String keyName = RssClientConf.RSS_CLIENT_BLOCK_ID_MANAGER_CLASS.key(); + String className = properties.get(keyName); + if (StringUtils.isEmpty(className)) { + keyName = + Constants.SPARK_RSS_CONFIG_PREFIX + RssClientConf.RSS_CLIENT_BLOCK_ID_MANAGER_CLASS.key(); + className = + properties.get( + Constants.SPARK_RSS_CONFIG_PREFIX + + RssClientConf.RSS_CLIENT_BLOCK_ID_MANAGER_CLASS.key()); + } + if (StringUtils.isNotEmpty(className)) { + shuffleBlockIdManager = + ShuffleBlockIdManagerFactory.createShuffleBlockIdManager(className, keyName); + LOGGER.info( + "{} use app configured ShuffleBlockIdManager to {}", appId, shuffleBlockIdManager); + } + } + + public ShuffleBlockIdManager getShuffleBlockIdManager() { + return shuffleBlockIdManager; + } + + public void setShuffleBlockIdManagerIfNeeded(ShuffleBlockIdManager shuffleBlockIdManager) { + if (this.shuffleBlockIdManager == null) { + this.shuffleBlockIdManager = shuffleBlockIdManager; + } } } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 6211e6673..d6a9a3fe9 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -21,10 +21,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -43,7 +41,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Queues; import com.google.common.collect.Range; import com.google.common.collect.Sets; @@ -76,6 +73,8 @@ import org.apache.uniffle.common.util.OutputUtils; import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.common.util.ThreadUtils; import org.apache.uniffle.common.util.UnitConverter; +import org.apache.uniffle.server.block.ShuffleBlockIdManager; +import org.apache.uniffle.server.block.ShuffleBlockIdManagerFactory; import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo; import org.apache.uniffle.server.buffer.ShuffleBuffer; import org.apache.uniffle.server.buffer.ShuffleBufferManager; @@ -115,13 +114,6 @@ public class ShuffleTaskManager { private long commitCheckIntervalMax; private long leakShuffleDataCheckInterval; private long triggerFlushInterval; - // appId -> shuffleId -> blockIds to avoid too many appId - // store taskAttemptId info to filter speculation task - // Roaring64NavigableMap instance will cost much memory, - // merge different blockId of partition to one bitmap can reduce memory cost, - // but when get blockId, performance will degrade a little which can be optimized by client - // configuration - private Map<String, Map<Integer, Roaring64NavigableMap[]>> partitionsToBlockIds; private final ShuffleBufferManager shuffleBufferManager; private Map<String, ShuffleTaskInfo> shuffleTaskInfos = JavaUtils.newConcurrentMap(); private Map<Long, PreAllocatedBufferInfo> requireBufferIds = JavaUtils.newConcurrentMap(); @@ -130,6 +122,7 @@ public class ShuffleTaskManager { private final Cache<String, ReentrantReadWriteLock> appLocks; private final long storageRemoveOperationTimeoutSec; private ShuffleMergeManager shuffleMergeManager; + private ShuffleBlockIdManager shuffleBlockIdManager; public ShuffleTaskManager( ShuffleServerConf conf, @@ -147,7 +140,6 @@ public class ShuffleTaskManager { ShuffleMergeManager shuffleMergeManager) { this.conf = conf; this.shuffleFlushManager = shuffleFlushManager; - this.partitionsToBlockIds = JavaUtils.newConcurrentMap(); this.shuffleBufferManager = shuffleBufferManager; this.storageManager = storageManager; this.shuffleMergeManager = shuffleMergeManager; @@ -200,6 +192,8 @@ public class ShuffleTaskManager { shuffleBufferManager.setShuffleTaskManager(this); } + shuffleBlockIdManager = ShuffleBlockIdManagerFactory.createShuffleBlockIdManager(conf); + appLocks = CacheBuilder.newBuilder() .expireAfterAccess(3600, TimeUnit.SECONDS) @@ -260,13 +254,12 @@ public class ShuffleTaskManager { ShuffleServerMetrics.addLabeledCacheGauge( REPORTED_BLOCK_COUNT, () -> - partitionsToBlockIds.values().stream() - .flatMap(innerMap -> innerMap.values().stream()) - .flatMapToLong( - arr -> - java.util.Arrays.stream(arr) - .mapToLong(Roaring64NavigableMap::getLongCardinality)) - .sum(), + shuffleBlockIdManager.getTotalBlockCount() + + shuffleTaskInfos.values().stream() + .map(ShuffleTaskInfo::getShuffleBlockIdManager) + .filter(manager -> manager != null && manager != shuffleBlockIdManager) + .mapToLong(ShuffleBlockIdManager::getTotalBlockCount) + .sum(), 2 * 60 * 1000L /* 2 minutes */); ShuffleServerMetrics.addLabeledCacheGauge( CACHED_BLOCK_COUNT, @@ -339,8 +332,9 @@ public class ShuffleTaskManager { getMaxConcurrencyWriting(maxConcurrencyPerPartitionToWrite, conf)) .dataDistributionType(dataDistType) .build()); + taskInfo.setShuffleBlockIdManagerIfNeeded(shuffleBlockIdManager); - partitionsToBlockIds.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap()); + taskInfo.getShuffleBlockIdManager().registerAppId(appId); for (PartitionRange partitionRange : partitionRanges) { shuffleBufferManager.registerBuffer( appId, shuffleId, partitionRange.getStart(), partitionRange.getEnd()); @@ -467,51 +461,16 @@ public class ShuffleTaskManager { public int addFinishedBlockIds( String appId, Integer shuffleId, Map<Integer, long[]> partitionToBlockIds, int bitmapNum) { refreshAppId(appId); - Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId); - if (shuffleIdToPartitions == null) { - throw new RssException("appId[" + appId + "] is expired!"); - } - shuffleIdToPartitions.computeIfAbsent( - shuffleId, - key -> { - Roaring64NavigableMap[] blockIds = new Roaring64NavigableMap[bitmapNum]; - for (int i = 0; i < bitmapNum; i++) { - blockIds[i] = Roaring64NavigableMap.bitmapOf(); - } - return blockIds; - }); - Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId); - if (blockIds.length != bitmapNum) { - throw new InvalidRequestException( - "Request expects " - + bitmapNum - + " bitmaps, but there are " - + blockIds.length - + " bitmaps!"); - } - ShuffleTaskInfo taskInfo = getShuffleTaskInfo(appId); if (taskInfo == null) { throw new InvalidRequestException( "ShuffleTaskInfo is not found that should not happen for appId: " + appId); } - int totalUpdatedBlockCount = 0; - for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) { - Integer partitionId = entry.getKey(); - Roaring64NavigableMap bitmap = blockIds[partitionId % bitmapNum]; - int updatedBlockCount = 0; - synchronized (bitmap) { - for (long blockId : entry.getValue()) { - if (!bitmap.contains(blockId)) { - bitmap.addLong(blockId); - updatedBlockCount++; - totalUpdatedBlockCount++; - } - } - } - taskInfo.incBlockNumber(shuffleId, partitionId, updatedBlockCount); + ShuffleBlockIdManager manager = taskInfo.getShuffleBlockIdManager(); + if (manager == null) { + throw new RssException("appId[" + appId + "] is expired!"); } - return totalUpdatedBlockCount; + return manager.addFinishedBlockIds(taskInfo, appId, shuffleId, partitionToBlockIds, bitmapNum); } public int updateAndGetCommitCount(String appId, int shuffleId) { @@ -707,66 +666,12 @@ public class ShuffleTaskManager { storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId)); } } - Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId); - if (shuffleIdToPartitions == null) { - LOG.warn("Empty blockIds for app: {}. This should not happen", appId); - return null; - } - - Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId); - if (blockIds == null) { - LOG.warn("Empty blockIds for app: {}, shuffleId: {}", appId, shuffleId); - return new byte[] {}; - } - ShuffleTaskInfo taskInfo = getShuffleTaskInfo(appId); - long expectedBlockNumber = 0; - Map<Integer, Set<Integer>> bitmapIndexToPartitions = Maps.newHashMap(); - for (int partitionId : partitions) { - int bitmapIndex = partitionId % blockIds.length; - if (bitmapIndexToPartitions.containsKey(bitmapIndex)) { - bitmapIndexToPartitions.get(bitmapIndex).add(partitionId); - } else { - HashSet<Integer> newHashSet = Sets.newHashSet(partitionId); - bitmapIndexToPartitions.put(bitmapIndex, newHashSet); - } - expectedBlockNumber += taskInfo.getBlockNumber(shuffleId, partitionId); - } - - Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf(); - for (Map.Entry<Integer, Set<Integer>> entry : bitmapIndexToPartitions.entrySet()) { - Set<Integer> requestPartitions = entry.getValue(); - Roaring64NavigableMap bitmap = blockIds[entry.getKey()]; - getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout); - } - - if (res.getLongCardinality() != expectedBlockNumber) { - throw new RssException( - "Inconsistent block number for partitions: " - + partitions - + ". Excepted: " - + expectedBlockNumber - + ", actual: " - + res.getLongCardinality()); + ShuffleBlockIdManager manager = taskInfo.getShuffleBlockIdManager(); + if (manager == null) { + throw new RssException("appId[" + appId + "] is expired!"); } - - return RssUtils.serializeBitMap(res); - } - - // filter the specific partition blockId in the bitmap to the resultBitmap - protected Roaring64NavigableMap getBlockIdsByPartitionId( - Set<Integer> requestPartitions, - Roaring64NavigableMap bitmap, - Roaring64NavigableMap resultBitmap, - BlockIdLayout blockIdLayout) { - bitmap.forEach( - blockId -> { - int partitionId = blockIdLayout.getPartitionId(blockId); - if (requestPartitions.contains(partitionId)) { - resultBitmap.addLong(blockId); - } - }); - return resultBitmap; + return manager.getFinishedBlockIds(taskInfo, appId, shuffleId, partitions, blockIdLayout); } public ShuffleDataResult getInMemoryShuffleData( @@ -890,14 +795,14 @@ public class ShuffleTaskManager { taskInfo.getCommitCounts().remove(shuffleId); taskInfo.getCommitLocks().remove(shuffleId); } + ShuffleBlockIdManager manager = taskInfo.getShuffleBlockIdManager(); + if (manager == null) { + throw new RssException("appId[" + appId + "] is expired!"); + } + manager.removeBlockIdByShuffleId(appId, shuffleIds); + } else { + shuffleBlockIdManager.removeBlockIdByShuffleId(appId, shuffleIds); } - Optional.ofNullable(partitionsToBlockIds.get(appId)) - .ifPresent( - x -> { - for (Integer shuffleId : shuffleIds) { - x.remove(shuffleId); - } - }); shuffleBufferManager.removeBufferByShuffleId(appId, shuffleIds); shuffleFlushManager.removeResourcesOfShuffleId(appId, shuffleIds); @@ -975,7 +880,11 @@ public class ShuffleTaskManager { partitionInfoSummary.append("The app task info: ").append(shuffleTaskInfo); LOG.info("Removing app summary info: {}", partitionInfoSummary); - partitionsToBlockIds.remove(appId); + ShuffleBlockIdManager manager = shuffleTaskInfo.getShuffleBlockIdManager(); + if (manager != null) { + manager.removeBlockIdByAppId(appId); + } + shuffleBlockIdManager.removeBlockIdByAppId(appId); shuffleBufferManager.removeBuffer(appId); shuffleFlushManager.removeResources(appId); @@ -1095,11 +1004,6 @@ public class ShuffleTaskManager { return requireBufferIds; } - @VisibleForTesting - public Map<String, Map<Integer, Roaring64NavigableMap[]>> getPartitionsToBlockIds() { - return partitionsToBlockIds; - } - public void removeShuffleDataAsync(String appId, int shuffleId) { expiredAppIdQueue.add( new ShufflePurgeEvent(appId, getUserByAppId(appId), Arrays.asList(shuffleId))); @@ -1152,4 +1056,9 @@ public class ShuffleTaskManager { protected void setShuffleFlushManager(ShuffleFlushManager flushManager) { this.shuffleFlushManager = flushManager; } + + @VisibleForTesting + public ShuffleBlockIdManager getShuffleBlockIdManager() { + return shuffleBlockIdManager; + } } diff --git a/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java b/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java new file mode 100644 index 000000000..6487fd582 --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.block; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.roaringbitmap.longlong.Roaring64NavigableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.exception.InvalidRequestException; +import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.common.util.BlockIdLayout; +import org.apache.uniffle.common.util.JavaUtils; +import org.apache.uniffle.common.util.RssUtils; +import org.apache.uniffle.server.ShuffleTaskInfo; + +/** + * The default implementation of ShuffleBlockIdManager, manage block id of all partitions together. + */ +public class DefaultShuffleBlockIdManager implements ShuffleBlockIdManager { + private static final Logger LOG = LoggerFactory.getLogger(DefaultShuffleBlockIdManager.class); + + // appId -> shuffleId -> blockIds to avoid too many appId + // store taskAttemptId info to filter speculation task + // Roaring64NavigableMap instance will cost much memory, + // merge different blockId of partition to one bitmap can reduce memory cost, + // but when get blockId, performance will degrade a little which can be optimized by client + // configuration + // appId -> shuffleId -> hashId -> blockIds + private Map<String, Map<Integer, Roaring64NavigableMap[]>> partitionsToBlockIds; + + public DefaultShuffleBlockIdManager() { + this.partitionsToBlockIds = JavaUtils.newConcurrentMap(); + } + + @VisibleForTesting + /** Filter the specific partition blockId in the bitmap to the resultBitmap. */ + public static Roaring64NavigableMap getBlockIdsByPartitionId( + Set<Integer> requestPartitions, + Roaring64NavigableMap bitmap, + Roaring64NavigableMap resultBitmap, + BlockIdLayout blockIdLayout) { + bitmap.forEach( + blockId -> { + int partitionId = blockIdLayout.getPartitionId(blockId); + if (requestPartitions.contains(partitionId)) { + resultBitmap.addLong(blockId); + } + }); + return resultBitmap; + } + + public void registerAppId(String appId) { + partitionsToBlockIds.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap()); + } + + /** + * Add finished blockIds from client + * + * @param appId + * @param shuffleId + * @param partitionToBlockIds + * @param bitmapNum + * @return the number of added blockIds + */ + public int addFinishedBlockIds( + ShuffleTaskInfo taskInfo, + String appId, + Integer shuffleId, + Map<Integer, long[]> partitionToBlockIds, + int bitmapNum) { + Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId); + if (shuffleIdToPartitions == null) { + throw new RssException("appId[" + appId + "] is expired!"); + } + shuffleIdToPartitions.computeIfAbsent( + shuffleId, + key -> { + Roaring64NavigableMap[] blockIds = new Roaring64NavigableMap[bitmapNum]; + for (int i = 0; i < bitmapNum; i++) { + blockIds[i] = Roaring64NavigableMap.bitmapOf(); + } + return blockIds; + }); + Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId); + if (blockIds.length != bitmapNum) { + throw new InvalidRequestException( + "Request expects " + + bitmapNum + + " bitmaps, but there are " + + blockIds.length + + " bitmaps!"); + } + + int totalUpdatedBlockCount = 0; + for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) { + Integer partitionId = entry.getKey(); + Roaring64NavigableMap bitmap = blockIds[partitionId % bitmapNum]; + int updatedBlockCount = 0; + synchronized (bitmap) { + for (long blockId : entry.getValue()) { + if (!bitmap.contains(blockId)) { + bitmap.addLong(blockId); + updatedBlockCount++; + totalUpdatedBlockCount++; + } + } + } + taskInfo.incBlockNumber(shuffleId, partitionId, updatedBlockCount); + } + return totalUpdatedBlockCount; + } + + public byte[] getFinishedBlockIds( + ShuffleTaskInfo taskInfo, + String appId, + Integer shuffleId, + Set<Integer> partitions, + BlockIdLayout blockIdLayout) + throws IOException { + Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId); + if (shuffleIdToPartitions == null) { + LOG.warn("Empty blockIds for app: {}. This should not happen", appId); + return null; + } + Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId); + if (blockIds == null) { + LOG.warn("Empty blockIds for app: {}, shuffleId: {}", appId, shuffleId); + return new byte[] {}; + } + long expectedBlockNumber = 0; + Map<Integer, Set<Integer>> bitmapIndexToPartitions = Maps.newHashMap(); + for (int partitionId : partitions) { + int bitmapIndex = partitionId % blockIds.length; + if (bitmapIndexToPartitions.containsKey(bitmapIndex)) { + bitmapIndexToPartitions.get(bitmapIndex).add(partitionId); + } else { + HashSet<Integer> newHashSet = Sets.newHashSet(partitionId); + bitmapIndexToPartitions.put(bitmapIndex, newHashSet); + } + expectedBlockNumber += taskInfo.getBlockNumber(shuffleId, partitionId); + } + Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf(); + for (Map.Entry<Integer, Set<Integer>> entry : bitmapIndexToPartitions.entrySet()) { + Set<Integer> requestPartitions = entry.getValue(); + Roaring64NavigableMap bitmap = blockIds[entry.getKey()]; + getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout); + } + if (res.getLongCardinality() != expectedBlockNumber) { + throw new RssException( + "Inconsistent block number for partitions: " + + partitions + + ". Excepted: " + + expectedBlockNumber + + ", actual: " + + res.getLongCardinality()); + } + return RssUtils.serializeBitMap(res); + } + + @Override + public void removeBlockIdByShuffleId(String appId, List<Integer> shuffleIds) { + Optional.ofNullable(partitionsToBlockIds.get(appId)) + .ifPresent( + x -> { + for (Integer shuffleId : shuffleIds) { + x.remove(shuffleId); + } + }); + } + + @Override + public void removeBlockIdByAppId(String appId) { + partitionsToBlockIds.remove(appId); + } + + @Override + public long getTotalBlockCount() { + return partitionsToBlockIds.values().stream() + .flatMap(innerMap -> innerMap.values().stream()) + .flatMapToLong( + arr -> + java.util.Arrays.stream(arr).mapToLong(Roaring64NavigableMap::getLongCardinality)) + .sum(); + } + + @Override + public boolean contains(String appId) { + return partitionsToBlockIds.containsKey(appId); + } + + @Override + public long getBitmapNum(String appId, int shuffleId) { + return partitionsToBlockIds.get(appId).get(shuffleId).length; + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java b/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java new file mode 100644 index 000000000..5e56fee83 --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.block; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.roaringbitmap.longlong.Roaring64NavigableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.common.util.BlockIdLayout; +import org.apache.uniffle.common.util.JavaUtils; +import org.apache.uniffle.common.util.RssUtils; +import org.apache.uniffle.server.ShuffleTaskInfo; + +/** Manage the block ids individual for each partition. */ +public class PartitionedShuffleBlockIdManager implements ShuffleBlockIdManager { + private static final Logger LOG = LoggerFactory.getLogger(PartitionedShuffleBlockIdManager.class); + + // appId -> shuffleId -> partitionId -> blockIds + private Map<String, Map<Integer, Map<Integer, Roaring64NavigableMap>>> partitionsToBlockIds; + + public PartitionedShuffleBlockIdManager() { + this.partitionsToBlockIds = new ConcurrentHashMap<>(); + } + + public void registerAppId(String appId) { + partitionsToBlockIds.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap()); + } + + @Override + public int addFinishedBlockIds( + ShuffleTaskInfo taskInfo, + String appId, + Integer shuffleId, + Map<Integer, long[]> partitionToBlockIds, + int bitmapNum) { + Map<Integer, Map<Integer, Roaring64NavigableMap>> shuffleIdToPartitions = + partitionsToBlockIds.get(appId); + if (shuffleIdToPartitions == null) { + throw new RssException("appId[" + appId + "] is expired!"); + } + shuffleIdToPartitions.computeIfAbsent(shuffleId, key -> new ConcurrentHashMap<>()); + + Map<Integer, Roaring64NavigableMap> partitions = shuffleIdToPartitions.get(shuffleId); + int totalUpdatedBlockCount = 0; + for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) { + Integer partitionId = entry.getKey(); + partitions.computeIfAbsent(partitionId, k -> Roaring64NavigableMap.bitmapOf()); + Roaring64NavigableMap bitmap = partitions.get(partitionId); + int updatedBlockCount = 0; + synchronized (bitmap) { + for (long blockId : entry.getValue()) { + if (!bitmap.contains(blockId)) { + bitmap.addLong(blockId); + updatedBlockCount++; + totalUpdatedBlockCount++; + } + } + } + taskInfo.incBlockNumber(shuffleId, partitionId, updatedBlockCount); + } + return totalUpdatedBlockCount; + } + + @Override + public byte[] getFinishedBlockIds( + ShuffleTaskInfo taskInfo, + String appId, + Integer shuffleId, + Set<Integer> partitions, + BlockIdLayout blockIdLayout) + throws IOException { + Map<Integer, Map<Integer, Roaring64NavigableMap>> shuffleIdToPartitions = + partitionsToBlockIds.get(appId); + if (shuffleIdToPartitions == null) { + LOG.warn("Empty blockIds for app: {}. This should not happen", appId); + return null; + } + + Map<Integer, Roaring64NavigableMap> partitionToBlockId = shuffleIdToPartitions.get(shuffleId); + + long expectedBlockNumber = 0; + Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf(); + for (int partitionId : partitions) { + expectedBlockNumber += taskInfo.getBlockNumber(shuffleId, partitionId); + Roaring64NavigableMap bitmap = partitionToBlockId.get(partitionId); + res.or(bitmap); + } + + if (res.getLongCardinality() != expectedBlockNumber) { + throw new RssException( + "Inconsistent block number for partitions: " + + partitions + + ". Excepted: " + + expectedBlockNumber + + ", actual: " + + res.getLongCardinality()); + } + + return RssUtils.serializeBitMap(res); + } + + @Override + public void removeBlockIdByAppId(String appId) { + partitionsToBlockIds.remove(appId); + } + + @Override + public void removeBlockIdByShuffleId(String appId, List<Integer> shuffleIds) { + Optional.ofNullable(partitionsToBlockIds.get(appId)) + .ifPresent( + x -> { + for (Integer shuffleId : shuffleIds) { + x.remove(shuffleId); + } + }); + } + + @Override + public long getTotalBlockCount() { + return partitionsToBlockIds.values().stream() + .flatMap(innerMap -> innerMap.values().stream()) + .flatMap(innerMap -> innerMap.values().stream()) + .mapToLong(roaringMap -> roaringMap.getLongCardinality()) + .sum(); + } + + @Override + public boolean contains(String appId) { + return partitionsToBlockIds.containsKey(appId); + } + + @Override + public long getBitmapNum(String appId, int shuffleId) { + return partitionsToBlockIds.get(appId).get(shuffleId).size(); + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java b/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java new file mode 100644 index 000000000..635e0c394 --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.block; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.uniffle.common.util.BlockIdLayout; +import org.apache.uniffle.server.ShuffleTaskInfo; + +/** The implementation of this interface to manage the shuffle block ids. */ +public interface ShuffleBlockIdManager { + void registerAppId(String appId); + + int addFinishedBlockIds( + ShuffleTaskInfo taskInfo, + String appId, + Integer shuffleId, + Map<Integer, long[]> partitionToBlockIds, + int bitmapNum); + + byte[] getFinishedBlockIds( + ShuffleTaskInfo taskInfo, + String appId, + Integer shuffleId, + Set<Integer> partitions, + BlockIdLayout blockIdLayout) + throws IOException; + + void removeBlockIdByShuffleId(String appId, List<Integer> shuffleIds); + + void removeBlockIdByAppId(String appId); + + long getTotalBlockCount(); + + boolean contains(String testAppId); + + long getBitmapNum(String appId, int shuffleId); +} diff --git a/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManagerFactory.java b/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManagerFactory.java new file mode 100644 index 000000000..e76b8d72d --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManagerFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.block; + +import org.apache.commons.lang3.StringUtils; + +import org.apache.uniffle.common.util.RssUtils; +import org.apache.uniffle.server.ShuffleServerConf; + +public class ShuffleBlockIdManagerFactory { + public static ShuffleBlockIdManager createShuffleBlockIdManager(ShuffleServerConf conf) { + String className = conf.get(ShuffleServerConf.SERVER_BLOCK_ID_MANAGER_CLASS); + return createShuffleBlockIdManager( + className, ShuffleServerConf.SERVER_BLOCK_ID_MANAGER_CLASS.key()); + } + + public static ShuffleBlockIdManager createShuffleBlockIdManager( + String className, String configKey) { + if (StringUtils.isEmpty(className)) { + throw new IllegalStateException( + "Configuration error: " + configKey + " should not set to empty"); + } + + try { + return (ShuffleBlockIdManager) RssUtils.getConstructor(className).newInstance(); + } catch (Exception e) { + throw new IllegalStateException( + "Configuration error: " + configKey + " is failed to create instance of " + className, e); + } + } +} diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java index 2a67c548e..c6484b175 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java @@ -59,6 +59,7 @@ import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.BlockIdLayout; import org.apache.uniffle.common.util.ChecksumUtils; import org.apache.uniffle.common.util.RssUtils; +import org.apache.uniffle.server.block.DefaultShuffleBlockIdManager; import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo; import org.apache.uniffle.server.buffer.ShuffleBuffer; import org.apache.uniffle.server.buffer.ShuffleBufferManager; @@ -816,7 +817,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase { } } Roaring64NavigableMap resultBlockIds = - shuffleTaskManager.getBlockIdsByPartitionId( + DefaultShuffleBlockIdManager.getBlockIdsByPartitionId( Sets.newHashSet(expectedPartitionId), bitmapBlockIds, Roaring64NavigableMap.bitmapOf(), @@ -825,7 +826,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase { bitmapBlockIds.addLong(layout.getBlockId(0, 0, 0)); resultBlockIds = - shuffleTaskManager.getBlockIdsByPartitionId( + DefaultShuffleBlockIdManager.getBlockIdsByPartitionId( Sets.newHashSet(0), bitmapBlockIds, Roaring64NavigableMap.bitmapOf(), layout); assertEquals(Roaring64NavigableMap.bitmapOf(0L), resultBlockIds); @@ -833,7 +834,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase { layout.getBlockId(layout.maxSequenceNo, layout.maxPartitionId, layout.maxTaskAttemptId); bitmapBlockIds.addLong(expectedBlockId); resultBlockIds = - shuffleTaskManager.getBlockIdsByPartitionId( + DefaultShuffleBlockIdManager.getBlockIdsByPartitionId( Sets.newHashSet(layout.maxPartitionId), bitmapBlockIds, Roaring64NavigableMap.bitmapOf(), @@ -872,12 +873,12 @@ public class ShuffleTaskManagerTest extends HadoopTestBase { } Roaring64NavigableMap resultBlockIds = - shuffleTaskManager.getBlockIdsByPartitionId( + DefaultShuffleBlockIdManager.getBlockIdsByPartitionId( requestPartitions, bitmapBlockIds, Roaring64NavigableMap.bitmapOf(), layout); assertEquals(expectedBlockIds, resultBlockIds); assertEquals( bitmapBlockIds, - shuffleTaskManager.getBlockIdsByPartitionId( + DefaultShuffleBlockIdManager.getBlockIdsByPartitionId( allPartitions, bitmapBlockIds, Roaring64NavigableMap.bitmapOf(), layout)); }
