This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new ad513416 [ISSUE-124] Add fallback mechanism for blocks read 
inconsistent (#276)
ad513416 is described below

commit ad513416c11d81e086436985b5b17f9448b5fa3c
Author: xianjingfeng <[email protected]>
AuthorDate: Tue Nov 29 00:03:46 2022 +0800

    [ISSUE-124] Add fallback mechanism for blocks read inconsistent (#276)
    
    ### What changes were proposed in this pull request?
    Add fallback mechanism for blocks read inconsistent
    
    ### Why are the changes needed?
    When the data in this first server is damaged, application will fail. #124 
#129
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Already added
---
 .../shuffle/reader/RssShuffleDataIteratorTest.java |  73 +++---
 .../spark/shuffle/reader/RssShuffleReaderTest.java |  14 +-
 .../spark/shuffle/reader/RssShuffleReaderTest.java |  17 +-
 .../uniffle/client/impl/ShuffleReadClientImpl.java |   9 +-
 .../java/org/apache/uniffle/client/TestUtils.java  |  27 +++
 .../client/impl/ShuffleReadClientImplTest.java     | 101 ++++----
 .../apache/uniffle/common/ShuffleServerInfo.java   |   7 +
 .../org/apache/uniffle/common/util/RssUtils.java   |  12 +
 .../apache/uniffle/test/IntegrationTestBase.java   |  12 +-
 .../test/MultiStorageFaultToleranceTest.java       |   2 +-
 .../test/ShuffleServerFaultToleranceTest.java      | 254 +++++++++++++++++++++
 .../uniffle/test/ShuffleServerWithHdfsTest.java    |  12 +-
 .../test/ShuffleServerWithKerberizedHdfsTest.java  |  27 ++-
 .../ShuffleServerWithLocalOfExceptionTest.java     |  17 +-
 .../test/ShuffleServerWithMemLocalHdfsTest.java    |  16 +-
 .../uniffle/test/ShuffleServerWithMemoryTest.java  |  38 +--
 .../uniffle/test/SparkClientWithLocalTest.java     |   2 +-
 .../storage/factory/ShuffleHandlerFactory.java     |  80 ++++---
 .../storage/handler/api/ClientReadHandler.java     |   1 -
 .../handler/impl/ComposedClientReadHandler.java    |  57 ++---
 .../handler/impl/HdfsClientReadHandler.java        |  10 +-
 .../handler/impl/LocalFileClientReadHandler.java   |  23 +-
 .../impl/LocalFileQuorumClientReadHandler.java     | 134 -----------
 .../impl/MemoryQuorumClientReadHandler.java        |  75 ------
 .../impl/MultiReplicaClientReadHandler.java        | 104 +++++++++
 25 files changed, 683 insertions(+), 441 deletions(-)

diff --git 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
index 78c3375b..77d6b31c 100644
--- 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
+++ 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
@@ -18,6 +18,7 @@
 package org.apache.spark.shuffle.reader;
 
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.Lists;
@@ -38,6 +39,7 @@ import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
 import org.apache.uniffle.client.util.ClientUtils;
 import org.apache.uniffle.client.util.DefaultIdHelper;
+import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.common.util.Constants;
@@ -58,11 +60,14 @@ public class RssShuffleDataIteratorTest extends 
AbstractRssReaderTest {
   private static final Serializer KRYO_SERIALIZER = new KryoSerializer(new 
SparkConf(false));
   private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should 
be thrown";
 
+  private ShuffleServerInfo ssi1 = new ShuffleServerInfo("host1-0", "host1", 
0);
+  private ShuffleServerInfo ssi2 = new ShuffleServerInfo("host2-0", "host2", 
0);
+
   @Test
   public void readTest1() throws Exception {
     String basePath = HDFS_URI + "readTest1";
     HdfsShuffleWriteHandler writeHandler =
-        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test1", conf);
+        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), 
conf);
 
     Map<String, String> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -70,12 +75,13 @@ public class RssShuffleDataIteratorTest extends 
AbstractRssReaderTest {
     writeTestData(writeHandler, 2, 5, expectedData,
         blockIdBitmap, "key", KRYO_SERIALIZER, 0);
 
-    RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath, 
blockIdBitmap, taskIdBitmap);
+    RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath, 
blockIdBitmap,
+        taskIdBitmap, Lists.newArrayList(ssi1));
 
     validateResult(rssShuffleDataIterator, expectedData, 10);
 
     blockIdBitmap.add(ClientUtils.getBlockId(0, 0, Constants.MAX_SEQUENCE_NO));
-    rssShuffleDataIterator = getDataIterator(basePath, blockIdBitmap, 
taskIdBitmap);
+    rssShuffleDataIterator = getDataIterator(basePath, blockIdBitmap, 
taskIdBitmap, Lists.newArrayList(ssi1));
     int recNum = 0;
     try {
       // can't find all expected block id, data loss
@@ -91,10 +97,10 @@ public class RssShuffleDataIteratorTest extends 
AbstractRssReaderTest {
   }
 
   private RssShuffleDataIterator getDataIterator(String basePath, 
Roaring64NavigableMap blockIdBitmap,
-      Roaring64NavigableMap taskIdBitmap) {
+      Roaring64NavigableMap taskIdBitmap, List<ShuffleServerInfo> serverInfos) 
{
     ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(
         StorageType.HDFS.name(), "appId", 0, 1, 100, 2,
-        10, 10000, basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(),
+        10, 10000, basePath, blockIdBitmap, taskIdBitmap, 
Lists.newArrayList(serverInfos),
         new Configuration(), new DefaultIdHelper());
     return new RssShuffleDataIterator(KRYO_SERIALIZER, readClient,
         new ShuffleReadMetrics(), new RssConf());
@@ -104,9 +110,9 @@ public class RssShuffleDataIteratorTest extends 
AbstractRssReaderTest {
   public void readTest2() throws Exception {
     String basePath = HDFS_URI + "readTest2";
     HdfsShuffleWriteHandler writeHandler1 =
-        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test2_1", 
conf);
+        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), 
conf);
     HdfsShuffleWriteHandler writeHandler2 =
-        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test2_2", 
conf);
+        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), 
conf);
 
     Map<String, String> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -116,7 +122,8 @@ public class RssShuffleDataIteratorTest extends 
AbstractRssReaderTest {
     writeTestData(writeHandler2, 2, 5, expectedData,
         blockIdBitmap, "key2", KRYO_SERIALIZER, 0);
 
-    RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath, 
blockIdBitmap, taskIdBitmap);
+    RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath, 
blockIdBitmap,
+        taskIdBitmap, Lists.newArrayList(ssi1, ssi2));
 
     validateResult(rssShuffleDataIterator, expectedData, 20);
     assertEquals(20, 
rssShuffleDataIterator.getShuffleReadMetrics().recordsRead());
@@ -127,9 +134,9 @@ public class RssShuffleDataIteratorTest extends 
AbstractRssReaderTest {
   public void readTest3() throws Exception {
     String basePath = HDFS_URI + "readTest3";
     HdfsShuffleWriteHandler writeHandler1 =
-        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test3_1", 
conf);
+        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), 
conf);
     HdfsShuffleWriteHandler writeHandler2 =
-        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test3_2", 
conf);
+        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), 
conf);
 
     Map<String, String> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -141,16 +148,19 @@ public class RssShuffleDataIteratorTest extends 
AbstractRssReaderTest {
 
     // duplicate file created, it should be used in product environment
     String shuffleFolder = basePath + "/appId/0/0-1";
-    FileUtil.copy(fs, new Path(shuffleFolder + "/test3_1_0.data"), fs,
-        new Path(shuffleFolder + "/test3_1_0.cp.data"), false, conf);
-    FileUtil.copy(fs, new Path(shuffleFolder + "/test3_1_0.index"), fs,
-        new Path(shuffleFolder + "/test3_1_0.cp.index"), false, conf);
-    FileUtil.copy(fs, new Path(shuffleFolder + "/test3_2_0.data"), fs,
-        new Path(shuffleFolder + "/test3_2_0.cp.data"), false, conf);
-    FileUtil.copy(fs, new Path(shuffleFolder + "/test3_2_0.index"), fs,
-        new Path(shuffleFolder + "/test3_2_0.cp.index"), false, conf);
-
-    RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath, 
blockIdBitmap, taskIdBitmap);
+    String ssi1Prefix = shuffleFolder + "/" + ssi1.getId();
+    String ssi2Prefix = shuffleFolder + "/" + ssi2.getId();
+    FileUtil.copy(fs, new Path(ssi1Prefix + "_0.data"), fs,
+        new Path(ssi1Prefix + "_0.cp.data"), false, conf);
+    FileUtil.copy(fs, new Path(ssi1Prefix + "_0.index"), fs,
+        new Path(ssi1Prefix + "_0.cp.index"), false, conf);
+    FileUtil.copy(fs, new Path(ssi2Prefix + "_0.data"), fs,
+        new Path(ssi2Prefix + "_0.cp.data"), false, conf);
+    FileUtil.copy(fs, new Path(ssi2Prefix + "_0.index"), fs,
+        new Path(ssi2Prefix + "_0.cp.index"), false, conf);
+
+    RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath, 
blockIdBitmap,
+        taskIdBitmap, Lists.newArrayList(ssi1, ssi2));
 
     validateResult(rssShuffleDataIterator, expectedData, 20);
   }
@@ -159,7 +169,7 @@ public class RssShuffleDataIteratorTest extends 
AbstractRssReaderTest {
   public void readTest4() throws Exception {
     String basePath = HDFS_URI + "readTest4";
     HdfsShuffleWriteHandler writeHandler =
-        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test1", conf);
+        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), 
conf);
 
     Map<String, String> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -167,12 +177,11 @@ public class RssShuffleDataIteratorTest extends 
AbstractRssReaderTest {
     writeTestData(writeHandler, 2, 5, expectedData,
         blockIdBitmap, "key", KRYO_SERIALIZER, 0);
 
-    RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath, 
blockIdBitmap, taskIdBitmap);
+    RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath, 
blockIdBitmap,
+        taskIdBitmap, Lists.newArrayList(ssi1));
     // data file is deleted after iterator initialization
-    Path dataFile = new Path(basePath + "/appId/0/0-1/test1_0.data");
+    Path dataFile = new Path(basePath + "/appId/0/0-1/" + ssi1.getId() + 
"_0.data");
     fs.delete(dataFile, true);
