This is an automated email from the ASF dual-hosted git repository.

maobaolong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cf82d70f [#2219] feat: Introduce ShuffleBlockIdManagerFactory and 
PartitionedShuffleBlockIdManager (#2227)
3cf82d70f is described below

commit 3cf82d70f2465f64d01a6af2bdf2cb39f7b9ce5d
Author: maobaolong <[email protected]>
AuthorDate: Mon Nov 18 14:16:21 2024 +0800

    [#2219] feat: Introduce ShuffleBlockIdManagerFactory and 
PartitionedShuffleBlockIdManager (#2227)
    
    ### What changes were proposed in this pull request?
    
    - Introduce a ShuffleBlockIdManagerFactory to create configured 
implementation.
    - Introduce PartitionedShuffleBlockIdManager to individual bitmap for each 
partition.
    
    ### Why are the changes needed?
    
    Fix: #2219
    
    <img width="1596" alt="image" 
src="https://github.com/user-attachments/assets/c7752f94-941b-45d2-9a45-1da197e38984";>
    
    This is the maximum partition num with node metrics in our cluster, it is 
~410K, it cost heap size 410K * 50KiB = 2GiB if we say a bitmap related a 
partition avg 10000 blocks(cost 50KiB heap).
    
    I think this is worth to choose this new policy in our production cluster.
    
    ### Does this PR introduce _any_ user-facing change?
    
    - Config option: rss.server.blockIdStrategyClass
    
    ### How was this patch tested?
    
    Test Locally, check result by `arthas`
    
    - Common case, test with default config
    ```
    [arthas@264]$ vmtool  --action getInstances --className 
org.apache.uniffle.server.ShuffleTaskInfo --express 
'instances[0].shuffleBlockIdManager'
    @DefaultShuffleBlockIdManager[
        LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@3f211bae],
        partitionsToBlockIds=@ConcurrentHashMapForJDK8[isEmpty=false;size=1],
    ]
    
    [arthas@264]$ vmtool  --action getInstances --className 
org.apache.uniffle.server.ShuffleTaskInfo --express 
'instances[0].shuffleBlockIdManager.partitionsToBlockIds.get("application_1729845342052_431031_1730297736664")
 '
    @ConcurrentHashMapForJDK8[
        @Integer[0]:@Roaring64NavigableMap[][isEmpty=false;size=10],
        @Integer[1]:@Roaring64NavigableMap[][isEmpty=false;size=10],
        @Integer[2]:@Roaring64NavigableMap[][isEmpty=false;size=10],
    ]
    
    ```
    - Config to PartitionedShuffleBlockIdManager
    ```
    [arthas@264]$ vmtool  --action getInstances --className 
org.apache.uniffle.server.ShuffleTaskInfo --express 
'instances[0].shuffleBlockIdManager'
    @PartitionedShuffleBlockIdManager[
        LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@6aa6fea0],
        partitionsToBlockIds=@ConcurrentHashMap[isEmpty=false;size=1],
    ]
    
    [arthas@264]$ vmtool  --action getInstances --className 
org.apache.uniffle.server.ShuffleTaskInfo --express 
'instances[0].shuffleBlockIdManager.partitionsToBlockIds'
    @ConcurrentHashMap[
        
@String[application_1729845342052_432324_1730299397916]:@ConcurrentHashMapForJDK8[isEmpty=false;size=2],
    ]
    [arthas@264]$ vmtool  --action getInstances --className 
org.apache.uniffle.server.ShuffleTaskInfo --express 
'instances[0].shuffleBlockIdManager.partitionsToBlockIds.get("application_1729845342052_432324_1730299397916")'
    @ConcurrentHashMapForJDK8[
        @Integer[0]:@ConcurrentHashMap[isEmpty=false;size=2000],
        @Integer[1]:@ConcurrentHashMap[isEmpty=false;size=2000],
    ]
    ```
    - Use client app level config
    submit two app with 
`spark.rss.client.blockIdStrategyClass=org.apache.uniffle.server.block.DefaultShuffleBlockIdManager`
 and 
`spark.rss.client.blockIdStrategyClass=org.apache.uniffle.server.block.PartitionedShuffleBlockIdManager`
    
    ```
    [arthas@264]$ vmtool  --action getInstances --className 
org.apache.uniffle.server.ShuffleTaskInfo --express 
'instances[0].shuffleBlockIdManager'
    @DefaultShuffleBlockIdManager[
        LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@ed3b52a],
        partitionsToBlockIds=@ConcurrentHashMapForJDK8[isEmpty=false;size=1],
    ]
    [arthas@264]$ vmtool  --action getInstances --className 
org.apache.uniffle.server.ShuffleTaskInfo --express 
'instances[1].shuffleBlockIdManager'
    @PartitionedShuffleBlockIdManager[
        LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@5fc14628],
        partitionsToBlockIds=@ConcurrentHashMap[isEmpty=false;size=1],
    ]
    ```
    
    Log example
    ```
    [2024-10-30 23:01:37.585] [Grpc-709] [INFO] ShuffleTaskInfo - 
application_1729845342052_432387_1730300440568 use app configured 
ShuffleBlockIdManager to 
org.apache.uniffle.server.block.DefaultShuffleBlockIdManager@3c6b75a8
    ```
---
 .../uniffle/common/config/RssClientConf.java       |   8 +
 docs/client_guide/client_guide.md                  |   1 +
 docs/server_guide.md                               |   1 +
 .../apache/uniffle/test/ShuffleServerGrpcTest.java |   9 +-
 .../uniffle/test/ShuffleServerWithLocalTest.java   |  12 +-
 .../apache/uniffle/server/ShuffleServerConf.java   |   8 +
 .../org/apache/uniffle/server/ShuffleTaskInfo.java |  34 +++-
 .../apache/uniffle/server/ShuffleTaskManager.java  | 167 ++++------------
 .../server/block/DefaultShuffleBlockIdManager.java | 220 +++++++++++++++++++++
 .../block/PartitionedShuffleBlockIdManager.java    | 159 +++++++++++++++
 .../server/block/ShuffleBlockIdManager.java        |  56 ++++++
 .../server/block/ShuffleBlockIdManagerFactory.java |  46 +++++
 .../uniffle/server/ShuffleTaskManagerTest.java     |  11 +-
 13 files changed, 590 insertions(+), 142 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 58b6d6a91..6d311a549 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -295,4 +295,12 @@ public class RssClientConf {
           .asList()
           .noDefaultValue()
           .withDescription("the extra java properties could be configured by 
this option");
+
+  public static final ConfigOption<String> RSS_CLIENT_BLOCK_ID_MANAGER_CLASS =
+      ConfigOptions.key("rss.client.blockIdManagerClass")
+          .stringType()
+          .noDefaultValue()
+          .withDescription(
+              "The block id manager class of server for this application, "
+                  + "the implementation of this interface to manage the 
shuffle block ids");
 }
diff --git a/docs/client_guide/client_guide.md 
b/docs/client_guide/client_guide.md
index bf7bb5d16..371528931 100644
--- a/docs/client_guide/client_guide.md
+++ b/docs/client_guide/client_guide.md
@@ -59,6 +59,7 @@ The important configuration of client is listed as following. 
These configuratio
 | <client_type>.rss.client.rpc.netty.pageSize                     | 4096       
                            | The value of pageSize for PooledByteBufAllocator 
when using gRPC internal Netty on the client-side. This configuration will only 
take effect when rss.rpc.server.type is set to GRPC_NETTY.                      
                                                                                
                                                                                
               [...]
 | <client_type>.rss.client.rpc.netty.maxOrder                     | 3          
                            | The value of maxOrder for PooledByteBufAllocator 
when using gRPC internal Netty on the client-side. This configuration will only 
take effect when rss.rpc.server.type is set to GRPC_NETTY.                      
                                                                                
                                                                                
               [...]
 | <client_type>.rss.client.rpc.netty.smallCacheSize               | 1024       
                            | The value of smallCacheSize for 
PooledByteBufAllocator when using gRPC internal Netty on the client-side. This 
configuration will only take effect when rss.rpc.server.type is set to 
GRPC_NETTY.                                                                     
                                                                                
                                          [...]
+| <client_type>.rss.client.blockIdManagerClass                    | -          
                            | The block id manager class of server for this 
application, the implementation of this interface to manage the shuffle block 
ids                                                                             
                                                                                
                                                                                
                    [...]
 
 Notice:
 
diff --git a/docs/server_guide.md b/docs/server_guide.md
index d975ce324..3c6b7750a 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -121,6 +121,7 @@ This document will introduce how to deploy Uniffle shuffle 
servers.
 | rss.storage.localFileWriterClass                         | 
org.apache.uniffle.storage.handler.impl.LocalFileWriter                | The 
writer class to write shuffle data for LOCALFILE.                               
                                                                                
                                                                                
                                                                                
                                   [...]
 | rss.storage.hdfs.write.dataBufferSize                    | 8K                
                                                     | The size of the buffer 
used to cache data written for HDFS.                                            
                                                                                
                                                                                
                                                                                
                [...]
 | rss.storage.hdfs.write.indexBufferSize                   | 8K                
                                                     | The size of the buffer 
used to cache index written for HDFS.                                           
                                                                                
                                                                                
                                                                                
                [...]
+| rss.server.blockIdManagerClass                           | 
org.apache.uniffle.server.block.DefaultShuffleBlockIdManager           | The 
block id manager class. It is used to manage block id.                          
                                                                                
                                                                                
                                                                                
                                   [...]
 
 ### Advanced Configurations
 | Property Name                                    | Default | Description     
                                                                                
                                                                                
            |
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index 1ee10a8aa..e188e04b6 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -352,14 +352,13 @@ public class ShuffleServerGrpcTest extends 
IntegrationTestBase {
     request = new RssReportShuffleResultRequest("shuffleResultTest", 2, 1L, 
partitionToBlockIds, 3);
     grpcShuffleServerClient.reportShuffleResult(request);
     // validate bitmap in shuffleTaskManager
-    Roaring64NavigableMap[] bitmaps =
+    long bitmapNum =
         grpcShuffleServers
             .get(0)
             .getShuffleTaskManager()
-            .getPartitionsToBlockIds()
-            .get("shuffleResultTest")
-            .get(2);
-    assertEquals(3, bitmaps.length);
+            .getShuffleBlockIdManager()
+            .getBitmapNum("shuffleResultTest", 2);
+    assertEquals(3, bitmapNum);
 
     req = new RssGetShuffleResultRequest("shuffleResultTest", 2, 1, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
index a77472e6a..56c3a6a09 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
@@ -179,10 +179,18 @@ public class ShuffleServerWithLocalTest extends 
ShuffleReadWriteBase {
 
     List<ShuffleServer> shuffleServers = isNettyMode ? nettyShuffleServers : 
grpcShuffleServers;
     assertNotNull(
-        
shuffleServers.get(0).getShuffleTaskManager().getPartitionsToBlockIds().get(testAppId));
+        shuffleServers
+            .get(0)
+            .getShuffleTaskManager()
+            .getShuffleBlockIdManager()
+            .contains(testAppId));
     Thread.sleep(8000);
     assertNull(
-        
shuffleServers.get(0).getShuffleTaskManager().getPartitionsToBlockIds().get(testAppId));
+        shuffleServers
+            .get(0)
+            .getShuffleTaskManager()
+            .getShuffleBlockIdManager()
+            .contains(testAppId));
   }
 
   protected void validateResult(
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index fd2f5802c..8888e60b7 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -25,6 +25,7 @@ import org.apache.uniffle.common.config.ConfigOption;
 import org.apache.uniffle.common.config.ConfigOptions;
 import org.apache.uniffle.common.config.ConfigUtils;
 import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.server.block.DefaultShuffleBlockIdManager;
 import org.apache.uniffle.server.buffer.ShuffleBufferType;
 
 public class ShuffleServerConf extends RssBaseConf {
@@ -743,6 +744,13 @@ public class ShuffleServerConf extends RssBaseConf {
           .booleanType()
           .defaultValue(false)
           .withDescription("Whether to enable app detail log");
+  public static final ConfigOption<String> SERVER_BLOCK_ID_MANAGER_CLASS =
+      ConfigOptions.key("rss.server.blockIdManagerClass")
+          .stringType()
+          .defaultValue(DefaultShuffleBlockIdManager.class.getName())
+          .withDescription(
+              "The block id manager class, the implementation of this 
interface "
+                  + "to manage the shuffle block ids");
 
   public ShuffleServerConf() {}
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index 11ac2dff3..6b2a89554 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -26,14 +26,19 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.PartitionInfo;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.UnitConverter;
+import org.apache.uniffle.server.block.ShuffleBlockIdManager;
+import org.apache.uniffle.server.block.ShuffleBlockIdManagerFactory;
 
 /**
  * ShuffleTaskInfo contains the information of submitting the shuffle, the 
information of the cache
@@ -78,6 +83,7 @@ public class ShuffleTaskInfo {
 
   private final Map<Integer, Integer> latestStageAttemptNumbers;
   private Map<String, String> properties;
+  private ShuffleBlockIdManager shuffleBlockIdManager;
 
   public ShuffleTaskInfo(String appId) {
     this.appId = appId;
@@ -324,6 +330,32 @@ public class ShuffleTaskInfo {
             .filter(entry -> entry.getKey().contains(".rss."))
             .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
     this.properties = filteredProperties;
-    LOGGER.info("{} set properties to {}", appId, properties);
+    LOGGER.info("{} set properties to {}", appId, filteredProperties);
+    String keyName = RssClientConf.RSS_CLIENT_BLOCK_ID_MANAGER_CLASS.key();
+    String className = properties.get(keyName);
+    if (StringUtils.isEmpty(className)) {
+      keyName =
+          Constants.SPARK_RSS_CONFIG_PREFIX + 
RssClientConf.RSS_CLIENT_BLOCK_ID_MANAGER_CLASS.key();
+      className =
+          properties.get(
+              Constants.SPARK_RSS_CONFIG_PREFIX
+                  + RssClientConf.RSS_CLIENT_BLOCK_ID_MANAGER_CLASS.key());
+    }
+    if (StringUtils.isNotEmpty(className)) {
+      shuffleBlockIdManager =
+          ShuffleBlockIdManagerFactory.createShuffleBlockIdManager(className, 
keyName);
+      LOGGER.info(
+          "{} use app configured ShuffleBlockIdManager to {}", appId, 
shuffleBlockIdManager);
+    }
+  }
+
+  public ShuffleBlockIdManager getShuffleBlockIdManager() {
+    return shuffleBlockIdManager;
+  }
+
+  public void setShuffleBlockIdManagerIfNeeded(ShuffleBlockIdManager 
shuffleBlockIdManager) {
+    if (this.shuffleBlockIdManager == null) {
+      this.shuffleBlockIdManager = shuffleBlockIdManager;
+    }
   }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 6211e6673..d6a9a3fe9 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -21,10 +21,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
@@ -43,7 +41,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
@@ -76,6 +73,8 @@ import org.apache.uniffle.common.util.OutputUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.common.util.UnitConverter;
+import org.apache.uniffle.server.block.ShuffleBlockIdManager;
+import org.apache.uniffle.server.block.ShuffleBlockIdManagerFactory;
 import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
 import org.apache.uniffle.server.buffer.ShuffleBuffer;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
@@ -115,13 +114,6 @@ public class ShuffleTaskManager {
   private long commitCheckIntervalMax;
   private long leakShuffleDataCheckInterval;
   private long triggerFlushInterval;
-  // appId -> shuffleId -> blockIds to avoid too many appId
-  // store taskAttemptId info to filter speculation task
-  // Roaring64NavigableMap instance will cost much memory,
-  // merge different blockId of partition to one bitmap can reduce memory cost,
-  // but when get blockId, performance will degrade a little which can be 
optimized by client
-  // configuration
-  private Map<String, Map<Integer, Roaring64NavigableMap[]>> 
partitionsToBlockIds;
   private final ShuffleBufferManager shuffleBufferManager;
   private Map<String, ShuffleTaskInfo> shuffleTaskInfos = 
JavaUtils.newConcurrentMap();
   private Map<Long, PreAllocatedBufferInfo> requireBufferIds = 
JavaUtils.newConcurrentMap();
@@ -130,6 +122,7 @@ public class ShuffleTaskManager {
   private final Cache<String, ReentrantReadWriteLock> appLocks;
   private final long storageRemoveOperationTimeoutSec;
   private ShuffleMergeManager shuffleMergeManager;
+  private ShuffleBlockIdManager shuffleBlockIdManager;
 
   public ShuffleTaskManager(
       ShuffleServerConf conf,
@@ -147,7 +140,6 @@ public class ShuffleTaskManager {
       ShuffleMergeManager shuffleMergeManager) {
     this.conf = conf;
     this.shuffleFlushManager = shuffleFlushManager;
-    this.partitionsToBlockIds = JavaUtils.newConcurrentMap();
     this.shuffleBufferManager = shuffleBufferManager;
     this.storageManager = storageManager;
     this.shuffleMergeManager = shuffleMergeManager;
@@ -200,6 +192,8 @@ public class ShuffleTaskManager {
       shuffleBufferManager.setShuffleTaskManager(this);
     }
 
+    shuffleBlockIdManager = 
ShuffleBlockIdManagerFactory.createShuffleBlockIdManager(conf);
+
     appLocks =
         CacheBuilder.newBuilder()
             .expireAfterAccess(3600, TimeUnit.SECONDS)
@@ -260,13 +254,12 @@ public class ShuffleTaskManager {
     ShuffleServerMetrics.addLabeledCacheGauge(
         REPORTED_BLOCK_COUNT,
         () ->
-            partitionsToBlockIds.values().stream()
-                .flatMap(innerMap -> innerMap.values().stream())
-                .flatMapToLong(
-                    arr ->
-                        java.util.Arrays.stream(arr)
-                            
.mapToLong(Roaring64NavigableMap::getLongCardinality))
-                .sum(),
+            shuffleBlockIdManager.getTotalBlockCount()
+                + shuffleTaskInfos.values().stream()
+                    .map(ShuffleTaskInfo::getShuffleBlockIdManager)
+                    .filter(manager -> manager != null && manager != 
shuffleBlockIdManager)
+                    .mapToLong(ShuffleBlockIdManager::getTotalBlockCount)
+                    .sum(),
         2 * 60 * 1000L /* 2 minutes */);
     ShuffleServerMetrics.addLabeledCacheGauge(
         CACHED_BLOCK_COUNT,
@@ -339,8 +332,9 @@ public class ShuffleTaskManager {
                   getMaxConcurrencyWriting(maxConcurrencyPerPartitionToWrite, 
conf))
               .dataDistributionType(dataDistType)
               .build());
+      taskInfo.setShuffleBlockIdManagerIfNeeded(shuffleBlockIdManager);
 
-      partitionsToBlockIds.computeIfAbsent(appId, key -> 
JavaUtils.newConcurrentMap());
+      taskInfo.getShuffleBlockIdManager().registerAppId(appId);
       for (PartitionRange partitionRange : partitionRanges) {
         shuffleBufferManager.registerBuffer(
             appId, shuffleId, partitionRange.getStart(), 
partitionRange.getEnd());
@@ -467,51 +461,16 @@ public class ShuffleTaskManager {
   public int addFinishedBlockIds(
       String appId, Integer shuffleId, Map<Integer, long[]> 
partitionToBlockIds, int bitmapNum) {
     refreshAppId(appId);
-    Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = 
partitionsToBlockIds.get(appId);
-    if (shuffleIdToPartitions == null) {
-      throw new RssException("appId[" + appId + "] is expired!");
-    }
-    shuffleIdToPartitions.computeIfAbsent(
-        shuffleId,
-        key -> {
-          Roaring64NavigableMap[] blockIds = new 
Roaring64NavigableMap[bitmapNum];
-          for (int i = 0; i < bitmapNum; i++) {
-            blockIds[i] = Roaring64NavigableMap.bitmapOf();
-          }
-          return blockIds;
-        });
-    Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId);
-    if (blockIds.length != bitmapNum) {
-      throw new InvalidRequestException(
-          "Request expects "
-              + bitmapNum
-              + " bitmaps, but there are "
-              + blockIds.length
-              + " bitmaps!");
-    }
-
     ShuffleTaskInfo taskInfo = getShuffleTaskInfo(appId);
     if (taskInfo == null) {
       throw new InvalidRequestException(
           "ShuffleTaskInfo is not found that should not happen for appId: " + 
appId);
     }
-    int totalUpdatedBlockCount = 0;
-    for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) {
-      Integer partitionId = entry.getKey();
-      Roaring64NavigableMap bitmap = blockIds[partitionId % bitmapNum];
-      int updatedBlockCount = 0;
-      synchronized (bitmap) {
-        for (long blockId : entry.getValue()) {
-          if (!bitmap.contains(blockId)) {
-            bitmap.addLong(blockId);
-            updatedBlockCount++;
-            totalUpdatedBlockCount++;
-          }
-        }
-      }
-      taskInfo.incBlockNumber(shuffleId, partitionId, updatedBlockCount);
+    ShuffleBlockIdManager manager = taskInfo.getShuffleBlockIdManager();
+    if (manager == null) {
+      throw new RssException("appId[" + appId + "] is expired!");
     }
-    return totalUpdatedBlockCount;
+    return manager.addFinishedBlockIds(taskInfo, appId, shuffleId, 
partitionToBlockIds, bitmapNum);
   }
 
   public int updateAndGetCommitCount(String appId, int shuffleId) {
@@ -707,66 +666,12 @@ public class ShuffleTaskManager {
         storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
       }
     }
-    Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = 
partitionsToBlockIds.get(appId);
-    if (shuffleIdToPartitions == null) {
-      LOG.warn("Empty blockIds for app: {}. This should not happen", appId);
-      return null;
-    }
-
-    Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId);
-    if (blockIds == null) {
-      LOG.warn("Empty blockIds for app: {}, shuffleId: {}", appId, shuffleId);
-      return new byte[] {};
-    }
-
     ShuffleTaskInfo taskInfo = getShuffleTaskInfo(appId);
-    long expectedBlockNumber = 0;
-    Map<Integer, Set<Integer>> bitmapIndexToPartitions = Maps.newHashMap();
-    for (int partitionId : partitions) {
-      int bitmapIndex = partitionId % blockIds.length;
-      if (bitmapIndexToPartitions.containsKey(bitmapIndex)) {
-        bitmapIndexToPartitions.get(bitmapIndex).add(partitionId);
-      } else {
-        HashSet<Integer> newHashSet = Sets.newHashSet(partitionId);
-        bitmapIndexToPartitions.put(bitmapIndex, newHashSet);
-      }
-      expectedBlockNumber += taskInfo.getBlockNumber(shuffleId, partitionId);
-    }
-
-    Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf();
-    for (Map.Entry<Integer, Set<Integer>> entry : 
bitmapIndexToPartitions.entrySet()) {
-      Set<Integer> requestPartitions = entry.getValue();
-      Roaring64NavigableMap bitmap = blockIds[entry.getKey()];
-      getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout);
-    }
-
-    if (res.getLongCardinality() != expectedBlockNumber) {
-      throw new RssException(
-          "Inconsistent block number for partitions: "
-              + partitions
-              + ". Excepted: "
-              + expectedBlockNumber
-              + ", actual: "
-              + res.getLongCardinality());
+    ShuffleBlockIdManager manager = taskInfo.getShuffleBlockIdManager();
+    if (manager == null) {
+      throw new RssException("appId[" + appId + "] is expired!");
     }
-
-    return RssUtils.serializeBitMap(res);
-  }
-
-  // filter the specific partition blockId in the bitmap to the resultBitmap
-  protected Roaring64NavigableMap getBlockIdsByPartitionId(
-      Set<Integer> requestPartitions,
-      Roaring64NavigableMap bitmap,
-      Roaring64NavigableMap resultBitmap,
-      BlockIdLayout blockIdLayout) {
-    bitmap.forEach(
-        blockId -> {
-          int partitionId = blockIdLayout.getPartitionId(blockId);
-          if (requestPartitions.contains(partitionId)) {
-            resultBitmap.addLong(blockId);
-          }
-        });
-    return resultBitmap;
+    return manager.getFinishedBlockIds(taskInfo, appId, shuffleId, partitions, 
blockIdLayout);
   }
 
   public ShuffleDataResult getInMemoryShuffleData(
@@ -890,14 +795,14 @@ public class ShuffleTaskManager {
           taskInfo.getCommitCounts().remove(shuffleId);
           taskInfo.getCommitLocks().remove(shuffleId);
         }
+        ShuffleBlockIdManager manager = taskInfo.getShuffleBlockIdManager();
+        if (manager == null) {
+          throw new RssException("appId[" + appId + "] is expired!");
+        }
+        manager.removeBlockIdByShuffleId(appId, shuffleIds);
+      } else {
+        shuffleBlockIdManager.removeBlockIdByShuffleId(appId, shuffleIds);
       }
-      Optional.ofNullable(partitionsToBlockIds.get(appId))
-          .ifPresent(
-              x -> {
-                for (Integer shuffleId : shuffleIds) {
-                  x.remove(shuffleId);
-                }
-              });
       shuffleBufferManager.removeBufferByShuffleId(appId, shuffleIds);
       shuffleFlushManager.removeResourcesOfShuffleId(appId, shuffleIds);
 
@@ -975,7 +880,11 @@ public class ShuffleTaskManager {
       partitionInfoSummary.append("The app task info: 
").append(shuffleTaskInfo);
       LOG.info("Removing app summary info: {}", partitionInfoSummary);
 
-      partitionsToBlockIds.remove(appId);
+      ShuffleBlockIdManager manager = 
shuffleTaskInfo.getShuffleBlockIdManager();
+      if (manager != null) {
+        manager.removeBlockIdByAppId(appId);
+      }
+      shuffleBlockIdManager.removeBlockIdByAppId(appId);
       shuffleBufferManager.removeBuffer(appId);
       shuffleFlushManager.removeResources(appId);
 
@@ -1095,11 +1004,6 @@ public class ShuffleTaskManager {
     return requireBufferIds;
   }
 
-  @VisibleForTesting
-  public Map<String, Map<Integer, Roaring64NavigableMap[]>> 
getPartitionsToBlockIds() {
-    return partitionsToBlockIds;
-  }
-
   public void removeShuffleDataAsync(String appId, int shuffleId) {
     expiredAppIdQueue.add(
         new ShufflePurgeEvent(appId, getUserByAppId(appId), 
Arrays.asList(shuffleId)));
@@ -1152,4 +1056,9 @@ public class ShuffleTaskManager {
   protected void setShuffleFlushManager(ShuffleFlushManager flushManager) {
     this.shuffleFlushManager = flushManager;
   }
+
+  @VisibleForTesting
+  public ShuffleBlockIdManager getShuffleBlockIdManager() {
+    return shuffleBlockIdManager;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java
 
b/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java
new file mode 100644
index 000000000..6487fd582
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.block;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.InvalidRequestException;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.JavaUtils;
+import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.server.ShuffleTaskInfo;
+
+/**
+ * The default implementation of ShuffleBlockIdManager, manage block id of all 
partitions together.
+ */
+public class DefaultShuffleBlockIdManager implements ShuffleBlockIdManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DefaultShuffleBlockIdManager.class);
+
+  // appId -> shuffleId -> blockIds to avoid too many appId
+  // store taskAttemptId info to filter speculation task
+  // Roaring64NavigableMap instance will cost much memory,
+  // merge different blockId of partition to one bitmap can reduce memory cost,
+  // but when get blockId, performance will degrade a little which can be 
optimized by client
+  // configuration
+  // appId -> shuffleId -> hashId -> blockIds
+  private Map<String, Map<Integer, Roaring64NavigableMap[]>> 
partitionsToBlockIds;
+
+  public DefaultShuffleBlockIdManager() {
+    this.partitionsToBlockIds = JavaUtils.newConcurrentMap();
+  }
+
+  @VisibleForTesting
+  /** Filter the specific partition blockId in the bitmap to the resultBitmap. 
*/
+  public static Roaring64NavigableMap getBlockIdsByPartitionId(
+      Set<Integer> requestPartitions,
+      Roaring64NavigableMap bitmap,
+      Roaring64NavigableMap resultBitmap,
+      BlockIdLayout blockIdLayout) {
+    bitmap.forEach(
+        blockId -> {
+          int partitionId = blockIdLayout.getPartitionId(blockId);
+          if (requestPartitions.contains(partitionId)) {
+            resultBitmap.addLong(blockId);
+          }
+        });
+    return resultBitmap;
+  }
+
+  public void registerAppId(String appId) {
+    partitionsToBlockIds.computeIfAbsent(appId, key -> 
JavaUtils.newConcurrentMap());
+  }
+
+  /**
+   * Add finished blockIds from client
+   *
+   * @param appId
+   * @param shuffleId
+   * @param partitionToBlockIds
+   * @param bitmapNum
+   * @return the number of added blockIds
+   */
+  public int addFinishedBlockIds(
+      ShuffleTaskInfo taskInfo,
+      String appId,
+      Integer shuffleId,
+      Map<Integer, long[]> partitionToBlockIds,
+      int bitmapNum) {
+    Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = 
partitionsToBlockIds.get(appId);
+    if (shuffleIdToPartitions == null) {
+      throw new RssException("appId[" + appId + "] is expired!");
+    }
+    shuffleIdToPartitions.computeIfAbsent(
+        shuffleId,
+        key -> {
+          Roaring64NavigableMap[] blockIds = new 
Roaring64NavigableMap[bitmapNum];
+          for (int i = 0; i < bitmapNum; i++) {
+            blockIds[i] = Roaring64NavigableMap.bitmapOf();
+          }
+          return blockIds;
+        });
+    Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId);
+    if (blockIds.length != bitmapNum) {
+      throw new InvalidRequestException(
+          "Request expects "
+              + bitmapNum
+              + " bitmaps, but there are "
+              + blockIds.length
+              + " bitmaps!");
+    }
+
+    int totalUpdatedBlockCount = 0;
+    for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) {
+      Integer partitionId = entry.getKey();
+      Roaring64NavigableMap bitmap = blockIds[partitionId % bitmapNum];
+      int updatedBlockCount = 0;
+      synchronized (bitmap) {
+        for (long blockId : entry.getValue()) {
+          if (!bitmap.contains(blockId)) {
+            bitmap.addLong(blockId);
+            updatedBlockCount++;
+            totalUpdatedBlockCount++;
+          }
+        }
+      }
+      taskInfo.incBlockNumber(shuffleId, partitionId, updatedBlockCount);
+    }
+    return totalUpdatedBlockCount;
+  }
+
+  public byte[] getFinishedBlockIds(
+      ShuffleTaskInfo taskInfo,
+      String appId,
+      Integer shuffleId,
+      Set<Integer> partitions,
+      BlockIdLayout blockIdLayout)
+      throws IOException {
+    Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = 
partitionsToBlockIds.get(appId);
+    if (shuffleIdToPartitions == null) {
+      LOG.warn("Empty blockIds for app: {}. This should not happen", appId);
+      return null;
+    }
+    Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId);
+    if (blockIds == null) {
+      LOG.warn("Empty blockIds for app: {}, shuffleId: {}", appId, shuffleId);
+      return new byte[] {};
+    }
+    long expectedBlockNumber = 0;
+    Map<Integer, Set<Integer>> bitmapIndexToPartitions = Maps.newHashMap();
+    for (int partitionId : partitions) {
+      int bitmapIndex = partitionId % blockIds.length;
+      if (bitmapIndexToPartitions.containsKey(bitmapIndex)) {
+        bitmapIndexToPartitions.get(bitmapIndex).add(partitionId);
+      } else {
+        HashSet<Integer> newHashSet = Sets.newHashSet(partitionId);
+        bitmapIndexToPartitions.put(bitmapIndex, newHashSet);
+      }
+      expectedBlockNumber += taskInfo.getBlockNumber(shuffleId, partitionId);
+    }
+    Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf();
+    for (Map.Entry<Integer, Set<Integer>> entry : 
bitmapIndexToPartitions.entrySet()) {
+      Set<Integer> requestPartitions = entry.getValue();
+      Roaring64NavigableMap bitmap = blockIds[entry.getKey()];
+      getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout);
+    }
+    if (res.getLongCardinality() != expectedBlockNumber) {
+      throw new RssException(
+          "Inconsistent block number for partitions: "
+              + partitions
+              + ". Excepted: "
+              + expectedBlockNumber
+              + ", actual: "
+              + res.getLongCardinality());
+    }
+    return RssUtils.serializeBitMap(res);
+  }
+
+  @Override
+  public void removeBlockIdByShuffleId(String appId, List<Integer> shuffleIds) 
{
+    Optional.ofNullable(partitionsToBlockIds.get(appId))
+        .ifPresent(
+            x -> {
+              for (Integer shuffleId : shuffleIds) {
+                x.remove(shuffleId);
+              }
+            });
+  }
+
+  @Override
+  public void removeBlockIdByAppId(String appId) {
+    partitionsToBlockIds.remove(appId);
+  }
+
+  @Override
+  public long getTotalBlockCount() {
+    return partitionsToBlockIds.values().stream()
+        .flatMap(innerMap -> innerMap.values().stream())
+        .flatMapToLong(
+            arr ->
+                
java.util.Arrays.stream(arr).mapToLong(Roaring64NavigableMap::getLongCardinality))
+        .sum();
+  }
+
+  @Override
+  public boolean contains(String appId) {
+    return partitionsToBlockIds.containsKey(appId);
+  }
+
+  @Override
+  public long getBitmapNum(String appId, int shuffleId) {
+    return partitionsToBlockIds.get(appId).get(shuffleId).length;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java
 
b/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java
new file mode 100644
index 000000000..5e56fee83
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.block;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.JavaUtils;
+import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.server.ShuffleTaskInfo;
+
+/** Manage the block ids individual for each partition. */
+public class PartitionedShuffleBlockIdManager implements ShuffleBlockIdManager 
{
+  private static final Logger LOG = 
LoggerFactory.getLogger(PartitionedShuffleBlockIdManager.class);
+
+  // appId -> shuffleId -> partitionId -> blockIds
+  private Map<String, Map<Integer, Map<Integer, Roaring64NavigableMap>>> 
partitionsToBlockIds;
+
+  public PartitionedShuffleBlockIdManager() {
+    this.partitionsToBlockIds = new ConcurrentHashMap<>();
+  }
+
+  public void registerAppId(String appId) {
+    partitionsToBlockIds.computeIfAbsent(appId, key -> 
JavaUtils.newConcurrentMap());
+  }
+
+  @Override
+  public int addFinishedBlockIds(
+      ShuffleTaskInfo taskInfo,
+      String appId,
+      Integer shuffleId,
+      Map<Integer, long[]> partitionToBlockIds,
+      int bitmapNum) {
+    Map<Integer, Map<Integer, Roaring64NavigableMap>> shuffleIdToPartitions =
+        partitionsToBlockIds.get(appId);
+    if (shuffleIdToPartitions == null) {
+      throw new RssException("appId[" + appId + "] is expired!");
+    }
+    shuffleIdToPartitions.computeIfAbsent(shuffleId, key -> new 
ConcurrentHashMap<>());
+
+    Map<Integer, Roaring64NavigableMap> partitions = 
shuffleIdToPartitions.get(shuffleId);
+    int totalUpdatedBlockCount = 0;
+    for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) {
+      Integer partitionId = entry.getKey();
+      partitions.computeIfAbsent(partitionId, k -> 
Roaring64NavigableMap.bitmapOf());
+      Roaring64NavigableMap bitmap = partitions.get(partitionId);
+      int updatedBlockCount = 0;
+      synchronized (bitmap) {
+        for (long blockId : entry.getValue()) {
+          if (!bitmap.contains(blockId)) {
+            bitmap.addLong(blockId);
+            updatedBlockCount++;
+            totalUpdatedBlockCount++;
+          }
+        }
+      }
+      taskInfo.incBlockNumber(shuffleId, partitionId, updatedBlockCount);
+    }
+    return totalUpdatedBlockCount;
+  }
+
+  @Override
+  public byte[] getFinishedBlockIds(
+      ShuffleTaskInfo taskInfo,
+      String appId,
+      Integer shuffleId,
+      Set<Integer> partitions,
+      BlockIdLayout blockIdLayout)
+      throws IOException {
+    Map<Integer, Map<Integer, Roaring64NavigableMap>> shuffleIdToPartitions =
+        partitionsToBlockIds.get(appId);
+    if (shuffleIdToPartitions == null) {
+      LOG.warn("Empty blockIds for app: {}. This should not happen", appId);
+      return null;
+    }
+
+    Map<Integer, Roaring64NavigableMap> partitionToBlockId = 
shuffleIdToPartitions.get(shuffleId);
+
+    long expectedBlockNumber = 0;
+    Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf();
+    for (int partitionId : partitions) {
+      expectedBlockNumber += taskInfo.getBlockNumber(shuffleId, partitionId);
+      Roaring64NavigableMap bitmap = partitionToBlockId.get(partitionId);
+      res.or(bitmap);
+    }
+
+    if (res.getLongCardinality() != expectedBlockNumber) {
+      throw new RssException(
+          "Inconsistent block number for partitions: "
+              + partitions
+              + ". Excepted: "
+              + expectedBlockNumber
+              + ", actual: "
+              + res.getLongCardinality());
+    }
+
+    return RssUtils.serializeBitMap(res);
+  }
+
+  @Override
+  public void removeBlockIdByAppId(String appId) {
+    partitionsToBlockIds.remove(appId);
+  }
+
+  @Override
+  public void removeBlockIdByShuffleId(String appId, List<Integer> shuffleIds) 
{
+    Optional.ofNullable(partitionsToBlockIds.get(appId))
+        .ifPresent(
+            x -> {
+              for (Integer shuffleId : shuffleIds) {
+                x.remove(shuffleId);
+              }
+            });
+  }
+
+  @Override
+  public long getTotalBlockCount() {
+    return partitionsToBlockIds.values().stream()
+        .flatMap(innerMap -> innerMap.values().stream())
+        .flatMap(innerMap -> innerMap.values().stream())
+        .mapToLong(roaringMap -> roaringMap.getLongCardinality())
+        .sum();
+  }
+
+  @Override
+  public boolean contains(String appId) {
+    return partitionsToBlockIds.containsKey(appId);
+  }
+
+  @Override
+  public long getBitmapNum(String appId, int shuffleId) {
+    return partitionsToBlockIds.get(appId).get(shuffleId).size();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java
 
b/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java
new file mode 100644
index 000000000..635e0c394
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.block;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.server.ShuffleTaskInfo;
+
+/** The implementation of this interface to manage the shuffle block ids. */
+public interface ShuffleBlockIdManager {
+  void registerAppId(String appId);
+
+  int addFinishedBlockIds(
+      ShuffleTaskInfo taskInfo,
+      String appId,
+      Integer shuffleId,
+      Map<Integer, long[]> partitionToBlockIds,
+      int bitmapNum);
+
+  byte[] getFinishedBlockIds(
+      ShuffleTaskInfo taskInfo,
+      String appId,
+      Integer shuffleId,
+      Set<Integer> partitions,
+      BlockIdLayout blockIdLayout)
+      throws IOException;
+
+  void removeBlockIdByShuffleId(String appId, List<Integer> shuffleIds);
+
+  void removeBlockIdByAppId(String appId);
+
+  long getTotalBlockCount();
+
+  boolean contains(String testAppId);
+
+  long getBitmapNum(String appId, int shuffleId);
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManagerFactory.java
 
b/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManagerFactory.java
new file mode 100644
index 000000000..e76b8d72d
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManagerFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.block;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.server.ShuffleServerConf;
+
+public class ShuffleBlockIdManagerFactory {
+  public static ShuffleBlockIdManager 
createShuffleBlockIdManager(ShuffleServerConf conf) {
+    String className = 
conf.get(ShuffleServerConf.SERVER_BLOCK_ID_MANAGER_CLASS);
+    return createShuffleBlockIdManager(
+        className, ShuffleServerConf.SERVER_BLOCK_ID_MANAGER_CLASS.key());
+  }
+
+  public static ShuffleBlockIdManager createShuffleBlockIdManager(
+      String className, String configKey) {
+    if (StringUtils.isEmpty(className)) {
+      throw new IllegalStateException(
+          "Configuration error: " + configKey + " should not set to empty");
+    }
+
+    try {
+      return (ShuffleBlockIdManager) 
RssUtils.getConstructor(className).newInstance();
+    } catch (Exception e) {
+      throw new IllegalStateException(
+          "Configuration error: " + configKey + " is failed to create instance 
of " + className, e);
+    }
+  }
+}
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 2a67c548e..c6484b175 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -59,6 +59,7 @@ import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.server.block.DefaultShuffleBlockIdManager;
 import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
 import org.apache.uniffle.server.buffer.ShuffleBuffer;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
@@ -816,7 +817,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
       }
     }
     Roaring64NavigableMap resultBlockIds =
-        shuffleTaskManager.getBlockIdsByPartitionId(
+        DefaultShuffleBlockIdManager.getBlockIdsByPartitionId(
             Sets.newHashSet(expectedPartitionId),
             bitmapBlockIds,
             Roaring64NavigableMap.bitmapOf(),
@@ -825,7 +826,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
 
     bitmapBlockIds.addLong(layout.getBlockId(0, 0, 0));
     resultBlockIds =
-        shuffleTaskManager.getBlockIdsByPartitionId(
+        DefaultShuffleBlockIdManager.getBlockIdsByPartitionId(
             Sets.newHashSet(0), bitmapBlockIds, 
Roaring64NavigableMap.bitmapOf(), layout);
     assertEquals(Roaring64NavigableMap.bitmapOf(0L), resultBlockIds);
 
@@ -833,7 +834,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
         layout.getBlockId(layout.maxSequenceNo, layout.maxPartitionId, 
layout.maxTaskAttemptId);
     bitmapBlockIds.addLong(expectedBlockId);
     resultBlockIds =
-        shuffleTaskManager.getBlockIdsByPartitionId(
+        DefaultShuffleBlockIdManager.getBlockIdsByPartitionId(
             Sets.newHashSet(layout.maxPartitionId),
             bitmapBlockIds,
             Roaring64NavigableMap.bitmapOf(),
@@ -872,12 +873,12 @@ public class ShuffleTaskManagerTest extends 
HadoopTestBase {
     }
 
     Roaring64NavigableMap resultBlockIds =
-        shuffleTaskManager.getBlockIdsByPartitionId(
+        DefaultShuffleBlockIdManager.getBlockIdsByPartitionId(
             requestPartitions, bitmapBlockIds, 
Roaring64NavigableMap.bitmapOf(), layout);
     assertEquals(expectedBlockIds, resultBlockIds);
     assertEquals(
         bitmapBlockIds,
-        shuffleTaskManager.getBlockIdsByPartitionId(
+        DefaultShuffleBlockIdManager.getBlockIdsByPartitionId(
             allPartitions, bitmapBlockIds, Roaring64NavigableMap.bitmapOf(), 
layout));
   }
 

Reply via email to