This is an automated email from the ASF dual-hosted git repository. roryqi pushed a commit to branch branch-0.10 in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit d65389794afcd3efc076c4593b2f0184a7a3ac06 Author: maobaolong <[email protected]> AuthorDate: Tue Nov 26 10:18:58 2024 +0800 [#436] feat(client,server): Introduce multi-part LocalStorageManager (#2253) ### What changes were proposed in this pull request? - Introduce a factory to create specific LocalStorageManager by config. - Introduce multiply disk LocalStorageManager. ### Why are the changes needed? Fix: #436 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Existing UTs and new added UT - Tested on our pressure test cluster. - new client -> old server ✓ - new client -> new server ✓ - old client -> new server ❌, so we have to upgraded client first, than upgrade the servers --- .../java/io/netty/util/CompositeFileRegion.java | 142 +++++++++++++ .../apache/uniffle/common/ShuffleDataSegment.java | 10 +- .../apache/uniffle/common/ShuffleIndexResult.java | 25 ++- .../uniffle/common/netty/MessageEncoder.java | 3 +- .../buffer/MultiFileSegmentManagedBuffer.java | 83 ++++++++ .../netty/protocol/GetLocalShuffleDataRequest.java | 30 +++ ...uest.java => GetLocalShuffleDataV2Request.java} | 92 +++------ .../protocol/GetLocalShuffleIndexResponse.java | 5 + ...se.java => GetLocalShuffleIndexV2Response.java} | 60 ++++-- .../uniffle/common/netty/protocol/Message.java | 13 +- ...tSplitter.java => AbstractSegmentSplitter.java} | 78 ++++--- .../common/segment/FixedSizeSegmentSplitter.java | 90 +------- .../common/segment/LocalOrderSegmentSplitter.java | 120 +---------- .../org/apache/uniffle/common/util/Constants.java | 1 + .../segment/FixedSizeSegmentSplitterTest.java | 51 +++++ .../segment/LocalOrderSegmentSplitterTest.java | 57 ++++++ ...ShuffleWithRssClientTestWhenShuffleFlushed.java | 10 +- ...thLocalForMultiPartLocalStorageManagerTest.java | 226 +++++++++++++++++++++ .../client/impl/grpc/ShuffleServerGrpcClient.java | 6 +- .../impl/grpc/ShuffleServerGrpcNettyClient.java | 37 +++- .../client/request/RssGetShuffleDataRequest.java | 23 ++- .../response/RssGetShuffleIndexResponse.java | 10 +- proto/src/main/proto/Rss.proto | 2 + .../uniffle/server/ShuffleDataReadEvent.java | 11 + .../apache/uniffle/server/ShuffleServerConf.java | 7 + .../uniffle/server/ShuffleServerGrpcService.java | 16 +- .../apache/uniffle/server/ShuffleTaskManager.java | 27 ++- .../server/netty/ShuffleServerNettyHandler.java | 30 ++- .../server/storage/HybridStorageManager.java | 7 +- .../server/storage/LocalStorageManager.java | 3 +- .../server/storage/LocalStorageManagerFactory.java | 47 +++++ .../storage/MultiPartLocalStorageManager.java | 118 +++++++++++ .../uniffle/server/storage/StorageManager.java | 4 + .../server/storage/StorageManagerFactory.java | 2 +- .../common/CompositeReadingViewStorage.java | 91 +++++++++ .../uniffle/storage/common/LocalStorage.java | 15 +- .../storage/handler/api/ServerReadHandler.java | 2 + .../impl/CompositeLocalFileServerReadHandler.java | 78 +++++++ .../handler/impl/LocalFileClientReadHandler.java | 1 + .../handler/impl/LocalFileServerReadHandler.java | 22 +- 40 files changed, 1291 insertions(+), 364 deletions(-) diff --git a/common/src/main/java/io/netty/util/CompositeFileRegion.java b/common/src/main/java/io/netty/util/CompositeFileRegion.java new file mode 100644 index 000000000..4549ca0f2 --- /dev/null +++ b/common/src/main/java/io/netty/util/CompositeFileRegion.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.netty.util; + +import java.io.IOException; +import java.nio.channels.WritableByteChannel; + +import io.netty.channel.FileRegion; + +import org.apache.uniffle.common.netty.protocol.AbstractFileRegion; + +public class CompositeFileRegion extends AbstractFileRegion { + private final FileRegion[] regions; + private long totalSize = 0; + private long bytesTransferred = 0; + + public CompositeFileRegion(FileRegion... regions) { + this.regions = regions; + for (FileRegion region : regions) { + totalSize += region.count(); + } + } + + @Override + public long position() { + return bytesTransferred; + } + + @Override + public long count() { + return totalSize; + } + + @Override + public long transferTo(WritableByteChannel target, long position) throws IOException { + long totalBytesTransferred = 0; + + for (FileRegion region : regions) { + if (position >= region.count()) { + position -= region.count(); + } else { + long currentBytesTransferred = region.transferTo(target, position); + totalBytesTransferred += currentBytesTransferred; + bytesTransferred += currentBytesTransferred; + + if (currentBytesTransferred < region.count() - position) { + break; + } + position = 0; + } + } + + return totalBytesTransferred; + } + + @Override + public long transferred() { + return bytesTransferred; + } + + @Override + public AbstractFileRegion retain() { + super.retain(); + for (FileRegion region : regions) { + region.retain(); + } + return this; + } + + @Override + public AbstractFileRegion retain(int increment) { + super.retain(increment); + for (FileRegion region : regions) { + region.retain(increment); + } + return this; + } + + @Override + public boolean release() { + boolean released = super.release(); + for (FileRegion region : regions) { + if (!region.release()) { + released = false; + } + } + return released; + } + + @Override + public boolean release(int decrement) { + boolean released = super.release(decrement); + for (FileRegion region : regions) { + if (!region.release(decrement)) { + released = false; + } + } + return released; + } + + @Override + protected void deallocate() { + for (FileRegion region : regions) { + if (region instanceof AbstractReferenceCounted) { + ((AbstractReferenceCounted) region).deallocate(); + } + } + } + + @Override + public AbstractFileRegion touch() { + super.touch(); + for (FileRegion region : regions) { + region.touch(); + } + return this; + } + + @Override + public AbstractFileRegion touch(Object hint) { + super.touch(hint); + for (FileRegion region : regions) { + region.touch(hint); + } + return this; + } +} diff --git a/common/src/main/java/org/apache/uniffle/common/ShuffleDataSegment.java b/common/src/main/java/org/apache/uniffle/common/ShuffleDataSegment.java index af7299087..532921b14 100644 --- a/common/src/main/java/org/apache/uniffle/common/ShuffleDataSegment.java +++ b/common/src/main/java/org/apache/uniffle/common/ShuffleDataSegment.java @@ -27,11 +27,15 @@ import java.util.List; public class ShuffleDataSegment { private final long offset; private final int length; + + private final int storageId; private final List<BufferSegment> bufferSegments; - public ShuffleDataSegment(long offset, int length, List<BufferSegment> bufferSegments) { + public ShuffleDataSegment( + long offset, int length, int storageId, List<BufferSegment> bufferSegments) { this.offset = offset; this.length = length; + this.storageId = storageId; this.bufferSegments = bufferSegments; } @@ -46,4 +50,8 @@ public class ShuffleDataSegment { public List<BufferSegment> getBufferSegments() { return bufferSegments; } + + public int getStorageId() { + return storageId; + } } diff --git a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java index d4f863f89..2d9934546 100644 --- a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java +++ b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java @@ -30,8 +30,10 @@ import org.apache.uniffle.common.util.ByteBufUtils; public class ShuffleIndexResult { private static final Logger LOG = LoggerFactory.getLogger(ShuffleIndexResult.class); + private static final int[] DEFAULT_STORAGE_IDS = new int[] {0}; private final ManagedBuffer buffer; + private final int[] storageIds; private long dataFileLen; private String dataFileName; @@ -44,15 +46,28 @@ public class ShuffleIndexResult { } public ShuffleIndexResult(ByteBuffer data, long dataFileLen) { - this.buffer = - new NettyManagedBuffer(data != null ? Unpooled.wrappedBuffer(data) : Unpooled.EMPTY_BUFFER); - this.dataFileLen = dataFileLen; + this( + new NettyManagedBuffer(data != null ? Unpooled.wrappedBuffer(data) : Unpooled.EMPTY_BUFFER), + dataFileLen, + null, + DEFAULT_STORAGE_IDS); } public ShuffleIndexResult(ManagedBuffer buffer, long dataFileLen, String dataFileName) { + this(buffer, dataFileLen, dataFileName, DEFAULT_STORAGE_IDS); + } + + public ShuffleIndexResult( + ManagedBuffer buffer, long dataFileLen, String dataFileName, int storageId) { + this(buffer, dataFileLen, dataFileName, new int[] {storageId}); + } + + public ShuffleIndexResult( + ManagedBuffer buffer, long dataFileLen, String dataFileName, int[] storageIds) { this.buffer = buffer; this.dataFileLen = dataFileLen; this.dataFileName = dataFileName; + this.storageIds = storageIds; } public byte[] getData() { @@ -99,4 +114,8 @@ public class ShuffleIndexResult { public String getDataFileName() { return dataFileName; } + + public int[] getStorageIds() { + return storageIds; + } } diff --git a/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java b/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java index cd4002482..26c782cf9 100644 --- a/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java +++ b/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java @@ -86,7 +86,8 @@ public final class MessageEncoder extends MessageToMessageEncoder<Message> { header.writeInt(bodyLength); in.encode(header); if (header.writableBytes() != 0) { - throw new RssException("header's writable bytes should be 0"); + throw new RssException( + "header's writable bytes should be 0, but it is " + header.writableBytes()); } if (body != null) { diff --git a/common/src/main/java/org/apache/uniffle/common/netty/buffer/MultiFileSegmentManagedBuffer.java b/common/src/main/java/org/apache/uniffle/common/netty/buffer/MultiFileSegmentManagedBuffer.java new file mode 100644 index 000000000..319f2ede8 --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/netty/buffer/MultiFileSegmentManagedBuffer.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.netty.buffer; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.FileRegion; +import io.netty.util.CompositeFileRegion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A wrapper of multiple {@link FileSegmentManagedBuffer}, used for combine shuffle index files. */ +public class MultiFileSegmentManagedBuffer extends ManagedBuffer { + + private static final Logger LOG = LoggerFactory.getLogger(MultiFileSegmentManagedBuffer.class); + private final List<ManagedBuffer> managedBuffers; + + public MultiFileSegmentManagedBuffer(List<ManagedBuffer> managedBuffers) { + this.managedBuffers = managedBuffers; + } + + @Override + public int size() { + return managedBuffers.stream().mapToInt(ManagedBuffer::size).sum(); + } + + @Override + public ByteBuf byteBuf() { + return Unpooled.wrappedBuffer(this.nioByteBuffer()); + } + + @Override + public ByteBuffer nioByteBuffer() { + ByteBuffer merged = ByteBuffer.allocate(size()); + for (ManagedBuffer managedBuffer : managedBuffers) { + ByteBuffer buffer = managedBuffer.nioByteBuffer(); + merged.put(buffer.slice()); + } + merged.flip(); + return merged; + } + + @Override + public ManagedBuffer retain() { + return this; + } + + @Override + public ManagedBuffer release() { + return this; + } + + @Override + public Object convertToNetty() { + List<FileRegion> fileRegions = new ArrayList<>(managedBuffers.size()); + for (ManagedBuffer managedBuffer : managedBuffers) { + Object object = managedBuffer.convertToNetty(); + if (object instanceof FileRegion) { + fileRegions.add((FileRegion) object); + } + } + return new CompositeFileRegion(fileRegions.toArray(new FileRegion[0])); + } +} diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java index b96c028fb..e8fae1641 100644 --- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java +++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java @@ -30,6 +30,7 @@ public class GetLocalShuffleDataRequest extends RequestMessage { private long offset; private int length; private long timestamp; + private int storageId; public GetLocalShuffleDataRequest( long requestId, @@ -41,6 +42,30 @@ public class GetLocalShuffleDataRequest extends RequestMessage { long offset, int length, long timestamp) { + this( + requestId, + appId, + shuffleId, + partitionId, + partitionNumPerRange, + partitionNum, + offset, + length, + -1, + timestamp); + } + + protected GetLocalShuffleDataRequest( + long requestId, + String appId, + int shuffleId, + int partitionId, + int partitionNumPerRange, + int partitionNum, + long offset, + int length, + int storageId, + long timestamp) { super(requestId); this.appId = appId; this.shuffleId = shuffleId; @@ -49,6 +74,7 @@ public class GetLocalShuffleDataRequest extends RequestMessage { this.partitionNum = partitionNum; this.offset = offset; this.length = length; + this.storageId = storageId; this.timestamp = timestamp; } @@ -132,6 +158,10 @@ public class GetLocalShuffleDataRequest extends RequestMessage { return timestamp; } + public int getStorageId() { + return storageId; + } + @Override public String getOperationType() { return "getLocalShuffleData"; diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataV2Request.java similarity index 52% copy from common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java copy to common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataV2Request.java index b96c028fb..8dea653ed 100644 --- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java +++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataV2Request.java @@ -21,17 +21,9 @@ import io.netty.buffer.ByteBuf; import org.apache.uniffle.common.util.ByteBufUtils; -public class GetLocalShuffleDataRequest extends RequestMessage { - private String appId; - private int shuffleId; - private int partitionId; - private int partitionNumPerRange; - private int partitionNum; - private long offset; - private int length; - private long timestamp; +public class GetLocalShuffleDataV2Request extends GetLocalShuffleDataRequest { - public GetLocalShuffleDataRequest( + public GetLocalShuffleDataV2Request( long requestId, String appId, int shuffleId, @@ -40,45 +32,39 @@ public class GetLocalShuffleDataRequest extends RequestMessage { int partitionNum, long offset, int length, + int storageId, long timestamp) { - super(requestId); - this.appId = appId; - this.shuffleId = shuffleId; - this.partitionId = partitionId; - this.partitionNumPerRange = partitionNumPerRange; - this.partitionNum = partitionNum; - this.offset = offset; - this.length = length; - this.timestamp = timestamp; + super( + requestId, + appId, + shuffleId, + partitionId, + partitionNumPerRange, + partitionNum, + offset, + length, + storageId, + timestamp); } @Override public Type type() { - return Type.GET_LOCAL_SHUFFLE_DATA_REQUEST; + return Type.GET_LOCAL_SHUFFLE_DATA_V2_REQUEST; } @Override public int encodedLength() { - return REQUEST_ID_ENCODE_LENGTH - + ByteBufUtils.encodedLength(appId) - + 2 * Long.BYTES - + 5 * Integer.BYTES; + // add int type storageId to encoded length + return super.encodedLength() + Integer.BYTES; } @Override public void encode(ByteBuf buf) { - buf.writeLong(getRequestId()); - ByteBufUtils.writeLengthAndString(buf, appId); - buf.writeInt(shuffleId); - buf.writeInt(partitionId); - buf.writeInt(partitionNumPerRange); - buf.writeInt(partitionNum); - buf.writeLong(offset); - buf.writeInt(length); - buf.writeLong(timestamp); + super.encode(buf); + buf.writeInt(getStorageId()); } - public static GetLocalShuffleDataRequest decode(ByteBuf byteBuf) { + public static GetLocalShuffleDataV2Request decode(ByteBuf byteBuf) { long requestId = byteBuf.readLong(); String appId = ByteBufUtils.readLengthAndString(byteBuf); int shuffleId = byteBuf.readInt(); @@ -88,7 +74,8 @@ public class GetLocalShuffleDataRequest extends RequestMessage { long offset = byteBuf.readLong(); int length = byteBuf.readInt(); long timestamp = byteBuf.readLong(); - return new GetLocalShuffleDataRequest( + int storageId = byteBuf.readInt(); + return new GetLocalShuffleDataV2Request( requestId, appId, shuffleId, @@ -97,43 +84,12 @@ public class GetLocalShuffleDataRequest extends RequestMessage { partitionNum, offset, length, + storageId, timestamp); } - public String getAppId() { - return appId; - } - - public int getShuffleId() { - return shuffleId; - } - - public int getPartitionId() { - return partitionId; - } - - public int getPartitionNumPerRange() { - return partitionNumPerRange; - } - - public int getPartitionNum() { - return partitionNum; - } - - public long getOffset() { - return offset; - } - - public int getLength() { - return length; - } - - public long getTimestamp() { - return timestamp; - } - @Override public String getOperationType() { - return "getLocalShuffleData"; + return "getLocalShuffleDataV2"; } } diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java index f97373805..455dda4b1 100644 --- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java +++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java @@ -24,6 +24,7 @@ import org.apache.uniffle.common.netty.buffer.ManagedBuffer; import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.ByteBufUtils; +import org.apache.uniffle.common.util.Constants; public class GetLocalShuffleIndexResponse extends RpcResponse { @@ -92,4 +93,8 @@ public class GetLocalShuffleIndexResponse extends RpcResponse { public long getFileLength() { return fileLength; } + + public int[] getStorageIds() { + return Constants.EMPTY_INT_ARRAY; + } } diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexV2Response.java similarity index 59% copy from common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java copy to common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexV2Response.java index f97373805..b08a5ca8e 100644 --- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java +++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexV2Response.java @@ -24,12 +24,13 @@ import org.apache.uniffle.common.netty.buffer.ManagedBuffer; import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.ByteBufUtils; +import org.apache.uniffle.common.util.Constants; -public class GetLocalShuffleIndexResponse extends RpcResponse { +public class GetLocalShuffleIndexV2Response extends GetLocalShuffleIndexResponse { - private long fileLength; + private final int[] storageIds; - public GetLocalShuffleIndexResponse( + public GetLocalShuffleIndexV2Response( long requestId, StatusCode statusCode, String retMessage, byte[] indexData, long fileLength) { this( requestId, @@ -39,57 +40,78 @@ public class GetLocalShuffleIndexResponse extends RpcResponse { fileLength); } - public GetLocalShuffleIndexResponse( + public GetLocalShuffleIndexV2Response( long requestId, StatusCode statusCode, String retMessage, ByteBuf indexData, long fileLength) { - this(requestId, statusCode, retMessage, new NettyManagedBuffer(indexData), fileLength); + this( + requestId, + statusCode, + retMessage, + new NettyManagedBuffer(indexData), + fileLength, + Constants.EMPTY_INT_ARRAY); } - public GetLocalShuffleIndexResponse( + public GetLocalShuffleIndexV2Response( long requestId, StatusCode statusCode, String retMessage, ManagedBuffer managedBuffer, - long fileLength) { - super(requestId, statusCode, retMessage, managedBuffer); - this.fileLength = fileLength; + long fileLength, + int[] storageIds) { + super(requestId, statusCode, retMessage, managedBuffer, fileLength); + this.storageIds = storageIds; } @Override public int encodedLength() { - return super.encodedLength() + Long.BYTES; + // super encodedLength + 4(storageIds.length) + 4 * storageIds.length + return super.encodedLength() + Integer.BYTES + Integer.BYTES * storageIds.length; } @Override public void encode(ByteBuf buf) { super.encode(buf); - buf.writeLong(fileLength); + buf.writeInt(storageIds.length); + for (int storageId : storageIds) { + buf.writeInt(storageId); + } } - public static GetLocalShuffleIndexResponse decode(ByteBuf byteBuf, boolean decodeBody) { + public static GetLocalShuffleIndexV2Response decode(ByteBuf byteBuf, boolean decodeBody) { long requestId = byteBuf.readLong(); StatusCode statusCode = StatusCode.fromCode(byteBuf.readInt()); String retMessage = ByteBufUtils.readLengthAndString(byteBuf); long fileLength = byteBuf.readLong(); + int[] storageIds = new int[byteBuf.readInt()]; + for (int i = 0; i < storageIds.length; i++) { + storageIds[i] = byteBuf.readInt(); + } if (decodeBody) { NettyManagedBuffer nettyManagedBuffer = new NettyManagedBuffer(byteBuf); - return new GetLocalShuffleIndexResponse( - requestId, statusCode, retMessage, nettyManagedBuffer, fileLength); + return new GetLocalShuffleIndexV2Response( + requestId, statusCode, retMessage, nettyManagedBuffer, fileLength, storageIds); } else { - return new GetLocalShuffleIndexResponse( - requestId, statusCode, retMessage, NettyManagedBuffer.EMPTY_BUFFER, fileLength); + return new GetLocalShuffleIndexV2Response( + requestId, + statusCode, + retMessage, + NettyManagedBuffer.EMPTY_BUFFER, + fileLength, + storageIds); } } @Override public Type type() { - return Type.GET_LOCAL_SHUFFLE_INDEX_RESPONSE; + return Type.GET_LOCAL_SHUFFLE_INDEX_V2_RESPONSE; } - public long getFileLength() { - return fileLength; + @Override + public int[] getStorageIds() { + return storageIds; } } diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/Message.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Message.java index e7f18ab2f..2ad0b0d77 100644 --- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/Message.java +++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Message.java @@ -64,7 +64,10 @@ public abstract class Message implements Encodable { GET_SHUFFLE_RESULT_FOR_MULTI_PART_RESPONSE(19), REQUIRE_BUFFER_RESPONSE(20), GET_SORTED_SHUFFLE_DATA_REQUEST(21), - GET_SORTED_SHUFFLE_DATA_RESPONSE(22); + GET_SORTED_SHUFFLE_DATA_RESPONSE(22), + GET_LOCAL_SHUFFLE_INDEX_V2_RESPONSE(23), + GET_LOCAL_SHUFFLE_DATA_V2_REQUEST(24), + ; private final byte id; @@ -138,6 +141,10 @@ public abstract class Message implements Encodable { return GET_SORTED_SHUFFLE_DATA_REQUEST; case 22: return GET_SORTED_SHUFFLE_DATA_RESPONSE; + case 23: + return GET_LOCAL_SHUFFLE_INDEX_V2_RESPONSE; + case 24: + return GET_LOCAL_SHUFFLE_DATA_V2_REQUEST; case -1: throw new IllegalArgumentException("User type messages cannot be decoded."); default: @@ -154,12 +161,16 @@ public abstract class Message implements Encodable { return SendShuffleDataRequestV1.decode(in); case GET_LOCAL_SHUFFLE_DATA_REQUEST: return GetLocalShuffleDataRequest.decode(in); + case GET_LOCAL_SHUFFLE_DATA_V2_REQUEST: + return GetLocalShuffleDataV2Request.decode(in); case GET_LOCAL_SHUFFLE_DATA_RESPONSE: return GetLocalShuffleDataResponse.decode(in, true); case GET_LOCAL_SHUFFLE_INDEX_REQUEST: return GetLocalShuffleIndexRequest.decode(in); case GET_LOCAL_SHUFFLE_INDEX_RESPONSE: return GetLocalShuffleIndexResponse.decode(in, true); + case GET_LOCAL_SHUFFLE_INDEX_V2_RESPONSE: + return GetLocalShuffleIndexV2Response.decode(in, true); case GET_MEMORY_SHUFFLE_DATA_REQUEST: return GetMemoryShuffleDataRequest.decode(in); case GET_MEMORY_SHUFFLE_DATA_RESPONSE: diff --git a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/AbstractSegmentSplitter.java similarity index 57% copy from common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java copy to common/src/main/java/org/apache/uniffle/common/segment/AbstractSegmentSplitter.java index 04763d842..48f64fad4 100644 --- a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java +++ b/common/src/main/java/org/apache/uniffle/common/segment/AbstractSegmentSplitter.java @@ -21,6 +21,7 @@ import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.util.List; +import com.google.common.base.Predicate; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,34 +31,36 @@ import org.apache.uniffle.common.ShuffleDataSegment; import org.apache.uniffle.common.ShuffleIndexResult; import org.apache.uniffle.common.exception.RssException; -public class FixedSizeSegmentSplitter implements SegmentSplitter { - private static final Logger LOGGER = LoggerFactory.getLogger(FixedSizeSegmentSplitter.class); +public abstract class AbstractSegmentSplitter implements SegmentSplitter { + protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractSegmentSplitter.class); - private int readBufferSize; + protected int readBufferSize; - public FixedSizeSegmentSplitter(int readBufferSize) { + public AbstractSegmentSplitter(int readBufferSize) { this.readBufferSize = readBufferSize; } - @Override - public List<ShuffleDataSegment> split(ShuffleIndexResult shuffleIndexResult) { + protected List<ShuffleDataSegment> splitCommon( + ShuffleIndexResult shuffleIndexResult, Predicate<Long> taskFilter) { if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) { return Lists.newArrayList(); } ByteBuffer indexData = shuffleIndexResult.getIndexData(); long dataFileLen = shuffleIndexResult.getDataFileLen(); - return transIndexDataToSegments(indexData, readBufferSize, dataFileLen); - } + int[] storageIds = shuffleIndexResult.getStorageIds(); - private static List<ShuffleDataSegment> transIndexDataToSegments( - ByteBuffer indexData, int readBufferSize, long dataFileLen) { List<BufferSegment> bufferSegments = Lists.newArrayList(); List<ShuffleDataSegment> dataFileSegments = Lists.newArrayList(); int bufferOffset = 0; long fileOffset = -1; long totalLength = 0; + int storageIndex = 0; + long preOffset = -1; + int preStorageId = -1; + int currentStorageId = 0; + while (indexData.hasRemaining()) { try { final long offset = indexData.getLong(); @@ -67,19 +70,21 @@ public class FixedSizeSegmentSplitter implements SegmentSplitter { final long blockId = indexData.getLong(); final long taskAttemptId = indexData.getLong(); - // The index file is written, read and parsed sequentially, so these parsed index segments - // index a continuous shuffle data in the corresponding data file and the first segment's - // offset field is the offset of these shuffle data in the data file. - if (fileOffset == -1) { - fileOffset = offset; + if (storageIds.length == 0) { + currentStorageId = -1; + } else if (preOffset > offset) { + storageIndex++; + if (storageIndex >= storageIds.length) { + LOGGER.warn("storageIds length {} is not enough.", storageIds.length); + } + currentStorageId = storageIds[storageIndex]; + } else { + currentStorageId = storageIds[storageIndex]; } + preOffset = offset; totalLength += length; - // If ShuffleServer is flushing the file at this time, the length in the index file record - // may be greater - // than the length in the actual data file, and it needs to be returned at this time to - // avoid EOFException if (dataFileLen != -1 && totalLength > dataFileLen) { LOGGER.info( "Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than " @@ -91,16 +96,30 @@ public class FixedSizeSegmentSplitter implements SegmentSplitter { break; } - bufferSegments.add( - new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId)); - bufferOffset += length; + boolean storageChanged = preStorageId != -1 && currentStorageId != preStorageId; + + if (bufferOffset >= readBufferSize + || storageChanged + || (taskFilter != null && !taskFilter.test(taskAttemptId))) { + if (bufferOffset > 0) { + ShuffleDataSegment sds = + new ShuffleDataSegment(fileOffset, bufferOffset, preStorageId, bufferSegments); + dataFileSegments.add(sds); + bufferSegments = Lists.newArrayList(); + bufferOffset = 0; + fileOffset = -1; + } + } - if (bufferOffset >= readBufferSize) { - ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments); - dataFileSegments.add(sds); - bufferSegments = Lists.newArrayList(); - bufferOffset = 0; - fileOffset = -1; + if (taskFilter == null || taskFilter.test(taskAttemptId)) { + if (fileOffset == -1) { + fileOffset = offset; + } + bufferSegments.add( + new BufferSegment( + blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId)); + preStorageId = currentStorageId; + bufferOffset += length; } } catch (BufferUnderflowException ue) { throw new RssException("Read index data under flow", ue); @@ -108,7 +127,8 @@ public class FixedSizeSegmentSplitter implements SegmentSplitter { } if (bufferOffset > 0) { - ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments); + ShuffleDataSegment sds = + new ShuffleDataSegment(fileOffset, bufferOffset, currentStorageId, bufferSegments); dataFileSegments.add(sds); } diff --git a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java index 04763d842..dd548ebba 100644 --- a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java +++ b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java @@ -17,101 +17,19 @@ package org.apache.uniffle.common.segment; -import java.nio.BufferUnderflowException; -import java.nio.ByteBuffer; import java.util.List; -import com.google.common.collect.Lists; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.uniffle.common.BufferSegment; import org.apache.uniffle.common.ShuffleDataSegment; import org.apache.uniffle.common.ShuffleIndexResult; -import org.apache.uniffle.common.exception.RssException; - -public class FixedSizeSegmentSplitter implements SegmentSplitter { - private static final Logger LOGGER = LoggerFactory.getLogger(FixedSizeSegmentSplitter.class); - - private int readBufferSize; +public class FixedSizeSegmentSplitter extends AbstractSegmentSplitter { public FixedSizeSegmentSplitter(int readBufferSize) { - this.readBufferSize = readBufferSize; + super(readBufferSize); } @Override public List<ShuffleDataSegment> split(ShuffleIndexResult shuffleIndexResult) { - if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) { - return Lists.newArrayList(); - } - - ByteBuffer indexData = shuffleIndexResult.getIndexData(); - long dataFileLen = shuffleIndexResult.getDataFileLen(); - return transIndexDataToSegments(indexData, readBufferSize, dataFileLen); - } - - private static List<ShuffleDataSegment> transIndexDataToSegments( - ByteBuffer indexData, int readBufferSize, long dataFileLen) { - List<BufferSegment> bufferSegments = Lists.newArrayList(); - List<ShuffleDataSegment> dataFileSegments = Lists.newArrayList(); - int bufferOffset = 0; - long fileOffset = -1; - long totalLength = 0; - - while (indexData.hasRemaining()) { - try { - final long offset = indexData.getLong(); - final int length = indexData.getInt(); - final int uncompressLength = indexData.getInt(); - final long crc = indexData.getLong(); - final long blockId = indexData.getLong(); - final long taskAttemptId = indexData.getLong(); - - // The index file is written, read and parsed sequentially, so these parsed index segments - // index a continuous shuffle data in the corresponding data file and the first segment's - // offset field is the offset of these shuffle data in the data file. - if (fileOffset == -1) { - fileOffset = offset; - } - - totalLength += length; - - // If ShuffleServer is flushing the file at this time, the length in the index file record - // may be greater - // than the length in the actual data file, and it needs to be returned at this time to - // avoid EOFException - if (dataFileLen != -1 && totalLength > dataFileLen) { - LOGGER.info( - "Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than " - + "the real data file length: {}(bytes). Block id: {}" - + "This may happen when the data is flushing, please ignore.", - totalLength, - dataFileLen, - blockId); - break; - } - - bufferSegments.add( - new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId)); - bufferOffset += length; - - if (bufferOffset >= readBufferSize) { - ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments); - dataFileSegments.add(sds); - bufferSegments = Lists.newArrayList(); - bufferOffset = 0; - fileOffset = -1; - } - } catch (BufferUnderflowException ue) { - throw new RssException("Read index data under flow", ue); - } - } - - if (bufferOffset > 0) { - ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments); - dataFileSegments.add(sds); - } - - return dataFileSegments; + // For FixedSizeSegmentSplitter, we do not filter by taskAttemptId, so pass null for the filter. + return splitCommon(shuffleIndexResult, null); } } diff --git a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java index 366968c34..c371b0ad2 100644 --- a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java +++ b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java @@ -17,20 +17,12 @@ package org.apache.uniffle.common.segment; -import java.nio.BufferUnderflowException; -import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; -import com.google.common.collect.Lists; import org.roaringbitmap.longlong.Roaring64NavigableMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.uniffle.common.BufferSegment; import org.apache.uniffle.common.ShuffleDataSegment; import org.apache.uniffle.common.ShuffleIndexResult; -import org.apache.uniffle.common.exception.RssException; /** * {@class LocalOrderSegmentSplitter} will be initialized only when the {@class @@ -46,122 +38,16 @@ import org.apache.uniffle.common.exception.RssException; * <p>Last but not least, this split strategy depends on LOCAL_ORDER of index file, which must be * guaranteed by the shuffle server. */ -public class LocalOrderSegmentSplitter implements SegmentSplitter { - private static final Logger LOGGER = LoggerFactory.getLogger(LocalOrderSegmentSplitter.class); - +public class LocalOrderSegmentSplitter extends AbstractSegmentSplitter { private Roaring64NavigableMap expectTaskIds; - private int readBufferSize; public LocalOrderSegmentSplitter(Roaring64NavigableMap expectTaskIds, int readBufferSize) { + super(readBufferSize); this.expectTaskIds = expectTaskIds; - this.readBufferSize = readBufferSize; } @Override public List<ShuffleDataSegment> split(ShuffleIndexResult shuffleIndexResult) { - if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) { - return Lists.newArrayList(); - } - - ByteBuffer indexData = shuffleIndexResult.getIndexData(); - long dataFileLen = shuffleIndexResult.getDataFileLen(); - - List<BufferSegment> bufferSegments = Lists.newArrayList(); - - List<ShuffleDataSegment> dataFileSegments = Lists.newArrayList(); - int bufferOffset = 0; - long fileOffset = -1; - long totalLen = 0; - - long lastExpectedBlockIndex = -1; - - List<Long> indexTaskIds = new ArrayList<>(); - - /** - * One ShuffleDataSegment should meet following requirements: - * - * <p>1. taskId in [startMapIndex, endMapIndex) taskIds bitmap. Attention: the index in the - * range is not the map task id, which means the required task ids are not continuous. 2. - * ShuffleDataSegment size should < readBufferSize 3. Single shuffleDataSegment's blocks should - * be continuous - */ - int index = 0; - while (indexData.hasRemaining()) { - try { - long offset = indexData.getLong(); - int length = indexData.getInt(); - int uncompressLength = indexData.getInt(); - long crc = indexData.getLong(); - long blockId = indexData.getLong(); - long taskAttemptId = indexData.getLong(); - - totalLen += length; - indexTaskIds.add(taskAttemptId); - - // If ShuffleServer is flushing the file at this time, the length in the index file record - // may be greater - // than the length in the actual data file, and it needs to be returned at this time to - // avoid EOFException - if (dataFileLen != -1 && totalLen > dataFileLen) { - LOGGER.info( - "Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than " - + "the real data file length: {}(bytes). Block id: {}. This should not happen. " - + "This may happen when the data is flushing, please ignore.", - totalLen, - dataFileLen, - blockId); - break; - } - - boolean conditionOfDiscontinuousBlocks = - lastExpectedBlockIndex != -1 - && bufferSegments.size() > 0 - && expectTaskIds.contains(taskAttemptId) - && index - lastExpectedBlockIndex != 1; - - boolean conditionOfLimitedBufferSize = bufferOffset >= readBufferSize; - - if (conditionOfDiscontinuousBlocks || conditionOfLimitedBufferSize) { - ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments); - dataFileSegments.add(sds); - bufferSegments = Lists.newArrayList(); - bufferOffset = 0; - fileOffset = -1; - } - - if (expectTaskIds.contains(taskAttemptId)) { - if (fileOffset == -1) { - fileOffset = offset; - } - bufferSegments.add( - new BufferSegment( - blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId)); - bufferOffset += length; - lastExpectedBlockIndex = index; - } - index++; - } catch (BufferUnderflowException ue) { - throw new RssException("Read index data under flow", ue); - } - } - - if (bufferOffset > 0) { - ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments); - dataFileSegments.add(sds); - } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Index file task-ids sequence: {}, expected task-ids: {}", - indexTaskIds, - getExpectedTaskIds(expectTaskIds)); - } - return dataFileSegments; - } - - private List<Long> getExpectedTaskIds(Roaring64NavigableMap expectTaskIds) { - List<Long> taskIds = new ArrayList<>(); - expectTaskIds.forEach(value -> taskIds.add(value)); - return taskIds; + return splitCommon(shuffleIndexResult, taskAttemptId -> expectTaskIds.contains(taskAttemptId)); } } diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java b/common/src/main/java/org/apache/uniffle/common/util/Constants.java index 79ceb2f10..c96eedc1d 100644 --- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java +++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java @@ -93,4 +93,5 @@ public final class Constants { public static final String DATE_PATTERN = "yyyy-MM-dd HH:mm:ss"; public static final String SPARK_RSS_CONFIG_PREFIX = "spark."; + public static final int[] EMPTY_INT_ARRAY = new int[0]; } diff --git a/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java b/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java index b681f63ca..6834f8b15 100644 --- a/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java +++ b/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java @@ -20,13 +20,16 @@ package org.apache.uniffle.common.segment; import java.nio.ByteBuffer; import java.util.List; +import io.netty.buffer.Unpooled; import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.apache.uniffle.common.ShuffleDataSegment; import org.apache.uniffle.common.ShuffleIndexResult; +import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer; import static org.apache.uniffle.common.segment.LocalOrderSegmentSplitterTest.generateData; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -90,4 +93,52 @@ public class FixedSizeSegmentSplitterTest { assertTrue(e.getMessage().contains("Read index data under flow")); } } + + @Test + @DisplayName("Test splitting with storage ID changes") + void testSplitContainsStorageId() { + SegmentSplitter splitter = new FixedSizeSegmentSplitter(50); + int[] storageIds = new int[] {1, 2, 3}; + byte[] data0 = + generateData(Pair.of(32, 0), Pair.of(16, 0), Pair.of(10, 0), Pair.of(32, 6), Pair.of(6, 0)); + byte[] data1 = + generateData(Pair.of(32, 1), Pair.of(16, 0), Pair.of(10, 0), Pair.of(32, 6), Pair.of(6, 0)); + byte[] data2 = + generateData(Pair.of(32, 1), Pair.of(16, 0), Pair.of(10, 0), Pair.of(32, 6), Pair.of(6, 0)); + + ByteBuffer dataCombined = + ByteBuffer.allocate(data0.length + data1.length + data2.length) + .put(data0) + .put(data1) + .put(data2); + dataCombined.flip(); + List<ShuffleDataSegment> shuffleDataSegments = + splitter.split( + new ShuffleIndexResult( + new NettyManagedBuffer(Unpooled.wrappedBuffer(dataCombined)), -1L, "", storageIds)); + assertEquals(6, shuffleDataSegments.size(), "Expected 6 segments"); + assertSegment(shuffleDataSegments.get(0), 0, 58, 3, storageIds[0]); + // split while previous data segments over read buffer size + assertSegment(shuffleDataSegments.get(1), 58, 38, 2, storageIds[0]); + // split while storage id changed, which offset less than previous offset + assertSegment(shuffleDataSegments.get(2), 0, 58, 3, storageIds[1]); + // split while previous data segments over read buffer size + assertSegment(shuffleDataSegments.get(3), 58, 38, 2, storageIds[1]); + // split while storage id changed, which offset less than previous offset + assertSegment(shuffleDataSegments.get(4), 0, 58, 3, storageIds[2]); + // split while previous data segments over read buffer size + assertSegment(shuffleDataSegments.get(5), 58, 38, 2, storageIds[2]); + } + + private void assertSegment( + ShuffleDataSegment segment, + int expectedOffset, + int expectedLength, + int expectedSize, + int expectedStorageId) { + assertEquals(expectedOffset, segment.getOffset(), "Incorrect offset"); + assertEquals(expectedLength, segment.getLength(), "Incorrect length"); + assertEquals(expectedSize, segment.getBufferSegments().size(), "Incorrect buffer segment size"); + assertEquals(expectedStorageId, segment.getStorageId(), "Incorrect storage ID"); + } } diff --git a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java index 9e0ff1e5c..3ee5815f4 100644 --- a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java +++ b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java @@ -20,7 +20,9 @@ package org.apache.uniffle.common.segment; import java.nio.ByteBuffer; import java.util.List; +import io.netty.buffer.Unpooled; import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -29,6 +31,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.apache.uniffle.common.BufferSegment; import org.apache.uniffle.common.ShuffleDataSegment; import org.apache.uniffle.common.ShuffleIndexResult; +import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -375,4 +378,58 @@ public class LocalOrderSegmentSplitterTest { } return byteBuffer.array(); } + + @Test + @DisplayName("Test splitting with storage ID changes") + void testSplitContainsStorageId() { + Roaring64NavigableMap taskIds = Roaring64NavigableMap.bitmapOf(1); + SegmentSplitter splitter = new LocalOrderSegmentSplitter(taskIds, 50); + // generate 3 batch segments with storage ID 1, 2, 3 to simulate 3 flush event write 3 index + // file + int[] storageIds = new int[] {1, 2, 3}; + byte[] data0 = + generateData(Pair.of(32, 0), Pair.of(16, 0), Pair.of(10, 0), Pair.of(32, 6), Pair.of(6, 0)); + byte[] data1 = + generateData(Pair.of(32, 1), Pair.of(26, 1), Pair.of(10, 1), Pair.of(32, 0), Pair.of(6, 1)); + byte[] data2 = + generateData(Pair.of(32, 1), Pair.of(16, 0), Pair.of(10, 0), Pair.of(32, 6), Pair.of(6, 0)); + ByteBuffer dataCombined = + ByteBuffer.allocate(data0.length + data1.length + data2.length) + .put(data0) + .put(data1) + .put(data2); + dataCombined.flip(); + List<ShuffleDataSegment> shuffleDataSegments = + splitter.split( + new ShuffleIndexResult( + new NettyManagedBuffer(Unpooled.wrappedBuffer(dataCombined)), -1L, "", storageIds)); + assertEquals(4, shuffleDataSegments.size(), "Expected 3 segments"); + assertEquals( + 5, + shuffleDataSegments.stream().mapToInt(s -> s.getBufferSegments().size()).sum(), + "Incorrect total size of buffer segments"); + // First data segment come from the 0,1 part of data1 since first data array contains none with + // taskId 1. + assertSegment(shuffleDataSegments.get(0), 0, 58, 2, storageIds[1]); + // This data segment come from the No.3 part of data1 since No.4 part is not belong to taskId 1, + // close this data segment cause discontinuous block. + assertSegment(shuffleDataSegments.get(1), 58, 10, 1, storageIds[1]); + // This data segment come from the No.5 part of data1 since storage id changed. + assertSegment(shuffleDataSegments.get(2), 100, 6, 1, storageIds[1]); + // This data segment come from the No.1 part of data2 since other parts are not belong to taskId + // 1. + assertSegment(shuffleDataSegments.get(3), 0, 32, 1, storageIds[2]); + } + + private void assertSegment( + ShuffleDataSegment segment, + int expectedOffset, + int expectedLength, + int expectedSize, + int expectedStorageId) { + assertEquals(expectedOffset, segment.getOffset(), "Incorrect offset"); + assertEquals(expectedLength, segment.getLength(), "Incorrect length"); + assertEquals(expectedSize, segment.getBufferSegments().size(), "Incorrect buffer segment size"); + assertEquals(expectedStorageId, segment.getStorageId(), "Incorrect storage ID"); + } } diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java index 85499f3b0..2ab9f7b8f 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java @@ -37,6 +37,7 @@ import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import org.apache.hadoop.io.IntWritable; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.io.TempDir; @@ -68,9 +69,11 @@ import org.apache.uniffle.proto.RssProtos; import org.apache.uniffle.server.ShuffleServer; import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.server.buffer.ShuffleBufferType; +import org.apache.uniffle.server.storage.MultiPartLocalStorageManager; import org.apache.uniffle.storage.util.StorageType; import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED; +import static org.apache.uniffle.server.ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS; import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE; import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -88,8 +91,13 @@ public class RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed extends Shuff public static void setupServers(@TempDir File tmpDir) throws Exception { CoordinatorConf coordinatorConf = getCoordinatorConf(); coordinatorConf.setBoolean(COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED, false); - createCoordinatorServer(coordinatorConf); ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC_NETTY); + Assumptions.assumeTrue( + !shuffleServerConf + .get(SERVER_LOCAL_STORAGE_MANAGER_CLASS) + .equals(MultiPartLocalStorageManager.class.getName()), + MultiPartLocalStorageManager.class.getName() + " is not working with remote merge feature"); + createCoordinatorServer(coordinatorConf); shuffleServerConf.set(ShuffleServerConf.SERVER_MERGE_ENABLE, true); shuffleServerConf.set(ShuffleServerConf.SERVER_MERGE_DEFAULT_MERGED_BLOCK_SIZE, "1k"); shuffleServerConf.set( diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalForMultiPartLocalStorageManagerTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalForMultiPartLocalStorageManagerTest.java new file mode 100644 index 000000000..bebee47ea --- /dev/null +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalForMultiPartLocalStorageManagerTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.test; + +import java.io.File; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.roaringbitmap.longlong.Roaring64NavigableMap; + +import org.apache.uniffle.client.factory.ShuffleClientFactory; +import org.apache.uniffle.client.impl.ShuffleReadClientImpl; +import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient; +import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcNettyClient; +import org.apache.uniffle.client.request.RssFinishShuffleRequest; +import org.apache.uniffle.client.request.RssRegisterShuffleRequest; +import org.apache.uniffle.client.request.RssSendCommitRequest; +import org.apache.uniffle.client.request.RssSendShuffleDataRequest; +import org.apache.uniffle.common.ClientType; +import org.apache.uniffle.common.PartitionRange; +import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleServerInfo; +import org.apache.uniffle.common.rpc.ServerType; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.server.ShuffleDataReadEvent; +import org.apache.uniffle.server.ShuffleServer; +import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.server.storage.MultiPartLocalStorageManager; +import org.apache.uniffle.storage.common.LocalStorage; +import org.apache.uniffle.storage.util.StorageType; + +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class SparkClientWithLocalForMultiPartLocalStorageManagerTest extends ShuffleReadWriteBase { + + private static File GRPC_DATA_DIR1; + private static File GRPC_DATA_DIR2; + private static File NETTY_DATA_DIR1; + private static File NETTY_DATA_DIR2; + private ShuffleServerGrpcClient grpcShuffleServerClient; + private ShuffleServerGrpcNettyClient nettyShuffleServerClient; + private static ShuffleServerConf grpcShuffleServerConfig; + private static ShuffleServerConf nettyShuffleServerConfig; + + @BeforeAll + public static void setupServers(@TempDir File tmpDir) throws Exception { + CoordinatorConf coordinatorConf = getCoordinatorConf(); + createCoordinatorServer(coordinatorConf); + + GRPC_DATA_DIR1 = new File(tmpDir, "data1"); + GRPC_DATA_DIR2 = new File(tmpDir, "data2"); + String grpcBasePath = GRPC_DATA_DIR1.getAbsolutePath() + "," + GRPC_DATA_DIR2.getAbsolutePath(); + ShuffleServerConf grpcShuffleServerConf = buildShuffleServerConf(grpcBasePath, ServerType.GRPC); + grpcShuffleServerConf.set( + ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS, + MultiPartLocalStorageManager.class.getName()); + createShuffleServer(grpcShuffleServerConf); + + NETTY_DATA_DIR1 = new File(tmpDir, "netty_data1"); + NETTY_DATA_DIR2 = new File(tmpDir, "netty_data2"); + String nettyBasePath = + NETTY_DATA_DIR1.getAbsolutePath() + "," + NETTY_DATA_DIR2.getAbsolutePath(); + ShuffleServerConf nettyShuffleServerConf = + buildShuffleServerConf(nettyBasePath, ServerType.GRPC_NETTY); + nettyShuffleServerConf.set( + ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS, + MultiPartLocalStorageManager.class.getName()); + createShuffleServer(nettyShuffleServerConf); + + startServers(); + + grpcShuffleServerConfig = grpcShuffleServerConf; + nettyShuffleServerConfig = nettyShuffleServerConf; + } + + private static ShuffleServerConf buildShuffleServerConf(String basePath, ServerType serverType) + throws Exception { + ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType); + shuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name()); + shuffleServerConf.setString("rss.storage.basePath", basePath); + return shuffleServerConf; + } + + @BeforeEach + public void createClient() throws Exception { + grpcShuffleServerClient = + new ShuffleServerGrpcClient( + LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); + nettyShuffleServerClient = + new ShuffleServerGrpcNettyClient( + LOCALHOST, + nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), + nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); + } + + @AfterEach + public void closeClient() { + grpcShuffleServerClient.close(); + nettyShuffleServerClient.close(); + } + + private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(boolean isNettyMode) { + List<ShuffleServerInfo> shuffleServerInfo = + isNettyMode + ? Lists.newArrayList( + new ShuffleServerInfo( + LOCALHOST, + nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), + nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT))) + : Lists.newArrayList( + new ShuffleServerInfo( + LOCALHOST, + grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT))); + return ShuffleClientFactory.newReadBuilder() + .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC) + .storageType(StorageType.LOCALFILE.name()) + .shuffleId(0) + .partitionId(0) + .indexReadLimit(100) + .partitionNumPerRange(1) + .partitionNum(10) + .readBufferSize(1000) + .shuffleServerInfoList(shuffleServerInfo); + } + + private static Stream<Arguments> isNettyModeProvider() { + return Stream.of(Arguments.of(true), Arguments.of(false)); + } + + @ParameterizedTest + @MethodSource("isNettyModeProvider") + public void testClientRemoteReadFromMultipleDisk(boolean isNettyMode) { + String testAppId = "testClientRemoteReadFromMultipleDisk_appId"; + registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode); + + // Send shuffle data + Map<Long, byte[]> expectedData = Maps.newHashMap(); + Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf(); + + List<ShuffleBlockInfo> blocks = + createShuffleBlockList(0, 0, 0, 5, 30, blockIdBitmap, expectedData, mockSSI); + sendTestData(testAppId, blocks, isNettyMode); + + List<ShuffleServer> shuffleServers = isNettyMode ? nettyShuffleServers : grpcShuffleServers; + // Mark one storage reaching high watermark, it should switch another storage for next writing + ShuffleServer shuffleServer = shuffleServers.get(0); + ShuffleDataReadEvent readEvent = new ShuffleDataReadEvent(testAppId, 0, 0, 0, 0); + LocalStorage storage1 = + (LocalStorage) shuffleServer.getStorageManager().selectStorage(readEvent); + storage1.getMetaData().setSize(20 * 1024 * 1024); + + blocks = createShuffleBlockList(0, 0, 0, 3, 25, blockIdBitmap, expectedData, mockSSI); + sendTestData(testAppId, blocks, isNettyMode); + + readEvent = new ShuffleDataReadEvent(testAppId, 0, 0, 0, 1); + LocalStorage storage2 = + (LocalStorage) shuffleServer.getStorageManager().selectStorage(readEvent); + assertNotEquals(storage1, storage2); + + Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0); + // unexpected taskAttemptId should be filtered + ShuffleReadClientImpl readClient = + baseReadBuilder(isNettyMode) + .appId(testAppId) + .blockIdBitmap(blockIdBitmap) + .taskIdBitmap(taskIdBitmap) + .build(); + + validateResult(readClient, expectedData); + readClient.checkProcessedBlockIds(); + readClient.close(); + } + + protected void registerApp( + String testAppId, List<PartitionRange> partitionRanges, boolean isNettyMode) { + ShuffleServerGrpcClient shuffleServerClient = + isNettyMode ? nettyShuffleServerClient : grpcShuffleServerClient; + RssRegisterShuffleRequest rrsr = + new RssRegisterShuffleRequest(testAppId, 0, partitionRanges, ""); + shuffleServerClient.registerShuffle(rrsr); + } + + protected void sendTestData( + String testAppId, List<ShuffleBlockInfo> blocks, boolean isNettyMode) { + ShuffleServerGrpcClient shuffleServerClient = + isNettyMode ? nettyShuffleServerClient : grpcShuffleServerClient; + Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap(); + partitionToBlocks.put(0, blocks); + + Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = Maps.newHashMap(); + shuffleToBlocks.put(0, partitionToBlocks); + + RssSendShuffleDataRequest rssdr = + new RssSendShuffleDataRequest(testAppId, 3, 1000, shuffleToBlocks); + shuffleServerClient.sendShuffleData(rssdr); + RssSendCommitRequest rscr = new RssSendCommitRequest(testAppId, 0); + shuffleServerClient.sendCommit(rscr); + RssFinishShuffleRequest rfsr = new RssFinishShuffleRequest(testAppId, 0); + shuffleServerClient.finishShuffle(rfsr); + } +} diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index c7471991a..56f02721d 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -923,6 +923,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer .setOffset(request.getOffset()) .setLength(request.getLength()) .setTimestamp(start) + .setStorageId(request.getStorageId()) .build(); String requestInfo = "appId[" @@ -931,6 +932,8 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer + request.getShuffleId() + "], partitionId[" + request.getPartitionId() + + "], storageId[" + + request.getStorageId() + "]"; int retry = 0; GetLocalShuffleDataResponse rpcResponse; @@ -1016,7 +1019,8 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer StatusCode.SUCCESS, new NettyManagedBuffer( Unpooled.wrappedBuffer(rpcResponse.getIndexData().toByteArray())), - rpcResponse.getDataFileLen()); + rpcResponse.getDataFileLen(), + rpcResponse.getStorageIdsList().stream().mapToInt(Integer::intValue).toArray()); break; default: diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java index a89ea9c4c..fbc4e363b 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java @@ -52,6 +52,7 @@ import org.apache.uniffle.common.netty.client.TransportConf; import org.apache.uniffle.common.netty.client.TransportContext; import org.apache.uniffle.common.netty.protocol.GetLocalShuffleDataRequest; import org.apache.uniffle.common.netty.protocol.GetLocalShuffleDataResponse; +import org.apache.uniffle.common.netty.protocol.GetLocalShuffleDataV2Request; import org.apache.uniffle.common.netty.protocol.GetLocalShuffleIndexRequest; import org.apache.uniffle.common.netty.protocol.GetLocalShuffleIndexResponse; import org.apache.uniffle.common.netty.protocol.GetMemoryShuffleDataRequest; @@ -350,7 +351,8 @@ public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient { return new RssGetShuffleIndexResponse( StatusCode.SUCCESS, getLocalShuffleIndexResponse.body(), - getLocalShuffleIndexResponse.getFileLength()); + getLocalShuffleIndexResponse.getFileLength(), + getLocalShuffleIndexResponse.getStorageIds()); default: String msg = "Can't get shuffle index from " @@ -369,17 +371,30 @@ public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient { @Override public RssGetShuffleDataResponse getShuffleData(RssGetShuffleDataRequest request) { TransportClient transportClient = getTransportClient(); + // Construct old version or v2 get shuffle data request to compatible with old server GetLocalShuffleDataRequest getLocalShuffleIndexRequest = - new GetLocalShuffleDataRequest( - requestId(), - request.getAppId(), - request.getShuffleId(), - request.getPartitionId(), - request.getPartitionNumPerRange(), - request.getPartitionNum(), - request.getOffset(), - request.getLength(), - System.currentTimeMillis()); + request.storageIdSpecified() + ? new GetLocalShuffleDataV2Request( + requestId(), + request.getAppId(), + request.getShuffleId(), + request.getPartitionId(), + request.getPartitionNumPerRange(), + request.getPartitionNum(), + request.getOffset(), + request.getLength(), + request.getStorageId(), + System.currentTimeMillis()) + : new GetLocalShuffleDataRequest( + requestId(), + request.getAppId(), + request.getShuffleId(), + request.getPartitionId(), + request.getPartitionNumPerRange(), + request.getPartitionNum(), + request.getOffset(), + request.getLength(), + System.currentTimeMillis()); String requestInfo = "appId[" + request.getAppId() diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleDataRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleDataRequest.java index 580192217..c245e48b7 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleDataRequest.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleDataRequest.java @@ -28,6 +28,7 @@ public class RssGetShuffleDataRequest extends RetryableRequest { private final int partitionNum; private final long offset; private final int length; + private final int storageId; public RssGetShuffleDataRequest( String appId, @@ -37,6 +38,7 @@ public class RssGetShuffleDataRequest extends RetryableRequest { int partitionNum, long offset, int length, + int storageId, int retryMax, long retryIntervalMax) { this.appId = appId; @@ -46,6 +48,7 @@ public class RssGetShuffleDataRequest extends RetryableRequest { this.partitionNum = partitionNum; this.offset = offset; this.length = length; + this.storageId = storageId; this.retryMax = retryMax; this.retryIntervalMax = retryIntervalMax; } @@ -59,7 +62,17 @@ public class RssGetShuffleDataRequest extends RetryableRequest { int partitionNum, long offset, int length) { - this(appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, offset, length, 1, 0); + this( + appId, + shuffleId, + partitionId, + partitionNumPerRange, + partitionNum, + offset, + length, + -1, + 1, + 0); } public String getAppId() { @@ -90,6 +103,14 @@ public class RssGetShuffleDataRequest extends RetryableRequest { return length; } + public int getStorageId() { + return storageId; + } + + public boolean storageIdSpecified() { + return storageId != -1; + } + @Override public String operationType() { return "GetShuffleData"; diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java index 4d3667ab1..7c42ac813 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java @@ -20,13 +20,19 @@ package org.apache.uniffle.client.response; import org.apache.uniffle.common.ShuffleIndexResult; import org.apache.uniffle.common.netty.buffer.ManagedBuffer; import org.apache.uniffle.common.rpc.StatusCode; +import org.apache.uniffle.common.util.Constants; public class RssGetShuffleIndexResponse extends ClientResponse { private final ShuffleIndexResult shuffleIndexResult; - public RssGetShuffleIndexResponse(StatusCode statusCode, ManagedBuffer data, long dataFileLen) { + public RssGetShuffleIndexResponse( + StatusCode statusCode, ManagedBuffer data, long dataFileLen, int[] storageIds) { super(statusCode); - this.shuffleIndexResult = new ShuffleIndexResult(data, dataFileLen, null); + this.shuffleIndexResult = new ShuffleIndexResult(data, dataFileLen, null, storageIds); + } + + public RssGetShuffleIndexResponse(StatusCode statusCode, ManagedBuffer data, long dataFileLen) { + this(statusCode, data, dataFileLen, Constants.EMPTY_INT_ARRAY); } public ShuffleIndexResult getShuffleIndexResult() { diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto index 1312480de..c68a1a87e 100644 --- a/proto/src/main/proto/Rss.proto +++ b/proto/src/main/proto/Rss.proto @@ -86,6 +86,7 @@ message GetLocalShuffleDataRequest { int64 offset = 6; int32 length = 7; int64 timestamp = 8; + int32 storageId = 9; } message GetLocalShuffleDataResponse { @@ -124,6 +125,7 @@ message GetLocalShuffleIndexResponse { StatusCode status = 2; string retMsg = 3; int64 dataFileLen = 4; + repeated int32 storageIds = 5; } message ReportShuffleResultRequest { diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java b/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java index 11a8aeff7..cc121c47d 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java @@ -23,6 +23,7 @@ public class ShuffleDataReadEvent { private int shuffleId; private int partitionId; private int startPartition; + private int storageId; public ShuffleDataReadEvent( String appId, int shuffleId, int partitionId, int startPartitionOfRange) { @@ -32,6 +33,12 @@ public class ShuffleDataReadEvent { this.startPartition = startPartitionOfRange; } + public ShuffleDataReadEvent( + String appId, int shuffleId, int partitionId, int startPartitionOfRange, int storageId) { + this(appId, shuffleId, partitionId, startPartitionOfRange); + this.storageId = storageId; + } + public String getAppId() { return appId; } @@ -47,4 +54,8 @@ public class ShuffleDataReadEvent { public int getStartPartition() { return startPartition; } + + public int getStorageId() { + return storageId; + } } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java index ca0a3d6c7..b5951c73d 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java @@ -27,6 +27,7 @@ import org.apache.uniffle.common.config.ConfigUtils; import org.apache.uniffle.common.config.RssBaseConf; import org.apache.uniffle.server.block.DefaultShuffleBlockIdManager; import org.apache.uniffle.server.buffer.ShuffleBufferType; +import org.apache.uniffle.server.storage.LocalStorageManager; public class ShuffleServerConf extends RssBaseConf { @@ -759,6 +760,12 @@ public class ShuffleServerConf extends RssBaseConf { .withDescription( "A list of metrics will report to coordinator and dashboard, format in \"displayName:metricsName\", separated by ','"); + public static final ConfigOption<String> SERVER_LOCAL_STORAGE_MANAGER_CLASS = + ConfigOptions.key("rss.server.localStorageManagerClass") + .stringType() + .defaultValue(LocalStorageManager.class.getName()) + .withDescription("The class of local storage manager implementation"); + public ShuffleServerConf() {} public ShuffleServerConf(String fileName) { diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index 0c38960d0..79206492a 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -19,6 +19,7 @@ package org.apache.uniffle.server; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -1083,6 +1084,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { int partitionNum = request.getPartitionNum(); long offset = request.getOffset(); int length = request.getLength(); + int storageId = request.getStorageId(); auditContext.withAppId(appId).withShuffleId(shuffleId); auditContext.withArgs( @@ -1095,7 +1097,9 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { + ", offset=" + offset + ", length=" - + length); + + length + + ", storageId=" + + storageId); StatusCode status = verifyRequest(appId); if (status != StatusCode.SUCCESS) { @@ -1144,7 +1148,8 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { Storage storage = shuffleServer .getStorageManager() - .selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0])); + .selectStorage( + new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0], storageId)); if (storage != null) { storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId)); } @@ -1163,7 +1168,8 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { partitionNum, storageType, offset, - length); + length, + storageId); long readTime = System.currentTimeMillis() - start; ShuffleServerMetrics.counterTotalReadTime.inc(readTime); ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getDataLength()); @@ -1300,6 +1306,10 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { builder.setIndexData(UnsafeByteOperations.unsafeWrap(data)); builder.setDataFileLen(shuffleIndexResult.getDataFileLen()); + builder.addAllStorageIds( + Arrays.stream(shuffleIndexResult.getStorageIds()) + .boxed() + .collect(Collectors.toList())); auditContext.withReturnValue("len=" + shuffleIndexResult.getDataFileLen()); reply = builder.build(); } catch (FileNotFoundException indexFileNotFoundException) { diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index d6a9a3fe9..3f753eda8 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -694,7 +694,8 @@ public class ShuffleTaskManager { int partitionNum, String storageType, long offset, - int length) { + int length, + int storageId) { refreshAppId(appId); CreateShuffleReadHandlerRequest request = new CreateShuffleReadHandlerRequest(); @@ -709,12 +710,24 @@ public class ShuffleTaskManager { ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum); Storage storage = storageManager.selectStorage( - new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0])); + new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0], storageId)); if (storage == null) { throw new FileNotFoundException("No such data stored in current storage manager."); } - return storage.getOrCreateReadHandler(request).getShuffleData(offset, length); + // only one partition part in one storage + try { + return storage.getOrCreateReadHandler(request).getShuffleData(offset, length); + } catch (FileNotFoundException e) { + LOG.warn( + "shuffle file not found {}-{}-{} in {}", + appId, + shuffleId, + partitionId, + storage.getStoragePath(), + e); + throw e; + } } public ShuffleIndexResult getShuffleIndex( @@ -736,12 +749,16 @@ public class ShuffleTaskManager { int[] range = ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum); Storage storage = - storageManager.selectStorage( + storageManager.selectStorageById( new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0])); if (storage == null) { throw new FileNotFoundException("No such data in current storage manager."); } - return storage.getOrCreateReadHandler(request).getShuffleIndex(); + ShuffleIndexResult result = storage.getOrCreateReadHandler(request).getShuffleIndex(); + if (result == null) { + throw new FileNotFoundException("No such data in current storage manager."); + } + return result; } public void checkResourceStatus() { diff --git a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java index b5137d818..ab6ce2834 100644 --- a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java +++ b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java @@ -53,6 +53,7 @@ import org.apache.uniffle.common.netty.protocol.GetLocalShuffleDataRequest; import org.apache.uniffle.common.netty.protocol.GetLocalShuffleDataResponse; import org.apache.uniffle.common.netty.protocol.GetLocalShuffleIndexRequest; import org.apache.uniffle.common.netty.protocol.GetLocalShuffleIndexResponse; +import org.apache.uniffle.common.netty.protocol.GetLocalShuffleIndexV2Response; import org.apache.uniffle.common.netty.protocol.GetMemoryShuffleDataRequest; import org.apache.uniffle.common.netty.protocol.GetMemoryShuffleDataResponse; import org.apache.uniffle.common.netty.protocol.GetSortedShuffleDataRequest; @@ -552,7 +553,7 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { } String msg = "OK"; - GetLocalShuffleIndexResponse response; + GetLocalShuffleIndexV2Response response; int[] range = ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum); Storage storage = @@ -587,8 +588,13 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { auditContext.withStatusCode(status); auditContext.withReturnValue("len=" + data.size()); response = - new GetLocalShuffleIndexResponse( - req.getRequestId(), status, msg, data, shuffleIndexResult.getDataFileLen()); + new GetLocalShuffleIndexV2Response( + req.getRequestId(), + status, + msg, + data, + shuffleIndexResult.getDataFileLen(), + shuffleIndexResult.getStorageIds()); ReleaseMemoryAndRecordReadTimeListener listener = new ReleaseMemoryAndRecordReadTimeListener( start, assumedFileSize, data.size(), requestInfo, req, response, client); @@ -605,7 +611,7 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { requestInfo, indexFileNotFoundException); response = - new GetLocalShuffleIndexResponse( + new GetLocalShuffleIndexV2Response( req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L); } catch (Exception e) { shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize); @@ -616,7 +622,7 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { msg = "Error happened when get shuffle index for " + requestInfo + ", " + e.getMessage(); LOG.error(msg, e); response = - new GetLocalShuffleIndexResponse( + new GetLocalShuffleIndexV2Response( req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L); } } else { @@ -624,7 +630,7 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { msg = "Can't require memory to get shuffle index"; LOG.warn("{} for {}", msg, requestInfo); response = - new GetLocalShuffleIndexResponse( + new GetLocalShuffleIndexV2Response( req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L); } auditContext.withStatusCode(response.getStatusCode()); @@ -642,6 +648,7 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { int partitionNum = req.getPartitionNum(); long offset = req.getOffset(); int length = req.getLength(); + int storageId = req.getStorageId(); auditContext.withAppId(appId); auditContext.withShuffleId(shuffleId); auditContext.withArgs( @@ -656,7 +663,9 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { + ", offset=" + offset + ", length=" - + length); + + length + + ", storageId=" + + storageId); StatusCode status = verifyRequest(appId); if (status != StatusCode.SUCCESS) { auditContext.withStatusCode(status); @@ -699,7 +708,9 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { Storage storage = shuffleServer .getStorageManager() - .selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0])); + .selectStorage( + new ShuffleDataReadEvent( + appId, shuffleId, partitionId, range[0], req.getStorageId())); if (storage != null) { storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId)); } @@ -719,7 +730,8 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { partitionNum, storageType, offset, - length); + length, + storageId); ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getDataLength()); ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getDataLength()); ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.inc(); diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java index 05b83e558..29693ad18 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java @@ -46,7 +46,7 @@ public class HybridStorageManager implements StorageManager { private final StorageManagerSelector storageManagerSelector; HybridStorageManager(ShuffleServerConf conf) { - warmStorageManager = new LocalStorageManager(conf); + warmStorageManager = LocalStorageManagerFactory.get(conf); coldStorageManager = new HadoopStorageManager(conf); try { @@ -115,6 +115,11 @@ public class HybridStorageManager implements StorageManager { return warmStorageManager.selectStorage(event); } + @Override + public Storage selectStorageById(ShuffleDataReadEvent event) { + return warmStorageManager.selectStorageById(event); + } + @Override public void updateWriteMetrics(ShuffleDataFlushEvent event, long writeTime) { throw new UnsupportedOperationException(); diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index 622b5205a..598a2a9c6 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -94,7 +94,7 @@ public class LocalStorageManager extends SingleStorageManager { private boolean isStorageAuditLogEnabled; @VisibleForTesting - LocalStorageManager(ShuffleServerConf conf) { + public LocalStorageManager(ShuffleServerConf conf) { super(conf); storageBasePaths = RssUtils.getConfiguredLocalDirs(conf); if (CollectionUtils.isEmpty(storageBasePaths)) { @@ -136,6 +136,7 @@ public class LocalStorageManager extends SingleStorageManager { .ratio(ratio) .lowWaterMarkOfWrite(lowWaterMarkOfWrite) .highWaterMarkOfWrite(highWaterMarkOfWrite) + .setId(idx) .localStorageMedia(storageType); if (isDiskCapacityWatermarkCheckEnabled) { builder.enableDiskCapacityWatermarkCheck(); diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFactory.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFactory.java new file mode 100644 index 000000000..acdcd4377 --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.storage; + +import org.apache.commons.lang3.StringUtils; + +import org.apache.uniffle.common.util.RssUtils; +import org.apache.uniffle.server.ShuffleServerConf; + +public class LocalStorageManagerFactory { + public static LocalStorageManager get(ShuffleServerConf conf) { + String className = conf.get(ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS); + if (StringUtils.isEmpty(className)) { + throw new IllegalStateException( + "Configuration error: " + + ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS.toString() + + " should not set to empty"); + } + + try { + return (LocalStorageManager) + RssUtils.getConstructor(className, ShuffleServerConf.class).newInstance(conf); + } catch (Exception e) { + throw new IllegalStateException( + "Configuration error: " + + ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS.toString() + + " is failed to create instance of " + + className, + e); + } + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/storage/MultiPartLocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/MultiPartLocalStorageManager.java new file mode 100644 index 000000000..5bca9a9e1 --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/storage/MultiPartLocalStorageManager.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.storage; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.server.ShuffleDataFlushEvent; +import org.apache.uniffle.server.ShuffleDataReadEvent; +import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.storage.common.CompositeReadingViewStorage; +import org.apache.uniffle.storage.common.LocalStorage; +import org.apache.uniffle.storage.common.Storage; +import org.apache.uniffle.storage.util.ShuffleStorageUtils; + +public class MultiPartLocalStorageManager extends LocalStorageManager { + private static final Logger LOG = LoggerFactory.getLogger(MultiPartLocalStorageManager.class); + // id -> storage + private final Map<Integer, LocalStorage> idToStorages; + + private final CompositeReadingViewStorage compositeStorage; + + public MultiPartLocalStorageManager(ShuffleServerConf conf) { + super(conf); + idToStorages = new ConcurrentSkipListMap<>(); + for (LocalStorage storage : getStorages()) { + idToStorages.put(storage.getId(), storage); + } + + compositeStorage = new CompositeReadingViewStorage(getStorages()); + } + + @Override + public Storage selectStorage(ShuffleDataFlushEvent event) { + if (getStorages().size() == 1) { + if (event.getUnderStorage() == null) { + event.setUnderStorage(getStorages().get(0)); + } + return getStorages().get(0); + } + String appId = event.getAppId(); + int shuffleId = event.getShuffleId(); + int partitionId = event.getStartPartition(); + + // TODO(baoloongmao): extend to support select storage by free space + // eventId is a non-negative long. + LocalStorage storage = getStorages().get((int) (event.getEventId() % getStorages().size())); + if (storage != null) { + if (storage.isCorrupted()) { + if (storage.containsWriteHandler(appId, shuffleId, partitionId)) { + LOG.error( + "LocalStorage: {} is corrupted. Switching another storage for event: {}, some data will be lost", + storage.getBasePath(), + event); + } + } else { + if (event.getUnderStorage() == null) { + event.setUnderStorage(storage); + } + return storage; + } + } + + // TODO(baoloongmao): update health storages and store it as member of this class. + List<LocalStorage> candidates = + getStorages().stream() + .filter(x -> x.canWrite() && !x.isCorrupted()) + .collect(Collectors.toList()); + + if (candidates.size() == 0) { + return null; + } + final LocalStorage selectedStorage = + candidates.get( + ShuffleStorageUtils.getStorageIndex(candidates.size(), appId, shuffleId, partitionId)); + if (storage == null || storage.isCorrupted() || event.getUnderStorage() == null) { + event.setUnderStorage(selectedStorage); + return selectedStorage; + } + return storage; + } + + @Override + public Storage selectStorage(ShuffleDataReadEvent event) { + if (getStorages().size() == 1) { + return getStorages().get(0); + } + + int storageId = event.getStorageId(); + // TODO(baoloongmao): check AOOB exception + return idToStorages.get(storageId); + } + + @Override + public Storage selectStorageById(ShuffleDataReadEvent event) { + return compositeStorage; + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java index 70425a22d..4d8ca6822 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java @@ -36,6 +36,10 @@ public interface StorageManager { Storage selectStorage(ShuffleDataReadEvent event); + default Storage selectStorageById(ShuffleDataReadEvent event) { + return selectStorage(event); + } + boolean write(Storage storage, ShuffleWriteHandler handler, ShuffleDataFlushEvent event); void updateWriteMetrics(ShuffleDataFlushEvent event, long writeTime); diff --git a/server/src/main/java/org/apache/uniffle/server/storage/StorageManagerFactory.java b/server/src/main/java/org/apache/uniffle/server/storage/StorageManagerFactory.java index f21d6f6fe..e99434b0d 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManagerFactory.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManagerFactory.java @@ -33,7 +33,7 @@ public class StorageManagerFactory { public StorageManager createStorageManager(ShuffleServerConf conf) { StorageType type = StorageType.valueOf(conf.get(ShuffleServerConf.RSS_STORAGE_TYPE).name()); if (StorageType.LOCALFILE.equals(type) || StorageType.MEMORY_LOCALFILE.equals(type)) { - return new LocalStorageManager(conf); + return LocalStorageManagerFactory.get(conf); } else if (StorageType.HDFS.equals(type) || StorageType.MEMORY_HDFS.equals(type)) { return new HadoopStorageManager(conf); } else if (StorageType.LOCALFILE_HDFS.equals(type) diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/CompositeReadingViewStorage.java b/storage/src/main/java/org/apache/uniffle/storage/common/CompositeReadingViewStorage.java new file mode 100644 index 000000000..3d32e7922 --- /dev/null +++ b/storage/src/main/java/org/apache/uniffle/storage/common/CompositeReadingViewStorage.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.storage.common; + +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.exception.FileNotFoundException; +import org.apache.uniffle.storage.handler.api.ServerReadHandler; +import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler; +import org.apache.uniffle.storage.handler.impl.CompositeLocalFileServerReadHandler; +import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest; +import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest; + +public class CompositeReadingViewStorage extends AbstractStorage { + + private static final Logger LOG = LoggerFactory.getLogger(CompositeReadingViewStorage.class); + private final List<LocalStorage> localStorages; + + public CompositeReadingViewStorage(List<LocalStorage> localStorages) { + super(); + this.localStorages = localStorages; + } + + @Override + ShuffleWriteHandler newWriteHandler(CreateShuffleWriteHandlerRequest request) { + return null; + } + + public ServerReadHandler getOrCreateReadHandler(CreateShuffleReadHandlerRequest request) { + // Do not cache it since this class is just a wrapper + return newReadHandler(request); + } + + @Override + protected ServerReadHandler newReadHandler(CreateShuffleReadHandlerRequest request) { + List<ServerReadHandler> handlers = new ArrayList<>(); + for (LocalStorage storage : localStorages) { + try { + handlers.add(storage.getOrCreateReadHandler(request)); + } catch (FileNotFoundException e) { + // ignore it + } catch (Exception e) { + LOG.error("Failed to create read handler for storage: " + storage, e); + } + } + return new CompositeLocalFileServerReadHandler(handlers); + } + + @Override + public boolean canWrite() { + return false; + } + + @Override + public void updateWriteMetrics(StorageWriteMetrics metrics) {} + + @Override + public void updateReadMetrics(StorageReadMetrics metrics) {} + + @Override + public void createMetadataIfNotExist(String shuffleKey) {} + + @Override + public String getStoragePath() { + return null; + } + + @Override + public String getStorageHost() { + return null; + } +} diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java index e748608de..a87fd1d87 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java +++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java @@ -46,6 +46,7 @@ public class LocalStorage extends AbstractStorage { public static final String STORAGE_HOST = "local"; private final long diskCapacity; + private final int id; private volatile long diskAvailableBytes; private volatile long serviceUsedBytes; // for test cases @@ -68,6 +69,7 @@ public class LocalStorage extends AbstractStorage { this.capacity = builder.capacity; this.media = builder.media; this.enableDiskCapacityCheck = builder.enableDiskCapacityWatermarkCheck; + this.id = builder.id; File baseFolder = new File(basePath); try { @@ -149,7 +151,8 @@ public class LocalStorage extends AbstractStorage { request.getPartitionId(), request.getPartitionNumPerRange(), request.getPartitionNum(), - basePath); + basePath, + id); } // only for tests. @@ -282,6 +285,10 @@ public class LocalStorage extends AbstractStorage { isSpaceEnough = false; } + public int getId() { + return id; + } + public static class Builder { private long capacity; private double ratio; @@ -290,6 +297,7 @@ public class LocalStorage extends AbstractStorage { private String basePath; private StorageMedia media; private boolean enableDiskCapacityWatermarkCheck; + private int id; private Builder() {} @@ -328,6 +336,11 @@ public class LocalStorage extends AbstractStorage { return this; } + public Builder setId(int id) { + this.id = id; + return this; + } + public LocalStorage build() { return new LocalStorage(this); } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ServerReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ServerReadHandler.java index b16a4d327..267cd3030 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ServerReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ServerReadHandler.java @@ -25,4 +25,6 @@ public interface ServerReadHandler { ShuffleDataResult getShuffleData(long offset, int length); ShuffleIndexResult getShuffleIndex(); + + int getStorageId(); } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/CompositeLocalFileServerReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/CompositeLocalFileServerReadHandler.java new file mode 100644 index 000000000..deb9953b1 --- /dev/null +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/CompositeLocalFileServerReadHandler.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.storage.handler.impl; + +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.ShuffleDataResult; +import org.apache.uniffle.common.ShuffleIndexResult; +import org.apache.uniffle.common.netty.buffer.ManagedBuffer; +import org.apache.uniffle.common.netty.buffer.MultiFileSegmentManagedBuffer; +import org.apache.uniffle.storage.handler.api.ServerReadHandler; + +public class CompositeLocalFileServerReadHandler implements ServerReadHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(CompositeLocalFileServerReadHandler.class); + private final List<ServerReadHandler> handlers; + + public CompositeLocalFileServerReadHandler(List<ServerReadHandler> handlers) { + this.handlers = handlers; + } + + @Override + public ShuffleDataResult getShuffleData(long offset, int length) { + return null; + } + + @Override + public ShuffleIndexResult getShuffleIndex() { + if (handlers.size() == 0) { + // caller should handle the null return + return null; + } + int[] storageIds = new int[handlers.size()]; + List<ManagedBuffer> managedBuffers = new ArrayList<>(handlers.size()); + String dataFileName = ""; + long length = 0; + for (int i = 0; i < handlers.size(); i++) { + ServerReadHandler handler = handlers.get(i); + storageIds[i] = handler.getStorageId(); + ShuffleIndexResult result = handler.getShuffleIndex(); + length += result.getDataFileLen(); + managedBuffers.add(result.getManagedBuffer()); + if (i == 0) { + // Use the first data file name as the data file name of the combined result. + // TODO: This cannot work for remote merge feature. + dataFileName = result.getDataFileName(); + } + } + MultiFileSegmentManagedBuffer mergedManagedBuffer = + new MultiFileSegmentManagedBuffer(managedBuffers); + return new ShuffleIndexResult(mergedManagedBuffer, length, dataFileName, storageIds); + } + + @Override + public int getStorageId() { + return 0; + } +} diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java index 56c68d062..4772dcf21 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java @@ -157,6 +157,7 @@ public class LocalFileClientReadHandler extends DataSkippableReadHandler { partitionNum, shuffleDataSegment.getOffset(), expectedLength, + shuffleDataSegment.getStorageId(), retryMax, retryIntervalMax); try { diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java index 4eb7e2d52..f9da2a6e8 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java @@ -36,6 +36,7 @@ import org.apache.uniffle.storage.util.ShuffleStorageUtils; public class LocalFileServerReadHandler implements ServerReadHandler { private static final Logger LOG = LoggerFactory.getLogger(LocalFileServerReadHandler.class); + private final int storageId; private String indexFileName = ""; private String dataFileName = ""; private String appId; @@ -48,13 +49,25 @@ public class LocalFileServerReadHandler implements ServerReadHandler { int partitionId, int partitionNumPerRange, int partitionNum, - String path) { + String path, + int storageId) { this.appId = appId; this.shuffleId = shuffleId; this.partitionId = partitionId; + this.storageId = storageId; init(appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, path); } + public LocalFileServerReadHandler( + String appId, + int shuffleId, + int partitionId, + int partitionNumPerRange, + int partitionNum, + String path) { + this(appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, path, 0); + } + private void init( String appId, int shuffleId, @@ -150,7 +163,7 @@ public class LocalFileServerReadHandler implements ServerReadHandler { // get dataFileSize for read segment generation in DataSkippableReadHandler#readShuffleData long dataFileSize = new File(dataFileName).length(); return new ShuffleIndexResult( - new FileSegmentManagedBuffer(indexFile, 0, len), dataFileSize, dataFileName); + new FileSegmentManagedBuffer(indexFile, 0, len), dataFileSize, dataFileName, storageId); } public String getDataFileName() { @@ -160,4 +173,9 @@ public class LocalFileServerReadHandler implements ServerReadHandler { public String getIndexFileName() { return indexFileName; } + + @Override + public int getStorageId() { + return storageId; + } }