-    // sleep to wait delete operation
-    Thread.sleep(10000);
     try {
       fs.listStatus(dataFile);
       fail("Index file should be deleted");
@@ -194,7 +203,7 @@ public class RssShuffleDataIteratorTest extends 
AbstractRssReaderTest {
   public void readTest5() throws Exception {
     String basePath = HDFS_URI + "readTest5";
     HdfsShuffleWriteHandler writeHandler =
-        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test", conf);
+        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), 
conf);
 
     Map<String, String> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -202,12 +211,11 @@ public class RssShuffleDataIteratorTest extends 
AbstractRssReaderTest {
     writeTestData(writeHandler, 2, 5, expectedData,
         blockIdBitmap, "key", KRYO_SERIALIZER, 0);
 
-    final RssShuffleDataIterator rssShuffleDataIterator = 
getDataIterator(basePath, blockIdBitmap, taskIdBitmap);
+    final RssShuffleDataIterator rssShuffleDataIterator = 
getDataIterator(basePath, blockIdBitmap,
+        taskIdBitmap, Lists.newArrayList(ssi1));
     // index file is deleted after iterator initialization, it should be ok, 
all index infos are read already
-    Path indexFile = new Path(basePath + "/appId/0/0-1/test.index");
+    Path indexFile = new Path(basePath + "/appId/0/0-1/" + ssi1.getId() + 
".index");
     fs.delete(indexFile, true);
-    // sleep to wait delete operation
-    Thread.sleep(10000);
     try {
       fs.listStatus(indexFile);
       fail("Index file should be deleted");
@@ -221,7 +229,7 @@ public class RssShuffleDataIteratorTest extends 
AbstractRssReaderTest {
   public void readTest7() throws Exception {
     String basePath = HDFS_URI + "readTest7";
     HdfsShuffleWriteHandler writeHandler =
-        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test", conf);
+        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), 
conf);
 
     Map<String, String> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -229,7 +237,8 @@ public class RssShuffleDataIteratorTest extends 
AbstractRssReaderTest {
     writeTestData(writeHandler, 2, 5, expectedData,
         blockIdBitmap, "key", KRYO_SERIALIZER, 0);
 
-    RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath, 
blockIdBitmap, taskIdBitmap);
+    RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath, 
blockIdBitmap,
+        taskIdBitmap, Lists.newArrayList(ssi1));
 
     // crc32 is incorrect
     try (MockedStatic<ChecksumUtils> checksumUtilsMock = 
Mockito.mockStatic(ChecksumUtils.class)) {
diff --git 
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
 
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
index ce33f47c..fa94faf8 100644
--- 
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
+++ 
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
@@ -17,8 +17,11 @@
 
 package org.apache.spark.shuffle.reader;
 
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
@@ -31,6 +34,7 @@ import org.junit.jupiter.api.Test;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import scala.Option;
 
+import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler;
 import org.apache.uniffle.storage.util.StorageType;
@@ -46,10 +50,10 @@ public class RssShuffleReaderTest extends 
AbstractRssReaderTest {
 
   @Test
   public void readTest() throws Exception {
-
+    ShuffleServerInfo ssi = new ShuffleServerInfo("127.0.0.1", 0);
     String basePath = HDFS_URI + "readTest1";
     HdfsShuffleWriteHandler writeHandler =
-        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test", conf);
+        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi.getId(), 
conf);
 
     Map<String, String> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -57,13 +61,17 @@ public class RssShuffleReaderTest extends 
AbstractRssReaderTest {
     writeTestData(writeHandler, 2, 5, expectedData,
         blockIdBitmap, "key", KRYO_SERIALIZER, 0);
 
-    TaskContext contextMock = mock(TaskContext.class);
     RssShuffleHandle handleMock = mock(RssShuffleHandle.class);
     ShuffleDependency dependencyMock = mock(ShuffleDependency.class);
     when(handleMock.getAppId()).thenReturn("appId");
     when(handleMock.getShuffleId()).thenReturn(1);
     when(handleMock.getDependency()).thenReturn(dependencyMock);
+    Map<Integer, List<ShuffleServerInfo>> partitionToServers = new HashMap<>();
+    partitionToServers.put(0, Lists.newArrayList(ssi));
+    partitionToServers.put(1, Lists.newArrayList(ssi));
+    when(handleMock.getPartitionToServers()).thenReturn(partitionToServers);
     when(dependencyMock.serializer()).thenReturn(KRYO_SERIALIZER);
+    TaskContext contextMock = mock(TaskContext.class);
     when(contextMock.taskAttemptId()).thenReturn(1L);
     when(contextMock.attemptNumber()).thenReturn(1);
     when(contextMock.taskMetrics()).thenReturn(new TaskMetrics());
diff --git 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
index 213061a4..c0988e2e 100644
--- 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
+++ 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
@@ -17,8 +17,11 @@
 
 package org.apache.spark.shuffle.reader;
 
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
@@ -33,6 +36,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import scala.Option;
 
 import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler;
 import org.apache.uniffle.storage.util.StorageType;
@@ -42,19 +46,18 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
-
 public class RssShuffleReaderTest extends AbstractRssReaderTest {
 
   private static final Serializer KRYO_SERIALIZER = new KryoSerializer(new 
SparkConf(false));
 
   @Test
   public void readTest() throws Exception {
-
+    ShuffleServerInfo ssi = new ShuffleServerInfo("127.0.0.1", 0);
     String basePath = HDFS_URI + "readTest1";
     HdfsShuffleWriteHandler writeHandler =
-        new HdfsShuffleWriteHandler("appId", 0, 0, 0, basePath, "test", conf);
+        new HdfsShuffleWriteHandler("appId", 0, 0, 0, basePath, ssi.getId(), 
conf);
     final HdfsShuffleWriteHandler writeHandler1 =
-        new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test", conf);
+        new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi.getId(), 
conf);
 
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
     final Roaring64NavigableMap taskIdBitmap = 
Roaring64NavigableMap.bitmapOf(0);
@@ -63,13 +66,17 @@ public class RssShuffleReaderTest extends 
AbstractRssReaderTest {
     writeTestData(writeHandler, 2, 5, expectedData,
         blockIdBitmap, "key", KRYO_SERIALIZER, 0);
 
-    TaskContext contextMock = mock(TaskContext.class);
     RssShuffleHandle handleMock = mock(RssShuffleHandle.class);
     ShuffleDependency dependencyMock = mock(ShuffleDependency.class);
     when(handleMock.getAppId()).thenReturn("appId");
     when(handleMock.getDependency()).thenReturn(dependencyMock);
     when(handleMock.getShuffleId()).thenReturn(1);
+    Map<Integer, List<ShuffleServerInfo>> partitionToServers = new HashMap<>();
+    partitionToServers.put(0, Lists.newArrayList(ssi));
+    partitionToServers.put(1, Lists.newArrayList(ssi));
+    when(handleMock.getPartitionToServers()).thenReturn(partitionToServers);
     when(dependencyMock.serializer()).thenReturn(KRYO_SERIALIZER);
+    TaskContext contextMock = mock(TaskContext.class);
     when(contextMock.attemptNumber()).thenReturn(1);
     when(contextMock.taskAttemptId()).thenReturn(1L);
     when(contextMock.taskMetrics()).thenReturn(new TaskMetrics());
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 99387f59..8a57cbb0 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -47,7 +47,6 @@ import 
org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
 public class ShuffleReadClientImpl implements ShuffleReadClient {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleReadClientImpl.class);
-
   private int shuffleId;
   private int partitionId;
   private byte[] readBuffer;
@@ -232,13 +231,7 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
 
   @Override
   public void checkProcessedBlockIds() {
-    Roaring64NavigableMap cloneBitmap;
-    cloneBitmap = RssUtils.cloneBitMap(blockIdBitmap);
-    cloneBitmap.and(processedBlockIds);
-    if (!blockIdBitmap.equals(cloneBitmap)) {
-      throw new RssException("Blocks read inconsistent: expected " + 
blockIdBitmap.getLongCardinality()
-          + " blocks, actual " + cloneBitmap.getLongCardinality() + " blocks");
-    }
+    RssUtils.checkProcessedBlockIds(blockIdBitmap, processedBlockIds);
   }
 
   @Override
diff --git a/client/src/test/java/org/apache/uniffle/client/TestUtils.java 
b/client/src/test/java/org/apache/uniffle/client/TestUtils.java
index cbb441df..284d22e8 100644
--- a/client/src/test/java/org/apache/uniffle/client/TestUtils.java
+++ b/client/src/test/java/org/apache/uniffle/client/TestUtils.java
@@ -18,12 +18,16 @@
 package org.apache.uniffle.client;
 
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
+import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.ShuffleDataResult;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestUtils {
@@ -55,6 +59,29 @@ public class TestUtils {
     assertEquals(expectedData.size(), blockNum);
   }
 
+  public static void validateResult(
+      Map<Long, byte[]> expectedData,
+      ShuffleDataResult sdr) {
+    byte[] buffer = sdr.getData();
+    List<BufferSegment> bufferSegments = sdr.getBufferSegments();
+    assertEquals(expectedData.size(), bufferSegments.size());
+    for (Map.Entry<Long, byte[]> entry : expectedData.entrySet()) {
+      BufferSegment bs = findBufferSegment(entry.getKey(), bufferSegments);
+      assertNotNull(bs);
+      byte[] data = new byte[bs.getLength()];
+      System.arraycopy(buffer, bs.getOffset(), data, 0, bs.getLength());
+    }
+  }
+
+  private static BufferSegment findBufferSegment(long blockId, 
List<BufferSegment> bufferSegments) {
+    for (BufferSegment bs : bufferSegments) {
+      if (bs.getBlockId() == blockId) {
+        return bs;
+      }
+    }
+    return null;
+  }
+
   public static boolean compareByte(byte[] expected, ByteBuffer buffer) {
     int start = buffer.position();
     for (int i = 0; i < expected.length; i++) {
diff --git 
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
 
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
index 2589c195..e2f41536 100644
--- 
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
+++ 
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
@@ -37,6 +37,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.apache.uniffle.client.TestUtils;
 import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.HdfsTestBase;
@@ -54,11 +55,14 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
   private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should 
be thrown";
   private static AtomicLong ATOMIC_LONG = new AtomicLong(0);
 
+  private ShuffleServerInfo ssi1 = new ShuffleServerInfo("host1-0", "host1", 
0);
+  private ShuffleServerInfo ssi2 = new ShuffleServerInfo("host2-0", "host2", 
0);
+
   @Test
   public void readTest1() throws Exception {
     String basePath = HDFS_URI + "clientReadTest1";
     HdfsShuffleWriteHandler writeHandler =
-        new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test1", conf);
+        new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), 
conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -68,7 +72,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
 
     ShuffleReadClientImpl readClient = new 
ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 100, 1,
         10, 1000, basePath, blockIdBitmap, taskIdBitmap,
-        Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+        Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
 
     TestUtils.validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
@@ -78,7 +82,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
     taskIdBitmap.addLong(Constants.MAX_TASK_ATTEMPT_ID - 1);
     readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 
0, 1, 100, 1,
         10, 1000, basePath, blockIdBitmap, taskIdBitmap,
-        Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+        Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
     TestUtils.validateResult(readClient, expectedData);
     try {
       // can't find all expected block id, data loss
@@ -95,9 +99,9 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
   public void readTest2() throws Exception {
     String basePath = HDFS_URI + "clientReadTest2";
     HdfsShuffleWriteHandler writeHandler1 =
-        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test2_1", 
conf);
+        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), 
conf);
     HdfsShuffleWriteHandler writeHandler2 =
-        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test2_2", 
conf);
+        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), 
conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -107,7 +111,8 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
 
     ShuffleReadClientImpl readClient = new 
ShuffleReadClientImpl(StorageType.HDFS.name(),
         "appId", 0, 1, 100, 2, 10, 1000,
-        basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new 
Configuration(), new DefaultIdHelper());
+        basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1, ssi2),
+        new Configuration(), new DefaultIdHelper());
 
     TestUtils.validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
@@ -118,9 +123,9 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
   public void readTest3() throws Exception {
     String basePath = HDFS_URI + "clientReadTest3";
     HdfsShuffleWriteHandler writeHandler1 =
-        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test3_1", 
conf);
+        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), 
conf);
     HdfsShuffleWriteHandler writeHandler2 =
-        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test3_2", 
conf);
+        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), 
conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     final Roaring64NavigableMap blockIdBitmap = 
Roaring64NavigableMap.bitmapOf();
@@ -130,18 +135,19 @@ public class ShuffleReadClientImplTest extends 
HdfsTestBase {
 
     // duplicate file created, it should be used in product environment
     String shuffleFolder = basePath + "/appId/0/0-1";
-    FileUtil.copy(fs, new Path(shuffleFolder + "/test3_1_0.data"), fs,
-        new Path(basePath + "/test3_1.cp.data"), false, conf);
-    FileUtil.copy(fs, new Path(shuffleFolder + "/test3_1_0.index"), fs,
-        new Path(basePath + "/test3_1.cp.index"), false, conf);
-    FileUtil.copy(fs, new Path(shuffleFolder + "/test3_2_0.data"), fs,
-        new Path(basePath + "/test3_2.cp.data"), false, conf);
-    FileUtil.copy(fs, new Path(shuffleFolder + "/test3_2_0.index"), fs,
-        new Path(basePath + "/test3_2.cp.index"), false, conf);
+    FileUtil.copy(fs, new Path(shuffleFolder + "/" + ssi1.getId() + 
"_0.data"), fs,
+        new Path(basePath + "/" + ssi1.getId() + ".cp.data"), false, conf);
+    FileUtil.copy(fs, new Path(shuffleFolder + "/" + ssi1.getId() + 
"_0.index"), fs,
+        new Path(basePath + "/" + ssi1.getId() + ".cp.index"), false, conf);
+    FileUtil.copy(fs, new Path(shuffleFolder + "/" + ssi2.getId() + 
"_0.data"), fs,
+        new Path(basePath + "/" + ssi2.getId() + ".cp.data"), false, conf);
+    FileUtil.copy(fs, new Path(shuffleFolder + "/" + ssi2.getId() + 
"_0.index"), fs,
+        new Path(basePath + "/" + ssi2.getId() + ".cp.index"), false, conf);
 
     ShuffleReadClientImpl readClient = new 
ShuffleReadClientImpl(StorageType.HDFS.name(),
         "appId", 0, 1, 100, 2, 10, 1000,
-        basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new 
Configuration(), new DefaultIdHelper());
+        basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1, ssi2),
+        new Configuration(), new DefaultIdHelper());
 
     TestUtils.validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
