This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new ad513416 [ISSUE-124] Add fallback mechanism for blocks read
inconsistent (#276)
ad513416 is described below
commit ad513416c11d81e086436985b5b17f9448b5fa3c
Author: xianjingfeng <[email protected]>
AuthorDate: Tue Nov 29 00:03:46 2022 +0800
[ISSUE-124] Add fallback mechanism for blocks read inconsistent (#276)
### What changes were proposed in this pull request?
Add fallback mechanism for blocks read inconsistent
### Why are the changes needed?
When the data in this first server is damaged, application will fail. #124
#129
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Already added
---
.../shuffle/reader/RssShuffleDataIteratorTest.java | 73 +++---
.../spark/shuffle/reader/RssShuffleReaderTest.java | 14 +-
.../spark/shuffle/reader/RssShuffleReaderTest.java | 17 +-
.../uniffle/client/impl/ShuffleReadClientImpl.java | 9 +-
.../java/org/apache/uniffle/client/TestUtils.java | 27 +++
.../client/impl/ShuffleReadClientImplTest.java | 101 ++++----
.../apache/uniffle/common/ShuffleServerInfo.java | 7 +
.../org/apache/uniffle/common/util/RssUtils.java | 12 +
.../apache/uniffle/test/IntegrationTestBase.java | 12 +-
.../test/MultiStorageFaultToleranceTest.java | 2 +-
.../test/ShuffleServerFaultToleranceTest.java | 254 +++++++++++++++++++++
.../uniffle/test/ShuffleServerWithHdfsTest.java | 12 +-
.../test/ShuffleServerWithKerberizedHdfsTest.java | 27 ++-
.../ShuffleServerWithLocalOfExceptionTest.java | 17 +-
.../test/ShuffleServerWithMemLocalHdfsTest.java | 16 +-
.../uniffle/test/ShuffleServerWithMemoryTest.java | 38 +--
.../uniffle/test/SparkClientWithLocalTest.java | 2 +-
.../storage/factory/ShuffleHandlerFactory.java | 80 ++++---
.../storage/handler/api/ClientReadHandler.java | 1 -
.../handler/impl/ComposedClientReadHandler.java | 57 ++---
.../handler/impl/HdfsClientReadHandler.java | 10 +-
.../handler/impl/LocalFileClientReadHandler.java | 23 +-
.../impl/LocalFileQuorumClientReadHandler.java | 134 -----------
.../impl/MemoryQuorumClientReadHandler.java | 75 ------
.../impl/MultiReplicaClientReadHandler.java | 104 +++++++++
25 files changed, 683 insertions(+), 441 deletions(-)
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
index 78c3375b..77d6b31c 100644
---
a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
@@ -18,6 +18,7 @@
package org.apache.spark.shuffle.reader;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
@@ -38,6 +39,7 @@ import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.client.util.DefaultIdHelper;
+import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.Constants;
@@ -58,11 +60,14 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
private static final Serializer KRYO_SERIALIZER = new KryoSerializer(new
SparkConf(false));
private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should
be thrown";
+ private ShuffleServerInfo ssi1 = new ShuffleServerInfo("host1-0", "host1",
0);
+ private ShuffleServerInfo ssi2 = new ShuffleServerInfo("host2-0", "host2",
0);
+
@Test
public void readTest1() throws Exception {
String basePath = HDFS_URI + "readTest1";
HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test1", conf);
+ new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(),
conf);
Map<String, String> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -70,12 +75,13 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
writeTestData(writeHandler, 2, 5, expectedData,
blockIdBitmap, "key", KRYO_SERIALIZER, 0);
- RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath,
blockIdBitmap, taskIdBitmap);
+ RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath,
blockIdBitmap,
+ taskIdBitmap, Lists.newArrayList(ssi1));
validateResult(rssShuffleDataIterator, expectedData, 10);
blockIdBitmap.add(ClientUtils.getBlockId(0, 0, Constants.MAX_SEQUENCE_NO));
- rssShuffleDataIterator = getDataIterator(basePath, blockIdBitmap,
taskIdBitmap);
+ rssShuffleDataIterator = getDataIterator(basePath, blockIdBitmap,
taskIdBitmap, Lists.newArrayList(ssi1));
int recNum = 0;
try {
// can't find all expected block id, data loss
@@ -91,10 +97,10 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
}
private RssShuffleDataIterator getDataIterator(String basePath,
Roaring64NavigableMap blockIdBitmap,
- Roaring64NavigableMap taskIdBitmap) {
+ Roaring64NavigableMap taskIdBitmap, List<ShuffleServerInfo> serverInfos)
{
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(
StorageType.HDFS.name(), "appId", 0, 1, 100, 2,
- 10, 10000, basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(),
+ 10, 10000, basePath, blockIdBitmap, taskIdBitmap,
Lists.newArrayList(serverInfos),
new Configuration(), new DefaultIdHelper());
return new RssShuffleDataIterator(KRYO_SERIALIZER, readClient,
new ShuffleReadMetrics(), new RssConf());
@@ -104,9 +110,9 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
public void readTest2() throws Exception {
String basePath = HDFS_URI + "readTest2";
HdfsShuffleWriteHandler writeHandler1 =
- new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test2_1",
conf);
+ new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(),
conf);
HdfsShuffleWriteHandler writeHandler2 =
- new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test2_2",
conf);
+ new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(),
conf);
Map<String, String> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -116,7 +122,8 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
writeTestData(writeHandler2, 2, 5, expectedData,
blockIdBitmap, "key2", KRYO_SERIALIZER, 0);
- RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath,
blockIdBitmap, taskIdBitmap);
+ RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath,
blockIdBitmap,
+ taskIdBitmap, Lists.newArrayList(ssi1, ssi2));
validateResult(rssShuffleDataIterator, expectedData, 20);
assertEquals(20,
rssShuffleDataIterator.getShuffleReadMetrics().recordsRead());
@@ -127,9 +134,9 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
public void readTest3() throws Exception {
String basePath = HDFS_URI + "readTest3";
HdfsShuffleWriteHandler writeHandler1 =
- new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test3_1",
conf);
+ new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(),
conf);
HdfsShuffleWriteHandler writeHandler2 =
- new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test3_2",
conf);
+ new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(),
conf);
Map<String, String> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -141,16 +148,19 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
// duplicate file created, it should be used in product environment
String shuffleFolder = basePath + "/appId/0/0-1";
- FileUtil.copy(fs, new Path(shuffleFolder + "/test3_1_0.data"), fs,
- new Path(shuffleFolder + "/test3_1_0.cp.data"), false, conf);
- FileUtil.copy(fs, new Path(shuffleFolder + "/test3_1_0.index"), fs,
- new Path(shuffleFolder + "/test3_1_0.cp.index"), false, conf);
- FileUtil.copy(fs, new Path(shuffleFolder + "/test3_2_0.data"), fs,
- new Path(shuffleFolder + "/test3_2_0.cp.data"), false, conf);
- FileUtil.copy(fs, new Path(shuffleFolder + "/test3_2_0.index"), fs,
- new Path(shuffleFolder + "/test3_2_0.cp.index"), false, conf);
-
- RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath,
blockIdBitmap, taskIdBitmap);
+ String ssi1Prefix = shuffleFolder + "/" + ssi1.getId();
+ String ssi2Prefix = shuffleFolder + "/" + ssi2.getId();
+ FileUtil.copy(fs, new Path(ssi1Prefix + "_0.data"), fs,
+ new Path(ssi1Prefix + "_0.cp.data"), false, conf);
+ FileUtil.copy(fs, new Path(ssi1Prefix + "_0.index"), fs,
+ new Path(ssi1Prefix + "_0.cp.index"), false, conf);
+ FileUtil.copy(fs, new Path(ssi2Prefix + "_0.data"), fs,
+ new Path(ssi2Prefix + "_0.cp.data"), false, conf);
+ FileUtil.copy(fs, new Path(ssi2Prefix + "_0.index"), fs,
+ new Path(ssi2Prefix + "_0.cp.index"), false, conf);
+
+ RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath,
blockIdBitmap,
+ taskIdBitmap, Lists.newArrayList(ssi1, ssi2));
validateResult(rssShuffleDataIterator, expectedData, 20);
}
@@ -159,7 +169,7 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
public void readTest4() throws Exception {
String basePath = HDFS_URI + "readTest4";
HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test1", conf);
+ new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(),
conf);
Map<String, String> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -167,12 +177,11 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
writeTestData(writeHandler, 2, 5, expectedData,
blockIdBitmap, "key", KRYO_SERIALIZER, 0);
- RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath,
blockIdBitmap, taskIdBitmap);
+ RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath,
blockIdBitmap,
+ taskIdBitmap, Lists.newArrayList(ssi1));
// data file is deleted after iterator initialization
- Path dataFile = new Path(basePath + "/appId/0/0-1/test1_0.data");
+ Path dataFile = new Path(basePath + "/appId/0/0-1/" + ssi1.getId() +
"_0.data");
fs.delete(dataFile, true);
- // sleep to wait delete operation
- Thread.sleep(10000);
try {
fs.listStatus(dataFile);
fail("Index file should be deleted");
@@ -194,7 +203,7 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
public void readTest5() throws Exception {
String basePath = HDFS_URI + "readTest5";
HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test", conf);
+ new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(),
conf);
Map<String, String> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -202,12 +211,11 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
writeTestData(writeHandler, 2, 5, expectedData,
blockIdBitmap, "key", KRYO_SERIALIZER, 0);
- final RssShuffleDataIterator rssShuffleDataIterator =
getDataIterator(basePath, blockIdBitmap, taskIdBitmap);
+ final RssShuffleDataIterator rssShuffleDataIterator =
getDataIterator(basePath, blockIdBitmap,
+ taskIdBitmap, Lists.newArrayList(ssi1));
// index file is deleted after iterator initialization, it should be ok,
all index infos are read already
- Path indexFile = new Path(basePath + "/appId/0/0-1/test.index");
+ Path indexFile = new Path(basePath + "/appId/0/0-1/" + ssi1.getId() +
".index");
fs.delete(indexFile, true);
- // sleep to wait delete operation
- Thread.sleep(10000);
try {
fs.listStatus(indexFile);
fail("Index file should be deleted");
@@ -221,7 +229,7 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
public void readTest7() throws Exception {
String basePath = HDFS_URI + "readTest7";
HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test", conf);
+ new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(),
conf);
Map<String, String> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -229,7 +237,8 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
writeTestData(writeHandler, 2, 5, expectedData,
blockIdBitmap, "key", KRYO_SERIALIZER, 0);
- RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath,
blockIdBitmap, taskIdBitmap);
+ RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath,
blockIdBitmap,
+ taskIdBitmap, Lists.newArrayList(ssi1));
// crc32 is incorrect
try (MockedStatic<ChecksumUtils> checksumUtilsMock =
Mockito.mockStatic(ChecksumUtils.class)) {
diff --git
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
index ce33f47c..fa94faf8 100644
---
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
+++
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
@@ -17,8 +17,11 @@
package org.apache.spark.shuffle.reader;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
@@ -31,6 +34,7 @@ import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import scala.Option;
+import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler;
import org.apache.uniffle.storage.util.StorageType;
@@ -46,10 +50,10 @@ public class RssShuffleReaderTest extends
AbstractRssReaderTest {
@Test
public void readTest() throws Exception {
-
+ ShuffleServerInfo ssi = new ShuffleServerInfo("127.0.0.1", 0);
String basePath = HDFS_URI + "readTest1";
HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test", conf);
+ new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi.getId(),
conf);
Map<String, String> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -57,13 +61,17 @@ public class RssShuffleReaderTest extends
AbstractRssReaderTest {
writeTestData(writeHandler, 2, 5, expectedData,
blockIdBitmap, "key", KRYO_SERIALIZER, 0);
- TaskContext contextMock = mock(TaskContext.class);
RssShuffleHandle handleMock = mock(RssShuffleHandle.class);
ShuffleDependency dependencyMock = mock(ShuffleDependency.class);
when(handleMock.getAppId()).thenReturn("appId");
when(handleMock.getShuffleId()).thenReturn(1);
when(handleMock.getDependency()).thenReturn(dependencyMock);
+ Map<Integer, List<ShuffleServerInfo>> partitionToServers = new HashMap<>();
+ partitionToServers.put(0, Lists.newArrayList(ssi));
+ partitionToServers.put(1, Lists.newArrayList(ssi));
+ when(handleMock.getPartitionToServers()).thenReturn(partitionToServers);
when(dependencyMock.serializer()).thenReturn(KRYO_SERIALIZER);
+ TaskContext contextMock = mock(TaskContext.class);
when(contextMock.taskAttemptId()).thenReturn(1L);
when(contextMock.attemptNumber()).thenReturn(1);
when(contextMock.taskMetrics()).thenReturn(new TaskMetrics());
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
index 213061a4..c0988e2e 100644
---
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
+++
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
@@ -17,8 +17,11 @@
package org.apache.spark.shuffle.reader;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
@@ -33,6 +36,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import scala.Option;
import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler;
import org.apache.uniffle.storage.util.StorageType;
@@ -42,19 +46,18 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
-
public class RssShuffleReaderTest extends AbstractRssReaderTest {
private static final Serializer KRYO_SERIALIZER = new KryoSerializer(new
SparkConf(false));
@Test
public void readTest() throws Exception {
-
+ ShuffleServerInfo ssi = new ShuffleServerInfo("127.0.0.1", 0);
String basePath = HDFS_URI + "readTest1";
HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler("appId", 0, 0, 0, basePath, "test", conf);
+ new HdfsShuffleWriteHandler("appId", 0, 0, 0, basePath, ssi.getId(),
conf);
final HdfsShuffleWriteHandler writeHandler1 =
- new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test", conf);
+ new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi.getId(),
conf);
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
final Roaring64NavigableMap taskIdBitmap =
Roaring64NavigableMap.bitmapOf(0);
@@ -63,13 +66,17 @@ public class RssShuffleReaderTest extends
AbstractRssReaderTest {
writeTestData(writeHandler, 2, 5, expectedData,
blockIdBitmap, "key", KRYO_SERIALIZER, 0);
- TaskContext contextMock = mock(TaskContext.class);
RssShuffleHandle handleMock = mock(RssShuffleHandle.class);
ShuffleDependency dependencyMock = mock(ShuffleDependency.class);
when(handleMock.getAppId()).thenReturn("appId");
when(handleMock.getDependency()).thenReturn(dependencyMock);
when(handleMock.getShuffleId()).thenReturn(1);
+ Map<Integer, List<ShuffleServerInfo>> partitionToServers = new HashMap<>();
+ partitionToServers.put(0, Lists.newArrayList(ssi));
+ partitionToServers.put(1, Lists.newArrayList(ssi));
+ when(handleMock.getPartitionToServers()).thenReturn(partitionToServers);
when(dependencyMock.serializer()).thenReturn(KRYO_SERIALIZER);
+ TaskContext contextMock = mock(TaskContext.class);
when(contextMock.attemptNumber()).thenReturn(1);
when(contextMock.taskAttemptId()).thenReturn(1L);
when(contextMock.taskMetrics()).thenReturn(new TaskMetrics());
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 99387f59..8a57cbb0 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -47,7 +47,6 @@ import
org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
public class ShuffleReadClientImpl implements ShuffleReadClient {
private static final Logger LOG =
LoggerFactory.getLogger(ShuffleReadClientImpl.class);
-
private int shuffleId;
private int partitionId;
private byte[] readBuffer;
@@ -232,13 +231,7 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
@Override
public void checkProcessedBlockIds() {
- Roaring64NavigableMap cloneBitmap;
- cloneBitmap = RssUtils.cloneBitMap(blockIdBitmap);
- cloneBitmap.and(processedBlockIds);
- if (!blockIdBitmap.equals(cloneBitmap)) {
- throw new RssException("Blocks read inconsistent: expected " +
blockIdBitmap.getLongCardinality()
- + " blocks, actual " + cloneBitmap.getLongCardinality() + " blocks");
- }
+ RssUtils.checkProcessedBlockIds(blockIdBitmap, processedBlockIds);
}
@Override
diff --git a/client/src/test/java/org/apache/uniffle/client/TestUtils.java
b/client/src/test/java/org/apache/uniffle/client/TestUtils.java
index cbb441df..284d22e8 100644
--- a/client/src/test/java/org/apache/uniffle/client/TestUtils.java
+++ b/client/src/test/java/org/apache/uniffle/client/TestUtils.java
@@ -18,12 +18,16 @@
package org.apache.uniffle.client;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.Map;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
+import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.ShuffleDataResult;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestUtils {
@@ -55,6 +59,29 @@ public class TestUtils {
assertEquals(expectedData.size(), blockNum);
}
+ public static void validateResult(
+ Map<Long, byte[]> expectedData,
+ ShuffleDataResult sdr) {
+ byte[] buffer = sdr.getData();
+ List<BufferSegment> bufferSegments = sdr.getBufferSegments();
+ assertEquals(expectedData.size(), bufferSegments.size());
+ for (Map.Entry<Long, byte[]> entry : expectedData.entrySet()) {
+ BufferSegment bs = findBufferSegment(entry.getKey(), bufferSegments);
+ assertNotNull(bs);
+ byte[] data = new byte[bs.getLength()];
+ System.arraycopy(buffer, bs.getOffset(), data, 0, bs.getLength());
+ }
+ }
+
+ private static BufferSegment findBufferSegment(long blockId,
List<BufferSegment> bufferSegments) {
+ for (BufferSegment bs : bufferSegments) {
+ if (bs.getBlockId() == blockId) {
+ return bs;
+ }
+ }
+ return null;
+ }
+
public static boolean compareByte(byte[] expected, ByteBuffer buffer) {
int start = buffer.position();
for (int i = 0; i < expected.length; i++) {
diff --git
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
index 2589c195..e2f41536 100644
---
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
+++
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
@@ -37,6 +37,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.TestUtils;
import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.HdfsTestBase;
@@ -54,11 +55,14 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should
be thrown";
private static AtomicLong ATOMIC_LONG = new AtomicLong(0);
+ private ShuffleServerInfo ssi1 = new ShuffleServerInfo("host1-0", "host1",
0);
+ private ShuffleServerInfo ssi2 = new ShuffleServerInfo("host2-0", "host2",
0);
+
@Test
public void readTest1() throws Exception {
String basePath = HDFS_URI + "clientReadTest1";
HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test1", conf);
+ new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(),
conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -68,7 +72,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
ShuffleReadClientImpl readClient = new
ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 100, 1,
10, 1000, basePath, blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
@@ -78,7 +82,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
taskIdBitmap.addLong(Constants.MAX_TASK_ATTEMPT_ID - 1);
readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId",
0, 1, 100, 1,
10, 1000, basePath, blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
try {
// can't find all expected block id, data loss
@@ -95,9 +99,9 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
public void readTest2() throws Exception {
String basePath = HDFS_URI + "clientReadTest2";
HdfsShuffleWriteHandler writeHandler1 =
- new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test2_1",
conf);
+ new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(),
conf);
HdfsShuffleWriteHandler writeHandler2 =
- new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test2_2",
conf);
+ new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(),
conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -107,7 +111,8 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
ShuffleReadClientImpl readClient = new
ShuffleReadClientImpl(StorageType.HDFS.name(),
"appId", 0, 1, 100, 2, 10, 1000,
- basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new
Configuration(), new DefaultIdHelper());
+ basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1, ssi2),
+ new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
@@ -118,9 +123,9 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
public void readTest3() throws Exception {
String basePath = HDFS_URI + "clientReadTest3";
HdfsShuffleWriteHandler writeHandler1 =
- new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test3_1",
conf);
+ new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(),
conf);
HdfsShuffleWriteHandler writeHandler2 =
- new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test3_2",
conf);
+ new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(),
conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
final Roaring64NavigableMap blockIdBitmap =
Roaring64NavigableMap.bitmapOf();
@@ -130,18 +135,19 @@ public class ShuffleReadClientImplTest extends
HdfsTestBase {
// duplicate file created, it should be used in product environment
String shuffleFolder = basePath + "/appId/0/0-1";
- FileUtil.copy(fs, new Path(shuffleFolder + "/test3_1_0.data"), fs,
- new Path(basePath + "/test3_1.cp.data"), false, conf);
- FileUtil.copy(fs, new Path(shuffleFolder + "/test3_1_0.index"), fs,
- new Path(basePath + "/test3_1.cp.index"), false, conf);
- FileUtil.copy(fs, new Path(shuffleFolder + "/test3_2_0.data"), fs,
- new Path(basePath + "/test3_2.cp.data"), false, conf);
- FileUtil.copy(fs, new Path(shuffleFolder + "/test3_2_0.index"), fs,
- new Path(basePath + "/test3_2.cp.index"), false, conf);
+ FileUtil.copy(fs, new Path(shuffleFolder + "/" + ssi1.getId() +
"_0.data"), fs,
+ new Path(basePath + "/" + ssi1.getId() + ".cp.data"), false, conf);
+ FileUtil.copy(fs, new Path(shuffleFolder + "/" + ssi1.getId() +
"_0.index"), fs,
+ new Path(basePath + "/" + ssi1.getId() + ".cp.index"), false, conf);
+ FileUtil.copy(fs, new Path(shuffleFolder + "/" + ssi2.getId() +
"_0.data"), fs,
+ new Path(basePath + "/" + ssi2.getId() + ".cp.data"), false, conf);
+ FileUtil.copy(fs, new Path(shuffleFolder + "/" + ssi2.getId() +
"_0.index"), fs,
+ new Path(basePath + "/" + ssi2.getId() + ".cp.index"), false, conf);
ShuffleReadClientImpl readClient = new
ShuffleReadClientImpl(StorageType.HDFS.name(),
"appId", 0, 1, 100, 2, 10, 1000,
- basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new
Configuration(), new DefaultIdHelper());
+ basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1, ssi2),
+ new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
@@ -152,7 +158,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
public void readTest4() throws Exception {
String basePath = HDFS_URI + "clientReadTest4";
HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test1", conf);
+ new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(),
conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -161,12 +167,10 @@ public class ShuffleReadClientImplTest extends
HdfsTestBase {
ShuffleReadClientImpl readClient = new
ShuffleReadClientImpl(StorageType.HDFS.name(),
"appId", 0, 1, 100, 2, 10, 1000,
- basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new
Configuration(), new DefaultIdHelper());
- Path dataFile = new Path(basePath + "/appId/0/0-1/test1_0.data");
+ basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1), new
Configuration(), new DefaultIdHelper());
+ Path dataFile = new Path(basePath + "/appId/0/0-1/" + ssi1.getId() +
"_0.data");
// data file is deleted after readClient checkExpectedBlockIds
- fs.delete(new Path(basePath + "/appId/0/0-1/test1_0.data"), true);
- // sleep to wait delete operation
- Thread.sleep(10000);
+ fs.delete(dataFile, true);
assertNull(readClient.readShuffleBlockData());
try {
@@ -189,7 +193,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
public void readTest5() throws Exception {
String basePath = HDFS_URI + "clientReadTest5";
HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test", conf);
+ new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(),
conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -198,9 +202,9 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
ShuffleReadClientImpl readClient = new
ShuffleReadClientImpl(StorageType.HDFS.name(),
"appId", 0, 1, 100, 2, 10, 1000,
- basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new
Configuration(), new DefaultIdHelper());
+ basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1), new
Configuration(), new DefaultIdHelper());
// index file is deleted after iterator initialization, it should be ok,
all index infos are read already
- Path indexFile = new Path(basePath + "/appId/0/0-1/test_0.index");
+ Path indexFile = new Path(basePath + "/appId/0/0-1/" + ssi1.getId() +
"_0.index");
fs.delete(indexFile, true);
readClient.close();
@@ -211,7 +215,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
public void readTest7() throws Exception {
String basePath = HDFS_URI + "clientReadTest7";
HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test", conf);
+ new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(),
conf);
Map<Long, byte[]> expectedData1 = Maps.newHashMap();
Map<Long, byte[]> expectedData2 = Maps.newHashMap();
@@ -226,10 +230,10 @@ public class ShuffleReadClientImplTest extends
HdfsTestBase {
ShuffleReadClientImpl readClient1 = new
ShuffleReadClientImpl(StorageType.HDFS.name(),
"appId", 0, 0, 100, 2, 10, 100,
- basePath, blockIdBitmap1, taskIdBitmap, Lists.newArrayList(), new
Configuration(), new DefaultIdHelper());
+ basePath, blockIdBitmap1, taskIdBitmap, Lists.newArrayList(ssi1), new
Configuration(), new DefaultIdHelper());
final ShuffleReadClientImpl readClient2 = new
ShuffleReadClientImpl(StorageType.HDFS.name(),
"appId", 0, 1, 100, 2, 10, 100,
- basePath, blockIdBitmap2, taskIdBitmap, Lists.newArrayList(), new
Configuration(), new DefaultIdHelper());
+ basePath, blockIdBitmap2, taskIdBitmap, Lists.newArrayList(ssi1), new
Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient1, expectedData1);
readClient1.checkProcessedBlockIds();
readClient1.close();
@@ -243,7 +247,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
public void readTest8() throws Exception {
String basePath = HDFS_URI + "clientReadTest8";
HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test", conf);
+ new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(),
conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -252,7 +256,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
ShuffleReadClientImpl readClient = new
ShuffleReadClientImpl(StorageType.HDFS.name(),
"appId", 0, 1, 100, 2, 10, 1000,
- basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new
Configuration(), new DefaultIdHelper());
+ basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1), new
Configuration(), new DefaultIdHelper());
// crc32 is incorrect
try (MockedStatic<ChecksumUtils> checksumUtilsMock =
Mockito.mockStatic(ChecksumUtils.class)) {
checksumUtilsMock.when(() -> ChecksumUtils.getCrc32((ByteBuffer)
any())).thenReturn(-1L);
@@ -275,7 +279,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
ShuffleReadClientImpl readClient = new
ShuffleReadClientImpl(StorageType.HDFS.name(),
"appId", 0, 1, 100, 2, 10, 1000,
"basePath", Roaring64NavigableMap.bitmapOf(),
Roaring64NavigableMap.bitmapOf(),
- Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
assertNull(readClient.readShuffleBlockData());
readClient.checkProcessedBlockIds();
}
@@ -284,7 +288,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
public void readTest10() throws Exception {
String basePath = HDFS_URI + "clientReadTest10";
HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test", conf);
+ new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(),
conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -298,7 +302,8 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
ShuffleReadClientImpl readClient = new
ShuffleReadClientImpl(StorageType.HDFS.name(),
"appId", 0, 0, 100, 2, 10, 100,
- basePath, wrongBlockIdBitmap, taskIdBitmap, Lists.newArrayList(), new
Configuration(), new DefaultIdHelper());
+ basePath, wrongBlockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1),
+ new Configuration(), new DefaultIdHelper());
assertNull(readClient.readShuffleBlockData());
try {
readClient.checkProcessedBlockIds();
@@ -312,7 +317,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
public void readTest11() throws Exception {
String basePath = HDFS_URI + "clientReadTest11";
HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test1", conf);
+ new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(),
conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -322,7 +327,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
// test with different indexReadLimit to validate result
ShuffleReadClientImpl readClient = new
ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 1, 1,
10, 1000, basePath, blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
@@ -330,7 +335,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId",
0, 1, 2, 1,
10, 1000, basePath, blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
@@ -338,7 +343,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId",
0, 1, 3, 1,
10, 1000, basePath, blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
@@ -346,7 +351,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId",
0, 1, 10, 1,
10, 1000, basePath, blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
@@ -354,7 +359,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId",
0, 1, 11, 1,
10, 1000, basePath, blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
@@ -365,7 +370,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
public void readTest12() throws Exception {
String basePath = HDFS_URI + "clientReadTest12";
HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test1", conf);
+ new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(),
conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
final Roaring64NavigableMap blockIdBitmap =
Roaring64NavigableMap.bitmapOf();
@@ -377,7 +382,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient = new
ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 100, 1,
10, 1000, basePath, blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
assertEquals(15, readClient.getProcessedBlockIds().getLongCardinality());
@@ -389,7 +394,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
public void readTest13() throws Exception {
String basePath = HDFS_URI + "clientReadTest13";
HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test1", conf);
+ new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(),
conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
final Roaring64NavigableMap blockIdBitmap =
Roaring64NavigableMap.bitmapOf();
@@ -404,7 +409,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient = new
ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 100, 1,
10, 1000, basePath, blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
assertEquals(20, readClient.getProcessedBlockIds().getLongCardinality());
@@ -416,7 +421,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
public void readTest14() throws Exception {
String basePath = HDFS_URI + "clientReadTest14";
HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test1", conf);
+ new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(),
conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
final Roaring64NavigableMap blockIdBitmap =
Roaring64NavigableMap.bitmapOf();
@@ -428,7 +433,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient = new
ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 100, 1,
10, 1000, basePath, blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
assertEquals(15, readClient.getProcessedBlockIds().getLongCardinality());
@@ -440,7 +445,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
public void readTest15() throws Exception {
String basePath = HDFS_URI + "clientReadTest15";
HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test1", conf);
+ new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(),
conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
final Roaring64NavigableMap blockIdBitmap =
Roaring64NavigableMap.bitmapOf();
@@ -454,7 +459,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase
{
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient = new
ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 100, 1,
10, 1000, basePath, blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ Lists.newArrayList(ssi1), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
assertEquals(25, readClient.getProcessedBlockIds().getLongCardinality());
diff --git
a/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java
b/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java
index 2174f35d..eb503d90 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleServerInfo.java
@@ -27,6 +27,13 @@ public class ShuffleServerInfo implements Serializable {
private int port;
+ // Only for test
+ public ShuffleServerInfo(String host, int port) {
+ this.id = host + "-" + port;
+ this.host = host;
+ this.port = port;
+ }
+
public ShuffleServerInfo(String id, String host, int port) {
this.id = id;
this.host = host;
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 5319444e..fb05894e 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -48,6 +48,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.exception.RssException;
public class RssUtils {
@@ -271,4 +272,15 @@ public class RssUtils {
}
return serverToPartitions;
}
+
+ public static void checkProcessedBlockIds(Roaring64NavigableMap
blockIdBitmap,
+ Roaring64NavigableMap
processedBlockIds) {
+ Roaring64NavigableMap cloneBitmap;
+ cloneBitmap = RssUtils.cloneBitMap(blockIdBitmap);
+ cloneBitmap.and(processedBlockIds);
+ if (!blockIdBitmap.equals(cloneBitmap)) {
+ throw new RssException("Blocks read inconsistent: expected " +
blockIdBitmap.getLongCardinality()
+ + " blocks, actual " + cloneBitmap.getLongCardinality() + " blocks");
+ }
+ }
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
index 06e3be7b..ee58ade8 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
@@ -27,6 +27,7 @@ import java.util.Map;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.AfterAll;
+import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.CoordinatorMetrics;
import org.apache.uniffle.coordinator.CoordinatorServer;
@@ -40,7 +41,16 @@ import org.apache.uniffle.storage.util.StorageType;
public abstract class IntegrationTestBase extends HdfsTestBase {
protected static final int SHUFFLE_SERVER_PORT = 20001;
- protected static final String LOCALHOST = "127.0.0.1";
+ protected static final String LOCALHOST;
+
+ static {
+ try {
+ LOCALHOST = RssUtils.getHostIp();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
protected static final int COORDINATOR_PORT_1 = 19999;
protected static final int COORDINATOR_PORT_2 = 20030;
protected static final int JETTY_PORT_1 = 19998;
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
index 5f76be5b..6e0145d8 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
@@ -135,7 +135,7 @@ public class MultiStorageFaultToleranceTest extends
ShuffleReadWriteBase {
Roaring64NavigableMap taskBitmap, Map<Long,
byte[]> expectedData) {
ShuffleReadClientImpl readClient = new
ShuffleReadClientImpl(StorageType.LOCALFILE_HDFS.name(),
appId, shuffleId, partitionId, 100, 1, 10, 1000, REMOTE_STORAGE,
blockBitmap, taskBitmap,
- Lists.newArrayList(new ShuffleServerInfo("test", LOCALHOST,
SHUFFLE_SERVER_PORT)), conf, new DefaultIdHelper());
+ Lists.newArrayList(new ShuffleServerInfo(LOCALHOST,
SHUFFLE_SERVER_PORT)), conf, new DefaultIdHelper());
CompressedShuffleBlock csb = readClient.readShuffleBlockData();
Roaring64NavigableMap matched = Roaring64NavigableMap.bitmapOf();
while (csb != null && csb.getByteBuffer() != null) {
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
new file mode 100644
index 00000000..636ba65f
--- /dev/null
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
+import org.apache.uniffle.client.TestUtils;
+import org.apache.uniffle.client.api.ShuffleServerClient;
+import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
+import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
+import org.apache.uniffle.client.request.RssSendCommitRequest;
+import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.CoordinatorServer;
+import org.apache.uniffle.server.MockedShuffleServer;
+import org.apache.uniffle.server.ShuffleServer;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.buffer.ShuffleBuffer;
+import org.apache.uniffle.storage.factory.ShuffleHandlerFactory;
+import org.apache.uniffle.storage.handler.api.ClientReadHandler;
+import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ShuffleServerFaultToleranceTest extends ShuffleReadWriteBase {
+
+ private List<ShuffleServerClient> shuffleServerClients;
+
+ private String remoteStoragePath = HDFS_URI + "rss/test";
+
+ @BeforeAll
+ public static void setupServers() throws Exception {
+ CoordinatorConf coordinatorConf = getCoordinatorConf();
+ createCoordinatorServer(coordinatorConf);
+ shuffleServers.add(createServer(0));
+ shuffleServers.add(createServer(1));
+ shuffleServers.add(createServer(2));
+ startServers();
+ }
+
+ @BeforeEach
+ public void createClient() {
+ shuffleServerClients = new ArrayList<>();
+ for (ShuffleServer shuffleServer : shuffleServers) {
+ shuffleServerClients.add(new
ShuffleServerGrpcClient(shuffleServer.getIp(), shuffleServer.getPort()));
+ }
+ }
+
+ @AfterEach
+ public void cleanEnv() throws Exception {
+ shuffleServerClients.forEach((client) -> {
+ client.close();
+ });
+ cleanCluster();
+ setupServers();
+ }
+
+ @Test
+ public void testReadFaultTolerance() throws Exception {
+ String testAppId =
"ShuffleServerFaultToleranceTest.testReadFaultTolerance";
+ int shuffleId = 0;
+ int partitionId = 0;
+ RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest(testAppId,
shuffleId,
+ Lists.newArrayList(new PartitionRange(0, 0)), remoteStoragePath);
+ registerShuffle(rrsr);
+ Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+ Map<Long, byte[]> dataMap = Maps.newHashMap();
+ Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[1];
+ bitmaps[0] = Roaring64NavigableMap.bitmapOf();
+ List<ShuffleBlockInfo> blocks = createShuffleBlockList(
+ shuffleId, partitionId, 0, 3, 25,
+ expectBlockIds, dataMap, mockSSI);
+
+ RssSendShuffleDataRequest rssdr = getRssSendShuffleDataRequest(testAppId,
shuffleId, partitionId, blocks);
+ shuffleServerClients.get(1).sendShuffleData(rssdr);
+
+ List<ShuffleServerInfo> shuffleServerInfoList = new ArrayList<>();
+ for (ShuffleServer shuffleServer : shuffleServers) {
+ shuffleServerInfoList.add(new ShuffleServerInfo(shuffleServer.getId(),
+ shuffleServer.getIp(), shuffleServer.getPort()));
+ }
+
+ CreateShuffleReadHandlerRequest request =
mockCreateShuffleReadHandlerRequest(
+ testAppId, shuffleId, partitionId, shuffleServerInfoList,
expectBlockIds, StorageType.MEMORY_LOCALFILE);
+ ClientReadHandler clientReadHandler =
ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
+ Map<Long, byte[]> expectedData = Maps.newHashMap();
+ expectedData.clear();
+ blocks.forEach((block) -> {
+ expectedData.put(block.getBlockId(), block.getData());
+ });
+ ShuffleDataResult sdr = clientReadHandler.readShuffleData();
+ TestUtils.validateResult(expectedData, sdr);
+
+ // send data to shuffle server, and wait until flush to localfile
+ List<ShuffleBlockInfo> blocks2 = createShuffleBlockList(
+ shuffleId, partitionId, 0, 3, 25,
+ expectBlockIds, dataMap, mockSSI);
+
+ rssdr = getRssSendShuffleDataRequest(testAppId, shuffleId, partitionId,
blocks2);
+ shuffleServerClients.get(1).sendShuffleData(rssdr);
+ RssSendCommitRequest commitRequest = new RssSendCommitRequest(testAppId,
shuffleId);
+ shuffleServerClients.get(1).sendCommit(commitRequest);
+ waitFlush(testAppId, shuffleId);
+ request = mockCreateShuffleReadHandlerRequest(
+ testAppId, shuffleId, partitionId, shuffleServerInfoList,
expectBlockIds, StorageType.LOCALFILE);
+ clientReadHandler =
ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
+ sdr = clientReadHandler.readShuffleData();
+ blocks2.forEach((block) -> {
+ expectedData.put(block.getBlockId(), block.getData());
+ });
+ TestUtils.validateResult(expectedData, sdr);
+
+ // send data to shuffle server, and wait until flush to hdfs
+ List<ShuffleBlockInfo> blocks3 = createShuffleBlockList(
+ shuffleId, partitionId, 0, 3, 150,
+ expectBlockIds, dataMap, mockSSI);
+ expectedData.clear();
+ blocks3.forEach((block) -> {
+ expectedData.put(block.getBlockId(), block.getData());
+ });
+ rssdr = getRssSendShuffleDataRequest(testAppId, shuffleId, partitionId,
blocks3);
+ shuffleServerClients.get(1).sendShuffleData(rssdr);
+ shuffleServerClients.get(1).sendCommit(commitRequest);
+ waitFlush(testAppId, shuffleId);
+ request = mockCreateShuffleReadHandlerRequest(
+ testAppId, shuffleId, partitionId, shuffleServerInfoList,
expectBlockIds, StorageType.HDFS);
+ clientReadHandler =
ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
+ sdr = clientReadHandler.readShuffleData();
+ TestUtils.validateResult(expectedData, sdr);
+ }
+
+ private CreateShuffleReadHandlerRequest mockCreateShuffleReadHandlerRequest(
+ String testAppId, int shuffleId, int partitionId,
List<ShuffleServerInfo> shuffleServerInfoList,
+ Roaring64NavigableMap expectBlockIds, StorageType storageType) {
+ CreateShuffleReadHandlerRequest request = new
CreateShuffleReadHandlerRequest();
+ request.setStorageType(storageType.name());
+ request.setAppId(testAppId);
+ request.setShuffleId(shuffleId);
+ request.setPartitionId(partitionId);
+ request.setIndexReadLimit(100);
+ request.setPartitionNumPerRange(1);
+ request.setPartitionNum(1);
+ request.setReadBufferSize(14 * 1024 * 1024);
+ request.setStorageBasePath(remoteStoragePath);
+ request.setShuffleServerInfoList(shuffleServerInfoList);
+ request.setHadoopConf(conf);
+ request.setExpectBlockIds(expectBlockIds);
+ Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+ request.setProcessBlockIds(processBlockIds);
+ request.setDistributionType(ShuffleDataDistributionType.NORMAL);
+ Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
+ request.setExpectTaskIds(taskIdBitmap);
+ return request;
+ }
+
+ private RssSendShuffleDataRequest getRssSendShuffleDataRequest(
+ String appId, int shuffleId, int partitionId, List<ShuffleBlockInfo>
blocks) {
+ Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
+ partitionToBlocks.put(partitionId, blocks);
+ Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks =
Maps.newHashMap();
+ shuffleToBlocks.put(shuffleId, partitionToBlocks);
+ return new RssSendShuffleDataRequest(
+ appId, 3, 1000, shuffleToBlocks);
+ }
+
+ private void registerShuffle(RssRegisterShuffleRequest rrsr) {
+ shuffleServerClients.forEach((client) -> {
+ client.registerShuffle(rrsr);
+ });
+ }
+
+ public static MockedShuffleServer createServer(int id) throws Exception {
+ ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE,
StorageType.LOCALFILE.name());
+
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT,
5000L);
+
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
20.0);
+
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
40.0);
+ shuffleServerConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 600L);
+
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT,
5000L);
+ shuffleServerConf.set(ShuffleServerConf.DISK_CAPACITY, 1000000L);
+ shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000);
+ File tmpDir = Files.createTempDir();
+ tmpDir.deleteOnExit();
+ File dataDir1 = new File(tmpDir, id + "_1");
+ File dataDir2 = new File(tmpDir, id + "_2");
+ String basePath = dataDir1.getAbsolutePath() + "," +
dataDir2.getAbsolutePath();
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE_HDFS.name());
+
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE,
450L);
+ shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT +
id);
+ shuffleServerConf.setInteger("rss.jetty.http.port", 19081 + id * 100);
+ shuffleServerConf.setString("rss.storage.basePath", basePath);
+ return new MockedShuffleServer(shuffleServerConf);
+ }
+
+ protected void waitFlush(String appId, int shuffleId) throws
InterruptedException {
+ int retry = 0;
+ while (true) {
+ if (retry > 5) {
+ fail("Timeout for flush data");
+ }
+ ShuffleBuffer shuffleBuffer =
shuffleServers.get(1).getShuffleBufferManager()
+ .getShuffleBuffer(appId, shuffleId, 0);
+ if (shuffleBuffer.getBlocks().size() == 0 &&
shuffleBuffer.getInFlushBlockMap().size() == 0) {
+ break;
+ }
+ Thread.sleep(1000);
+ retry++;
+ }
+ }
+
+ public static void cleanCluster() throws Exception {
+ for (CoordinatorServer coordinator : coordinators) {
+ coordinator.stopServer();
+ }
+ for (ShuffleServer shuffleServer : shuffleServers) {
+ shuffleServer.stopServer();
+ }
+ shuffleServers = Lists.newArrayList();
+ coordinators = Lists.newArrayList();
+ }
+}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHdfsTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHdfsTest.java
index 41a4f7e4..90a4fef6 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHdfsTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHdfsTest.java
@@ -40,6 +40,7 @@ import
org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
@@ -100,9 +101,10 @@ public class ShuffleServerWithHdfsTest extends
ShuffleReadWriteBase {
shuffleServerClient.sendCommit(rscr);
RssFinishShuffleRequest rfsr = new RssFinishShuffleRequest(appId, 0);
+ ShuffleServerInfo ssi = new ShuffleServerInfo(LOCALHOST,
SHUFFLE_SERVER_PORT);
ShuffleReadClientImpl readClient = new
ShuffleReadClientImpl(StorageType.HDFS.name(),
appId, 0, 0, 100, 2, 10, 1000,
- dataBasePath, bitmaps[0], Roaring64NavigableMap.bitmapOf(0),
Lists.newArrayList(),
+ dataBasePath, bitmaps[0], Roaring64NavigableMap.bitmapOf(0),
Lists.newArrayList(ssi),
new Configuration(), new DefaultIdHelper());
assertNull(readClient.readShuffleBlockData());
shuffleServerClient.finishShuffle(rfsr);
@@ -132,25 +134,25 @@ public class ShuffleServerWithHdfsTest extends
ShuffleReadWriteBase {
readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(),
appId, 0, 0, 100, 2, 10, 1000,
- dataBasePath, bitmaps[0], Roaring64NavigableMap.bitmapOf(0),
Lists.newArrayList(),
+ dataBasePath, bitmaps[0], Roaring64NavigableMap.bitmapOf(0),
Lists.newArrayList(ssi),
new Configuration(), new DefaultIdHelper());
validateResult(readClient, expectedData, bitmaps[0]);
readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(),
appId, 0, 1, 100, 2, 10, 1000,
- dataBasePath, bitmaps[1], Roaring64NavigableMap.bitmapOf(1),
Lists.newArrayList(),
+ dataBasePath, bitmaps[1], Roaring64NavigableMap.bitmapOf(1),
Lists.newArrayList(ssi),
new Configuration(), new DefaultIdHelper());
validateResult(readClient, expectedData, bitmaps[1]);
readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(),
appId, 0, 2, 100, 2, 10, 1000,
- dataBasePath, bitmaps[2], Roaring64NavigableMap.bitmapOf(2),
Lists.newArrayList(),
+ dataBasePath, bitmaps[2], Roaring64NavigableMap.bitmapOf(2),
Lists.newArrayList(ssi),
new Configuration(), new DefaultIdHelper());
validateResult(readClient, expectedData, bitmaps[2]);
readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(),
appId, 0, 3, 100, 2, 10, 1000,
- dataBasePath, bitmaps[3], Roaring64NavigableMap.bitmapOf(3),
Lists.newArrayList(),
+ dataBasePath, bitmaps[3], Roaring64NavigableMap.bitmapOf(3),
Lists.newArrayList(ssi),
new Configuration(), new DefaultIdHelper());
validateResult(readClient, expectedData, bitmaps[3]);
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java
index ed2bec66..4bce65c3 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java
@@ -47,6 +47,8 @@ import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.CoordinatorServer;
import org.apache.uniffle.server.ShuffleServer;
@@ -60,9 +62,19 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class ShuffleServerWithKerberizedHdfsTest extends KerberizedHdfsBase {
+ protected static final String LOCALHOST;
+
+ static {
+ try {
+ LOCALHOST = RssUtils.getHostIp();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private static final int COORDINATOR_RPC_PROT = 19999;
private static final int SHUFFLE_SERVER_PORT = 29999;
- private static final String COORDINATOR_QUORUM = "localhost:" +
COORDINATOR_RPC_PROT;
+ private static final String COORDINATOR_QUORUM = LOCALHOST + ":" +
COORDINATOR_RPC_PROT;
private ShuffleServerGrpcClient shuffleServerClient;
private static CoordinatorServer coordinatorServer;
@@ -122,7 +134,7 @@ public class ShuffleServerWithKerberizedHdfsTest extends
KerberizedHdfsBase {
@BeforeEach
public void beforeEach() throws Exception {
initHadoopSecurityContext();
- shuffleServerClient = new ShuffleServerGrpcClient("localhost",
SHUFFLE_SERVER_PORT);
+ shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST,
SHUFFLE_SERVER_PORT);
}
@AfterEach
@@ -212,6 +224,7 @@ public class ShuffleServerWithKerberizedHdfsTest extends
KerberizedHdfsBase {
shuffleServerClient.sendCommit(rscr);
RssFinishShuffleRequest rfsr = new RssFinishShuffleRequest(appId, 0);
+ ShuffleServerInfo ssi = new ShuffleServerInfo(LOCALHOST,
SHUFFLE_SERVER_PORT);
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(
StorageType.HDFS.name(),
appId,
@@ -224,7 +237,7 @@ public class ShuffleServerWithKerberizedHdfsTest extends
KerberizedHdfsBase {
dataBasePath,
bitmaps[0],
Roaring64NavigableMap.bitmapOf(0),
- Lists.newArrayList(),
+ Lists.newArrayList(ssi),
new Configuration(),
new DefaultIdHelper()
);
@@ -265,7 +278,7 @@ public class ShuffleServerWithKerberizedHdfsTest extends
KerberizedHdfsBase {
1000,
dataBasePath, bitmaps[0],
Roaring64NavigableMap.bitmapOf(0),
- Lists.newArrayList(),
+ Lists.newArrayList(ssi),
new Configuration(),
new DefaultIdHelper()
);
@@ -283,7 +296,7 @@ public class ShuffleServerWithKerberizedHdfsTest extends
KerberizedHdfsBase {
dataBasePath,
bitmaps[1],
Roaring64NavigableMap.bitmapOf(1),
- Lists.newArrayList(),
+ Lists.newArrayList(ssi),
new Configuration(),
new DefaultIdHelper()
);
@@ -301,7 +314,7 @@ public class ShuffleServerWithKerberizedHdfsTest extends
KerberizedHdfsBase {
dataBasePath,
bitmaps[2],
Roaring64NavigableMap.bitmapOf(2),
- Lists.newArrayList(),
+ Lists.newArrayList(ssi),
new Configuration(),
new DefaultIdHelper()
);
@@ -319,7 +332,7 @@ public class ShuffleServerWithKerberizedHdfsTest extends
KerberizedHdfsBase {
dataBasePath,
bitmaps[3],
Roaring64NavigableMap.bitmapOf(3),
- Lists.newArrayList(),
+ Lists.newArrayList(ssi),
new Configuration(),
new DefaultIdHelper()
);
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
index 03420fba..fb316a08 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
@@ -19,7 +19,6 @@ package org.apache.uniffle.test;
import java.io.File;
-import com.google.common.collect.Lists;
import com.google.common.io.Files;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@@ -27,13 +26,10 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
-import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
-import org.apache.uniffle.storage.handler.api.ClientReadHandler;
-import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler;
-import org.apache.uniffle.storage.handler.impl.MemoryQuorumClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.MemoryClientReadHandler;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -78,17 +74,14 @@ public class ShuffleServerWithLocalOfExceptionTest extends
ShuffleReadWriteBase
int shuffleId = 0;
int partitionId = 0;
- MemoryQuorumClientReadHandler memoryQuorumClientReadHandler = new
MemoryQuorumClientReadHandler(
- testAppId, shuffleId, partitionId, 150,
Lists.newArrayList(shuffleServerClient));
- ClientReadHandler[] handlers = new ClientReadHandler[1];
- handlers[0] = memoryQuorumClientReadHandler;
- ComposedClientReadHandler composedClientReadHandler = new
ComposedClientReadHandler(handlers);
+ MemoryClientReadHandler memoryClientReadHandler = new
MemoryClientReadHandler(
+ testAppId, shuffleId, partitionId, 150, shuffleServerClient);
shuffleServers.get(0).stopServer();
try {
- ShuffleDataResult sdr = composedClientReadHandler.readShuffleData();
+ memoryClientReadHandler.readShuffleData();
fail("Should throw connection exception directly.");
} catch (RssException rssException) {
- assertTrue(rssException.getMessage().contains("Failed to read shuffle
data from HOT handler"));
+ assertTrue(rssException.getMessage().contains("Failed to read in memory
shuffle data with"));
}
}
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
index 592c980e..de74c1b5 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
@@ -44,8 +44,8 @@ import org.apache.uniffle.server.buffer.ShuffleBuffer;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler;
import org.apache.uniffle.storage.handler.impl.HdfsClientReadHandler;
-import
org.apache.uniffle.storage.handler.impl.LocalFileQuorumClientReadHandler;
-import org.apache.uniffle.storage.handler.impl.MemoryQuorumClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.MemoryClientReadHandler;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -114,17 +114,17 @@ public class ShuffleServerWithMemLocalHdfsTest extends
ShuffleReadWriteBase {
shuffleServerClient.sendShuffleData(rssdr);
// read the 1-th segment from memory
- MemoryQuorumClientReadHandler memoryQuorumClientReadHandler = new
MemoryQuorumClientReadHandler(
- testAppId, shuffleId, partitionId, 150,
Lists.newArrayList(shuffleServerClient));
+ MemoryClientReadHandler memoryClientReadHandler = new
MemoryClientReadHandler(
+ testAppId, shuffleId, partitionId, 150, shuffleServerClient);
Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
- LocalFileQuorumClientReadHandler localFileQuorumClientReadHandler = new
LocalFileQuorumClientReadHandler(
+ LocalFileClientReadHandler localFileClientReadHandler = new
LocalFileClientReadHandler(
testAppId, shuffleId, partitionId, 0, 1, 3,
- 75, expectBlockIds, processBlockIds,
Lists.newArrayList(shuffleServerClient));
+ 75, expectBlockIds, processBlockIds, shuffleServerClient);
HdfsClientReadHandler hdfsClientReadHandler = new
HdfsClientReadHandler(testAppId, shuffleId, partitionId, 0, 1, 3,
500, expectBlockIds, processBlockIds, REMOTE_STORAGE, conf);
ClientReadHandler[] handlers = new ClientReadHandler[3];
- handlers[0] = memoryQuorumClientReadHandler;
- handlers[1] = localFileQuorumClientReadHandler;
+ handlers[0] = memoryClientReadHandler;
+ handlers[1] = localFileClientReadHandler;
handlers[2] = hdfsClientReadHandler;
ComposedClientReadHandler composedClientReadHandler = new
ComposedClientReadHandler(handlers);
Map<Long, byte[]> expectedData = Maps.newHashMap();
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
index 7e0123dc..03ed356a 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
@@ -43,8 +43,8 @@ import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.buffer.ShuffleBuffer;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler;
-import
org.apache.uniffle.storage.handler.impl.LocalFileQuorumClientReadHandler;
-import org.apache.uniffle.storage.handler.impl.MemoryQuorumClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.MemoryClientReadHandler;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -113,37 +113,37 @@ public class ShuffleServerWithMemoryTest extends
ShuffleReadWriteBase {
assertEquals(3, shuffleServers.get(0).getShuffleBufferManager()
.getShuffleBuffer(testAppId, shuffleId, 0).getBlocks().size());
// create memory handler to read data,
- MemoryQuorumClientReadHandler memoryQuorumClientReadHandler = new
MemoryQuorumClientReadHandler(
- testAppId, shuffleId, partitionId, 20,
Lists.newArrayList(shuffleServerClient));
+ MemoryClientReadHandler memoryClientReadHandler = new
MemoryClientReadHandler(
+ testAppId, shuffleId, partitionId, 20, shuffleServerClient);
// start to read data, one block data for every call
- ShuffleDataResult sdr = memoryQuorumClientReadHandler.readShuffleData();
+ ShuffleDataResult sdr = memoryClientReadHandler.readShuffleData();
Map<Long, byte[]> expectedData = Maps.newHashMap();
expectedData.put(blocks.get(0).getBlockId(), blocks.get(0).getData());
validateResult(expectedData, sdr);
- sdr = memoryQuorumClientReadHandler.readShuffleData();
+ sdr = memoryClientReadHandler.readShuffleData();
expectedData.clear();
expectedData.put(blocks.get(1).getBlockId(), blocks.get(1).getData());
validateResult(expectedData, sdr);
- sdr = memoryQuorumClientReadHandler.readShuffleData();
+ sdr = memoryClientReadHandler.readShuffleData();
expectedData.clear();
expectedData.put(blocks.get(2).getBlockId(), blocks.get(2).getData());
validateResult(expectedData, sdr);
// no data in cache, empty return
- sdr = memoryQuorumClientReadHandler.readShuffleData();
+ sdr = memoryClientReadHandler.readShuffleData();
assertEquals(0, sdr.getBufferSegments().size());
// case: read with ComposedClientReadHandler
Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
- memoryQuorumClientReadHandler = new MemoryQuorumClientReadHandler(
- testAppId, shuffleId, partitionId, 50,
Lists.newArrayList(shuffleServerClient));
- LocalFileQuorumClientReadHandler localFileQuorumClientReadHandler = new
LocalFileQuorumClientReadHandler(
+ memoryClientReadHandler = new MemoryClientReadHandler(
+ testAppId, shuffleId, partitionId, 50, shuffleServerClient);
+ LocalFileClientReadHandler localFileQuorumClientReadHandler = new
LocalFileClientReadHandler(
testAppId, shuffleId, partitionId, 0, 1, 3,
- 50, expectBlockIds, processBlockIds,
Lists.newArrayList(shuffleServerClient));
+ 50, expectBlockIds, processBlockIds, shuffleServerClient);
ClientReadHandler[] handlers = new ClientReadHandler[2];
- handlers[0] = memoryQuorumClientReadHandler;
+ handlers[0] = memoryClientReadHandler;
handlers[1] = localFileQuorumClientReadHandler;
ComposedClientReadHandler composedClientReadHandler = new
ComposedClientReadHandler(handlers);
// read from memory with ComposedClientReadHandler
@@ -235,14 +235,14 @@ public class ShuffleServerWithMemoryTest extends
ShuffleReadWriteBase {
// read the 1-th segment from memory
Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
- MemoryQuorumClientReadHandler memoryQuorumClientReadHandler = new
MemoryQuorumClientReadHandler(
- testAppId, shuffleId, partitionId, 150,
Lists.newArrayList(shuffleServerClient));
- LocalFileQuorumClientReadHandler localFileQuorumClientReadHandler = new
LocalFileQuorumClientReadHandler(
+ MemoryClientReadHandler memoryClientReadHandler = new
MemoryClientReadHandler(
+ testAppId, shuffleId, partitionId, 150, shuffleServerClient);
+ LocalFileClientReadHandler localFileClientReadHandler = new
LocalFileClientReadHandler(
testAppId, shuffleId, partitionId, 0, 1, 3,
- 75, expectBlockIds, processBlockIds,
Lists.newArrayList(shuffleServerClient));
+ 75, expectBlockIds, processBlockIds, shuffleServerClient);
ClientReadHandler[] handlers = new ClientReadHandler[2];
- handlers[0] = memoryQuorumClientReadHandler;
- handlers[1] = localFileQuorumClientReadHandler;
+ handlers[0] = memoryClientReadHandler;
+ handlers[1] = localFileClientReadHandler;
ComposedClientReadHandler composedClientReadHandler = new
ComposedClientReadHandler(handlers);
Map<Long, byte[]> expectedData = Maps.newHashMap();
expectedData.clear();
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
index f09be54a..34e14e1c 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
@@ -59,7 +59,7 @@ public class SparkClientWithLocalTest extends
ShuffleReadWriteBase {
private static File DATA_DIR2;
private ShuffleServerGrpcClient shuffleServerClient;
private List<ShuffleServerInfo> shuffleServerInfo =
- Lists.newArrayList(new ShuffleServerInfo("127.0.0.1-20001", LOCALHOST,
SHUFFLE_SERVER_PORT));
+ Lists.newArrayList(new ShuffleServerInfo(LOCALHOST,
SHUFFLE_SERVER_PORT));
@BeforeAll
public static void setupServers() throws Exception {
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
index ef0fbe47..3f4dd135 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
@@ -19,8 +19,9 @@ package org.apache.uniffle.storage.factory;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.stream.Collectors;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
@@ -32,9 +33,10 @@ import
org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler;
import org.apache.uniffle.storage.handler.impl.HdfsClientReadHandler;
import org.apache.uniffle.storage.handler.impl.HdfsShuffleDeleteHandler;
+import org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler;
import org.apache.uniffle.storage.handler.impl.LocalFileDeleteHandler;
-import
org.apache.uniffle.storage.handler.impl.LocalFileQuorumClientReadHandler;
-import org.apache.uniffle.storage.handler.impl.MemoryQuorumClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.MemoryClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.MultiReplicaClientReadHandler;
import org.apache.uniffle.storage.request.CreateShuffleDeleteHandlerRequest;
import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
import org.apache.uniffle.storage.util.StorageType;
@@ -53,80 +55,89 @@ public class ShuffleHandlerFactory {
return INSTANCE;
}
+
public ClientReadHandler
createShuffleReadHandler(CreateShuffleReadHandlerRequest request) {
+ if (CollectionUtils.isEmpty(request.getShuffleServerInfoList())) {
+ throw new RuntimeException("Shuffle servers should not be empty!");
+ }
+ if (request.getShuffleServerInfoList().size() > 1) {
+ List<ClientReadHandler> handlers = Lists.newArrayList();
+ request.getShuffleServerInfoList().forEach((ssi) -> {
+
handlers.add(ShuffleHandlerFactory.getInstance().createSingleReplicaClientReadHandler(request,
ssi));
+ });
+ return new MultiReplicaClientReadHandler(handlers,
request.getShuffleServerInfoList(),
+ request.getExpectBlockIds(), request.getProcessBlockIds());
+ } else {
+ ShuffleServerInfo serverInfo = request.getShuffleServerInfoList().get(0);
+ return createSingleReplicaClientReadHandler(request, serverInfo);
+ }
+ }
+
+ public ClientReadHandler
createSingleReplicaClientReadHandler(CreateShuffleReadHandlerRequest request,
+
ShuffleServerInfo serverInfo) {
String storageType = request.getStorageType();
StorageType type = StorageType.valueOf(storageType);
if (StorageType.MEMORY == type) {
throw new UnsupportedOperationException(
- "Doesn't support storage type for client read handler:" +
storageType);
+ "Doesn't support storage type for client read :" + storageType);
}
if (StorageType.HDFS == type) {
- return getHdfsClientReadHandler(request);
+ return getHdfsClientReadHandler(request, serverInfo);
}
if (StorageType.LOCALFILE == type) {
- return getLocalfileClientReaderHandler(request);
+ return getLocalfileClientReaderHandler(request, serverInfo);
}
List<ClientReadHandler> handlers = new ArrayList<>();
if (StorageType.withMemory(type)) {
handlers.add(
- getMemoryClientReadHandler(request)
+ getMemoryClientReadHandler(request, serverInfo)
);
}
if (StorageType.withLocalfile(type)) {
handlers.add(
- getLocalfileClientReaderHandler(request)
+ getLocalfileClientReaderHandler(request, serverInfo)
);
}
if (StorageType.withHDFS(type)) {
handlers.add(
- getHdfsClientReadHandler(request)
+ getHdfsClientReadHandler(request, serverInfo)
);
}
if (handlers.isEmpty()) {
throw new RssException("This should not happen due to the unknown
storage type: " + storageType);
}
- Callable<ClientReadHandler>[] callables =
- handlers
- .stream()
- .map(x -> (Callable<ClientReadHandler>) () -> x)
- .collect(Collectors.toList())
- .toArray(new Callable[handlers.size()]);
- return new ComposedClientReadHandler(callables);
+ return new ComposedClientReadHandler(handlers);
}
- private ClientReadHandler
getMemoryClientReadHandler(CreateShuffleReadHandlerRequest request) {
- List<ShuffleServerInfo> shuffleServerInfoList =
request.getShuffleServerInfoList();
- List<ShuffleServerClient> shuffleServerClients =
shuffleServerInfoList.stream().map(
- ssi -> ShuffleServerClientFactory.getInstance().getShuffleServerClient(
- ClientType.GRPC.name(), ssi)).collect(
- Collectors.toList());
- ClientReadHandler memoryClientReadHandler = new
MemoryQuorumClientReadHandler(
+ private ClientReadHandler
getMemoryClientReadHandler(CreateShuffleReadHandlerRequest request,
ShuffleServerInfo ssi) {
+ ShuffleServerClient shuffleServerClient =
ShuffleServerClientFactory.getInstance().getShuffleServerClient(
+ ClientType.GRPC.name(), ssi);
+ ClientReadHandler memoryClientReadHandler = new MemoryClientReadHandler(
request.getAppId(),
request.getShuffleId(),
request.getPartitionId(),
request.getReadBufferSize(),
- shuffleServerClients);
+ shuffleServerClient);
return memoryClientReadHandler;
}
- private ClientReadHandler
getLocalfileClientReaderHandler(CreateShuffleReadHandlerRequest request) {
- List<ShuffleServerInfo> shuffleServerInfoList =
request.getShuffleServerInfoList();
- List<ShuffleServerClient> shuffleServerClients =
shuffleServerInfoList.stream().map(
- ssi ->
ShuffleServerClientFactory.getInstance().getShuffleServerClient(ClientType.GRPC.name(),
ssi)).collect(
- Collectors.toList());
- return new LocalFileQuorumClientReadHandler(
+ private ClientReadHandler
getLocalfileClientReaderHandler(CreateShuffleReadHandlerRequest request,
+ ShuffleServerInfo
ssi) {
+ ShuffleServerClient shuffleServerClient =
ShuffleServerClientFactory.getInstance().getShuffleServerClient(
+ ClientType.GRPC.name(), ssi);
+ return new LocalFileClientReadHandler(
request.getAppId(), request.getShuffleId(), request.getPartitionId(),
request.getIndexReadLimit(), request.getPartitionNumPerRange(),
request.getPartitionNum(),
request.getReadBufferSize(), request.getExpectBlockIds(),
request.getProcessBlockIds(),
- shuffleServerClients, request.getDistributionType(),
request.getExpectTaskIds()
+ shuffleServerClient, request.getDistributionType(),
request.getExpectTaskIds()
);
}
- private ClientReadHandler
getHdfsClientReadHandler(CreateShuffleReadHandlerRequest request) {
+ private ClientReadHandler
getHdfsClientReadHandler(CreateShuffleReadHandlerRequest request,
ShuffleServerInfo ssi) {
return new HdfsClientReadHandler(
request.getAppId(),
request.getShuffleId(),
@@ -140,7 +151,8 @@ public class ShuffleHandlerFactory {
request.getStorageBasePath(),
request.getHadoopConf(),
request.getDistributionType(),
- request.getExpectTaskIds()
+ request.getExpectTaskIds(),
+ ssi.getId()
);
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ClientReadHandler.java
index c619da2c..ba018d3a 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ClientReadHandler.java
@@ -34,5 +34,4 @@ public interface ClientReadHandler {
// Display the statistics of consumed blocks
void logConsumedBlockInfo();
-
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
index aab15b5d..da4e2b2f 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
@@ -17,9 +17,10 @@
package org.apache.uniffle.storage.handler.impl;
-import java.util.concurrent.Callable;
+import java.util.List;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,6 +29,11 @@ import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
+/**
+ * Composed read handler for all storage types and one replicas.
+ * The storage types reading order is as follows: HOT -> WARM -> COLD -> FROZEN
+ * @see <a
href="https://github.com/apache/incubator-uniffle/pull/276">PR-276</a>
+ */
public class ComposedClientReadHandler implements ClientReadHandler {
private static final Logger LOG =
LoggerFactory.getLogger(ComposedClientReadHandler.class);
@@ -36,10 +42,6 @@ public class ComposedClientReadHandler implements
ClientReadHandler {
private ClientReadHandler warmDataReadHandler;
private ClientReadHandler coldDataReadHandler;
private ClientReadHandler frozenDataReadHandler;
- private Callable<ClientReadHandler> hotHandlerCreator;
- private Callable<ClientReadHandler> warmHandlerCreator;
- private Callable<ClientReadHandler> coldHandlerCreator;
- private Callable<ClientReadHandler> frozenHandlerCreator;
private static final int HOT = 1;
private static final int WARM = 2;
private static final int COLD = 3;
@@ -63,34 +65,22 @@ public class ComposedClientReadHandler implements
ClientReadHandler {
private long frozenReadUncompressLength = 0L;
public ComposedClientReadHandler(ClientReadHandler... handlers) {
- topLevelOfHandler = handlers.length;
- if (topLevelOfHandler > 0) {
- this.hotDataReadHandler = handlers[0];
- }
- if (topLevelOfHandler > 1) {
- this.warmDataReadHandler = handlers[1];
- }
- if (topLevelOfHandler > 2) {
- this.coldDataReadHandler = handlers[2];
- }
- if (topLevelOfHandler > 3) {
- this.frozenDataReadHandler = handlers[3];
- }
+ this(Lists.newArrayList(handlers));
}
- public ComposedClientReadHandler(Callable<ClientReadHandler>... creators) {
- topLevelOfHandler = creators.length;
+ public ComposedClientReadHandler(List<ClientReadHandler> handlers) {
+ topLevelOfHandler = handlers.size();
if (topLevelOfHandler > 0) {
- this.hotHandlerCreator = creators[0];
+ this.hotDataReadHandler = handlers.get(0);
}
if (topLevelOfHandler > 1) {
- this.warmHandlerCreator = creators[1];
+ this.warmDataReadHandler = handlers.get(1);
}
if (topLevelOfHandler > 2) {
- this.coldHandlerCreator = creators[2];
+ this.coldDataReadHandler = handlers.get(2);
}
if (topLevelOfHandler > 3) {
- this.frozenHandlerCreator = creators[3];
+ this.frozenDataReadHandler = handlers.get(3);
}
}
@@ -100,27 +90,15 @@ public class ComposedClientReadHandler implements
ClientReadHandler {
try {
switch (currentHandler) {
case HOT:
- if (hotDataReadHandler == null) {
- hotDataReadHandler =
createReadHandlerIfNotExist(hotHandlerCreator);
- }
shuffleDataResult = hotDataReadHandler.readShuffleData();
break;
case WARM:
- if (warmDataReadHandler == null) {
- warmDataReadHandler =
createReadHandlerIfNotExist(warmHandlerCreator);
- }
shuffleDataResult = warmDataReadHandler.readShuffleData();
break;
case COLD:
- if (coldDataReadHandler == null) {
- coldDataReadHandler =
createReadHandlerIfNotExist(coldHandlerCreator);
- }
shuffleDataResult = coldDataReadHandler.readShuffleData();
break;
case FROZEN:
- if (frozenDataReadHandler == null) {
- frozenDataReadHandler =
createReadHandlerIfNotExist(frozenHandlerCreator);
- }
shuffleDataResult = frozenDataReadHandler.readShuffleData();
break;
default:
@@ -143,13 +121,6 @@ public class ComposedClientReadHandler implements
ClientReadHandler {
return shuffleDataResult;
}
- private ClientReadHandler
createReadHandlerIfNotExist(Callable<ClientReadHandler> creator) throws
Exception {
- if (creator == null) {
- throw new IllegalStateException("creator " + getCurrentHandlerName() + "
handler doesn't exist");
- }
- return creator.call();
- }
-
private String getCurrentHandlerName() {
String name = "UNKNOWN";
switch (currentHandler) {
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
index a9208960..1001c12c 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
@@ -43,6 +43,7 @@ public class HdfsClientReadHandler extends
AbstractClientReadHandler {
protected final int partitionNumPerRange;
protected final int partitionNum;
protected final int readBufferSize;
+ private final String shuffleServerId;
protected Roaring64NavigableMap expectBlockIds;
protected Roaring64NavigableMap processBlockIds;
protected final String storageBasePath;
@@ -70,7 +71,8 @@ public class HdfsClientReadHandler extends
AbstractClientReadHandler {
String storageBasePath,
Configuration hadoopConf,
ShuffleDataDistributionType distributionType,
- Roaring64NavigableMap expectTaskIds) {
+ Roaring64NavigableMap expectTaskIds,
+ String shuffleServerId) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionId = partitionId;
@@ -84,6 +86,7 @@ public class HdfsClientReadHandler extends
AbstractClientReadHandler {
this.readHandlerIndex = 0;
this.distributionType = distributionType;
this.expectTaskIds = expectTaskIds;
+ this.shuffleServerId = shuffleServerId;
}
// Only for test
@@ -101,7 +104,7 @@ public class HdfsClientReadHandler extends
AbstractClientReadHandler {
Configuration hadoopConf) {
this(appId, shuffleId, partitionId, indexReadLimit, partitionNumPerRange,
partitionNum, readBufferSize,
expectBlockIds, processBlockIds, storageBasePath, hadoopConf,
ShuffleDataDistributionType.NORMAL,
- Roaring64NavigableMap.bitmapOf());
+ Roaring64NavigableMap.bitmapOf(), null);
}
protected void init(String fullShufflePath) {
@@ -119,7 +122,8 @@ public class HdfsClientReadHandler extends
AbstractClientReadHandler {
try {
// get all index files
indexFiles = fs.listStatus(baseFolder,
- file ->
file.getName().endsWith(Constants.SHUFFLE_INDEX_FILE_SUFFIX));
+ file -> file.getName().endsWith(Constants.SHUFFLE_INDEX_FILE_SUFFIX)
+ && (shuffleServerId == null ||
file.getName().startsWith(shuffleServerId)));
} catch (Exception e) {
LOG.error(failedGetIndexFileMsg, e);
return;
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
index a630cf1b..7aac7d0f 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
@@ -38,7 +38,7 @@ public class LocalFileClientReadHandler extends
DataSkippableReadHandler {
private final int partitionNum;
private ShuffleServerClient shuffleServerClient;
- LocalFileClientReadHandler(
+ public LocalFileClientReadHandler(
String appId,
int shuffleId,
int partitionId,
@@ -60,6 +60,27 @@ public class LocalFileClientReadHandler extends
DataSkippableReadHandler {
this.partitionNum = partitionNum;
}
+ /**
+ * Only for test
+ */
+ public LocalFileClientReadHandler(
+ String appId,
+ int shuffleId,
+ int partitionId,
+ int indexReadLimit,
+ int partitionNumPerRange,
+ int partitionNum,
+ int readBufferSize,
+ Roaring64NavigableMap expectBlockIds,
+ Roaring64NavigableMap processBlockIds,
+ ShuffleServerClient shuffleServerClient) {
+ this(
+ appId, shuffleId, partitionId, indexReadLimit, partitionNumPerRange,
+ partitionNum, readBufferSize, expectBlockIds, processBlockIds,
+ shuffleServerClient, ShuffleDataDistributionType.NORMAL,
Roaring64NavigableMap.bitmapOf()
+ );
+ }
+
@Override
public ShuffleIndexResult readShuffleIndex() {
ShuffleIndexResult shuffleIndexResult = null;
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
deleted file mode 100644
index 2a6afdbe..00000000
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.storage.handler.impl;
-
-import java.util.List;
-
-import com.google.common.collect.Lists;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.uniffle.client.api.ShuffleServerClient;
-import org.apache.uniffle.common.BufferSegment;
-import org.apache.uniffle.common.ShuffleDataDistributionType;
-import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
-
-public class LocalFileQuorumClientReadHandler extends
AbstractClientReadHandler {
-
- private static final Logger LOG =
LoggerFactory.getLogger(LocalFileQuorumClientReadHandler.class);
-
- private List<LocalFileClientReadHandler> handlers = Lists.newLinkedList();
-
- private long readBlockNum = 0L;
- private long readLength = 0L;
- private long readUncompressLength = 0L;
-
- public LocalFileQuorumClientReadHandler(
- String appId,
- int shuffleId,
- int partitionId,
- int indexReadLimit,
- int partitionNumPerRange,
- int partitionNum,
- int readBufferSize,
- Roaring64NavigableMap expectBlockIds,
- Roaring64NavigableMap processBlockIds,
- List<ShuffleServerClient> shuffleServerClients,
- ShuffleDataDistributionType distributionType,
- Roaring64NavigableMap expectTaskIds) {
- this.appId = appId;
- this.shuffleId = shuffleId;
- this.partitionId = partitionId;
- this.readBufferSize = readBufferSize;
- for (ShuffleServerClient client: shuffleServerClients) {
- handlers.add(new LocalFileClientReadHandler(
- appId,
- shuffleId,
- partitionId,
- indexReadLimit,
- partitionNumPerRange,
- partitionNum,
- readBufferSize,
- expectBlockIds,
- processBlockIds,
- client,
- distributionType,
- expectTaskIds
- ));
- }
- }
-
- /**
- * Only for test
- */
- public LocalFileQuorumClientReadHandler(
- String appId,
- int shuffleId,
- int partitionId,
- int indexReadLimit,
- int partitionNumPerRange,
- int partitionNum,
- int readBufferSize,
- Roaring64NavigableMap expectBlockIds,
- Roaring64NavigableMap processBlockIds,
- List<ShuffleServerClient> shuffleServerClients) {
- this(
- appId, shuffleId, partitionId, indexReadLimit, partitionNumPerRange,
- partitionNum, readBufferSize, expectBlockIds, processBlockIds,
- shuffleServerClients, ShuffleDataDistributionType.NORMAL,
Roaring64NavigableMap.bitmapOf()
- );
- }
-
- @Override
- public ShuffleDataResult readShuffleData() {
- boolean readSuccessful = false;
- ShuffleDataResult result = null;
- for (LocalFileClientReadHandler handler : handlers) {
- try {
- result = handler.readShuffleData();
- readSuccessful = true;
- break;
- } catch (Exception e) {
- LOG.warn("Failed to read a replica due to ", e);
- }
- }
- if (!readSuccessful) {
- throw new RssException("Failed to read all replicas for appId[" + appId
+ "], shuffleId["
- + shuffleId + "], partitionId[" + partitionId + "]");
- }
- return result;
- }
-
- @Override
- public void updateConsumedBlockInfo(BufferSegment bs) {
- if (bs == null) {
- return;
- }
- readBlockNum++;
- readLength += bs.getLength();
- readUncompressLength += bs.getUncompressLength();
- }
-
- @Override
- public void logConsumedBlockInfo() {
- LOG.info("Client read " + readBlockNum + " blocks,"
- + " bytes:" + readLength + " uncompressed bytes:" +
readUncompressLength);
- }
-}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java
deleted file mode 100644
index 5fbe8ec6..00000000
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.storage.handler.impl;
-
-import java.util.List;
-
-import com.google.common.collect.Lists;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.uniffle.client.api.ShuffleServerClient;
-import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.common.util.Constants;
-
-public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
-
- private static final Logger LOG =
LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
- private long lastBlockId = Constants.INVALID_BLOCK_ID;
- private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
-
- public MemoryQuorumClientReadHandler(
- String appId,
- int shuffleId,
- int partitionId,
- int readBufferSize,
- List<ShuffleServerClient> shuffleServerClients) {
- this.appId = appId;
- this.shuffleId = shuffleId;
- this.partitionId = partitionId;
- this.readBufferSize = readBufferSize;
- shuffleServerClients.forEach(client ->
- handlers.add(new MemoryClientReadHandler(
- appId, shuffleId, partitionId, readBufferSize, client))
- );
- }
-
- @Override
- public ShuffleDataResult readShuffleData() {
- boolean readSuccessful = false;
- ShuffleDataResult result = null;
-
- for (MemoryClientReadHandler handler: handlers) {
- try {
- result = handler.readShuffleData();
- readSuccessful = true;
- break;
- } catch (Exception e) {
- LOG.warn("Failed to read a replica due to ", e);
- }
- }
-
- if (!readSuccessful) {
- throw new RssException("Failed to read in memory shuffle data for
appId[" + appId
- + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId +
"]");
- }
-
- return result;
- }
-}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
new file mode 100644
index 00000000..83b9fb7f
--- /dev/null
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.impl;
+
+import java.util.List;
+
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.storage.handler.api.ClientReadHandler;
+
+public class MultiReplicaClientReadHandler extends AbstractClientReadHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MultiReplicaClientReadHandler.class);
+
+ private final List<ClientReadHandler> handlers;
+ private final List<ShuffleServerInfo> shuffleServerInfos;
+
+ private long readBlockNum = 0L;
+ private long readLength = 0L;
+ private long readUncompressLength = 0L;
+ private final Roaring64NavigableMap blockIdBitmap;
+ private final Roaring64NavigableMap processedBlockIds;
+
+ private int readHandlerIndex;
+
+ public MultiReplicaClientReadHandler(
+ List<ClientReadHandler> handlers,
+ List<ShuffleServerInfo> shuffleServerInfos,
+ Roaring64NavigableMap blockIdBitmap,
+ Roaring64NavigableMap processedBlockIds) {
+ this.handlers = handlers;
+ this.blockIdBitmap = blockIdBitmap;
+ this.processedBlockIds = processedBlockIds;
+ this.shuffleServerInfos = shuffleServerInfos;
+ }
+
+ @Override
+ public ShuffleDataResult readShuffleData() {
+ ClientReadHandler handler;
+ ShuffleDataResult result = null;
+ do {
+ if (readHandlerIndex >= handlers.size()) {
+ return result;
+ }
+ handler = handlers.get(readHandlerIndex);
+ try {
+ result = handler.readShuffleData();
+ } catch (Exception e) {
+ LOG.warn("Failed to read a replica from [{}] due to ",
+ shuffleServerInfos.get(readHandlerIndex).getId(), e);
+ }
+ if (result != null && !result.isEmpty()) {
+ return result;
+ } else {
+ try {
+ RssUtils.checkProcessedBlockIds(blockIdBitmap, processedBlockIds);
+ return result;
+ } catch (RssException e) {
+ LOG.warn("Finished read from [{}], but haven't finished read all the
blocks.",
+ shuffleServerInfos.get(readHandlerIndex).getId(), e);
+ }
+ readHandlerIndex++;
+ }
+ } while (true);
+ }
+
+ @Override
+ public void updateConsumedBlockInfo(BufferSegment bs) {
+ if (bs == null) {
+ return;
+ }
+ readBlockNum++;
+ readLength += bs.getLength();
+ readUncompressLength += bs.getUncompressLength();
+ }
+
+ @Override
+ public void logConsumedBlockInfo() {
+ LOG.info("Client read " + readBlockNum + " blocks,"
+ + " bytes:" + readLength + " uncompressed bytes:" +
readUncompressLength);
+ }
+}