This is an automated email from the ASF dual-hosted git repository.
maobaolong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new b7d391c80 [#436] feat(client,server): Introduce multi-part
LocalStorageManager (#2253)
b7d391c80 is described below
commit b7d391c8085dd9b27c775935ea52bb458f59fd4d
Author: maobaolong <[email protected]>
AuthorDate: Tue Nov 26 10:18:58 2024 +0800
[#436] feat(client,server): Introduce multi-part LocalStorageManager (#2253)
### What changes were proposed in this pull request?
- Introduce a factory to create specific LocalStorageManager by config.
- Introduce multiply disk LocalStorageManager.
### Why are the changes needed?
Fix: #436
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- Existing UTs and new added UT
- Tested on our pressure test cluster.
- new client -> old server ✓
- new client -> new server ✓
- old client -> new server ❌, so we have to upgraded client first, than
upgrade the servers
---
.../java/io/netty/util/CompositeFileRegion.java | 142 +++++++++++++
.../apache/uniffle/common/ShuffleDataSegment.java | 10 +-
.../apache/uniffle/common/ShuffleIndexResult.java | 25 ++-
.../uniffle/common/netty/MessageEncoder.java | 3 +-
.../buffer/MultiFileSegmentManagedBuffer.java | 83 ++++++++
.../netty/protocol/GetLocalShuffleDataRequest.java | 30 +++
...uest.java => GetLocalShuffleDataV2Request.java} | 92 +++------
.../protocol/GetLocalShuffleIndexResponse.java | 5 +
...se.java => GetLocalShuffleIndexV2Response.java} | 60 ++++--
.../uniffle/common/netty/protocol/Message.java | 13 +-
...tSplitter.java => AbstractSegmentSplitter.java} | 78 ++++---
.../common/segment/FixedSizeSegmentSplitter.java | 90 +-------
.../common/segment/LocalOrderSegmentSplitter.java | 120 +----------
.../org/apache/uniffle/common/util/Constants.java | 1 +
.../segment/FixedSizeSegmentSplitterTest.java | 51 +++++
.../segment/LocalOrderSegmentSplitterTest.java | 57 ++++++
...ShuffleWithRssClientTestWhenShuffleFlushed.java | 10 +-
...thLocalForMultiPartLocalStorageManagerTest.java | 226 +++++++++++++++++++++
.../client/impl/grpc/ShuffleServerGrpcClient.java | 6 +-
.../impl/grpc/ShuffleServerGrpcNettyClient.java | 37 +++-
.../client/request/RssGetShuffleDataRequest.java | 23 ++-
.../response/RssGetShuffleIndexResponse.java | 10 +-
proto/src/main/proto/Rss.proto | 2 +
.../uniffle/server/ShuffleDataReadEvent.java | 11 +
.../apache/uniffle/server/ShuffleServerConf.java | 7 +
.../uniffle/server/ShuffleServerGrpcService.java | 16 +-
.../apache/uniffle/server/ShuffleTaskManager.java | 27 ++-
.../server/netty/ShuffleServerNettyHandler.java | 30 ++-
.../server/storage/HybridStorageManager.java | 7 +-
.../server/storage/LocalStorageManager.java | 3 +-
.../server/storage/LocalStorageManagerFactory.java | 47 +++++
.../storage/MultiPartLocalStorageManager.java | 118 +++++++++++
.../uniffle/server/storage/StorageManager.java | 4 +
.../server/storage/StorageManagerFactory.java | 2 +-
.../common/CompositeReadingViewStorage.java | 91 +++++++++
.../uniffle/storage/common/LocalStorage.java | 15 +-
.../storage/handler/api/ServerReadHandler.java | 2 +
.../impl/CompositeLocalFileServerReadHandler.java | 78 +++++++
.../handler/impl/LocalFileClientReadHandler.java | 1 +
.../handler/impl/LocalFileServerReadHandler.java | 22 +-
40 files changed, 1291 insertions(+), 364 deletions(-)
diff --git a/common/src/main/java/io/netty/util/CompositeFileRegion.java
b/common/src/main/java/io/netty/util/CompositeFileRegion.java
new file mode 100644
index 000000000..4549ca0f2
--- /dev/null
+++ b/common/src/main/java/io/netty/util/CompositeFileRegion.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.util;
+
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+import io.netty.channel.FileRegion;
+
+import org.apache.uniffle.common.netty.protocol.AbstractFileRegion;
+
+public class CompositeFileRegion extends AbstractFileRegion {
+ private final FileRegion[] regions;
+ private long totalSize = 0;
+ private long bytesTransferred = 0;
+
+ public CompositeFileRegion(FileRegion... regions) {
+ this.regions = regions;
+ for (FileRegion region : regions) {
+ totalSize += region.count();
+ }
+ }
+
+ @Override
+ public long position() {
+ return bytesTransferred;
+ }
+
+ @Override
+ public long count() {
+ return totalSize;
+ }
+
+ @Override
+ public long transferTo(WritableByteChannel target, long position) throws
IOException {
+ long totalBytesTransferred = 0;
+
+ for (FileRegion region : regions) {
+ if (position >= region.count()) {
+ position -= region.count();
+ } else {
+ long currentBytesTransferred = region.transferTo(target, position);
+ totalBytesTransferred += currentBytesTransferred;
+ bytesTransferred += currentBytesTransferred;
+
+ if (currentBytesTransferred < region.count() - position) {
+ break;
+ }
+ position = 0;
+ }
+ }
+
+ return totalBytesTransferred;
+ }
+
+ @Override
+ public long transferred() {
+ return bytesTransferred;
+ }
+
+ @Override
+ public AbstractFileRegion retain() {
+ super.retain();
+ for (FileRegion region : regions) {
+ region.retain();
+ }
+ return this;
+ }
+
+ @Override
+ public AbstractFileRegion retain(int increment) {
+ super.retain(increment);
+ for (FileRegion region : regions) {
+ region.retain(increment);
+ }
+ return this;
+ }
+
+ @Override
+ public boolean release() {
+ boolean released = super.release();
+ for (FileRegion region : regions) {
+ if (!region.release()) {
+ released = false;
+ }
+ }
+ return released;
+ }
+
+ @Override
+ public boolean release(int decrement) {
+ boolean released = super.release(decrement);
+ for (FileRegion region : regions) {
+ if (!region.release(decrement)) {
+ released = false;
+ }
+ }
+ return released;
+ }
+
+ @Override
+ protected void deallocate() {
+ for (FileRegion region : regions) {
+ if (region instanceof AbstractReferenceCounted) {
+ ((AbstractReferenceCounted) region).deallocate();
+ }
+ }
+ }
+
+ @Override
+ public AbstractFileRegion touch() {
+ super.touch();
+ for (FileRegion region : regions) {
+ region.touch();
+ }
+ return this;
+ }
+
+ @Override
+ public AbstractFileRegion touch(Object hint) {
+ super.touch(hint);
+ for (FileRegion region : regions) {
+ region.touch(hint);
+ }
+ return this;
+ }
+}
diff --git
a/common/src/main/java/org/apache/uniffle/common/ShuffleDataSegment.java
b/common/src/main/java/org/apache/uniffle/common/ShuffleDataSegment.java
index af7299087..532921b14 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleDataSegment.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleDataSegment.java
@@ -27,11 +27,15 @@ import java.util.List;
public class ShuffleDataSegment {
private final long offset;
private final int length;
+
+ private final int storageId;
private final List<BufferSegment> bufferSegments;
- public ShuffleDataSegment(long offset, int length, List<BufferSegment>
bufferSegments) {
+ public ShuffleDataSegment(
+ long offset, int length, int storageId, List<BufferSegment>
bufferSegments) {
this.offset = offset;
this.length = length;
+ this.storageId = storageId;
this.bufferSegments = bufferSegments;
}
@@ -46,4 +50,8 @@ public class ShuffleDataSegment {
public List<BufferSegment> getBufferSegments() {
return bufferSegments;
}
+
+ public int getStorageId() {
+ return storageId;
+ }
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
index d4f863f89..2d9934546 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
@@ -30,8 +30,10 @@ import org.apache.uniffle.common.util.ByteBufUtils;
public class ShuffleIndexResult {
private static final Logger LOG =
LoggerFactory.getLogger(ShuffleIndexResult.class);
+ private static final int[] DEFAULT_STORAGE_IDS = new int[] {0};
private final ManagedBuffer buffer;
+ private final int[] storageIds;
private long dataFileLen;
private String dataFileName;
@@ -44,15 +46,28 @@ public class ShuffleIndexResult {
}
public ShuffleIndexResult(ByteBuffer data, long dataFileLen) {
- this.buffer =
- new NettyManagedBuffer(data != null ? Unpooled.wrappedBuffer(data) :
Unpooled.EMPTY_BUFFER);
- this.dataFileLen = dataFileLen;
+ this(
+ new NettyManagedBuffer(data != null ? Unpooled.wrappedBuffer(data) :
Unpooled.EMPTY_BUFFER),
+ dataFileLen,
+ null,
+ DEFAULT_STORAGE_IDS);
}
public ShuffleIndexResult(ManagedBuffer buffer, long dataFileLen, String
dataFileName) {
+ this(buffer, dataFileLen, dataFileName, DEFAULT_STORAGE_IDS);
+ }
+
+ public ShuffleIndexResult(
+ ManagedBuffer buffer, long dataFileLen, String dataFileName, int
storageId) {
+ this(buffer, dataFileLen, dataFileName, new int[] {storageId});
+ }
+
+ public ShuffleIndexResult(
+ ManagedBuffer buffer, long dataFileLen, String dataFileName, int[]
storageIds) {
this.buffer = buffer;
this.dataFileLen = dataFileLen;
this.dataFileName = dataFileName;
+ this.storageIds = storageIds;
}
public byte[] getData() {
@@ -99,4 +114,8 @@ public class ShuffleIndexResult {
public String getDataFileName() {
return dataFileName;
}
+
+ public int[] getStorageIds() {
+ return storageIds;
+ }
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java
b/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java
index cd4002482..26c782cf9 100644
--- a/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java
+++ b/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java
@@ -86,7 +86,8 @@ public final class MessageEncoder extends
MessageToMessageEncoder<Message> {
header.writeInt(bodyLength);
in.encode(header);
if (header.writableBytes() != 0) {
- throw new RssException("header's writable bytes should be 0");
+ throw new RssException(
+ "header's writable bytes should be 0, but it is " +
header.writableBytes());
}
if (body != null) {
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/buffer/MultiFileSegmentManagedBuffer.java
b/common/src/main/java/org/apache/uniffle/common/netty/buffer/MultiFileSegmentManagedBuffer.java
new file mode 100644
index 000000000..319f2ede8
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/netty/buffer/MultiFileSegmentManagedBuffer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.netty.buffer;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.FileRegion;
+import io.netty.util.CompositeFileRegion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A wrapper of multiple {@link FileSegmentManagedBuffer}, used for combine
shuffle index files. */
+public class MultiFileSegmentManagedBuffer extends ManagedBuffer {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MultiFileSegmentManagedBuffer.class);
+ private final List<ManagedBuffer> managedBuffers;
+
+ public MultiFileSegmentManagedBuffer(List<ManagedBuffer> managedBuffers) {
+ this.managedBuffers = managedBuffers;
+ }
+
+ @Override
+ public int size() {
+ return managedBuffers.stream().mapToInt(ManagedBuffer::size).sum();
+ }
+
+ @Override
+ public ByteBuf byteBuf() {
+ return Unpooled.wrappedBuffer(this.nioByteBuffer());
+ }
+
+ @Override
+ public ByteBuffer nioByteBuffer() {
+ ByteBuffer merged = ByteBuffer.allocate(size());
+ for (ManagedBuffer managedBuffer : managedBuffers) {
+ ByteBuffer buffer = managedBuffer.nioByteBuffer();
+ merged.put(buffer.slice());
+ }
+ merged.flip();
+ return merged;
+ }
+
+ @Override
+ public ManagedBuffer retain() {
+ return this;
+ }
+
+ @Override
+ public ManagedBuffer release() {
+ return this;
+ }
+
+ @Override
+ public Object convertToNetty() {
+ List<FileRegion> fileRegions = new ArrayList<>(managedBuffers.size());
+ for (ManagedBuffer managedBuffer : managedBuffers) {
+ Object object = managedBuffer.convertToNetty();
+ if (object instanceof FileRegion) {
+ fileRegions.add((FileRegion) object);
+ }
+ }
+ return new CompositeFileRegion(fileRegions.toArray(new FileRegion[0]));
+ }
+}
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java
index b96c028fb..e8fae1641 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java
@@ -30,6 +30,7 @@ public class GetLocalShuffleDataRequest extends
RequestMessage {
private long offset;
private int length;
private long timestamp;
+ private int storageId;
public GetLocalShuffleDataRequest(
long requestId,
@@ -41,6 +42,30 @@ public class GetLocalShuffleDataRequest extends
RequestMessage {
long offset,
int length,
long timestamp) {
+ this(
+ requestId,
+ appId,
+ shuffleId,
+ partitionId,
+ partitionNumPerRange,
+ partitionNum,
+ offset,
+ length,
+ -1,
+ timestamp);
+ }
+
+ protected GetLocalShuffleDataRequest(
+ long requestId,
+ String appId,
+ int shuffleId,
+ int partitionId,
+ int partitionNumPerRange,
+ int partitionNum,
+ long offset,
+ int length,
+ int storageId,
+ long timestamp) {
super(requestId);
this.appId = appId;
this.shuffleId = shuffleId;
@@ -49,6 +74,7 @@ public class GetLocalShuffleDataRequest extends
RequestMessage {
this.partitionNum = partitionNum;
this.offset = offset;
this.length = length;
+ this.storageId = storageId;
this.timestamp = timestamp;
}
@@ -132,6 +158,10 @@ public class GetLocalShuffleDataRequest extends
RequestMessage {
return timestamp;
}
+ public int getStorageId() {
+ return storageId;
+ }
+
@Override
public String getOperationType() {
return "getLocalShuffleData";
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataV2Request.java
similarity index 52%
copy from
common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java
copy to
common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataV2Request.java
index b96c028fb..8dea653ed 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataV2Request.java
@@ -21,17 +21,9 @@ import io.netty.buffer.ByteBuf;
import org.apache.uniffle.common.util.ByteBufUtils;
-public class GetLocalShuffleDataRequest extends RequestMessage {
- private String appId;
- private int shuffleId;
- private int partitionId;
- private int partitionNumPerRange;
- private int partitionNum;
- private long offset;
- private int length;
- private long timestamp;
+public class GetLocalShuffleDataV2Request extends GetLocalShuffleDataRequest {
- public GetLocalShuffleDataRequest(
+ public GetLocalShuffleDataV2Request(
long requestId,
String appId,
int shuffleId,
@@ -40,45 +32,39 @@ public class GetLocalShuffleDataRequest extends
RequestMessage {
int partitionNum,
long offset,
int length,
+ int storageId,
long timestamp) {
- super(requestId);
- this.appId = appId;
- this.shuffleId = shuffleId;
- this.partitionId = partitionId;
- this.partitionNumPerRange = partitionNumPerRange;
- this.partitionNum = partitionNum;
- this.offset = offset;
- this.length = length;
- this.timestamp = timestamp;
+ super(
+ requestId,
+ appId,
+ shuffleId,
+ partitionId,
+ partitionNumPerRange,
+ partitionNum,
+ offset,
+ length,
+ storageId,
+ timestamp);
}
@Override
public Type type() {
- return Type.GET_LOCAL_SHUFFLE_DATA_REQUEST;
+ return Type.GET_LOCAL_SHUFFLE_DATA_V2_REQUEST;
}
@Override
public int encodedLength() {
- return REQUEST_ID_ENCODE_LENGTH
- + ByteBufUtils.encodedLength(appId)
- + 2 * Long.BYTES
- + 5 * Integer.BYTES;
+ // add int type storageId to encoded length
+ return super.encodedLength() + Integer.BYTES;
}
@Override
public void encode(ByteBuf buf) {
- buf.writeLong(getRequestId());
- ByteBufUtils.writeLengthAndString(buf, appId);
- buf.writeInt(shuffleId);
- buf.writeInt(partitionId);
- buf.writeInt(partitionNumPerRange);
- buf.writeInt(partitionNum);
- buf.writeLong(offset);
- buf.writeInt(length);
- buf.writeLong(timestamp);
+ super.encode(buf);
+ buf.writeInt(getStorageId());
}
- public static GetLocalShuffleDataRequest decode(ByteBuf byteBuf) {
+ public static GetLocalShuffleDataV2Request decode(ByteBuf byteBuf) {
long requestId = byteBuf.readLong();
String appId = ByteBufUtils.readLengthAndString(byteBuf);
int shuffleId = byteBuf.readInt();
@@ -88,7 +74,8 @@ public class GetLocalShuffleDataRequest extends
RequestMessage {
long offset = byteBuf.readLong();
int length = byteBuf.readInt();
long timestamp = byteBuf.readLong();
- return new GetLocalShuffleDataRequest(
+ int storageId = byteBuf.readInt();
+ return new GetLocalShuffleDataV2Request(
requestId,
appId,
shuffleId,
@@ -97,43 +84,12 @@ public class GetLocalShuffleDataRequest extends
RequestMessage {
partitionNum,
offset,
length,
+ storageId,
timestamp);
}
- public String getAppId() {
- return appId;
- }
-
- public int getShuffleId() {
- return shuffleId;
- }
-
- public int getPartitionId() {
- return partitionId;
- }
-
- public int getPartitionNumPerRange() {
- return partitionNumPerRange;
- }
-
- public int getPartitionNum() {
- return partitionNum;
- }
-
- public long getOffset() {
- return offset;
- }
-
- public int getLength() {
- return length;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
@Override
public String getOperationType() {
- return "getLocalShuffleData";
+ return "getLocalShuffleDataV2";
}
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java
index f97373805..455dda4b1 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java
@@ -24,6 +24,7 @@ import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ByteBufUtils;
+import org.apache.uniffle.common.util.Constants;
public class GetLocalShuffleIndexResponse extends RpcResponse {
@@ -92,4 +93,8 @@ public class GetLocalShuffleIndexResponse extends RpcResponse
{
public long getFileLength() {
return fileLength;
}
+
+ public int[] getStorageIds() {
+ return Constants.EMPTY_INT_ARRAY;
+ }
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexV2Response.java
similarity index 59%
copy from
common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java
copy to
common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexV2Response.java
index f97373805..b08a5ca8e 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexV2Response.java
@@ -24,12 +24,13 @@ import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ByteBufUtils;
+import org.apache.uniffle.common.util.Constants;
-public class GetLocalShuffleIndexResponse extends RpcResponse {
+public class GetLocalShuffleIndexV2Response extends
GetLocalShuffleIndexResponse {
- private long fileLength;
+ private final int[] storageIds;
- public GetLocalShuffleIndexResponse(
+ public GetLocalShuffleIndexV2Response(
long requestId, StatusCode statusCode, String retMessage, byte[]
indexData, long fileLength) {
this(
requestId,
@@ -39,57 +40,78 @@ public class GetLocalShuffleIndexResponse extends
RpcResponse {
fileLength);
}
- public GetLocalShuffleIndexResponse(
+ public GetLocalShuffleIndexV2Response(
long requestId,
StatusCode statusCode,
String retMessage,
ByteBuf indexData,
long fileLength) {
- this(requestId, statusCode, retMessage, new NettyManagedBuffer(indexData),
fileLength);
+ this(
+ requestId,
+ statusCode,
+ retMessage,
+ new NettyManagedBuffer(indexData),
+ fileLength,
+ Constants.EMPTY_INT_ARRAY);
}
- public GetLocalShuffleIndexResponse(
+ public GetLocalShuffleIndexV2Response(
long requestId,
StatusCode statusCode,
String retMessage,
ManagedBuffer managedBuffer,
- long fileLength) {
- super(requestId, statusCode, retMessage, managedBuffer);
- this.fileLength = fileLength;
+ long fileLength,
+ int[] storageIds) {
+ super(requestId, statusCode, retMessage, managedBuffer, fileLength);
+ this.storageIds = storageIds;
}
@Override
public int encodedLength() {
- return super.encodedLength() + Long.BYTES;
+ // super encodedLength + 4(storageIds.length) + 4 * storageIds.length
+ return super.encodedLength() + Integer.BYTES + Integer.BYTES *
storageIds.length;
}
@Override
public void encode(ByteBuf buf) {
super.encode(buf);
- buf.writeLong(fileLength);
+ buf.writeInt(storageIds.length);
+ for (int storageId : storageIds) {
+ buf.writeInt(storageId);
+ }
}
- public static GetLocalShuffleIndexResponse decode(ByteBuf byteBuf, boolean
decodeBody) {
+ public static GetLocalShuffleIndexV2Response decode(ByteBuf byteBuf, boolean
decodeBody) {
long requestId = byteBuf.readLong();
StatusCode statusCode = StatusCode.fromCode(byteBuf.readInt());
String retMessage = ByteBufUtils.readLengthAndString(byteBuf);
long fileLength = byteBuf.readLong();
+ int[] storageIds = new int[byteBuf.readInt()];
+ for (int i = 0; i < storageIds.length; i++) {
+ storageIds[i] = byteBuf.readInt();
+ }
if (decodeBody) {
NettyManagedBuffer nettyManagedBuffer = new NettyManagedBuffer(byteBuf);
- return new GetLocalShuffleIndexResponse(
- requestId, statusCode, retMessage, nettyManagedBuffer, fileLength);
+ return new GetLocalShuffleIndexV2Response(
+ requestId, statusCode, retMessage, nettyManagedBuffer, fileLength,
storageIds);
} else {
- return new GetLocalShuffleIndexResponse(
- requestId, statusCode, retMessage, NettyManagedBuffer.EMPTY_BUFFER,
fileLength);
+ return new GetLocalShuffleIndexV2Response(
+ requestId,
+ statusCode,
+ retMessage,
+ NettyManagedBuffer.EMPTY_BUFFER,
+ fileLength,
+ storageIds);
}
}
@Override
public Type type() {
- return Type.GET_LOCAL_SHUFFLE_INDEX_RESPONSE;
+ return Type.GET_LOCAL_SHUFFLE_INDEX_V2_RESPONSE;
}
- public long getFileLength() {
- return fileLength;
+ @Override
+ public int[] getStorageIds() {
+ return storageIds;
}
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/Message.java
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Message.java
index e7f18ab2f..2ad0b0d77 100644
--- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/Message.java
+++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Message.java
@@ -64,7 +64,10 @@ public abstract class Message implements Encodable {
GET_SHUFFLE_RESULT_FOR_MULTI_PART_RESPONSE(19),
REQUIRE_BUFFER_RESPONSE(20),
GET_SORTED_SHUFFLE_DATA_REQUEST(21),
- GET_SORTED_SHUFFLE_DATA_RESPONSE(22);
+ GET_SORTED_SHUFFLE_DATA_RESPONSE(22),
+ GET_LOCAL_SHUFFLE_INDEX_V2_RESPONSE(23),
+ GET_LOCAL_SHUFFLE_DATA_V2_REQUEST(24),
+ ;
private final byte id;
@@ -138,6 +141,10 @@ public abstract class Message implements Encodable {
return GET_SORTED_SHUFFLE_DATA_REQUEST;
case 22:
return GET_SORTED_SHUFFLE_DATA_RESPONSE;
+ case 23:
+ return GET_LOCAL_SHUFFLE_INDEX_V2_RESPONSE;
+ case 24:
+ return GET_LOCAL_SHUFFLE_DATA_V2_REQUEST;
case -1:
throw new IllegalArgumentException("User type messages cannot be
decoded.");
default:
@@ -154,12 +161,16 @@ public abstract class Message implements Encodable {
return SendShuffleDataRequestV1.decode(in);
case GET_LOCAL_SHUFFLE_DATA_REQUEST:
return GetLocalShuffleDataRequest.decode(in);
+ case GET_LOCAL_SHUFFLE_DATA_V2_REQUEST:
+ return GetLocalShuffleDataV2Request.decode(in);
case GET_LOCAL_SHUFFLE_DATA_RESPONSE:
return GetLocalShuffleDataResponse.decode(in, true);
case GET_LOCAL_SHUFFLE_INDEX_REQUEST:
return GetLocalShuffleIndexRequest.decode(in);
case GET_LOCAL_SHUFFLE_INDEX_RESPONSE:
return GetLocalShuffleIndexResponse.decode(in, true);
+ case GET_LOCAL_SHUFFLE_INDEX_V2_RESPONSE:
+ return GetLocalShuffleIndexV2Response.decode(in, true);
case GET_MEMORY_SHUFFLE_DATA_REQUEST:
return GetMemoryShuffleDataRequest.decode(in);
case GET_MEMORY_SHUFFLE_DATA_RESPONSE:
diff --git
a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
b/common/src/main/java/org/apache/uniffle/common/segment/AbstractSegmentSplitter.java
similarity index 57%
copy from
common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
copy to
common/src/main/java/org/apache/uniffle/common/segment/AbstractSegmentSplitter.java
index 04763d842..48f64fad4 100644
---
a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
+++
b/common/src/main/java/org/apache/uniffle/common/segment/AbstractSegmentSplitter.java
@@ -21,6 +21,7 @@ import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.List;
+import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,34 +31,36 @@ import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.exception.RssException;
-public class FixedSizeSegmentSplitter implements SegmentSplitter {
- private static final Logger LOGGER =
LoggerFactory.getLogger(FixedSizeSegmentSplitter.class);
+public abstract class AbstractSegmentSplitter implements SegmentSplitter {
+ protected static final Logger LOGGER =
LoggerFactory.getLogger(AbstractSegmentSplitter.class);
- private int readBufferSize;
+ protected int readBufferSize;
- public FixedSizeSegmentSplitter(int readBufferSize) {
+ public AbstractSegmentSplitter(int readBufferSize) {
this.readBufferSize = readBufferSize;
}
- @Override
- public List<ShuffleDataSegment> split(ShuffleIndexResult shuffleIndexResult)
{
+ protected List<ShuffleDataSegment> splitCommon(
+ ShuffleIndexResult shuffleIndexResult, Predicate<Long> taskFilter) {
if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
return Lists.newArrayList();
}
ByteBuffer indexData = shuffleIndexResult.getIndexData();
long dataFileLen = shuffleIndexResult.getDataFileLen();
- return transIndexDataToSegments(indexData, readBufferSize, dataFileLen);
- }
+ int[] storageIds = shuffleIndexResult.getStorageIds();
- private static List<ShuffleDataSegment> transIndexDataToSegments(
- ByteBuffer indexData, int readBufferSize, long dataFileLen) {
List<BufferSegment> bufferSegments = Lists.newArrayList();
List<ShuffleDataSegment> dataFileSegments = Lists.newArrayList();
int bufferOffset = 0;
long fileOffset = -1;
long totalLength = 0;
+ int storageIndex = 0;
+ long preOffset = -1;
+ int preStorageId = -1;
+ int currentStorageId = 0;
+
while (indexData.hasRemaining()) {
try {
final long offset = indexData.getLong();
@@ -67,19 +70,21 @@ public class FixedSizeSegmentSplitter implements
SegmentSplitter {
final long blockId = indexData.getLong();
final long taskAttemptId = indexData.getLong();
- // The index file is written, read and parsed sequentially, so these
parsed index segments
- // index a continuous shuffle data in the corresponding data file and
the first segment's
- // offset field is the offset of these shuffle data in the data file.
- if (fileOffset == -1) {
- fileOffset = offset;
+ if (storageIds.length == 0) {
+ currentStorageId = -1;
+ } else if (preOffset > offset) {
+ storageIndex++;
+ if (storageIndex >= storageIds.length) {
+ LOGGER.warn("storageIds length {} is not enough.",
storageIds.length);
+ }
+ currentStorageId = storageIds[storageIndex];
+ } else {
+ currentStorageId = storageIds[storageIndex];
}
+ preOffset = offset;
totalLength += length;
- // If ShuffleServer is flushing the file at this time, the length in
the index file record
- // may be greater
- // than the length in the actual data file, and it needs to be
returned at this time to
- // avoid EOFException
if (dataFileLen != -1 && totalLength > dataFileLen) {
LOGGER.info(
"Abort inconsistent data, the data length: {}(bytes) recorded in
index file is greater than "
@@ -91,16 +96,30 @@ public class FixedSizeSegmentSplitter implements
SegmentSplitter {
break;
}
- bufferSegments.add(
- new BufferSegment(blockId, bufferOffset, length, uncompressLength,
crc, taskAttemptId));
- bufferOffset += length;
+ boolean storageChanged = preStorageId != -1 && currentStorageId !=
preStorageId;
+
+ if (bufferOffset >= readBufferSize
+ || storageChanged
+ || (taskFilter != null && !taskFilter.test(taskAttemptId))) {
+ if (bufferOffset > 0) {
+ ShuffleDataSegment sds =
+ new ShuffleDataSegment(fileOffset, bufferOffset, preStorageId,
bufferSegments);
+ dataFileSegments.add(sds);
+ bufferSegments = Lists.newArrayList();
+ bufferOffset = 0;
+ fileOffset = -1;
+ }
+ }
- if (bufferOffset >= readBufferSize) {
- ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset,
bufferOffset, bufferSegments);
- dataFileSegments.add(sds);
- bufferSegments = Lists.newArrayList();
- bufferOffset = 0;
- fileOffset = -1;
+ if (taskFilter == null || taskFilter.test(taskAttemptId)) {
+ if (fileOffset == -1) {
+ fileOffset = offset;
+ }
+ bufferSegments.add(
+ new BufferSegment(
+ blockId, bufferOffset, length, uncompressLength, crc,
taskAttemptId));
+ preStorageId = currentStorageId;
+ bufferOffset += length;
}
} catch (BufferUnderflowException ue) {
throw new RssException("Read index data under flow", ue);
@@ -108,7 +127,8 @@ public class FixedSizeSegmentSplitter implements
SegmentSplitter {
}
if (bufferOffset > 0) {
- ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset,
bufferOffset, bufferSegments);
+ ShuffleDataSegment sds =
+ new ShuffleDataSegment(fileOffset, bufferOffset, currentStorageId,
bufferSegments);
dataFileSegments.add(sds);
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
index 04763d842..dd548ebba 100644
---
a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
+++
b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
@@ -17,101 +17,19 @@
package org.apache.uniffle.common.segment;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
import java.util.List;
-import com.google.common.collect.Lists;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
-import org.apache.uniffle.common.exception.RssException;
-
-public class FixedSizeSegmentSplitter implements SegmentSplitter {
- private static final Logger LOGGER =
LoggerFactory.getLogger(FixedSizeSegmentSplitter.class);
-
- private int readBufferSize;
+public class FixedSizeSegmentSplitter extends AbstractSegmentSplitter {
public FixedSizeSegmentSplitter(int readBufferSize) {
- this.readBufferSize = readBufferSize;
+ super(readBufferSize);
}
@Override
public List<ShuffleDataSegment> split(ShuffleIndexResult shuffleIndexResult)
{
- if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
- return Lists.newArrayList();
- }
-
- ByteBuffer indexData = shuffleIndexResult.getIndexData();
- long dataFileLen = shuffleIndexResult.getDataFileLen();
- return transIndexDataToSegments(indexData, readBufferSize, dataFileLen);
- }
-
- private static List<ShuffleDataSegment> transIndexDataToSegments(
- ByteBuffer indexData, int readBufferSize, long dataFileLen) {
- List<BufferSegment> bufferSegments = Lists.newArrayList();
- List<ShuffleDataSegment> dataFileSegments = Lists.newArrayList();
- int bufferOffset = 0;
- long fileOffset = -1;
- long totalLength = 0;
-
- while (indexData.hasRemaining()) {
- try {
- final long offset = indexData.getLong();
- final int length = indexData.getInt();
- final int uncompressLength = indexData.getInt();
- final long crc = indexData.getLong();
- final long blockId = indexData.getLong();
- final long taskAttemptId = indexData.getLong();
-
- // The index file is written, read and parsed sequentially, so these
parsed index segments
- // index a continuous shuffle data in the corresponding data file and
the first segment's
- // offset field is the offset of these shuffle data in the data file.
- if (fileOffset == -1) {
- fileOffset = offset;
- }
-
- totalLength += length;
-
- // If ShuffleServer is flushing the file at this time, the length in
the index file record
- // may be greater
- // than the length in the actual data file, and it needs to be
returned at this time to
- // avoid EOFException
- if (dataFileLen != -1 && totalLength > dataFileLen) {
- LOGGER.info(
- "Abort inconsistent data, the data length: {}(bytes) recorded in
index file is greater than "
- + "the real data file length: {}(bytes). Block id: {}"
- + "This may happen when the data is flushing, please
ignore.",
- totalLength,
- dataFileLen,
- blockId);
- break;
- }
-
- bufferSegments.add(
- new BufferSegment(blockId, bufferOffset, length, uncompressLength,
crc, taskAttemptId));
- bufferOffset += length;
-
- if (bufferOffset >= readBufferSize) {
- ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset,
bufferOffset, bufferSegments);
- dataFileSegments.add(sds);
- bufferSegments = Lists.newArrayList();
- bufferOffset = 0;
- fileOffset = -1;
- }
- } catch (BufferUnderflowException ue) {
- throw new RssException("Read index data under flow", ue);
- }
- }
-
- if (bufferOffset > 0) {
- ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset,
bufferOffset, bufferSegments);
- dataFileSegments.add(sds);
- }
-
- return dataFileSegments;
+ // For FixedSizeSegmentSplitter, we do not filter by taskAttemptId, so
pass null for the filter.
+ return splitCommon(shuffleIndexResult, null);
}
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
index 366968c34..c371b0ad2 100644
---
a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
+++
b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
@@ -17,20 +17,12 @@
package org.apache.uniffle.common.segment;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.List;
-import com.google.common.collect.Lists;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
-import org.apache.uniffle.common.exception.RssException;
/**
* {@class LocalOrderSegmentSplitter} will be initialized only when the {@class
@@ -46,122 +38,16 @@ import org.apache.uniffle.common.exception.RssException;
* <p>Last but not least, this split strategy depends on LOCAL_ORDER of index
file, which must be
* guaranteed by the shuffle server.
*/
-public class LocalOrderSegmentSplitter implements SegmentSplitter {
- private static final Logger LOGGER =
LoggerFactory.getLogger(LocalOrderSegmentSplitter.class);
-
+public class LocalOrderSegmentSplitter extends AbstractSegmentSplitter {
private Roaring64NavigableMap expectTaskIds;
- private int readBufferSize;
public LocalOrderSegmentSplitter(Roaring64NavigableMap expectTaskIds, int
readBufferSize) {
+ super(readBufferSize);
this.expectTaskIds = expectTaskIds;
- this.readBufferSize = readBufferSize;
}
@Override
public List<ShuffleDataSegment> split(ShuffleIndexResult shuffleIndexResult)
{
- if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
- return Lists.newArrayList();
- }
-
- ByteBuffer indexData = shuffleIndexResult.getIndexData();
- long dataFileLen = shuffleIndexResult.getDataFileLen();
-
- List<BufferSegment> bufferSegments = Lists.newArrayList();
-
- List<ShuffleDataSegment> dataFileSegments = Lists.newArrayList();
- int bufferOffset = 0;
- long fileOffset = -1;
- long totalLen = 0;
-
- long lastExpectedBlockIndex = -1;
-
- List<Long> indexTaskIds = new ArrayList<>();
-
- /**
- * One ShuffleDataSegment should meet following requirements:
- *
- * <p>1. taskId in [startMapIndex, endMapIndex) taskIds bitmap. Attention:
the index in the
- * range is not the map task id, which means the required task ids are not
continuous. 2.
- * ShuffleDataSegment size should < readBufferSize 3. Single
shuffleDataSegment's blocks should
- * be continuous
- */
- int index = 0;
- while (indexData.hasRemaining()) {
- try {
- long offset = indexData.getLong();
- int length = indexData.getInt();
- int uncompressLength = indexData.getInt();
- long crc = indexData.getLong();
- long blockId = indexData.getLong();
- long taskAttemptId = indexData.getLong();
-
- totalLen += length;
- indexTaskIds.add(taskAttemptId);
-
- // If ShuffleServer is flushing the file at this time, the length in
the index file record
- // may be greater
- // than the length in the actual data file, and it needs to be
returned at this time to
- // avoid EOFException
- if (dataFileLen != -1 && totalLen > dataFileLen) {
- LOGGER.info(
- "Abort inconsistent data, the data length: {}(bytes) recorded in
index file is greater than "
- + "the real data file length: {}(bytes). Block id: {}. This
should not happen. "
- + "This may happen when the data is flushing, please
ignore.",
- totalLen,
- dataFileLen,
- blockId);
- break;
- }
-
- boolean conditionOfDiscontinuousBlocks =
- lastExpectedBlockIndex != -1
- && bufferSegments.size() > 0
- && expectTaskIds.contains(taskAttemptId)
- && index - lastExpectedBlockIndex != 1;
-
- boolean conditionOfLimitedBufferSize = bufferOffset >= readBufferSize;
-
- if (conditionOfDiscontinuousBlocks || conditionOfLimitedBufferSize) {
- ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset,
bufferOffset, bufferSegments);
- dataFileSegments.add(sds);
- bufferSegments = Lists.newArrayList();
- bufferOffset = 0;
- fileOffset = -1;
- }
-
- if (expectTaskIds.contains(taskAttemptId)) {
- if (fileOffset == -1) {
- fileOffset = offset;
- }
- bufferSegments.add(
- new BufferSegment(
- blockId, bufferOffset, length, uncompressLength, crc,
taskAttemptId));
- bufferOffset += length;
- lastExpectedBlockIndex = index;
- }
- index++;
- } catch (BufferUnderflowException ue) {
- throw new RssException("Read index data under flow", ue);
- }
- }
-
- if (bufferOffset > 0) {
- ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset,
bufferOffset, bufferSegments);
- dataFileSegments.add(sds);
- }
-
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "Index file task-ids sequence: {}, expected task-ids: {}",
- indexTaskIds,
- getExpectedTaskIds(expectTaskIds));
- }
- return dataFileSegments;
- }
-
- private List<Long> getExpectedTaskIds(Roaring64NavigableMap expectTaskIds) {
- List<Long> taskIds = new ArrayList<>();
- expectTaskIds.forEach(value -> taskIds.add(value));
- return taskIds;
+ return splitCommon(shuffleIndexResult, taskAttemptId ->
expectTaskIds.contains(taskAttemptId));
}
}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index 79ceb2f10..c96eedc1d 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -93,4 +93,5 @@ public final class Constants {
public static final String DATE_PATTERN = "yyyy-MM-dd HH:mm:ss";
public static final String SPARK_RSS_CONFIG_PREFIX = "spark.";
+ public static final int[] EMPTY_INT_ARRAY = new int[0];
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java
b/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java
index b681f63ca..6834f8b15 100644
---
a/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java
+++
b/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java
@@ -20,13 +20,16 @@ package org.apache.uniffle.common.segment;
import java.nio.ByteBuffer;
import java.util.List;
+import io.netty.buffer.Unpooled;
import org.apache.commons.lang3.tuple.Pair;
+import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
+import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
import static
org.apache.uniffle.common.segment.LocalOrderSegmentSplitterTest.generateData;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -90,4 +93,52 @@ public class FixedSizeSegmentSplitterTest {
assertTrue(e.getMessage().contains("Read index data under flow"));
}
}
+
+ @Test
+ @DisplayName("Test splitting with storage ID changes")
+ void testSplitContainsStorageId() {
+ SegmentSplitter splitter = new FixedSizeSegmentSplitter(50);
+ int[] storageIds = new int[] {1, 2, 3};
+ byte[] data0 =
+ generateData(Pair.of(32, 0), Pair.of(16, 0), Pair.of(10, 0),
Pair.of(32, 6), Pair.of(6, 0));
+ byte[] data1 =
+ generateData(Pair.of(32, 1), Pair.of(16, 0), Pair.of(10, 0),
Pair.of(32, 6), Pair.of(6, 0));
+ byte[] data2 =
+ generateData(Pair.of(32, 1), Pair.of(16, 0), Pair.of(10, 0),
Pair.of(32, 6), Pair.of(6, 0));
+
+ ByteBuffer dataCombined =
+ ByteBuffer.allocate(data0.length + data1.length + data2.length)
+ .put(data0)
+ .put(data1)
+ .put(data2);
+ dataCombined.flip();
+ List<ShuffleDataSegment> shuffleDataSegments =
+ splitter.split(
+ new ShuffleIndexResult(
+ new NettyManagedBuffer(Unpooled.wrappedBuffer(dataCombined)),
-1L, "", storageIds));
+ assertEquals(6, shuffleDataSegments.size(), "Expected 6 segments");
+ assertSegment(shuffleDataSegments.get(0), 0, 58, 3, storageIds[0]);
+ // split while previous data segments over read buffer size
+ assertSegment(shuffleDataSegments.get(1), 58, 38, 2, storageIds[0]);
+ // split while storage id changed, which offset less than previous offset
+ assertSegment(shuffleDataSegments.get(2), 0, 58, 3, storageIds[1]);
+ // split while previous data segments over read buffer size
+ assertSegment(shuffleDataSegments.get(3), 58, 38, 2, storageIds[1]);
+ // split while storage id changed, which offset less than previous offset
+ assertSegment(shuffleDataSegments.get(4), 0, 58, 3, storageIds[2]);
+ // split while previous data segments over read buffer size
+ assertSegment(shuffleDataSegments.get(5), 58, 38, 2, storageIds[2]);
+ }
+
+ private void assertSegment(
+ ShuffleDataSegment segment,
+ int expectedOffset,
+ int expectedLength,
+ int expectedSize,
+ int expectedStorageId) {
+ assertEquals(expectedOffset, segment.getOffset(), "Incorrect offset");
+ assertEquals(expectedLength, segment.getLength(), "Incorrect length");
+ assertEquals(expectedSize, segment.getBufferSegments().size(), "Incorrect
buffer segment size");
+ assertEquals(expectedStorageId, segment.getStorageId(), "Incorrect storage
ID");
+ }
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
index 9e0ff1e5c..3ee5815f4 100644
---
a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
+++
b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java
@@ -20,7 +20,9 @@ package org.apache.uniffle.common.segment;
import java.nio.ByteBuffer;
import java.util.List;
+import io.netty.buffer.Unpooled;
import org.apache.commons.lang3.tuple.Pair;
+import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -29,6 +31,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
+import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -375,4 +378,58 @@ public class LocalOrderSegmentSplitterTest {
}
return byteBuffer.array();
}
+
+ @Test
+ @DisplayName("Test splitting with storage ID changes")
+ void testSplitContainsStorageId() {
+ Roaring64NavigableMap taskIds = Roaring64NavigableMap.bitmapOf(1);
+ SegmentSplitter splitter = new LocalOrderSegmentSplitter(taskIds, 50);
+ // generate 3 batch segments with storage ID 1, 2, 3 to simulate 3 flush
event write 3 index
+ // file
+ int[] storageIds = new int[] {1, 2, 3};
+ byte[] data0 =
+ generateData(Pair.of(32, 0), Pair.of(16, 0), Pair.of(10, 0),
Pair.of(32, 6), Pair.of(6, 0));
+ byte[] data1 =
+ generateData(Pair.of(32, 1), Pair.of(26, 1), Pair.of(10, 1),
Pair.of(32, 0), Pair.of(6, 1));
+ byte[] data2 =
+ generateData(Pair.of(32, 1), Pair.of(16, 0), Pair.of(10, 0),
Pair.of(32, 6), Pair.of(6, 0));
+ ByteBuffer dataCombined =
+ ByteBuffer.allocate(data0.length + data1.length + data2.length)
+ .put(data0)
+ .put(data1)
+ .put(data2);
+ dataCombined.flip();
+ List<ShuffleDataSegment> shuffleDataSegments =
+ splitter.split(
+ new ShuffleIndexResult(
+ new NettyManagedBuffer(Unpooled.wrappedBuffer(dataCombined)),
-1L, "", storageIds));
+ assertEquals(4, shuffleDataSegments.size(), "Expected 3 segments");
+ assertEquals(
+ 5,
+ shuffleDataSegments.stream().mapToInt(s ->
s.getBufferSegments().size()).sum(),
+ "Incorrect total size of buffer segments");
+ // First data segment come from the 0,1 part of data1 since first data
array contains none with
+ // taskId 1.
+ assertSegment(shuffleDataSegments.get(0), 0, 58, 2, storageIds[1]);
+ // This data segment come from the No.3 part of data1 since No.4 part is
not belong to taskId 1,
+ // close this data segment cause discontinuous block.
+ assertSegment(shuffleDataSegments.get(1), 58, 10, 1, storageIds[1]);
+ // This data segment come from the No.5 part of data1 since storage id
changed.
+ assertSegment(shuffleDataSegments.get(2), 100, 6, 1, storageIds[1]);
+ // This data segment come from the No.1 part of data2 since other parts
are not belong to taskId
+ // 1.
+ assertSegment(shuffleDataSegments.get(3), 0, 32, 1, storageIds[2]);
+ }
+
+ private void assertSegment(
+ ShuffleDataSegment segment,
+ int expectedOffset,
+ int expectedLength,
+ int expectedSize,
+ int expectedStorageId) {
+ assertEquals(expectedOffset, segment.getOffset(), "Incorrect offset");
+ assertEquals(expectedLength, segment.getLength(), "Incorrect length");
+ assertEquals(expectedSize, segment.getBufferSegments().size(), "Incorrect
buffer segment size");
+ assertEquals(expectedStorageId, segment.getStorageId(), "Incorrect storage
ID");
+ }
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
index 85499f3b0..2ab9f7b8f 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
@@ -37,6 +37,7 @@ import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import org.apache.hadoop.io.IntWritable;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
@@ -68,9 +69,11 @@ import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.buffer.ShuffleBufferType;
+import org.apache.uniffle.server.storage.MultiPartLocalStorageManager;
import org.apache.uniffle.storage.util.StorageType;
import static
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED;
+import static
org.apache.uniffle.server.ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS;
import static
org.apache.uniffle.server.ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE;
import static
org.apache.uniffle.server.ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -88,8 +91,13 @@ public class
RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed extends Shuff
public static void setupServers(@TempDir File tmpDir) throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
coordinatorConf.setBoolean(COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED, false);
- createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf =
getShuffleServerConf(ServerType.GRPC_NETTY);
+ Assumptions.assumeTrue(
+ !shuffleServerConf
+ .get(SERVER_LOCAL_STORAGE_MANAGER_CLASS)
+ .equals(MultiPartLocalStorageManager.class.getName()),
+ MultiPartLocalStorageManager.class.getName() + " is not working with
remote merge feature");
+ createCoordinatorServer(coordinatorConf);
shuffleServerConf.set(ShuffleServerConf.SERVER_MERGE_ENABLE, true);
shuffleServerConf.set(ShuffleServerConf.SERVER_MERGE_DEFAULT_MERGED_BLOCK_SIZE,
"1k");
shuffleServerConf.set(
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalForMultiPartLocalStorageManagerTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalForMultiPartLocalStorageManagerTest.java
new file mode 100644
index 000000000..bebee47ea
--- /dev/null
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalForMultiPartLocalStorageManagerTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
+import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
+import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
+import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcNettyClient;
+import org.apache.uniffle.client.request.RssFinishShuffleRequest;
+import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
+import org.apache.uniffle.client.request.RssSendCommitRequest;
+import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.rpc.ServerType;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleDataReadEvent;
+import org.apache.uniffle.server.ShuffleServer;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.storage.MultiPartLocalStorageManager;
+import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class SparkClientWithLocalForMultiPartLocalStorageManagerTest extends
ShuffleReadWriteBase {
+
+ private static File GRPC_DATA_DIR1;
+ private static File GRPC_DATA_DIR2;
+ private static File NETTY_DATA_DIR1;
+ private static File NETTY_DATA_DIR2;
+ private ShuffleServerGrpcClient grpcShuffleServerClient;
+ private ShuffleServerGrpcNettyClient nettyShuffleServerClient;
+ private static ShuffleServerConf grpcShuffleServerConfig;
+ private static ShuffleServerConf nettyShuffleServerConfig;
+
+ @BeforeAll
+ public static void setupServers(@TempDir File tmpDir) throws Exception {
+ CoordinatorConf coordinatorConf = getCoordinatorConf();
+ createCoordinatorServer(coordinatorConf);
+
+ GRPC_DATA_DIR1 = new File(tmpDir, "data1");
+ GRPC_DATA_DIR2 = new File(tmpDir, "data2");
+ String grpcBasePath = GRPC_DATA_DIR1.getAbsolutePath() + "," +
GRPC_DATA_DIR2.getAbsolutePath();
+ ShuffleServerConf grpcShuffleServerConf =
buildShuffleServerConf(grpcBasePath, ServerType.GRPC);
+ grpcShuffleServerConf.set(
+ ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS,
+ MultiPartLocalStorageManager.class.getName());
+ createShuffleServer(grpcShuffleServerConf);
+
+ NETTY_DATA_DIR1 = new File(tmpDir, "netty_data1");
+ NETTY_DATA_DIR2 = new File(tmpDir, "netty_data2");
+ String nettyBasePath =
+ NETTY_DATA_DIR1.getAbsolutePath() + "," +
NETTY_DATA_DIR2.getAbsolutePath();
+ ShuffleServerConf nettyShuffleServerConf =
+ buildShuffleServerConf(nettyBasePath, ServerType.GRPC_NETTY);
+ nettyShuffleServerConf.set(
+ ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS,
+ MultiPartLocalStorageManager.class.getName());
+ createShuffleServer(nettyShuffleServerConf);
+
+ startServers();
+
+ grpcShuffleServerConfig = grpcShuffleServerConf;
+ nettyShuffleServerConfig = nettyShuffleServerConf;
+ }
+
+ private static ShuffleServerConf buildShuffleServerConf(String basePath,
ServerType serverType)
+ throws Exception {
+ ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType);
+ shuffleServerConf.setString("rss.storage.type",
StorageType.LOCALFILE.name());
+ shuffleServerConf.setString("rss.storage.basePath", basePath);
+ return shuffleServerConf;
+ }
+
+ @BeforeEach
+ public void createClient() throws Exception {
+ grpcShuffleServerClient =
+ new ShuffleServerGrpcClient(
+ LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
+ nettyShuffleServerClient =
+ new ShuffleServerGrpcNettyClient(
+ LOCALHOST,
+
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
+
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
+ }
+
+ @AfterEach
+ public void closeClient() {
+ grpcShuffleServerClient.close();
+ nettyShuffleServerClient.close();
+ }
+
+ private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(boolean
isNettyMode) {
+ List<ShuffleServerInfo> shuffleServerInfo =
+ isNettyMode
+ ? Lists.newArrayList(
+ new ShuffleServerInfo(
+ LOCALHOST,
+
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
+
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)))
+ : Lists.newArrayList(
+ new ShuffleServerInfo(
+ LOCALHOST,
+
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)));
+ return ShuffleClientFactory.newReadBuilder()
+ .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
+ .storageType(StorageType.LOCALFILE.name())
+ .shuffleId(0)
+ .partitionId(0)
+ .indexReadLimit(100)
+ .partitionNumPerRange(1)
+ .partitionNum(10)
+ .readBufferSize(1000)
+ .shuffleServerInfoList(shuffleServerInfo);
+ }
+
+ private static Stream<Arguments> isNettyModeProvider() {
+ return Stream.of(Arguments.of(true), Arguments.of(false));
+ }
+
+ @ParameterizedTest
+ @MethodSource("isNettyModeProvider")
+ public void testClientRemoteReadFromMultipleDisk(boolean isNettyMode) {
+ String testAppId = "testClientRemoteReadFromMultipleDisk_appId";
+ registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)),
isNettyMode);
+
+ // Send shuffle data
+ Map<Long, byte[]> expectedData = Maps.newHashMap();
+ Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+
+ List<ShuffleBlockInfo> blocks =
+ createShuffleBlockList(0, 0, 0, 5, 30, blockIdBitmap, expectedData,
mockSSI);
+ sendTestData(testAppId, blocks, isNettyMode);
+
+ List<ShuffleServer> shuffleServers = isNettyMode ? nettyShuffleServers :
grpcShuffleServers;
+ // Mark one storage reaching high watermark, it should switch another
storage for next writing
+ ShuffleServer shuffleServer = shuffleServers.get(0);
+ ShuffleDataReadEvent readEvent = new ShuffleDataReadEvent(testAppId, 0, 0,
0, 0);
+ LocalStorage storage1 =
+ (LocalStorage)
shuffleServer.getStorageManager().selectStorage(readEvent);
+ storage1.getMetaData().setSize(20 * 1024 * 1024);
+
+ blocks = createShuffleBlockList(0, 0, 0, 3, 25, blockIdBitmap,
expectedData, mockSSI);
+ sendTestData(testAppId, blocks, isNettyMode);
+
+ readEvent = new ShuffleDataReadEvent(testAppId, 0, 0, 0, 1);
+ LocalStorage storage2 =
+ (LocalStorage)
shuffleServer.getStorageManager().selectStorage(readEvent);
+ assertNotEquals(storage1, storage2);
+
+ Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
+ // unexpected taskAttemptId should be filtered
+ ShuffleReadClientImpl readClient =
+ baseReadBuilder(isNettyMode)
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
+
+ validateResult(readClient, expectedData);
+ readClient.checkProcessedBlockIds();
+ readClient.close();
+ }
+
+ protected void registerApp(
+ String testAppId, List<PartitionRange> partitionRanges, boolean
isNettyMode) {
+ ShuffleServerGrpcClient shuffleServerClient =
+ isNettyMode ? nettyShuffleServerClient : grpcShuffleServerClient;
+ RssRegisterShuffleRequest rrsr =
+ new RssRegisterShuffleRequest(testAppId, 0, partitionRanges, "");
+ shuffleServerClient.registerShuffle(rrsr);
+ }
+
+ protected void sendTestData(
+ String testAppId, List<ShuffleBlockInfo> blocks, boolean isNettyMode) {
+ ShuffleServerGrpcClient shuffleServerClient =
+ isNettyMode ? nettyShuffleServerClient : grpcShuffleServerClient;
+ Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
+ partitionToBlocks.put(0, blocks);
+
+ Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks =
Maps.newHashMap();
+ shuffleToBlocks.put(0, partitionToBlocks);
+
+ RssSendShuffleDataRequest rssdr =
+ new RssSendShuffleDataRequest(testAppId, 3, 1000, shuffleToBlocks);
+ shuffleServerClient.sendShuffleData(rssdr);
+ RssSendCommitRequest rscr = new RssSendCommitRequest(testAppId, 0);
+ shuffleServerClient.sendCommit(rscr);
+ RssFinishShuffleRequest rfsr = new RssFinishShuffleRequest(testAppId, 0);
+ shuffleServerClient.finishShuffle(rfsr);
+ }
+}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index c7471991a..56f02721d 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -923,6 +923,7 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
.setOffset(request.getOffset())
.setLength(request.getLength())
.setTimestamp(start)
+ .setStorageId(request.getStorageId())
.build();
String requestInfo =
"appId["
@@ -931,6 +932,8 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
+ request.getShuffleId()
+ "], partitionId["
+ request.getPartitionId()
+ + "], storageId["
+ + request.getStorageId()
+ "]";
int retry = 0;
GetLocalShuffleDataResponse rpcResponse;
@@ -1016,7 +1019,8 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
StatusCode.SUCCESS,
new NettyManagedBuffer(
Unpooled.wrappedBuffer(rpcResponse.getIndexData().toByteArray())),
- rpcResponse.getDataFileLen());
+ rpcResponse.getDataFileLen(),
+
rpcResponse.getStorageIdsList().stream().mapToInt(Integer::intValue).toArray());
break;
default:
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index a89ea9c4c..fbc4e363b 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -52,6 +52,7 @@ import org.apache.uniffle.common.netty.client.TransportConf;
import org.apache.uniffle.common.netty.client.TransportContext;
import org.apache.uniffle.common.netty.protocol.GetLocalShuffleDataRequest;
import org.apache.uniffle.common.netty.protocol.GetLocalShuffleDataResponse;
+import org.apache.uniffle.common.netty.protocol.GetLocalShuffleDataV2Request;
import org.apache.uniffle.common.netty.protocol.GetLocalShuffleIndexRequest;
import org.apache.uniffle.common.netty.protocol.GetLocalShuffleIndexResponse;
import org.apache.uniffle.common.netty.protocol.GetMemoryShuffleDataRequest;
@@ -350,7 +351,8 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
return new RssGetShuffleIndexResponse(
StatusCode.SUCCESS,
getLocalShuffleIndexResponse.body(),
- getLocalShuffleIndexResponse.getFileLength());
+ getLocalShuffleIndexResponse.getFileLength(),
+ getLocalShuffleIndexResponse.getStorageIds());
default:
String msg =
"Can't get shuffle index from "
@@ -369,17 +371,30 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
@Override
public RssGetShuffleDataResponse getShuffleData(RssGetShuffleDataRequest
request) {
TransportClient transportClient = getTransportClient();
+ // Construct old version or v2 get shuffle data request to compatible with
old server
GetLocalShuffleDataRequest getLocalShuffleIndexRequest =
- new GetLocalShuffleDataRequest(
- requestId(),
- request.getAppId(),
- request.getShuffleId(),
- request.getPartitionId(),
- request.getPartitionNumPerRange(),
- request.getPartitionNum(),
- request.getOffset(),
- request.getLength(),
- System.currentTimeMillis());
+ request.storageIdSpecified()
+ ? new GetLocalShuffleDataV2Request(
+ requestId(),
+ request.getAppId(),
+ request.getShuffleId(),
+ request.getPartitionId(),
+ request.getPartitionNumPerRange(),
+ request.getPartitionNum(),
+ request.getOffset(),
+ request.getLength(),
+ request.getStorageId(),
+ System.currentTimeMillis())
+ : new GetLocalShuffleDataRequest(
+ requestId(),
+ request.getAppId(),
+ request.getShuffleId(),
+ request.getPartitionId(),
+ request.getPartitionNumPerRange(),
+ request.getPartitionNum(),
+ request.getOffset(),
+ request.getLength(),
+ System.currentTimeMillis());
String requestInfo =
"appId["
+ request.getAppId()
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleDataRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleDataRequest.java
index 580192217..c245e48b7 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleDataRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleDataRequest.java
@@ -28,6 +28,7 @@ public class RssGetShuffleDataRequest extends
RetryableRequest {
private final int partitionNum;
private final long offset;
private final int length;
+ private final int storageId;
public RssGetShuffleDataRequest(
String appId,
@@ -37,6 +38,7 @@ public class RssGetShuffleDataRequest extends
RetryableRequest {
int partitionNum,
long offset,
int length,
+ int storageId,
int retryMax,
long retryIntervalMax) {
this.appId = appId;
@@ -46,6 +48,7 @@ public class RssGetShuffleDataRequest extends
RetryableRequest {
this.partitionNum = partitionNum;
this.offset = offset;
this.length = length;
+ this.storageId = storageId;
this.retryMax = retryMax;
this.retryIntervalMax = retryIntervalMax;
}
@@ -59,7 +62,17 @@ public class RssGetShuffleDataRequest extends
RetryableRequest {
int partitionNum,
long offset,
int length) {
- this(appId, shuffleId, partitionId, partitionNumPerRange, partitionNum,
offset, length, 1, 0);
+ this(
+ appId,
+ shuffleId,
+ partitionId,
+ partitionNumPerRange,
+ partitionNum,
+ offset,
+ length,
+ -1,
+ 1,
+ 0);
}
public String getAppId() {
@@ -90,6 +103,14 @@ public class RssGetShuffleDataRequest extends
RetryableRequest {
return length;
}
+ public int getStorageId() {
+ return storageId;
+ }
+
+ public boolean storageIdSpecified() {
+ return storageId != -1;
+ }
+
@Override
public String operationType() {
return "GetShuffleData";
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java
index 4d3667ab1..7c42ac813 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java
@@ -20,13 +20,19 @@ package org.apache.uniffle.client.response;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.Constants;
public class RssGetShuffleIndexResponse extends ClientResponse {
private final ShuffleIndexResult shuffleIndexResult;
- public RssGetShuffleIndexResponse(StatusCode statusCode, ManagedBuffer data,
long dataFileLen) {
+ public RssGetShuffleIndexResponse(
+ StatusCode statusCode, ManagedBuffer data, long dataFileLen, int[]
storageIds) {
super(statusCode);
- this.shuffleIndexResult = new ShuffleIndexResult(data, dataFileLen, null);
+ this.shuffleIndexResult = new ShuffleIndexResult(data, dataFileLen, null,
storageIds);
+ }
+
+ public RssGetShuffleIndexResponse(StatusCode statusCode, ManagedBuffer data,
long dataFileLen) {
+ this(statusCode, data, dataFileLen, Constants.EMPTY_INT_ARRAY);
}
public ShuffleIndexResult getShuffleIndexResult() {
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 1312480de..c68a1a87e 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -86,6 +86,7 @@ message GetLocalShuffleDataRequest {
int64 offset = 6;
int32 length = 7;
int64 timestamp = 8;
+ int32 storageId = 9;
}
message GetLocalShuffleDataResponse {
@@ -124,6 +125,7 @@ message GetLocalShuffleIndexResponse {
StatusCode status = 2;
string retMsg = 3;
int64 dataFileLen = 4;
+ repeated int32 storageIds = 5;
}
message ReportShuffleResultRequest {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java
index 11a8aeff7..cc121c47d 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java
@@ -23,6 +23,7 @@ public class ShuffleDataReadEvent {
private int shuffleId;
private int partitionId;
private int startPartition;
+ private int storageId;
public ShuffleDataReadEvent(
String appId, int shuffleId, int partitionId, int startPartitionOfRange)
{
@@ -32,6 +33,12 @@ public class ShuffleDataReadEvent {
this.startPartition = startPartitionOfRange;
}
+ public ShuffleDataReadEvent(
+ String appId, int shuffleId, int partitionId, int startPartitionOfRange,
int storageId) {
+ this(appId, shuffleId, partitionId, startPartitionOfRange);
+ this.storageId = storageId;
+ }
+
public String getAppId() {
return appId;
}
@@ -47,4 +54,8 @@ public class ShuffleDataReadEvent {
public int getStartPartition() {
return startPartition;
}
+
+ public int getStorageId() {
+ return storageId;
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index ca0a3d6c7..b5951c73d 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -27,6 +27,7 @@ import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.server.block.DefaultShuffleBlockIdManager;
import org.apache.uniffle.server.buffer.ShuffleBufferType;
+import org.apache.uniffle.server.storage.LocalStorageManager;
public class ShuffleServerConf extends RssBaseConf {
@@ -759,6 +760,12 @@ public class ShuffleServerConf extends RssBaseConf {
.withDescription(
"A list of metrics will report to coordinator and dashboard,
format in \"displayName:metricsName\", separated by ','");
+ public static final ConfigOption<String> SERVER_LOCAL_STORAGE_MANAGER_CLASS =
+ ConfigOptions.key("rss.server.localStorageManagerClass")
+ .stringType()
+ .defaultValue(LocalStorageManager.class.getName())
+ .withDescription("The class of local storage manager
implementation");
+
public ShuffleServerConf() {}
public ShuffleServerConf(String fileName) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 0c38960d0..79206492a 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.server;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -1083,6 +1084,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
int partitionNum = request.getPartitionNum();
long offset = request.getOffset();
int length = request.getLength();
+ int storageId = request.getStorageId();
auditContext.withAppId(appId).withShuffleId(shuffleId);
auditContext.withArgs(
@@ -1095,7 +1097,9 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
+ ", offset="
+ offset
+ ", length="
- + length);
+ + length
+ + ", storageId="
+ + storageId);
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
@@ -1144,7 +1148,8 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
Storage storage =
shuffleServer
.getStorageManager()
- .selectStorage(new ShuffleDataReadEvent(appId, shuffleId,
partitionId, range[0]));
+ .selectStorage(
+ new ShuffleDataReadEvent(appId, shuffleId, partitionId,
range[0], storageId));
if (storage != null) {
storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
}
@@ -1163,7 +1168,8 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
partitionNum,
storageType,
offset,
- length);
+ length,
+ storageId);
long readTime = System.currentTimeMillis() - start;
ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getDataLength());
@@ -1300,6 +1306,10 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
builder.setIndexData(UnsafeByteOperations.unsafeWrap(data));
builder.setDataFileLen(shuffleIndexResult.getDataFileLen());
+ builder.addAllStorageIds(
+ Arrays.stream(shuffleIndexResult.getStorageIds())
+ .boxed()
+ .collect(Collectors.toList()));
auditContext.withReturnValue("len=" +
shuffleIndexResult.getDataFileLen());
reply = builder.build();
} catch (FileNotFoundException indexFileNotFoundException) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index d6a9a3fe9..3f753eda8 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -694,7 +694,8 @@ public class ShuffleTaskManager {
int partitionNum,
String storageType,
long offset,
- int length) {
+ int length,
+ int storageId) {
refreshAppId(appId);
CreateShuffleReadHandlerRequest request = new
CreateShuffleReadHandlerRequest();
@@ -709,12 +710,24 @@ public class ShuffleTaskManager {
ShuffleStorageUtils.getPartitionRange(partitionId,
partitionNumPerRange, partitionNum);
Storage storage =
storageManager.selectStorage(
- new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0]));
+ new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0],
storageId));
if (storage == null) {
throw new FileNotFoundException("No such data stored in current storage
manager.");
}
- return storage.getOrCreateReadHandler(request).getShuffleData(offset,
length);
+ // only one partition part in one storage
+ try {
+ return storage.getOrCreateReadHandler(request).getShuffleData(offset,
length);
+ } catch (FileNotFoundException e) {
+ LOG.warn(
+ "shuffle file not found {}-{}-{} in {}",
+ appId,
+ shuffleId,
+ partitionId,
+ storage.getStoragePath(),
+ e);
+ throw e;
+ }
}
public ShuffleIndexResult getShuffleIndex(
@@ -736,12 +749,16 @@ public class ShuffleTaskManager {
int[] range =
ShuffleStorageUtils.getPartitionRange(partitionId,
partitionNumPerRange, partitionNum);
Storage storage =
- storageManager.selectStorage(
+ storageManager.selectStorageById(
new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0]));
if (storage == null) {
throw new FileNotFoundException("No such data in current storage
manager.");
}
- return storage.getOrCreateReadHandler(request).getShuffleIndex();
+ ShuffleIndexResult result =
storage.getOrCreateReadHandler(request).getShuffleIndex();
+ if (result == null) {
+ throw new FileNotFoundException("No such data in current storage
manager.");
+ }
+ return result;
}
public void checkResourceStatus() {
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index b5137d818..ab6ce2834 100644
---
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -53,6 +53,7 @@ import
org.apache.uniffle.common.netty.protocol.GetLocalShuffleDataRequest;
import org.apache.uniffle.common.netty.protocol.GetLocalShuffleDataResponse;
import org.apache.uniffle.common.netty.protocol.GetLocalShuffleIndexRequest;
import org.apache.uniffle.common.netty.protocol.GetLocalShuffleIndexResponse;
+import org.apache.uniffle.common.netty.protocol.GetLocalShuffleIndexV2Response;
import org.apache.uniffle.common.netty.protocol.GetMemoryShuffleDataRequest;
import org.apache.uniffle.common.netty.protocol.GetMemoryShuffleDataResponse;
import org.apache.uniffle.common.netty.protocol.GetSortedShuffleDataRequest;
@@ -552,7 +553,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
}
String msg = "OK";
- GetLocalShuffleIndexResponse response;
+ GetLocalShuffleIndexV2Response response;
int[] range =
ShuffleStorageUtils.getPartitionRange(partitionId,
partitionNumPerRange, partitionNum);
Storage storage =
@@ -587,8 +588,13 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
auditContext.withStatusCode(status);
auditContext.withReturnValue("len=" + data.size());
response =
- new GetLocalShuffleIndexResponse(
- req.getRequestId(), status, msg, data,
shuffleIndexResult.getDataFileLen());
+ new GetLocalShuffleIndexV2Response(
+ req.getRequestId(),
+ status,
+ msg,
+ data,
+ shuffleIndexResult.getDataFileLen(),
+ shuffleIndexResult.getStorageIds());
ReleaseMemoryAndRecordReadTimeListener listener =
new ReleaseMemoryAndRecordReadTimeListener(
start, assumedFileSize, data.size(), requestInfo, req,
response, client);
@@ -605,7 +611,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
requestInfo,
indexFileNotFoundException);
response =
- new GetLocalShuffleIndexResponse(
+ new GetLocalShuffleIndexV2Response(
req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
} catch (Exception e) {
shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
@@ -616,7 +622,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
msg = "Error happened when get shuffle index for " + requestInfo +
", " + e.getMessage();
LOG.error(msg, e);
response =
- new GetLocalShuffleIndexResponse(
+ new GetLocalShuffleIndexV2Response(
req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
}
} else {
@@ -624,7 +630,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
msg = "Can't require memory to get shuffle index";
LOG.warn("{} for {}", msg, requestInfo);
response =
- new GetLocalShuffleIndexResponse(
+ new GetLocalShuffleIndexV2Response(
req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
}
auditContext.withStatusCode(response.getStatusCode());
@@ -642,6 +648,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
int partitionNum = req.getPartitionNum();
long offset = req.getOffset();
int length = req.getLength();
+ int storageId = req.getStorageId();
auditContext.withAppId(appId);
auditContext.withShuffleId(shuffleId);
auditContext.withArgs(
@@ -656,7 +663,9 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
+ ", offset="
+ offset
+ ", length="
- + length);
+ + length
+ + ", storageId="
+ + storageId);
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
auditContext.withStatusCode(status);
@@ -699,7 +708,9 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
Storage storage =
shuffleServer
.getStorageManager()
- .selectStorage(new ShuffleDataReadEvent(appId, shuffleId,
partitionId, range[0]));
+ .selectStorage(
+ new ShuffleDataReadEvent(
+ appId, shuffleId, partitionId, range[0],
req.getStorageId()));
if (storage != null) {
storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
}
@@ -719,7 +730,8 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
partitionNum,
storageType,
offset,
- length);
+ length,
+ storageId);
ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getDataLength());
ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getDataLength());
ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.inc();
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java
index 05b83e558..29693ad18 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java
@@ -46,7 +46,7 @@ public class HybridStorageManager implements StorageManager {
private final StorageManagerSelector storageManagerSelector;
HybridStorageManager(ShuffleServerConf conf) {
- warmStorageManager = new LocalStorageManager(conf);
+ warmStorageManager = LocalStorageManagerFactory.get(conf);
coldStorageManager = new HadoopStorageManager(conf);
try {
@@ -115,6 +115,11 @@ public class HybridStorageManager implements
StorageManager {
return warmStorageManager.selectStorage(event);
}
+ @Override
+ public Storage selectStorageById(ShuffleDataReadEvent event) {
+ return warmStorageManager.selectStorageById(event);
+ }
+
@Override
public void updateWriteMetrics(ShuffleDataFlushEvent event, long writeTime) {
throw new UnsupportedOperationException();
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 622b5205a..598a2a9c6 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -94,7 +94,7 @@ public class LocalStorageManager extends SingleStorageManager
{
private boolean isStorageAuditLogEnabled;
@VisibleForTesting
- LocalStorageManager(ShuffleServerConf conf) {
+ public LocalStorageManager(ShuffleServerConf conf) {
super(conf);
storageBasePaths = RssUtils.getConfiguredLocalDirs(conf);
if (CollectionUtils.isEmpty(storageBasePaths)) {
@@ -136,6 +136,7 @@ public class LocalStorageManager extends
SingleStorageManager {
.ratio(ratio)
.lowWaterMarkOfWrite(lowWaterMarkOfWrite)
.highWaterMarkOfWrite(highWaterMarkOfWrite)
+ .setId(idx)
.localStorageMedia(storageType);
if (isDiskCapacityWatermarkCheckEnabled) {
builder.enableDiskCapacityWatermarkCheck();
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFactory.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFactory.java
new file mode 100644
index 000000000..acdcd4377
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.server.ShuffleServerConf;
+
+public class LocalStorageManagerFactory {
+ public static LocalStorageManager get(ShuffleServerConf conf) {
+ String className =
conf.get(ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS);
+ if (StringUtils.isEmpty(className)) {
+ throw new IllegalStateException(
+ "Configuration error: "
+ + ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS.toString()
+ + " should not set to empty");
+ }
+
+ try {
+ return (LocalStorageManager)
+ RssUtils.getConstructor(className,
ShuffleServerConf.class).newInstance(conf);
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Configuration error: "
+ + ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS.toString()
+ + " is failed to create instance of "
+ + className,
+ e);
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/MultiPartLocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/MultiPartLocalStorageManager.java
new file mode 100644
index 000000000..5bca9a9e1
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/storage/MultiPartLocalStorageManager.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.ShuffleDataReadEvent;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.common.CompositeReadingViewStorage;
+import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.common.Storage;
+import org.apache.uniffle.storage.util.ShuffleStorageUtils;
+
+public class MultiPartLocalStorageManager extends LocalStorageManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(MultiPartLocalStorageManager.class);
+ // id -> storage
+ private final Map<Integer, LocalStorage> idToStorages;
+
+ private final CompositeReadingViewStorage compositeStorage;
+
+ public MultiPartLocalStorageManager(ShuffleServerConf conf) {
+ super(conf);
+ idToStorages = new ConcurrentSkipListMap<>();
+ for (LocalStorage storage : getStorages()) {
+ idToStorages.put(storage.getId(), storage);
+ }
+
+ compositeStorage = new CompositeReadingViewStorage(getStorages());
+ }
+
+ @Override
+ public Storage selectStorage(ShuffleDataFlushEvent event) {
+ if (getStorages().size() == 1) {
+ if (event.getUnderStorage() == null) {
+ event.setUnderStorage(getStorages().get(0));
+ }
+ return getStorages().get(0);
+ }
+ String appId = event.getAppId();
+ int shuffleId = event.getShuffleId();
+ int partitionId = event.getStartPartition();
+
+ // TODO(baoloongmao): extend to support select storage by free space
+ // eventId is a non-negative long.
+ LocalStorage storage = getStorages().get((int) (event.getEventId() %
getStorages().size()));
+ if (storage != null) {
+ if (storage.isCorrupted()) {
+ if (storage.containsWriteHandler(appId, shuffleId, partitionId)) {
+ LOG.error(
+ "LocalStorage: {} is corrupted. Switching another storage for
event: {}, some data will be lost",
+ storage.getBasePath(),
+ event);
+ }
+ } else {
+ if (event.getUnderStorage() == null) {
+ event.setUnderStorage(storage);
+ }
+ return storage;
+ }
+ }
+
+ // TODO(baoloongmao): update health storages and store it as member of
this class.
+ List<LocalStorage> candidates =
+ getStorages().stream()
+ .filter(x -> x.canWrite() && !x.isCorrupted())
+ .collect(Collectors.toList());
+
+ if (candidates.size() == 0) {
+ return null;
+ }
+ final LocalStorage selectedStorage =
+ candidates.get(
+ ShuffleStorageUtils.getStorageIndex(candidates.size(), appId,
shuffleId, partitionId));
+ if (storage == null || storage.isCorrupted() || event.getUnderStorage() ==
null) {
+ event.setUnderStorage(selectedStorage);
+ return selectedStorage;
+ }
+ return storage;
+ }
+
+ @Override
+ public Storage selectStorage(ShuffleDataReadEvent event) {
+ if (getStorages().size() == 1) {
+ return getStorages().get(0);
+ }
+
+ int storageId = event.getStorageId();
+ // TODO(baoloongmao): check AOOB exception
+ return idToStorages.get(storageId);
+ }
+
+ @Override
+ public Storage selectStorageById(ShuffleDataReadEvent event) {
+ return compositeStorage;
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
index 70425a22d..4d8ca6822 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
@@ -36,6 +36,10 @@ public interface StorageManager {
Storage selectStorage(ShuffleDataReadEvent event);
+ default Storage selectStorageById(ShuffleDataReadEvent event) {
+ return selectStorage(event);
+ }
+
boolean write(Storage storage, ShuffleWriteHandler handler,
ShuffleDataFlushEvent event);
void updateWriteMetrics(ShuffleDataFlushEvent event, long writeTime);
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/StorageManagerFactory.java
b/server/src/main/java/org/apache/uniffle/server/storage/StorageManagerFactory.java
index f21d6f6fe..e99434b0d 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/StorageManagerFactory.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/StorageManagerFactory.java
@@ -33,7 +33,7 @@ public class StorageManagerFactory {
public StorageManager createStorageManager(ShuffleServerConf conf) {
StorageType type =
StorageType.valueOf(conf.get(ShuffleServerConf.RSS_STORAGE_TYPE).name());
if (StorageType.LOCALFILE.equals(type) ||
StorageType.MEMORY_LOCALFILE.equals(type)) {
- return new LocalStorageManager(conf);
+ return LocalStorageManagerFactory.get(conf);
} else if (StorageType.HDFS.equals(type) ||
StorageType.MEMORY_HDFS.equals(type)) {
return new HadoopStorageManager(conf);
} else if (StorageType.LOCALFILE_HDFS.equals(type)
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/CompositeReadingViewStorage.java
b/storage/src/main/java/org/apache/uniffle/storage/common/CompositeReadingViewStorage.java
new file mode 100644
index 000000000..3d32e7922
--- /dev/null
+++
b/storage/src/main/java/org/apache/uniffle/storage/common/CompositeReadingViewStorage.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.common;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.FileNotFoundException;
+import org.apache.uniffle.storage.handler.api.ServerReadHandler;
+import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
+import
org.apache.uniffle.storage.handler.impl.CompositeLocalFileServerReadHandler;
+import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
+import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
+
+public class CompositeReadingViewStorage extends AbstractStorage {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CompositeReadingViewStorage.class);
+ private final List<LocalStorage> localStorages;
+
+ public CompositeReadingViewStorage(List<LocalStorage> localStorages) {
+ super();
+ this.localStorages = localStorages;
+ }
+
+ @Override
+ ShuffleWriteHandler newWriteHandler(CreateShuffleWriteHandlerRequest
request) {
+ return null;
+ }
+
+ public ServerReadHandler
getOrCreateReadHandler(CreateShuffleReadHandlerRequest request) {
+ // Do not cache it since this class is just a wrapper
+ return newReadHandler(request);
+ }
+
+ @Override
+ protected ServerReadHandler newReadHandler(CreateShuffleReadHandlerRequest
request) {
+ List<ServerReadHandler> handlers = new ArrayList<>();
+ for (LocalStorage storage : localStorages) {
+ try {
+ handlers.add(storage.getOrCreateReadHandler(request));
+ } catch (FileNotFoundException e) {
+ // ignore it
+ } catch (Exception e) {
+ LOG.error("Failed to create read handler for storage: " + storage, e);
+ }
+ }
+ return new CompositeLocalFileServerReadHandler(handlers);
+ }
+
+ @Override
+ public boolean canWrite() {
+ return false;
+ }
+
+ @Override
+ public void updateWriteMetrics(StorageWriteMetrics metrics) {}
+
+ @Override
+ public void updateReadMetrics(StorageReadMetrics metrics) {}
+
+ @Override
+ public void createMetadataIfNotExist(String shuffleKey) {}
+
+ @Override
+ public String getStoragePath() {
+ return null;
+ }
+
+ @Override
+ public String getStorageHost() {
+ return null;
+ }
+}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index e748608de..a87fd1d87 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -46,6 +46,7 @@ public class LocalStorage extends AbstractStorage {
public static final String STORAGE_HOST = "local";
private final long diskCapacity;
+ private final int id;
private volatile long diskAvailableBytes;
private volatile long serviceUsedBytes;
// for test cases
@@ -68,6 +69,7 @@ public class LocalStorage extends AbstractStorage {
this.capacity = builder.capacity;
this.media = builder.media;
this.enableDiskCapacityCheck = builder.enableDiskCapacityWatermarkCheck;
+ this.id = builder.id;
File baseFolder = new File(basePath);
try {
@@ -149,7 +151,8 @@ public class LocalStorage extends AbstractStorage {
request.getPartitionId(),
request.getPartitionNumPerRange(),
request.getPartitionNum(),
- basePath);
+ basePath,
+ id);
}
// only for tests.
@@ -282,6 +285,10 @@ public class LocalStorage extends AbstractStorage {
isSpaceEnough = false;
}
+ public int getId() {
+ return id;
+ }
+
public static class Builder {
private long capacity;
private double ratio;
@@ -290,6 +297,7 @@ public class LocalStorage extends AbstractStorage {
private String basePath;
private StorageMedia media;
private boolean enableDiskCapacityWatermarkCheck;
+ private int id;
private Builder() {}
@@ -328,6 +336,11 @@ public class LocalStorage extends AbstractStorage {
return this;
}
+ public Builder setId(int id) {
+ this.id = id;
+ return this;
+ }
+
public LocalStorage build() {
return new LocalStorage(this);
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ServerReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ServerReadHandler.java
index b16a4d327..267cd3030 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ServerReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ServerReadHandler.java
@@ -25,4 +25,6 @@ public interface ServerReadHandler {
ShuffleDataResult getShuffleData(long offset, int length);
ShuffleIndexResult getShuffleIndex();
+
+ int getStorageId();
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/CompositeLocalFileServerReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/CompositeLocalFileServerReadHandler.java
new file mode 100644
index 000000000..deb9953b1
--- /dev/null
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/CompositeLocalFileServerReadHandler.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShuffleIndexResult;
+import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
+import org.apache.uniffle.common.netty.buffer.MultiFileSegmentManagedBuffer;
+import org.apache.uniffle.storage.handler.api.ServerReadHandler;
+
+public class CompositeLocalFileServerReadHandler implements ServerReadHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CompositeLocalFileServerReadHandler.class);
+ private final List<ServerReadHandler> handlers;
+
+ public CompositeLocalFileServerReadHandler(List<ServerReadHandler> handlers)
{
+ this.handlers = handlers;
+ }
+
+ @Override
+ public ShuffleDataResult getShuffleData(long offset, int length) {
+ return null;
+ }
+
+ @Override
+ public ShuffleIndexResult getShuffleIndex() {
+ if (handlers.size() == 0) {
+ // caller should handle the null return
+ return null;
+ }
+ int[] storageIds = new int[handlers.size()];
+ List<ManagedBuffer> managedBuffers = new ArrayList<>(handlers.size());
+ String dataFileName = "";
+ long length = 0;
+ for (int i = 0; i < handlers.size(); i++) {
+ ServerReadHandler handler = handlers.get(i);
+ storageIds[i] = handler.getStorageId();
+ ShuffleIndexResult result = handler.getShuffleIndex();
+ length += result.getDataFileLen();
+ managedBuffers.add(result.getManagedBuffer());
+ if (i == 0) {
+ // Use the first data file name as the data file name of the combined
result.
+ // TODO: This cannot work for remote merge feature.
+ dataFileName = result.getDataFileName();
+ }
+ }
+ MultiFileSegmentManagedBuffer mergedManagedBuffer =
+ new MultiFileSegmentManagedBuffer(managedBuffers);
+ return new ShuffleIndexResult(mergedManagedBuffer, length, dataFileName,
storageIds);
+ }
+
+ @Override
+ public int getStorageId() {
+ return 0;
+ }
+}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
index 56c68d062..4772dcf21 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
@@ -157,6 +157,7 @@ public class LocalFileClientReadHandler extends
DataSkippableReadHandler {
partitionNum,
shuffleDataSegment.getOffset(),
expectedLength,
+ shuffleDataSegment.getStorageId(),
retryMax,
retryIntervalMax);
try {
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
index 4eb7e2d52..f9da2a6e8 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
@@ -36,6 +36,7 @@ import org.apache.uniffle.storage.util.ShuffleStorageUtils;
public class LocalFileServerReadHandler implements ServerReadHandler {
private static final Logger LOG =
LoggerFactory.getLogger(LocalFileServerReadHandler.class);
+ private final int storageId;
private String indexFileName = "";
private String dataFileName = "";
private String appId;
@@ -48,13 +49,25 @@ public class LocalFileServerReadHandler implements
ServerReadHandler {
int partitionId,
int partitionNumPerRange,
int partitionNum,
- String path) {
+ String path,
+ int storageId) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionId = partitionId;
+ this.storageId = storageId;
init(appId, shuffleId, partitionId, partitionNumPerRange, partitionNum,
path);
}
+ public LocalFileServerReadHandler(
+ String appId,
+ int shuffleId,
+ int partitionId,
+ int partitionNumPerRange,
+ int partitionNum,
+ String path) {
+ this(appId, shuffleId, partitionId, partitionNumPerRange, partitionNum,
path, 0);
+ }
+
private void init(
String appId,
int shuffleId,
@@ -150,7 +163,7 @@ public class LocalFileServerReadHandler implements
ServerReadHandler {
// get dataFileSize for read segment generation in
DataSkippableReadHandler#readShuffleData
long dataFileSize = new File(dataFileName).length();
return new ShuffleIndexResult(
- new FileSegmentManagedBuffer(indexFile, 0, len), dataFileSize,
dataFileName);
+ new FileSegmentManagedBuffer(indexFile, 0, len), dataFileSize,
dataFileName, storageId);
}
public String getDataFileName() {
@@ -160,4 +173,9 @@ public class LocalFileServerReadHandler implements
ServerReadHandler {
public String getIndexFileName() {
return indexFileName;
}
+
+ @Override
+ public int getStorageId() {
+ return storageId;
+ }
}