@@ -152,7 +158,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
   public void readTest4() throws Exception {
     String basePath = HDFS_URI + "clientReadTest4";
     HdfsShuffleWriteHandler writeHandler =
-        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test1", conf);
+        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), 
conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -161,12 +167,10 @@ public class ShuffleReadClientImplTest extends 
HdfsTestBase {
 
     ShuffleReadClientImpl readClient = new 
ShuffleReadClientImpl(StorageType.HDFS.name(),
         "appId", 0, 1, 100, 2, 10, 1000,
-        basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new 
Configuration(), new DefaultIdHelper());
-    Path dataFile = new Path(basePath + "/appId/0/0-1/test1_0.data");
+        basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1), new 
Configuration(), new DefaultIdHelper());
+    Path dataFile = new Path(basePath + "/appId/0/0-1/" + ssi1.getId() + 
"_0.data");
     // data file is deleted after readClient checkExpectedBlockIds
-    fs.delete(new Path(basePath + "/appId/0/0-1/test1_0.data"), true);
-    // sleep to wait delete operation
-    Thread.sleep(10000);
+    fs.delete(dataFile, true);
 
     assertNull(readClient.readShuffleBlockData());
     try {
@@ -189,7 +193,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
   public void readTest5() throws Exception {
     String basePath = HDFS_URI + "clientReadTest5";
     HdfsShuffleWriteHandler writeHandler =
-        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test", conf);
+        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), 
conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -198,9 +202,9 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
 
     ShuffleReadClientImpl readClient = new 
ShuffleReadClientImpl(StorageType.HDFS.name(),
         "appId", 0, 1, 100, 2, 10, 1000,
-        basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new 
Configuration(), new DefaultIdHelper());
+        basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1), new 
Configuration(), new DefaultIdHelper());
     // index file is deleted after iterator initialization, it should be ok, 
all index infos are read already
-    Path indexFile = new Path(basePath + "/appId/0/0-1/test_0.index");
+    Path indexFile = new Path(basePath + "/appId/0/0-1/" + ssi1.getId() + 
"_0.index");
     fs.delete(indexFile, true);
     readClient.close();
 
@@ -211,7 +215,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
   public void readTest7() throws Exception {
     String basePath = HDFS_URI + "clientReadTest7";
     HdfsShuffleWriteHandler writeHandler =
-        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test", conf);
+        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), 
conf);
 
     Map<Long, byte[]> expectedData1 = Maps.newHashMap();
     Map<Long, byte[]> expectedData2 = Maps.newHashMap();
@@ -226,10 +230,10 @@ public class ShuffleReadClientImplTest extends 
HdfsTestBase {
 
     ShuffleReadClientImpl readClient1 = new 
ShuffleReadClientImpl(StorageType.HDFS.name(),
         "appId", 0, 0, 100, 2, 10, 100,
-        basePath, blockIdBitmap1, taskIdBitmap, Lists.newArrayList(), new 
Configuration(), new DefaultIdHelper());
+        basePath, blockIdBitmap1, taskIdBitmap, Lists.newArrayList(ssi1), new 
Configuration(), new DefaultIdHelper());
     final ShuffleReadClientImpl readClient2 = new 
ShuffleReadClientImpl(StorageType.HDFS.name(),
         "appId", 0, 1, 100, 2, 10, 100,
-        basePath, blockIdBitmap2, taskIdBitmap, Lists.newArrayList(), new 
Configuration(), new DefaultIdHelper());
+        basePath, blockIdBitmap2, taskIdBitmap, Lists.newArrayList(ssi1), new 
Configuration(), new DefaultIdHelper());
     TestUtils.validateResult(readClient1, expectedData1);
     readClient1.checkProcessedBlockIds();
     readClient1.close();
@@ -243,7 +247,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
   public void readTest8() throws Exception {
     String basePath = HDFS_URI + "clientReadTest8";
     HdfsShuffleWriteHandler writeHandler =
-        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test", conf);
+        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), 
conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -252,7 +256,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
 
     ShuffleReadClientImpl readClient = new 
ShuffleReadClientImpl(StorageType.HDFS.name(),
         "appId", 0, 1, 100, 2, 10, 1000,
-        basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new 
Configuration(), new DefaultIdHelper());
+        basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1), new 
Configuration(), new DefaultIdHelper());
     // crc32 is incorrect
     try (MockedStatic<ChecksumUtils> checksumUtilsMock = 
Mockito.mockStatic(ChecksumUtils.class)) {
       checksumUtilsMock.when(() -> ChecksumUtils.getCrc32((ByteBuffer) 
any())).thenReturn(-1L);
@@ -275,7 +279,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
     ShuffleReadClientImpl readClient = new 
ShuffleReadClientImpl(StorageType.HDFS.name(),
         "appId", 0, 1, 100, 2, 10, 1000,
         "basePath", Roaring64NavigableMap.bitmapOf(), 
Roaring64NavigableMap.bitmapOf(),
-        Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+        Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
     assertNull(readClient.readShuffleBlockData());
     readClient.checkProcessedBlockIds();
   }
@@ -284,7 +288,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
   public void readTest10() throws Exception {
     String basePath = HDFS_URI + "clientReadTest10";
     HdfsShuffleWriteHandler writeHandler =
-        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test", conf);
+        new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), 
conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -298,7 +302,8 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
 
     ShuffleReadClientImpl readClient = new 
ShuffleReadClientImpl(StorageType.HDFS.name(),
         "appId", 0, 0, 100, 2, 10, 100,
-        basePath, wrongBlockIdBitmap, taskIdBitmap, Lists.newArrayList(), new 
Configuration(), new DefaultIdHelper());
+        basePath, wrongBlockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1),
+        new Configuration(), new DefaultIdHelper());
     assertNull(readClient.readShuffleBlockData());
     try {
       readClient.checkProcessedBlockIds();
@@ -312,7 +317,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
   public void readTest11() throws Exception {
     String basePath = HDFS_URI + "clientReadTest11";
     HdfsShuffleWriteHandler writeHandler =
-        new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test1", conf);
+        new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), 
conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -322,7 +327,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
     // test with different indexReadLimit to validate result
     ShuffleReadClientImpl readClient = new 
ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 1, 1,
         10, 1000, basePath, blockIdBitmap, taskIdBitmap,
-        Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+        Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
 
     TestUtils.validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
@@ -330,7 +335,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
 
     readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 
0, 1, 2, 1,
         10, 1000, basePath, blockIdBitmap, taskIdBitmap,
-        Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+        Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
 
     TestUtils.validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
@@ -338,7 +343,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
 
     readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 
0, 1, 3, 1,
         10, 1000, basePath, blockIdBitmap, taskIdBitmap,
-        Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+        Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
 
     TestUtils.validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
@@ -346,7 +351,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
 
     readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 
0, 1, 10, 1,
         10, 1000, basePath, blockIdBitmap, taskIdBitmap,
-        Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+        Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
 
     TestUtils.validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
@@ -354,7 +359,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
 
     readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 
0, 1, 11, 1,
         10, 1000, basePath, blockIdBitmap, taskIdBitmap,
-        Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+        Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
 
     TestUtils.validateResult(readClient, expectedData);
     readClient.checkProcessedBlockIds();
@@ -365,7 +370,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
   public void readTest12() throws Exception {
     String basePath = HDFS_URI + "clientReadTest12";
     HdfsShuffleWriteHandler writeHandler =
-        new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test1", conf);
+        new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), 
conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     final Roaring64NavigableMap blockIdBitmap = 
Roaring64NavigableMap.bitmapOf();
@@ -377,7 +382,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
     // unexpected taskAttemptId should be filtered
     ShuffleReadClientImpl readClient = new 
ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 100, 1,
         10, 1000, basePath, blockIdBitmap, taskIdBitmap,
-        Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+        Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
 
     TestUtils.validateResult(readClient, expectedData);
     assertEquals(15, readClient.getProcessedBlockIds().getLongCardinality());
@@ -389,7 +394,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
   public void readTest13() throws Exception {
     String basePath = HDFS_URI + "clientReadTest13";
     HdfsShuffleWriteHandler writeHandler =
-        new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test1", conf);
+        new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), 
conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     final Roaring64NavigableMap blockIdBitmap = 
Roaring64NavigableMap.bitmapOf();
@@ -404,7 +409,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
     // unexpected taskAttemptId should be filtered
     ShuffleReadClientImpl readClient = new 
ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 100, 1,
         10, 1000, basePath, blockIdBitmap, taskIdBitmap,
-        Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+        Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
 
     TestUtils.validateResult(readClient, expectedData);
     assertEquals(20, readClient.getProcessedBlockIds().getLongCardinality());
@@ -416,7 +421,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
   public void readTest14() throws Exception {
     String basePath = HDFS_URI + "clientReadTest14";
     HdfsShuffleWriteHandler writeHandler =
-        new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test1", conf);
+        new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), 
conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     final Roaring64NavigableMap blockIdBitmap = 
Roaring64NavigableMap.bitmapOf();
@@ -428,7 +433,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
     // unexpected taskAttemptId should be filtered
     ShuffleReadClientImpl readClient = new 
ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 100, 1,
         10, 1000, basePath, blockIdBitmap, taskIdBitmap,
-        Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+        Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
 
     TestUtils.validateResult(readClient, expectedData);
     assertEquals(15, readClient.getProcessedBlockIds().getLongCardinality());
@@ -440,7 +445,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
   public void readTest15() throws Exception {
     String basePath = HDFS_URI + "clientReadTest15";
     HdfsShuffleWriteHandler writeHandler =
-        new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test1", conf);
+        new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), 
conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     final Roaring64NavigableMap blockIdBitmap = 
Roaring64NavigableMap.bitmapOf();
@@ -454,7 +459,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase 
{
     // unexpected taskAttemptId should be filtered
     ShuffleReadClientImpl readClient = new 
ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 100, 1,
         10, 1000, basePath, blockIdBitmap, taskIdBitmap,
-        Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+        Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
 
     TestUtils.validateResult(readClient, expectedData);
     assertEquals(25, readClient.getProcessedBlockIds().getLongCardinality());
diff --git 
a/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java 
b/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java
index 2174f35d..eb503d90 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java
@@ -27,6 +27,13 @@ public class ShuffleServerInfo implements Serializable {
 
   private int port;
 
+  // Only for test
+  public ShuffleServerInfo(String host, int port) {
+    this.id = host + "-" + port;
+    this.host = host;
+    this.port = port;
+  }
+
   public ShuffleServerInfo(String id, String host, int port) {
     this.id = id;
     this.host = host;
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 5319444e..fb05894e 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -48,6 +48,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.exception.RssException;
 
 public class RssUtils {
 
@@ -271,4 +272,15 @@ public class RssUtils {
     }
     return serverToPartitions;
   }
+
+  public static void checkProcessedBlockIds(Roaring64NavigableMap 
blockIdBitmap,
+                                            Roaring64NavigableMap 
processedBlockIds) {
+    Roaring64NavigableMap cloneBitmap;
+    cloneBitmap = RssUtils.cloneBitMap(blockIdBitmap);
+    cloneBitmap.and(processedBlockIds);
+    if (!blockIdBitmap.equals(cloneBitmap)) {
+      throw new RssException("Blocks read inconsistent: expected " + 
blockIdBitmap.getLongCardinality()
+          + " blocks, actual " + cloneBitmap.getLongCardinality() + " blocks");
+    }
+  }
 }
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
index 06e3be7b..ee58ade8 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import com.google.common.collect.Lists;
 import org.junit.jupiter.api.AfterAll;
 
+import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.coordinator.CoordinatorMetrics;
 import org.apache.uniffle.coordinator.CoordinatorServer;
@@ -40,7 +41,16 @@ import org.apache.uniffle.storage.util.StorageType;
 public abstract class IntegrationTestBase extends HdfsTestBase {
 
   protected static final int SHUFFLE_SERVER_PORT = 20001;
-  protected static final String LOCALHOST = "127.0.0.1";
+  protected static final String LOCALHOST;
+
+  static {
+    try {
+      LOCALHOST = RssUtils.getHostIp();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   protected static final int COORDINATOR_PORT_1 = 19999;
   protected static final int COORDINATOR_PORT_2 = 20030;
   protected static final int JETTY_PORT_1 = 19998;
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
index 5f76be5b..6e0145d8 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
@@ -135,7 +135,7 @@ public class MultiStorageFaultToleranceTest extends 
ShuffleReadWriteBase {
                                 Roaring64NavigableMap taskBitmap, Map<Long, 
byte[]> expectedData) {
     ShuffleReadClientImpl readClient = new 
ShuffleReadClientImpl(StorageType.LOCALFILE_HDFS.name(),
         appId, shuffleId, partitionId, 100, 1, 10, 1000, REMOTE_STORAGE, 
blockBitmap, taskBitmap,
-        Lists.newArrayList(new ShuffleServerInfo("test", LOCALHOST, 
SHUFFLE_SERVER_PORT)), conf, new DefaultIdHelper());
+        Lists.newArrayList(new ShuffleServerInfo(LOCALHOST, 
SHUFFLE_SERVER_PORT)), conf, new DefaultIdHelper());
     CompressedShuffleBlock csb = readClient.readShuffleBlockData();
     Roaring64NavigableMap matched = Roaring64NavigableMap.bitmapOf();
     while (csb != null && csb.getByteBuffer() != null) {
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
new file mode 100644
index 00000000..636ba65f
--- /dev/null
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
+import org.apache.uniffle.client.TestUtils;
+import org.apache.uniffle.client.api.ShuffleServerClient;
+import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
+import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
+import org.apache.uniffle.client.request.RssSendCommitRequest;
+import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.CoordinatorServer;
+import org.apache.uniffle.server.MockedShuffleServer;
+import org.apache.uniffle.server.ShuffleServer;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.buffer.ShuffleBuffer;
+import org.apache.uniffle.storage.factory.ShuffleHandlerFactory;
+import org.apache.uniffle.storage.handler.api.ClientReadHandler;
+import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ShuffleServerFaultToleranceTest extends ShuffleReadWriteBase {
+
+  private List<ShuffleServerClient> shuffleServerClients;
+
+  private String remoteStoragePath = HDFS_URI + "rss/test";
+
+  @BeforeAll
+  public static void setupServers() throws Exception {
+    CoordinatorConf coordinatorConf = getCoordinatorConf();
+    createCoordinatorServer(coordinatorConf);
+    shuffleServers.add(createServer(0));
+    shuffleServers.add(createServer(1));
+    shuffleServers.add(createServer(2));
+    startServers();
+  }
+
+  @BeforeEach
+  public void createClient() {
+    shuffleServerClients = new ArrayList<>();
+    for (ShuffleServer shuffleServer : shuffleServers) {
+      shuffleServerClients.add(new 
ShuffleServerGrpcClient(shuffleServer.getIp(), shuffleServer.getPort()));
+    }
+  }
+
+  @AfterEach
+  public void cleanEnv() throws Exception {
+    shuffleServerClients.forEach((client) -> {
+      client.close();
+    });
+    cleanCluster();
+    setupServers();
+  }
+
+  @Test
+  public void testReadFaultTolerance() throws Exception {
+    String testAppId = 
"ShuffleServerFaultToleranceTest.testReadFaultTolerance";
+    int shuffleId = 0;
+    int partitionId = 0;
+    RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest(testAppId, 
shuffleId,
+        Lists.newArrayList(new PartitionRange(0, 0)), remoteStoragePath);
+    registerShuffle(rrsr);
+    Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+    Map<Long, byte[]> dataMap = Maps.newHashMap();
+    Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[1];
+    bitmaps[0] = Roaring64NavigableMap.bitmapOf();
+    List<ShuffleBlockInfo> blocks = createShuffleBlockList(
+        shuffleId, partitionId, 0, 3, 25,
+        expectBlockIds, dataMap, mockSSI);
+
+    RssSendShuffleDataRequest rssdr = getRssSendShuffleDataRequest(testAppId, 
shuffleId, partitionId, blocks);
+    shuffleServerClients.get(1).sendShuffleData(rssdr);
+
+    List<ShuffleServerInfo> shuffleServerInfoList = new ArrayList<>();
+    for (ShuffleServer shuffleServer : shuffleServers) {
+      shuffleServerInfoList.add(new ShuffleServerInfo(shuffleServer.getId(),
+          shuffleServer.getIp(), shuffleServer.getPort()));
+    }
+
+    CreateShuffleReadHandlerRequest request = 
mockCreateShuffleReadHandlerRequest(
+        testAppId, shuffleId, partitionId, shuffleServerInfoList, 
expectBlockIds, StorageType.MEMORY_LOCALFILE);
+    ClientReadHandler clientReadHandler = 
ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
+    Map<Long, byte[]> expectedData = Maps.newHashMap();
+    expectedData.clear();
+    blocks.forEach((block) -> {
+      expectedData.put(block.getBlockId(), block.getData());
+    });
+    ShuffleDataResult sdr = clientReadHandler.readShuffleData();
+    TestUtils.validateResult(expectedData, sdr);
+
+    // send data to shuffle server, and wait until flush to localfile
+    List<ShuffleBlockInfo> blocks2 = createShuffleBlockList(
+        shuffleId, partitionId, 0, 3, 25,
+        expectBlockIds, dataMap, mockSSI);
+
+    rssdr = getRssSendShuffleDataRequest(testAppId, shuffleId, partitionId, 
blocks2);
+    shuffleServerClients.get(1).sendShuffleData(rssdr);
+    RssSendCommitRequest commitRequest = new RssSendCommitRequest(testAppId, 
shuffleId);
+    shuffleServerClients.get(1).sendCommit(commitRequest);
+    waitFlush(testAppId, shuffleId);
+    request = mockCreateShuffleReadHandlerRequest(
+        testAppId, shuffleId, partitionId, shuffleServerInfoList, 
expectBlockIds, StorageType.LOCALFILE);
+    clientReadHandler = 
ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
+    sdr = clientReadHandler.readShuffleData();
+    blocks2.forEach((block) -> {
+      expectedData.put(block.getBlockId(), block.getData());
+    });
+    TestUtils.validateResult(expectedData, sdr);
+
+    // send data to shuffle server, and wait until flush to hdfs
+    List<ShuffleBlockInfo> blocks3 = createShuffleBlockList(
+        shuffleId, partitionId, 0, 3, 150,
+        expectBlockIds, dataMap, mockSSI);
+    expectedData.clear();
+    blocks3.forEach((block) -> {
+      expectedData.put(block.getBlockId(), block.getData());
+    });
+    rssdr = getRssSendShuffleDataRequest(testAppId, shuffleId, partitionId, 
blocks3);
+    shuffleServerClients.get(1).sendShuffleData(rssdr);
+    shuffleServerClients.get(1).sendCommit(commitRequest);
+    waitFlush(testAppId, shuffleId);
+    request = mockCreateShuffleReadHandlerRequest(
+        testAppId, shuffleId, partitionId, shuffleServerInfoList, 
expectBlockIds, StorageType.HDFS);
+    clientReadHandler = 
ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
+    sdr = clientReadHandler.readShuffleData();
+    TestUtils.validateResult(expectedData, sdr);
+  }
+
+  private CreateShuffleReadHandlerRequest mockCreateShuffleReadHandlerRequest(
+      String testAppId, int shuffleId, int partitionId, 
List<ShuffleServerInfo> shuffleServerInfoList,
+      Roaring64NavigableMap expectBlockIds, StorageType storageType) {
+    CreateShuffleReadHandlerRequest request = new 
CreateShuffleReadHandlerRequest();
+    request.setStorageType(storageType.name());
+    request.setAppId(testAppId);
+    request.setShuffleId(shuffleId);
+    request.setPartitionId(partitionId);
+    request.setIndexReadLimit(100);
+    request.setPartitionNumPerRange(1);
+    request.setPartitionNum(1);
+    request.setReadBufferSize(14 * 1024 * 1024);
+    request.setStorageBasePath(remoteStoragePath);
+    request.setShuffleServerInfoList(shuffleServerInfoList);
+    request.setHadoopConf(conf);
+    request.setExpectBlockIds(expectBlockIds);
+    Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+    request.setProcessBlockIds(processBlockIds);
+    request.setDistributionType(ShuffleDataDistributionType.NORMAL);
+    Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
+    request.setExpectTaskIds(taskIdBitmap);
+    return request;
+  }
+
+  private RssSendShuffleDataRequest getRssSendShuffleDataRequest(
+      String appId, int shuffleId, int partitionId, List<ShuffleBlockInfo> 
blocks) {
+    Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
+    partitionToBlocks.put(partitionId, blocks);
+    Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = 
Maps.newHashMap();
+    shuffleToBlocks.put(shuffleId, partitionToBlocks);
+    return new RssSendShuffleDataRequest(
+        appId, 3, 1000, shuffleToBlocks);
+  }
+
+  private void registerShuffle(RssRegisterShuffleRequest rrsr) {
+    shuffleServerClients.forEach((client) -> {
+      client.registerShuffle(rrsr);
+    });
+  }
+
+  public static MockedShuffleServer createServer(int id) throws Exception {
+    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+    shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.LOCALFILE.name());
+    
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 
5000L);
+    
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
 20.0);
+    
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
 40.0);
+    shuffleServerConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 600L);
+    
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 
5000L);
+    shuffleServerConf.set(ShuffleServerConf.DISK_CAPACITY, 1000000L);
+    shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000);
+    File tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
+    File dataDir1 = new File(tmpDir, id + "_1");
+    File dataDir2 = new File(tmpDir, id + "_2");
+    String basePath = dataDir1.getAbsolutePath() + "," + 
dataDir2.getAbsolutePath();
+    shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.MEMORY_LOCALFILE_HDFS.name());
+    
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 
450L);
+    shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 
id);
+    shuffleServerConf.setInteger("rss.jetty.http.port", 19081 + id * 100);
+    shuffleServerConf.setString("rss.storage.basePath", basePath);
+    return new MockedShuffleServer(shuffleServerConf);
+  }
+
+  protected void waitFlush(String appId, int shuffleId) throws 
InterruptedException {
+    int retry = 0;
+    while (true) {
+      if (retry > 5) {
+        fail("Timeout for flush data");
+      }
+      ShuffleBuffer shuffleBuffer = 
shuffleServers.get(1).getShuffleBufferManager()
+          .getShuffleBuffer(appId, shuffleId, 0);
+      if (shuffleBuffer.getBlocks().size() == 0 && 
shuffleBuffer.getInFlushBlockMap().size() == 0) {
+        break;
+      }
+      Thread.sleep(1000);
+      retry++;
+    }
+  }
+
+  public static void cleanCluster() throws Exception {
+    for (CoordinatorServer coordinator : coordinators) {
+      coordinator.stopServer();
+    }
+    for (ShuffleServer shuffleServer : shuffleServers) {
+      shuffleServer.stopServer();
+    }
+    shuffleServers = Lists.newArrayList();
+    coordinators = Lists.newArrayList();
+  }
+}
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHdfsTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHdfsTest.java
index 41a4f7e4..90a4fef6 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHdfsTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHdfsTest.java
@@ -40,6 +40,7 @@ import 
org.apache.uniffle.client.response.CompressedShuffleBlock;
 import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.storage.util.StorageType;
@@ -100,9 +101,10 @@ public class ShuffleServerWithHdfsTest extends 
ShuffleReadWriteBase {
     shuffleServerClient.sendCommit(rscr);
     RssFinishShuffleRequest rfsr = new RssFinishShuffleRequest(appId, 0);
 
+    ShuffleServerInfo ssi = new ShuffleServerInfo(LOCALHOST, 
SHUFFLE_SERVER_PORT);
     ShuffleReadClientImpl readClient = new 
ShuffleReadClientImpl(StorageType.HDFS.name(),
         appId, 0, 0, 100, 2, 10, 1000,
-        dataBasePath, bitmaps[0], Roaring64NavigableMap.bitmapOf(0), 
Lists.newArrayList(),
+        dataBasePath, bitmaps[0], Roaring64NavigableMap.bitmapOf(0), 
Lists.newArrayList(ssi),
         new Configuration(), new DefaultIdHelper());
     assertNull(readClient.readShuffleBlockData());
     shuffleServerClient.finishShuffle(rfsr);
@@ -132,25 +134,25 @@ public class ShuffleServerWithHdfsTest extends 
ShuffleReadWriteBase {
 
     readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(),
         appId, 0, 0, 100, 2, 10, 1000,
-        dataBasePath, bitmaps[0], Roaring64NavigableMap.bitmapOf(0), 
Lists.newArrayList(),
+        dataBasePath, bitmaps[0], Roaring64NavigableMap.bitmapOf(0), 
Lists.newArrayList(ssi),
         new Configuration(), new DefaultIdHelper());
     validateResult(readClient, expectedData, bitmaps[0]);
 
     readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(),
         appId, 0, 1, 100, 2, 10, 1000,
-        dataBasePath, bitmaps[1], Roaring64NavigableMap.bitmapOf(1), 
Lists.newArrayList(),
+        dataBasePath, bitmaps[1], Roaring64NavigableMap.bitmapOf(1), 
Lists.newArrayList(ssi),
         new Configuration(), new DefaultIdHelper());
     validateResult(readClient, expectedData, bitmaps[1]);
 
     readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(),
         appId, 0, 2, 100, 2, 10, 1000,
-        dataBasePath, bitmaps[2], Roaring64NavigableMap.bitmapOf(2), 
Lists.newArrayList(),
+        dataBasePath, bitmaps[2], Roaring64NavigableMap.bitmapOf(2), 
Lists.newArrayList(ssi),
         new Configuration(), new DefaultIdHelper());
     validateResult(readClient, expectedData, bitmaps[2]);
 
     readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(),
         appId, 0, 3, 100, 2, 10, 1000,
-        dataBasePath, bitmaps[3], Roaring64NavigableMap.bitmapOf(3), 
Lists.newArrayList(),
+        dataBasePath, bitmaps[3], Roaring64NavigableMap.bitmapOf(3), 
Lists.newArrayList(ssi),
         new Configuration(), new DefaultIdHelper());
     validateResult(readClient, expectedData, bitmaps[3]);
   }
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java
index ed2bec66..4bce65c3 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java
@@ -47,6 +47,8 @@ import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.coordinator.CoordinatorServer;
 import org.apache.uniffle.server.ShuffleServer;
@@ -60,9 +62,19 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ShuffleServerWithKerberizedHdfsTest extends KerberizedHdfsBase {
 
+  protected static final String LOCALHOST;
+
+  static {
+    try {
+      LOCALHOST = RssUtils.getHostIp();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   private static final int COORDINATOR_RPC_PROT = 19999;
   private static final int SHUFFLE_SERVER_PORT = 29999;
-  private static final String COORDINATOR_QUORUM = "localhost:" + 
COORDINATOR_RPC_PROT;
+  private static final String COORDINATOR_QUORUM = LOCALHOST + ":" + 
COORDINATOR_RPC_PROT;
 
   private ShuffleServerGrpcClient shuffleServerClient;
   private static CoordinatorServer coordinatorServer;
@@ -122,7 +134,7 @@ public class ShuffleServerWithKerberizedHdfsTest extends 
KerberizedHdfsBase {
   @BeforeEach
   public void beforeEach() throws Exception {
     initHadoopSecurityContext();
-    shuffleServerClient = new ShuffleServerGrpcClient("localhost", 
SHUFFLE_SERVER_PORT);
+    shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST, 
SHUFFLE_SERVER_PORT);
   }
 
   @AfterEach
@@ -212,6 +224,7 @@ public class ShuffleServerWithKerberizedHdfsTest extends 
KerberizedHdfsBase {
     shuffleServerClient.sendCommit(rscr);
     RssFinishShuffleRequest rfsr = new RssFinishShuffleRequest(appId, 0);
 
+    ShuffleServerInfo ssi = new ShuffleServerInfo(LOCALHOST, 
SHUFFLE_SERVER_PORT);
     ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(
         StorageType.HDFS.name(),
         appId,
@@ -224,7 +237,7 @@ public class ShuffleServerWithKerberizedHdfsTest extends 
KerberizedHdfsBase {
         dataBasePath,
         bitmaps[0],
         Roaring64NavigableMap.bitmapOf(0),
-        Lists.newArrayList(),
+        Lists.newArrayList(ssi),
         new Configuration(),
         new DefaultIdHelper()
     );
@@ -265,7 +278,7 @@ public class ShuffleServerWithKerberizedHdfsTest extends 
KerberizedHdfsBase {
         1000,
         dataBasePath, bitmaps[0],
         Roaring64NavigableMap.bitmapOf(0),
-        Lists.newArrayList(),
+        Lists.newArrayList(ssi),
         new Configuration(),
         new DefaultIdHelper()
     );
@@ -283,7 +296,7 @@ public class ShuffleServerWithKerberizedHdfsTest extends 
KerberizedHdfsBase {
         dataBasePath,
         bitmaps[1],
         Roaring64NavigableMap.bitmapOf(1),
-        Lists.newArrayList(),
+        Lists.newArrayList(ssi),
         new Configuration(),
         new DefaultIdHelper()
     );
@@ -301,7 +314,7 @@ public class ShuffleServerWithKerberizedHdfsTest extends 
KerberizedHdfsBase {
         dataBasePath,
         bitmaps[2],
         Roaring64NavigableMap.bitmapOf(2),
-        Lists.newArrayList(),
+        Lists.newArrayList(ssi),
         new Configuration(),
         new DefaultIdHelper()
     );
@@ -319,7 +332,7 @@ public class ShuffleServerWithKerberizedHdfsTest extends 
KerberizedHdfsBase {
         dataBasePath,
         bitmaps[3],
         Roaring64NavigableMap.bitmapOf(3),
-        Lists.newArrayList(),
+        Lists.newArrayList(ssi),
         new Configuration(),
         new DefaultIdHelper()
     );
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
index 03420fba..fb316a08 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
@@ -19,7 +19,6 @@ package org.apache.uniffle.test;
 
 import java.io.File;
 
-import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
@@ -27,13 +26,10 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
-import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.server.ShuffleServerConf;
-import org.apache.uniffle.storage.handler.api.ClientReadHandler;
-import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler;
-import org.apache.uniffle.storage.handler.impl.MemoryQuorumClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.MemoryClientReadHandler;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -78,17 +74,14 @@ public class ShuffleServerWithLocalOfExceptionTest extends 
ShuffleReadWriteBase
     int shuffleId = 0;
     int partitionId = 0;
 
-    MemoryQuorumClientReadHandler memoryQuorumClientReadHandler = new 
MemoryQuorumClientReadHandler(
-        testAppId, shuffleId, partitionId, 150, 
Lists.newArrayList(shuffleServerClient));
-    ClientReadHandler[] handlers = new ClientReadHandler[1];
-    handlers[0] = memoryQuorumClientReadHandler;
-    ComposedClientReadHandler composedClientReadHandler = new 
ComposedClientReadHandler(handlers);
+    MemoryClientReadHandler memoryClientReadHandler = new 
MemoryClientReadHandler(
+        testAppId, shuffleId, partitionId, 150, shuffleServerClient);
     shuffleServers.get(0).stopServer();
     try {
-      ShuffleDataResult sdr  = composedClientReadHandler.readShuffleData();
+      memoryClientReadHandler.readShuffleData();
       fail("Should throw connection exception directly.");
     } catch (RssException rssException) {
-      assertTrue(rssException.getMessage().contains("Failed to read shuffle 
data from HOT handler"));
+      assertTrue(rssException.getMessage().contains("Failed to read in memory 
shuffle data with"));
     }
   }
 }
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
index 592c980e..de74c1b5 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
@@ -44,8 +44,8 @@ import org.apache.uniffle.server.buffer.ShuffleBuffer;
 import org.apache.uniffle.storage.handler.api.ClientReadHandler;
 import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler;
 import org.apache.uniffle.storage.handler.impl.HdfsClientReadHandler;
-import 
org.apache.uniffle.storage.handler.impl.LocalFileQuorumClientReadHandler;
-import org.apache.uniffle.storage.handler.impl.MemoryQuorumClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.MemoryClientReadHandler;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -114,17 +114,17 @@ public class ShuffleServerWithMemLocalHdfsTest extends 
ShuffleReadWriteBase {
     shuffleServerClient.sendShuffleData(rssdr);
 
     // read the 1-th segment from memory
-    MemoryQuorumClientReadHandler memoryQuorumClientReadHandler = new 
MemoryQuorumClientReadHandler(
-        testAppId, shuffleId, partitionId, 150, 
Lists.newArrayList(shuffleServerClient));
+    MemoryClientReadHandler memoryClientReadHandler = new 
MemoryClientReadHandler(
+        testAppId, shuffleId, partitionId, 150, shuffleServerClient);
     Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
-    LocalFileQuorumClientReadHandler localFileQuorumClientReadHandler = new 
LocalFileQuorumClientReadHandler(
+    LocalFileClientReadHandler localFileClientReadHandler = new 
LocalFileClientReadHandler(
         testAppId, shuffleId, partitionId, 0, 1, 3,
-        75, expectBlockIds, processBlockIds, 
Lists.newArrayList(shuffleServerClient));
+        75, expectBlockIds, processBlockIds, shuffleServerClient);
     HdfsClientReadHandler hdfsClientReadHandler = new 
HdfsClientReadHandler(testAppId, shuffleId, partitionId, 0, 1, 3,
         500, expectBlockIds, processBlockIds, REMOTE_STORAGE, conf);
     ClientReadHandler[] handlers = new ClientReadHandler[3];
-    handlers[0] = memoryQuorumClientReadHandler;
-    handlers[1] = localFileQuorumClientReadHandler;
+    handlers[0] = memoryClientReadHandler;
+    handlers[1] = localFileClientReadHandler;
     handlers[2] = hdfsClientReadHandler;
     ComposedClientReadHandler composedClientReadHandler = new 
ComposedClientReadHandler(handlers);
     Map<Long, byte[]> expectedData = Maps.newHashMap();
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
index 7e0123dc..03ed356a 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
@@ -43,8 +43,8 @@ import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.buffer.ShuffleBuffer;
 import org.apache.uniffle.storage.handler.api.ClientReadHandler;
 import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler;
-import 
org.apache.uniffle.storage.handler.impl.LocalFileQuorumClientReadHandler;
-import org.apache.uniffle.storage.handler.impl.MemoryQuorumClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.MemoryClientReadHandler;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -113,37 +113,37 @@ public class ShuffleServerWithMemoryTest extends 
ShuffleReadWriteBase {
     assertEquals(3, shuffleServers.get(0).getShuffleBufferManager()
         .getShuffleBuffer(testAppId, shuffleId, 0).getBlocks().size());
     // create memory handler to read data,
-    MemoryQuorumClientReadHandler memoryQuorumClientReadHandler = new 
MemoryQuorumClientReadHandler(
-        testAppId, shuffleId, partitionId, 20, 
Lists.newArrayList(shuffleServerClient));
+    MemoryClientReadHandler memoryClientReadHandler = new 
MemoryClientReadHandler(
+        testAppId, shuffleId, partitionId, 20, shuffleServerClient);
     // start to read data, one block data for every call
-    ShuffleDataResult sdr  = memoryQuorumClientReadHandler.readShuffleData();
+    ShuffleDataResult sdr  = memoryClientReadHandler.readShuffleData();
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     expectedData.put(blocks.get(0).getBlockId(), blocks.get(0).getData());
     validateResult(expectedData, sdr);
 
-    sdr = memoryQuorumClientReadHandler.readShuffleData();
+    sdr = memoryClientReadHandler.readShuffleData();
     expectedData.clear();
     expectedData.put(blocks.get(1).getBlockId(), blocks.get(1).getData());
     validateResult(expectedData, sdr);
 
-    sdr = memoryQuorumClientReadHandler.readShuffleData();
+    sdr = memoryClientReadHandler.readShuffleData();
     expectedData.clear();
     expectedData.put(blocks.get(2).getBlockId(), blocks.get(2).getData());
     validateResult(expectedData, sdr);
 
     // no data in cache, empty return
-    sdr = memoryQuorumClientReadHandler.readShuffleData();
+    sdr = memoryClientReadHandler.readShuffleData();
     assertEquals(0, sdr.getBufferSegments().size());
 
     // case: read with ComposedClientReadHandler
     Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
-    memoryQuorumClientReadHandler = new MemoryQuorumClientReadHandler(
-        testAppId, shuffleId, partitionId, 50, 
Lists.newArrayList(shuffleServerClient));
-    LocalFileQuorumClientReadHandler localFileQuorumClientReadHandler = new 
LocalFileQuorumClientReadHandler(
+    memoryClientReadHandler = new MemoryClientReadHandler(
+        testAppId, shuffleId, partitionId, 50, shuffleServerClient);
+    LocalFileClientReadHandler localFileQuorumClientReadHandler = new 
LocalFileClientReadHandler(
         testAppId, shuffleId, partitionId, 0, 1, 3,
-        50, expectBlockIds, processBlockIds, 
Lists.newArrayList(shuffleServerClient));
+        50, expectBlockIds, processBlockIds, shuffleServerClient);
     ClientReadHandler[] handlers = new ClientReadHandler[2];
-    handlers[0] = memoryQuorumClientReadHandler;
+    handlers[0] = memoryClientReadHandler;
     handlers[1] = localFileQuorumClientReadHandler;
     ComposedClientReadHandler composedClientReadHandler = new 
ComposedClientReadHandler(handlers);
     // read from memory with ComposedClientReadHandler
@@ -235,14 +235,14 @@ public class ShuffleServerWithMemoryTest extends 
ShuffleReadWriteBase {
 
     // read the 1-th segment from memory
     Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
-    MemoryQuorumClientReadHandler memoryQuorumClientReadHandler = new 
MemoryQuorumClientReadHandler(
-        testAppId, shuffleId, partitionId, 150, 
Lists.newArrayList(shuffleServerClient));
-    LocalFileQuorumClientReadHandler localFileQuorumClientReadHandler = new 
LocalFileQuorumClientReadHandler(
+    MemoryClientReadHandler memoryClientReadHandler = new 
MemoryClientReadHandler(
+        testAppId, shuffleId, partitionId, 150, shuffleServerClient);
+    LocalFileClientReadHandler localFileClientReadHandler = new 
LocalFileClientReadHandler(
         testAppId, shuffleId, partitionId, 0, 1, 3,
-        75, expectBlockIds, processBlockIds, 
Lists.newArrayList(shuffleServerClient));
+        75, expectBlockIds, processBlockIds, shuffleServerClient);
     ClientReadHandler[] handlers = new ClientReadHandler[2];
-    handlers[0] = memoryQuorumClientReadHandler;
-    handlers[1] = localFileQuorumClientReadHandler;
+    handlers[0] = memoryClientReadHandler;
+    handlers[1] = localFileClientReadHandler;
     ComposedClientReadHandler composedClientReadHandler = new 
ComposedClientReadHandler(handlers);
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     expectedData.clear();
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
index f09be54a..34e14e1c 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
@@ -59,7 +59,7 @@ public class SparkClientWithLocalTest extends 
ShuffleReadWriteBase {
   private static File DATA_DIR2;
   private ShuffleServerGrpcClient shuffleServerClient;
   private List<ShuffleServerInfo> shuffleServerInfo =
-      Lists.newArrayList(new ShuffleServerInfo("127.0.0.1-20001", LOCALHOST, 
SHUFFLE_SERVER_PORT));
+      Lists.newArrayList(new ShuffleServerInfo(LOCALHOST, 
SHUFFLE_SERVER_PORT));
 
   @BeforeAll
   public static void setupServers() throws Exception {
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
 
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
index ef0fbe47..3f4dd135 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
@@ -19,8 +19,9 @@ package org.apache.uniffle.storage.factory;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.stream.Collectors;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
@@ -32,9 +33,10 @@ import 
org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
 import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler;
 import org.apache.uniffle.storage.handler.impl.HdfsClientReadHandler;
 import org.apache.uniffle.storage.handler.impl.HdfsShuffleDeleteHandler;
+import org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler;
 import org.apache.uniffle.storage.handler.impl.LocalFileDeleteHandler;
-import 
org.apache.uniffle.storage.handler.impl.LocalFileQuorumClientReadHandler;
-import org.apache.uniffle.storage.handler.impl.MemoryQuorumClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.MemoryClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.MultiReplicaClientReadHandler;
 import org.apache.uniffle.storage.request.CreateShuffleDeleteHandlerRequest;
 import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
 import org.apache.uniffle.storage.util.StorageType;
@@ -53,80 +55,89 @@ public class ShuffleHandlerFactory {
     return INSTANCE;
   }
 
+
   public ClientReadHandler 
createShuffleReadHandler(CreateShuffleReadHandlerRequest request) {
+    if (CollectionUtils.isEmpty(request.getShuffleServerInfoList())) {
+      throw new RuntimeException("Shuffle servers should not be empty!");
+    }
+    if (request.getShuffleServerInfoList().size() > 1) {
+      List<ClientReadHandler> handlers = Lists.newArrayList();
+      request.getShuffleServerInfoList().forEach((ssi) -> {
+        
handlers.add(ShuffleHandlerFactory.getInstance().createSingleReplicaClientReadHandler(request,
 ssi));
+      });
+      return new MultiReplicaClientReadHandler(handlers, 
request.getShuffleServerInfoList(),
+          request.getExpectBlockIds(), request.getProcessBlockIds());
+    } else {
+      ShuffleServerInfo serverInfo = request.getShuffleServerInfoList().get(0);
+      return createSingleReplicaClientReadHandler(request, serverInfo);
+    }
+  }
+
+  public ClientReadHandler 
createSingleReplicaClientReadHandler(CreateShuffleReadHandlerRequest request,
+                                                                
ShuffleServerInfo serverInfo) {
     String storageType = request.getStorageType();
     StorageType type = StorageType.valueOf(storageType);
 
     if (StorageType.MEMORY == type) {
       throw new UnsupportedOperationException(
-          "Doesn't support storage type for client read handler:" + 
storageType);
+          "Doesn't support storage type for client read  :" + storageType);
     }
 
     if (StorageType.HDFS == type) {
-      return getHdfsClientReadHandler(request);
+      return getHdfsClientReadHandler(request, serverInfo);
     }
     if (StorageType.LOCALFILE == type) {
-      return getLocalfileClientReaderHandler(request);
+      return getLocalfileClientReaderHandler(request, serverInfo);
     }
 
     List<ClientReadHandler> handlers = new ArrayList<>();
     if (StorageType.withMemory(type)) {
       handlers.add(
-          getMemoryClientReadHandler(request)
+          getMemoryClientReadHandler(request, serverInfo)
       );
     }
     if (StorageType.withLocalfile(type)) {
       handlers.add(
-          getLocalfileClientReaderHandler(request)
+          getLocalfileClientReaderHandler(request, serverInfo)
       );
     }
     if (StorageType.withHDFS(type)) {
       handlers.add(
-          getHdfsClientReadHandler(request)
+          getHdfsClientReadHandler(request, serverInfo)
       );
     }
     if (handlers.isEmpty()) {
       throw new RssException("This should not happen due to the unknown 
storage type: " + storageType);
     }
 
-    Callable<ClientReadHandler>[] callables =
-        handlers
-            .stream()
-            .map(x -> (Callable<ClientReadHandler>) () -> x)
-            .collect(Collectors.toList())
-            .toArray(new Callable[handlers.size()]);
-    return new ComposedClientReadHandler(callables);
+    return new ComposedClientReadHandler(handlers);
   }
 
-  private ClientReadHandler 
getMemoryClientReadHandler(CreateShuffleReadHandlerRequest request) {
-    List<ShuffleServerInfo> shuffleServerInfoList = 
request.getShuffleServerInfoList();
-    List<ShuffleServerClient> shuffleServerClients = 
shuffleServerInfoList.stream().map(
-        ssi -> ShuffleServerClientFactory.getInstance().getShuffleServerClient(
-            ClientType.GRPC.name(), ssi)).collect(
-        Collectors.toList());
-    ClientReadHandler memoryClientReadHandler = new 
MemoryQuorumClientReadHandler(
+  private ClientReadHandler 
getMemoryClientReadHandler(CreateShuffleReadHandlerRequest request, 
ShuffleServerInfo ssi) {
+    ShuffleServerClient shuffleServerClient = 
ShuffleServerClientFactory.getInstance().getShuffleServerClient(
+        ClientType.GRPC.name(), ssi);
+    ClientReadHandler memoryClientReadHandler = new MemoryClientReadHandler(
         request.getAppId(),
         request.getShuffleId(),
         request.getPartitionId(),
         request.getReadBufferSize(),
-        shuffleServerClients);
+        shuffleServerClient);
     return memoryClientReadHandler;
   }
 
-  private ClientReadHandler 
getLocalfileClientReaderHandler(CreateShuffleReadHandlerRequest request) {
-    List<ShuffleServerInfo> shuffleServerInfoList = 
request.getShuffleServerInfoList();
-    List<ShuffleServerClient> shuffleServerClients = 
shuffleServerInfoList.stream().map(
-        ssi -> 
ShuffleServerClientFactory.getInstance().getShuffleServerClient(ClientType.GRPC.name(),
 ssi)).collect(
-        Collectors.toList());
-    return new LocalFileQuorumClientReadHandler(
+  private ClientReadHandler 
getLocalfileClientReaderHandler(CreateShuffleReadHandlerRequest request,
+                                                            ShuffleServerInfo 
ssi) {
+    ShuffleServerClient shuffleServerClient = 
ShuffleServerClientFactory.getInstance().getShuffleServerClient(
+        ClientType.GRPC.name(), ssi);
+    return new LocalFileClientReadHandler(
         request.getAppId(), request.getShuffleId(), request.getPartitionId(),
         request.getIndexReadLimit(), request.getPartitionNumPerRange(), 
request.getPartitionNum(),
         request.getReadBufferSize(), request.getExpectBlockIds(), 
request.getProcessBlockIds(),
-        shuffleServerClients, request.getDistributionType(), 
request.getExpectTaskIds()
+        shuffleServerClient, request.getDistributionType(), 
request.getExpectTaskIds()
     );
   }
 
-  private ClientReadHandler 
getHdfsClientReadHandler(CreateShuffleReadHandlerRequest request) {
+  private ClientReadHandler 
getHdfsClientReadHandler(CreateShuffleReadHandlerRequest request, 
ShuffleServerInfo ssi) {
     return new HdfsClientReadHandler(
         request.getAppId(),
         request.getShuffleId(),
@@ -140,7 +151,8 @@ public class ShuffleHandlerFactory {
         request.getStorageBasePath(),
         request.getHadoopConf(),
         request.getDistributionType(),
-        request.getExpectTaskIds()
+        request.getExpectTaskIds(),
+        ssi.getId()
     );
   }
 
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ClientReadHandler.java
index c619da2c..ba018d3a 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ClientReadHandler.java
@@ -34,5 +34,4 @@ public interface ClientReadHandler {
 
   // Display the statistics of consumed blocks
   void logConsumedBlockInfo();
-
 }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
index aab15b5d..da4e2b2f 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
@@ -17,9 +17,10 @@
 
 package org.apache.uniffle.storage.handler.impl;
 
-import java.util.concurrent.Callable;
+import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,6 +29,11 @@ import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.storage.handler.api.ClientReadHandler;
 
+/**
+ * Composed read handler for all storage types and one replicas.
+ * The storage types reading order is as follows: HOT -> WARM -> COLD -> FROZEN
+ * @see <a 
href="https://github.com/apache/incubator-uniffle/pull/276";>PR-276</a>
+ */
 public class ComposedClientReadHandler implements ClientReadHandler {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ComposedClientReadHandler.class);
@@ -36,10 +42,6 @@ public class ComposedClientReadHandler implements 
ClientReadHandler {
   private ClientReadHandler warmDataReadHandler;
   private ClientReadHandler coldDataReadHandler;
   private ClientReadHandler frozenDataReadHandler;
-  private Callable<ClientReadHandler> hotHandlerCreator;
-  private Callable<ClientReadHandler> warmHandlerCreator;
-  private Callable<ClientReadHandler> coldHandlerCreator;
-  private Callable<ClientReadHandler> frozenHandlerCreator;
   private static final int HOT = 1;
   private static final int WARM = 2;
   private static final int COLD = 3;
@@ -63,34 +65,22 @@ public class ComposedClientReadHandler implements 
ClientReadHandler {
   private long frozenReadUncompressLength = 0L;
 
   public ComposedClientReadHandler(ClientReadHandler... handlers) {
-    topLevelOfHandler = handlers.length;
-    if (topLevelOfHandler > 0) {
-      this.hotDataReadHandler = handlers[0];
-    }
-    if (topLevelOfHandler > 1) {
-      this.warmDataReadHandler = handlers[1];
-    }
-    if (topLevelOfHandler > 2) {
-      this.coldDataReadHandler = handlers[2];
-    }
-    if (topLevelOfHandler > 3) {
-      this.frozenDataReadHandler = handlers[3];
-    }
+    this(Lists.newArrayList(handlers));
   }
 
-  public ComposedClientReadHandler(Callable<ClientReadHandler>... creators) {
-    topLevelOfHandler = creators.length;
+  public ComposedClientReadHandler(List<ClientReadHandler> handlers) {
+    topLevelOfHandler = handlers.size();
     if (topLevelOfHandler > 0) {
-      this.hotHandlerCreator = creators[0];
+      this.hotDataReadHandler = handlers.get(0);
     }
     if (topLevelOfHandler > 1) {
-      this.warmHandlerCreator = creators[1];
+      this.warmDataReadHandler = handlers.get(1);
     }
     if (topLevelOfHandler > 2) {
-      this.coldHandlerCreator = creators[2];
+      this.coldDataReadHandler = handlers.get(2);
     }
     if (topLevelOfHandler > 3) {
-      this.frozenHandlerCreator = creators[3];
+      this.frozenDataReadHandler = handlers.get(3);
     }
   }
 
@@ -100,27 +90,15 @@ public class ComposedClientReadHandler implements 
ClientReadHandler {
     try {
       switch (currentHandler) {
         case HOT:
-          if (hotDataReadHandler == null) {
-            hotDataReadHandler = 
createReadHandlerIfNotExist(hotHandlerCreator);
-          }
           shuffleDataResult = hotDataReadHandler.readShuffleData();
           break;
         case WARM:
-          if (warmDataReadHandler == null) {
-            warmDataReadHandler = 
createReadHandlerIfNotExist(warmHandlerCreator);
-          }
           shuffleDataResult = warmDataReadHandler.readShuffleData();
           break;
         case COLD:
-          if (coldDataReadHandler == null) {
-            coldDataReadHandler = 
createReadHandlerIfNotExist(coldHandlerCreator);
-          }
           shuffleDataResult = coldDataReadHandler.readShuffleData();
           break;
         case FROZEN:
-          if (frozenDataReadHandler == null) {
-            frozenDataReadHandler = 
createReadHandlerIfNotExist(frozenHandlerCreator);
-          }
           shuffleDataResult = frozenDataReadHandler.readShuffleData();
           break;
         default:
@@ -143,13 +121,6 @@ public class ComposedClientReadHandler implements 
ClientReadHandler {
     return shuffleDataResult;
   }
 
-  private ClientReadHandler 
createReadHandlerIfNotExist(Callable<ClientReadHandler> creator) throws 
Exception {
-    if (creator == null) {
-      throw new IllegalStateException("creator " + getCurrentHandlerName() + " 
handler doesn't exist");
-    }
-    return creator.call();
-  }
-
   private String getCurrentHandlerName() {
     String name = "UNKNOWN";
     switch (currentHandler) {
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
index a9208960..1001c12c 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
@@ -43,6 +43,7 @@ public class HdfsClientReadHandler extends 
AbstractClientReadHandler {
   protected final int partitionNumPerRange;
   protected final int partitionNum;
   protected final int readBufferSize;
+  private final String shuffleServerId;
   protected Roaring64NavigableMap expectBlockIds;
   protected Roaring64NavigableMap processBlockIds;
   protected final String storageBasePath;
@@ -70,7 +71,8 @@ public class HdfsClientReadHandler extends 
AbstractClientReadHandler {
       String storageBasePath,
       Configuration hadoopConf,
       ShuffleDataDistributionType distributionType,
-      Roaring64NavigableMap expectTaskIds) {
+      Roaring64NavigableMap expectTaskIds,
+      String shuffleServerId) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
@@ -84,6 +86,7 @@ public class HdfsClientReadHandler extends 
AbstractClientReadHandler {
     this.readHandlerIndex = 0;
     this.distributionType = distributionType;
     this.expectTaskIds = expectTaskIds;
+    this.shuffleServerId = shuffleServerId;
   }
 
   // Only for test
@@ -101,7 +104,7 @@ public class HdfsClientReadHandler extends 
AbstractClientReadHandler {
       Configuration hadoopConf) {
     this(appId, shuffleId, partitionId, indexReadLimit, partitionNumPerRange, 
partitionNum, readBufferSize,
         expectBlockIds, processBlockIds, storageBasePath, hadoopConf, 
ShuffleDataDistributionType.NORMAL,
-        Roaring64NavigableMap.bitmapOf());
+        Roaring64NavigableMap.bitmapOf(), null);
   }
 
   protected void init(String fullShufflePath) {
@@ -119,7 +122,8 @@ public class HdfsClientReadHandler extends 
AbstractClientReadHandler {
     try {
       // get all index files
       indexFiles = fs.listStatus(baseFolder,
-          file -> 
file.getName().endsWith(Constants.SHUFFLE_INDEX_FILE_SUFFIX));
+          file -> file.getName().endsWith(Constants.SHUFFLE_INDEX_FILE_SUFFIX)
+              && (shuffleServerId == null || 
file.getName().startsWith(shuffleServerId)));
     } catch (Exception e) {
       LOG.error(failedGetIndexFileMsg, e);
       return;
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
index a630cf1b..7aac7d0f 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
@@ -38,7 +38,7 @@ public class LocalFileClientReadHandler extends 
DataSkippableReadHandler {
   private final int partitionNum;
   private ShuffleServerClient shuffleServerClient;
 
-  LocalFileClientReadHandler(
+  public LocalFileClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
@@ -60,6 +60,27 @@ public class LocalFileClientReadHandler extends 
DataSkippableReadHandler {
     this.partitionNum = partitionNum;
   }
 
+  /**
+   * Only for test
+   */
+  public LocalFileClientReadHandler(
+      String appId,
+      int shuffleId,
+      int partitionId,
+      int indexReadLimit,
+      int partitionNumPerRange,
+      int partitionNum,
+      int readBufferSize,
+      Roaring64NavigableMap expectBlockIds,
+      Roaring64NavigableMap processBlockIds,
+      ShuffleServerClient shuffleServerClient) {
+    this(
+        appId, shuffleId, partitionId, indexReadLimit, partitionNumPerRange,
+        partitionNum, readBufferSize, expectBlockIds, processBlockIds,
+        shuffleServerClient, ShuffleDataDistributionType.NORMAL, 
Roaring64NavigableMap.bitmapOf()
+    );
+  }
+
   @Override
   public ShuffleIndexResult readShuffleIndex() {
     ShuffleIndexResult shuffleIndexResult = null;
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
deleted file mode 100644
index 2a6afdbe..00000000
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.storage.handler.impl;
-
-import java.util.List;
-
-import com.google.common.collect.Lists;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.uniffle.client.api.ShuffleServerClient;
-import org.apache.uniffle.common.BufferSegment;
-import org.apache.uniffle.common.ShuffleDataDistributionType;
-import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
-
-public class LocalFileQuorumClientReadHandler extends 
AbstractClientReadHandler {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(LocalFileQuorumClientReadHandler.class);
-
-  private List<LocalFileClientReadHandler> handlers = Lists.newLinkedList();
-
-  private long readBlockNum = 0L;
-  private long readLength = 0L;
-  private long readUncompressLength = 0L;
-
-  public LocalFileQuorumClientReadHandler(
-      String appId,
-      int shuffleId,
-      int partitionId,
-      int indexReadLimit,
-      int partitionNumPerRange,
-      int partitionNum,
-      int readBufferSize,
-      Roaring64NavigableMap expectBlockIds,
-      Roaring64NavigableMap processBlockIds,
-      List<ShuffleServerClient> shuffleServerClients,
-      ShuffleDataDistributionType distributionType,
-      Roaring64NavigableMap expectTaskIds) {
-    this.appId = appId;
-    this.shuffleId = shuffleId;
-    this.partitionId = partitionId;
-    this.readBufferSize = readBufferSize;
-    for (ShuffleServerClient client: shuffleServerClients) {
-      handlers.add(new LocalFileClientReadHandler(
-          appId,
-          shuffleId,
-          partitionId,
-          indexReadLimit,
-          partitionNumPerRange,
-          partitionNum,
-          readBufferSize,
-          expectBlockIds,
-          processBlockIds,
-          client,
-          distributionType,
-          expectTaskIds
-      ));
-    }
-  }
-
-  /**
-   * Only for test
-   */
-  public LocalFileQuorumClientReadHandler(
-      String appId,
-      int shuffleId,
-      int partitionId,
-      int indexReadLimit,
-      int partitionNumPerRange,
-      int partitionNum,
-      int readBufferSize,
-      Roaring64NavigableMap expectBlockIds,
-      Roaring64NavigableMap processBlockIds,
-      List<ShuffleServerClient> shuffleServerClients) {
-    this(
-        appId, shuffleId, partitionId, indexReadLimit, partitionNumPerRange,
-        partitionNum, readBufferSize, expectBlockIds, processBlockIds,
-        shuffleServerClients, ShuffleDataDistributionType.NORMAL, 
Roaring64NavigableMap.bitmapOf()
-    );
-  }
-
-  @Override
-  public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
-    ShuffleDataResult result = null;
-    for (LocalFileClientReadHandler handler : handlers) {
-      try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
-      }
-    }
-    if (!readSuccessful) {
-      throw new RssException("Failed to read all replicas for appId[" + appId 
+ "], shuffleId["
-          + shuffleId + "], partitionId[" + partitionId + "]");
-    }
-    return result;
-  }
-
-  @Override
-  public void updateConsumedBlockInfo(BufferSegment bs) {
-    if (bs == null) {
-      return;
-    }
-    readBlockNum++;
-    readLength += bs.getLength();
-    readUncompressLength += bs.getUncompressLength();
-  }
-
-  @Override
-  public void logConsumedBlockInfo() {
-    LOG.info("Client read " + readBlockNum + " blocks,"
-        + " bytes:" +  readLength + " uncompressed bytes:" + 
readUncompressLength);
-  }
-}
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java
deleted file mode 100644
index 5fbe8ec6..00000000
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.storage.handler.impl;
-
-import java.util.List;
-
-import com.google.common.collect.Lists;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.uniffle.client.api.ShuffleServerClient;
-import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.common.util.Constants;
-
-public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
-  private long lastBlockId = Constants.INVALID_BLOCK_ID;
-  private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
-
-  public MemoryQuorumClientReadHandler(
-      String appId,
-      int shuffleId,
-      int partitionId,
-      int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
-    this.appId = appId;
-    this.shuffleId = shuffleId;
-    this.partitionId = partitionId;
-    this.readBufferSize = readBufferSize;
-    shuffleServerClients.forEach(client ->
-        handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
-    );
-  }
-
-  @Override
-  public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
-    ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
-      try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
-      }
-    }
-
-    if (!readSuccessful) {
-      throw new RssException("Failed to read in memory shuffle data for 
appId[" + appId
-          + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId + 
"]");
-    }
-
-    return result;
-  }
-}
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
new file mode 100644
index 00000000..83b9fb7f
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.impl;
+
+import java.util.List;
+
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.storage.handler.api.ClientReadHandler;
+
+public class MultiReplicaClientReadHandler extends AbstractClientReadHandler {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MultiReplicaClientReadHandler.class);
+
+  private final List<ClientReadHandler> handlers;
+  private final List<ShuffleServerInfo> shuffleServerInfos;
+
+  private long readBlockNum = 0L;
+  private long readLength = 0L;
+  private long readUncompressLength = 0L;
+  private final Roaring64NavigableMap blockIdBitmap;
+  private final Roaring64NavigableMap processedBlockIds;
+
+  private int readHandlerIndex;
+
+  public MultiReplicaClientReadHandler(
+      List<ClientReadHandler> handlers,
+      List<ShuffleServerInfo> shuffleServerInfos,
+      Roaring64NavigableMap blockIdBitmap,
+      Roaring64NavigableMap processedBlockIds) {
+    this.handlers = handlers;
+    this.blockIdBitmap = blockIdBitmap;
+    this.processedBlockIds = processedBlockIds;
+    this.shuffleServerInfos = shuffleServerInfos;
+  }
+
+  @Override
+  public ShuffleDataResult readShuffleData() {
+    ClientReadHandler handler;
+    ShuffleDataResult result = null;
+    do {
+      if (readHandlerIndex >= handlers.size()) {
+        return result;
+      }
+      handler = handlers.get(readHandlerIndex);
+      try {
+        result = handler.readShuffleData();
+      } catch (Exception e) {
+        LOG.warn("Failed to read a replica from [{}] due to ",
+            shuffleServerInfos.get(readHandlerIndex).getId(), e);
+      }
+      if (result != null && !result.isEmpty()) {
+        return result;
+      } else {
+        try {
+          RssUtils.checkProcessedBlockIds(blockIdBitmap, processedBlockIds);
+          return result;
+        } catch (RssException e) {
+          LOG.warn("Finished read from [{}], but haven't finished read all the 
blocks.",
+              shuffleServerInfos.get(readHandlerIndex).getId(), e);
+        }
+        readHandlerIndex++;
+      }
+    } while (true);
+  }
+
+  @Override
+  public void updateConsumedBlockInfo(BufferSegment bs) {
+    if (bs == null) {
+      return;
+    }
+    readBlockNum++;
+    readLength += bs.getLength();
+    readUncompressLength += bs.getUncompressLength();
+  }
+
+  @Override
+  public void logConsumedBlockInfo() {
+    LOG.info("Client read " + readBlockNum + " blocks,"
+        + " bytes:" +  readLength + " uncompressed bytes:" + 
readUncompressLength);
+  }
+}

Reply via email to