This is an automated email from the ASF dual-hosted git repository. roryqi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new 649118487 [#1748] feat(remote merge): Introduce MergeManager to merge records on the server side. (#1946) 649118487 is described below commit 649118487a8e25e8008c1b90d1d750f72f730c1a Author: zhengchenyu <zhengcheny...@163.com> AuthorDate: Mon Aug 12 10:23:56 2024 +0800 [#1748] feat(remote merge): Introduce MergeManager to merge records on the server side. (#1946) ### What changes were proposed in this pull request? Used to merge records on the server side. By this, client can get sorted record. ### Why are the changes needed? Fix: #1748 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? unit test and test in cluster. --- .../apache/uniffle/common/ShuffleIndexResult.java | 8 +- .../org/apache/uniffle/common/config/RssConf.java | 12 + .../response/RssGetShuffleIndexResponse.java | 2 +- .../org/apache/uniffle/server/ShuffleServer.java | 21 +- .../apache/uniffle/server/ShuffleServerConf.java | 56 +++ .../uniffle/server/ShuffleServerMetrics.java | 3 + .../apache/uniffle/server/ShuffleTaskManager.java | 20 +- .../uniffle/server/merge/BlockFlushFileReader.java | 387 +++++++++++++++++ .../server/merge/DefaultMergeEventHandler.java | 111 +++++ .../apache/uniffle/server/merge/MergeEvent.java | 88 ++++ .../uniffle/server/merge/MergeEventHandler.java | 18 +- .../apache/uniffle/server/merge/MergeStatus.java | 26 +- .../apache/uniffle/server/merge/MergedResult.java | 109 +++++ .../org/apache/uniffle/server/merge/Partition.java | 478 +++++++++++++++++++++ .../org/apache/uniffle/server/merge/Shuffle.java | 100 +++++ .../uniffle/server/merge/ShuffleMergeManager.java | 293 +++++++++++++ .../server/merge/BlockFlushFileReaderTest.java | 257 +++++++++++ .../uniffle/server/merge/MergedResultTest.java | 176 ++++++++ .../server/merge/ShuffleMergeManagerTest.java | 223 ++++++++++ server/src/test/resources/log4j2.xml | 29 ++ .../handler/impl/LocalFileServerReadHandler.java | 11 +- 21 files changed, 2398 insertions(+), 30 deletions(-) diff --git a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java index 71bb3df39..c90f8997e 100644 --- a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java +++ b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java @@ -28,6 +28,7 @@ import org.apache.uniffle.common.util.ByteBufUtils; public class ShuffleIndexResult { private final ManagedBuffer buffer; private long dataFileLen; + private String dataFileName; public ShuffleIndexResult() { this(ByteBuffer.wrap(new byte[0]), -1); @@ -43,9 +44,10 @@ public class ShuffleIndexResult { this.dataFileLen = dataFileLen; } - public ShuffleIndexResult(ManagedBuffer buffer, long dataFileLen) { + public ShuffleIndexResult(ManagedBuffer buffer, long dataFileLen, String dataFileName) { this.buffer = buffer; this.dataFileLen = dataFileLen; + this.dataFileName = dataFileName; } public byte[] getData() { @@ -79,4 +81,8 @@ public class ShuffleIndexResult { public ManagedBuffer getManagedBuffer() { return buffer; } + + public String getDataFileName() { + return dataFileName; + } } diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java index b77a50b23..74d1c2bdb 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java @@ -18,6 +18,7 @@ package org.apache.uniffle.common.config; import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -682,4 +683,15 @@ public class RssConf implements Cloneable { public void remove(String key) { this.settings.remove(key); } + + public Map<String, Object> getPropsWithPrefix(String confPrefix) { + Map<String, Object> configMap = new HashMap<>(); + for (Map.Entry<String, Object> entry : settings.entrySet()) { + if (entry.getKey().startsWith(confPrefix)) { + String keyName = entry.getKey().substring(confPrefix.length()); + configMap.put(keyName, entry.getValue()); + } + } + return configMap; + } } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java index 37a31652e..4d3667ab1 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java @@ -26,7 +26,7 @@ public class RssGetShuffleIndexResponse extends ClientResponse { public RssGetShuffleIndexResponse(StatusCode statusCode, ManagedBuffer data, long dataFileLen) { super(statusCode); - this.shuffleIndexResult = new ShuffleIndexResult(data, dataFileLen); + this.shuffleIndexResult = new ShuffleIndexResult(data, dataFileLen, null); } public ShuffleIndexResult getShuffleIndexResult() { diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java index ee790bad0..60be04b65 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java @@ -57,6 +57,7 @@ import org.apache.uniffle.common.util.ThreadUtils; import org.apache.uniffle.common.web.CoalescedCollectorRegistry; import org.apache.uniffle.common.web.JettyServer; import org.apache.uniffle.server.buffer.ShuffleBufferManager; +import org.apache.uniffle.server.merge.ShuffleMergeManager; import org.apache.uniffle.server.netty.StreamServer; import org.apache.uniffle.server.storage.StorageManager; import org.apache.uniffle.server.storage.StorageManagerFactory; @@ -90,6 +91,8 @@ public class ShuffleServer { private ShuffleFlushManager shuffleFlushManager; private ShuffleBufferManager shuffleBufferManager; private StorageManager storageManager; + private boolean remoteMergeEnable; + private ShuffleMergeManager shuffleMergeManager; private HealthCheck healthCheck; private Set<String> tags = Sets.newHashSet(); private GRPCMetrics grpcMetrics; @@ -305,9 +308,17 @@ public class ShuffleServer { shuffleFlushManager = new ShuffleFlushManager(shuffleServerConf, this, storageManager); shuffleBufferManager = new ShuffleBufferManager(shuffleServerConf, shuffleFlushManager, nettyServerEnabled); + remoteMergeEnable = shuffleServerConf.get(ShuffleServerConf.SERVER_MERGE_ENABLE); + if (remoteMergeEnable) { + shuffleMergeManager = new ShuffleMergeManager(shuffleServerConf, this); + } shuffleTaskManager = new ShuffleTaskManager( - shuffleServerConf, shuffleFlushManager, shuffleBufferManager, storageManager); + shuffleServerConf, + shuffleFlushManager, + shuffleBufferManager, + storageManager, + shuffleMergeManager); shuffleTaskManager.start(); setServer(); @@ -569,4 +580,12 @@ public class ShuffleServer { shuffleServer.getJettyPort(), shuffleServer.getStartTimeMs()); } + + public ShuffleMergeManager getShuffleMergeManager() { + return shuffleMergeManager; + } + + public boolean isRemoteMergeEnable() { + return remoteMergeEnable; + } } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java index 5b7aad8ed..eef889690 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java @@ -659,6 +659,62 @@ public class ShuffleServerConf extends RssBaseConf { .defaultValue(10 * 60L) .withDescription("The storage remove resource operation timeout."); + public static final ConfigOption<Boolean> SERVER_MERGE_ENABLE = + ConfigOptions.key("rss.server.merge.enable") + .booleanType() + .defaultValue(false) + .withDescription("Whether to enable remote merge"); + + public static final ConfigOption<Integer> SERVER_MERGE_THREAD_POOL_SIZE = + ConfigOptions.key("rss.server.merge.threadPoolSize") + .intType() + .defaultValue(10) + .withDescription("thread pool for merge"); + + public static final ConfigOption<Integer> SERVER_MERGE_THREAD_POOL_QUEUE_SIZE = + ConfigOptions.key("rss.server.merge.threadPoolQueueSize") + .intType() + .defaultValue(Integer.MAX_VALUE) + .withDescription("size of waiting queue for merge thread pool"); + + public static final ConfigOption<Integer> SERVER_MERGE_THREAD_ALIVE_TIME = + ConfigOptions.key("rss.server.merge.threadAliveTime") + .intType() + .defaultValue(120) + .withDescription("thread idle time in merge thread pool (s)"); + + public static final ConfigOption<String> SERVER_MERGE_DEFAULT_MERGED_BLOCK_SIZE = + ConfigOptions.key("rss.server.merge.defaultMergedBlockSize") + .stringType() + .defaultValue("14m") + .withDescription("The default merged block size."); + + public static final ConfigOption<Long> SERVER_MERGE_CACHE_MERGED_BLOCK_INIT_SLEEP_MS = + ConfigOptions.key("rss.server.merge.cacheMergedBlockInitSleepMs") + .longType() + .defaultValue(100L) + .withDescription( + "When caching merged block, the minimum waiting event after failure to require memory"); + + public static final ConfigOption<Long> SERVER_MERGE_CACHE_MERGED_BLOCK_MAX_SLEEP_MS = + ConfigOptions.key("rss.server.merge.cacheMergedBlockMaxSleepMs") + .longType() + .defaultValue(2000L) + .withDescription( + "When caching merged block, the maximum waiting event after failure to require memory"); + + public static final ConfigOption<Integer> SERVER_MERGE_BLOCK_RING_BUFFER_SIZE = + ConfigOptions.key("rss.server.merge.blockRingBufferSize") + .intType() + .defaultValue(2) + .withDescription("The ring buffer size for read block when merge"); + + public static final ConfigOption<String> SERVER_MERGE_CLASS_LOADER_JARS_PATH = + ConfigOptions.key("rss.server.merge.classLoaderJarsPath") + .stringType() + .defaultValue(null) + .withDescription("The jars path for class loader when merge"); + public ShuffleServerConf() {} public ShuffleServerConf(String fileName) { diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java index bac820520..3e886407c 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java @@ -58,6 +58,7 @@ public class ShuffleServerMetrics { private static final String EVENT_SIZE_THRESHOLD_LEVEL3 = "event_size_threshold_level3"; private static final String EVENT_SIZE_THRESHOLD_LEVEL4 = "event_size_threshold_level4"; private static final String EVENT_QUEUE_SIZE = "event_queue_size"; + private static final String MERGE_EVENT_QUEUE_SIZE = "merge_event_queue_size"; private static final String HADOOP_FLUSH_THREAD_POOL_QUEUE_SIZE = "hadoop_flush_thread_pool_queue_size"; private static final String LOCALFILE_FLUSH_THREAD_POOL_QUEUE_SIZE = @@ -222,6 +223,7 @@ public class ShuffleServerMetrics { public static Gauge.Child gaugeUsedDirectMemorySizeByGrpcNetty; public static Gauge.Child gaugeWriteHandler; public static Gauge.Child gaugeEventQueueSize; + public static Gauge.Child gaugeMergeEventQueueSize; public static Gauge.Child gaugeHadoopFlushThreadPoolQueueSize; public static Gauge.Child gaugeLocalfileFlushThreadPoolQueueSize; public static Gauge.Child gaugeFallbackFlushThreadPoolQueueSize; @@ -454,6 +456,7 @@ public class ShuffleServerMetrics { metricsManager.addLabeledGauge(USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY); gaugeWriteHandler = metricsManager.addLabeledGauge(TOTAL_WRITE_HANDLER); gaugeEventQueueSize = metricsManager.addLabeledGauge(EVENT_QUEUE_SIZE); + gaugeMergeEventQueueSize = metricsManager.addLabeledGauge(MERGE_EVENT_QUEUE_SIZE); gaugeHadoopFlushThreadPoolQueueSize = metricsManager.addLabeledGauge(HADOOP_FLUSH_THREAD_POOL_QUEUE_SIZE); gaugeLocalfileFlushThreadPoolQueueSize = diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 8dc1653ed..226682e63 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -79,6 +79,7 @@ import org.apache.uniffle.server.event.AppPurgeEvent; import org.apache.uniffle.server.event.AppUnregisterPurgeEvent; import org.apache.uniffle.server.event.PurgeEvent; import org.apache.uniffle.server.event.ShufflePurgeEvent; +import org.apache.uniffle.server.merge.ShuffleMergeManager; import org.apache.uniffle.server.storage.StorageManager; import org.apache.uniffle.storage.common.Storage; import org.apache.uniffle.storage.common.StorageReadMetrics; @@ -119,17 +120,28 @@ public class ShuffleTaskManager { private BlockingQueue<PurgeEvent> expiredAppIdQueue = Queues.newLinkedBlockingQueue(); private final Cache<String, ReentrantReadWriteLock> appLocks; private final long storageRemoveOperationTimeoutSec; + private ShuffleMergeManager shuffleMergeManager; public ShuffleTaskManager( ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager, ShuffleBufferManager shuffleBufferManager, StorageManager storageManager) { + this(conf, shuffleFlushManager, shuffleBufferManager, storageManager, null); + } + + public ShuffleTaskManager( + ShuffleServerConf conf, + ShuffleFlushManager shuffleFlushManager, + ShuffleBufferManager shuffleBufferManager, + StorageManager storageManager, + ShuffleMergeManager shuffleMergeManager) { this.conf = conf; this.shuffleFlushManager = shuffleFlushManager; this.partitionsToBlockIds = JavaUtils.newConcurrentMap(); this.shuffleBufferManager = shuffleBufferManager; this.storageManager = storageManager; + this.shuffleMergeManager = shuffleMergeManager; this.appExpiredWithoutHB = conf.getLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT); this.commitCheckIntervalMax = conf.getLong(ShuffleServerConf.SERVER_COMMIT_CHECK_INTERVAL_MAX); this.preAllocationExpired = conf.getLong(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED); @@ -804,7 +816,9 @@ public class ShuffleTaskManager { }, storageRemoveOperationTimeoutSec, operationMsg); - + if (shuffleMergeManager != null) { + shuffleMergeManager.removeBuffer(appId, shuffleIds); + } LOG.info( "Finish remove resource for appId[{}], shuffleIds[{}], cost[{}]", appId, @@ -862,7 +876,9 @@ public class ShuffleTaskManager { }, storageRemoveOperationTimeoutSec, operationMsg); - + if (shuffleMergeManager != null) { + shuffleMergeManager.removeBuffer(appId); + } if (shuffleTaskInfo.hasHugePartition()) { ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec(); ShuffleServerMetrics.gaugeHugePartitionNum.dec(); diff --git a/server/src/main/java/org/apache/uniffle/server/merge/BlockFlushFileReader.java b/server/src/main/java/org/apache/uniffle/server/merge/BlockFlushFileReader.java new file mode 100644 index 000000000..76fa2cc1d --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/merge/BlockFlushFileReader.java @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.merge; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.netty.buffer.FileSegmentManagedBuffer; +import org.apache.uniffle.common.serializer.PartialInputStream; +import org.apache.uniffle.common.util.JavaUtils; +import org.apache.uniffle.storage.common.FileBasedShuffleSegment; + +/** + * Remote Merge merges the original blocks into a new block set starting with id 1. For now, all + * blocks under the partition are written into a file. Remote Merge needs to read the contents of + * each block separately. If we use a file handle to manage each block, a large number of open files + * will be wasted. Therefore, BlockFlushFileReader was introduced. + * + * <p>BlockFlushFileReader uses one file handle to manage all blocks under this partition. The + * FlushFileReader thread is used to read this file corresponding to the partition. FlushFileReader + * reads the partial buffer of each block in sequence each time. In this way, FlushFileReader always + * reads data in the order of increasing offset, which reduces random reads compared to opening a + * file per block. BlockInputStream reads the buffer corresponding to the block. We use RingBuffer + * to balance the buffer generated by FlushFileReader and the buffer consumed by BlockInputStream. + */ +public class BlockFlushFileReader { + + private static final Logger LOG = LoggerFactory.getLogger(BlockFlushFileReader.class); + private static final int BUFFER_SIZE = 4096; + + private String dataFile; + private FileInputStream dataInput; + private FileChannel dataFileChannel; + boolean stop = false; + + // blockid -> BlockInputStream + private final Map<Long, BlockInputStream> inputStreamMap = JavaUtils.newConcurrentMap(); + private final LinkedHashMap<Long, FileBasedShuffleSegment> indexSegments = new LinkedHashMap<>(); + + private FlushFileReader flushFileReader; + private volatile Throwable readThrowable = null; + // Even though there are many BlockInputStream, these BlockInputStream must + // be executed in the same thread, we called the Merge Thread. When the buffer + // of BlockInputStream have been read out, we can notify flushFileReader by + // unlock. Then flushFileReader will load the buffer, and Merge will read the + // buffer of BlockInputStream until flushFileReader load done and unlock. + private final ReentrantLock lock = new ReentrantLock(true); + + private final int ringBufferSize; + private final int mask; + + public BlockFlushFileReader(String dataFile, String indexFile, int ringBufferSize) + throws IOException { + // Make sure flush file will not be updated + this.ringBufferSize = ringBufferSize; + this.mask = ringBufferSize - 1; + loadShuffleIndex(indexFile); + this.dataFile = dataFile; + this.dataInput = new FileInputStream(dataFile); + this.dataFileChannel = dataInput.getChannel(); + // Avoid flushFileReader noop loop + this.lock.lock(); + this.flushFileReader = new FlushFileReader(); + this.flushFileReader.start(); + } + + public void loadShuffleIndex(String indexFileName) { + File indexFile = new File(indexFileName); + long indexFileSize = indexFile.length(); + int indexNum = (int) (indexFileSize / FileBasedShuffleSegment.SEGMENT_SIZE); + int len = indexNum * FileBasedShuffleSegment.SEGMENT_SIZE; + ByteBuffer indexData = new FileSegmentManagedBuffer(indexFile, 0, len).nioByteBuffer(); + while (indexData.hasRemaining()) { + long offset = indexData.getLong(); + int length = indexData.getInt(); + int uncompressLength = indexData.getInt(); + long crc = indexData.getLong(); + long blockId = indexData.getLong(); + long taskAttemptId = indexData.getLong(); + FileBasedShuffleSegment fileBasedShuffleSegment = + new FileBasedShuffleSegment( + blockId, offset, length, uncompressLength, crc, taskAttemptId); + indexSegments.put(fileBasedShuffleSegment.getBlockId(), fileBasedShuffleSegment); + } + } + + public void close() throws IOException, InterruptedException { + if (!this.stop) { + stop = true; + flushFileReader.interrupt(); + flushFileReader = null; + } + if (dataInput != null) { + this.dataInput.close(); + this.dataInput = null; + this.dataFile = null; + } + } + + public BlockInputStream registerBlockInputStream(long blockId) { + if (!indexSegments.containsKey(blockId)) { + return null; + } + if (!inputStreamMap.containsKey(blockId)) { + inputStreamMap.put( + blockId, new BlockInputStream(blockId, this.indexSegments.get(blockId).getLength())); + } + return inputStreamMap.get(blockId); + } + + class FlushFileReader extends Thread { + @Override + public void run() { + while (!stop) { + int available = 0; + int process = 0; + try { + lock.lockInterruptibly(); + try { + Iterator<Map.Entry<Long, FileBasedShuffleSegment>> iterator = + indexSegments.entrySet().iterator(); + while (iterator.hasNext()) { + FileBasedShuffleSegment segment = iterator.next().getValue(); + long blockId = segment.getBlockId(); + BlockInputStream inputStream = inputStreamMap.get(blockId); + if (inputStream == null || inputStream.eof) { + continue; + } + available++; + if (inputStream.isBufferFull()) { + continue; + } + process++; + long off = segment.getOffset() + inputStream.getOffsetInThisBlock(); + if (dataFileChannel.position() != off) { + dataFileChannel.position(off); + } + inputStream.writeBuffer(); + } + } catch (Throwable throwable) { + readThrowable = throwable; + LOG.info("FlushFileReader read failed, caused by ", throwable); + stop = true; + } finally { + lock.unlock(); + if (LOG.isDebugEnabled()) { + LOG.debug( + "statistics: load buffer available is {}, process is {}", available, process); + } + } + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("FlushFileReader for {} have been interrupted.", dataFile); + } + } + } + } + } + + class Buffer { + + private byte[] bytes = new byte[BUFFER_SIZE]; + private int cap = BUFFER_SIZE; + private int pos = cap; + + public int get() { + return this.bytes[pos++] & 0xFF; + } + + public int get(byte[] bs, int off, int len) { + int r = Math.min(cap - pos, len); + System.arraycopy(bytes, pos, bs, off, r); + pos += r; + return r; + } + + public boolean readable() { + return pos < cap; + } + + public void writeBuffer(int length) throws IOException { + dataFileChannel.read(ByteBuffer.wrap(this.bytes, 0, length)); + this.pos = 0; + this.cap = length; + } + } + + class RingBuffer { + + Buffer[] buffers; + // The max of int is 2147483647, the maximum bocksize supported by RingBuffer is 7.999 TB, + // the block can't be that big. so readIndex and writeIndex cannot overflow, there's no + // modulo operator for readIndex and writeIndex. + int readIndex = 0; + int writeIndex = 0; + + RingBuffer() { + this.buffers = new Buffer[ringBufferSize]; + for (int i = 0; i < ringBufferSize; i++) { + this.buffers[i] = new Buffer(); + } + } + + boolean full() { + return (writeIndex - readIndex) == ringBufferSize; + } + + boolean empty() { + return writeIndex == readIndex; + } + + int write(int available) throws IOException { + int left = available; + while (!full() && left > 0) { + int size = Math.min(available, BUFFER_SIZE); + this.buffers[writeIndex & mask].writeBuffer(size); + left -= size; + writeIndex++; + } + return available - left; + } + + int read() { + int ret = this.buffers[readIndex & mask].get(); + if (!this.buffers[readIndex & mask].readable()) { + readIndex++; + } + return ret; + } + + int read(byte[] bs, int off, int len) { + int total = 0; + int end = off + len; + while (off < end && !this.empty()) { + Buffer buffer = this.buffers[readIndex & mask]; + int r = buffer.get(bs, off, len); + if (!this.buffers[readIndex & mask].readable()) { + readIndex++; + } + off += r; + len -= r; + total += r; + } + return total; + } + } + + public class BlockInputStream extends PartialInputStream { + + private long blockId; + private RingBuffer ringBuffer; + private boolean eof = false; + private final int length; + private int pos = 0; + private int offsetInThisBlock = 0; + + public BlockInputStream(long blockId, int length) { + this.blockId = blockId; + this.length = length; + this.ringBuffer = new RingBuffer(); + } + + @Override + public int available() throws IOException { + return length - pos; + } + + @Override + public long getStart() { + return 0; + } + + @Override + public long getEnd() { + return length; + } + + public long getOffsetInThisBlock() { + return this.offsetInThisBlock; + } + + @Override + public void close() throws IOException { + try { + inputStreamMap.remove(blockId); + indexSegments.remove(blockId); + if (inputStreamMap.size() == 0) { + BlockFlushFileReader.this.close(); + } + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + public boolean isBufferFull() { + return ringBuffer.full(); + } + + public void writeBuffer() throws IOException { + int size = this.ringBuffer.write(length - offsetInThisBlock); + this.offsetInThisBlock += size; + } + + public int read(byte[] bs, int off, int len) throws IOException { + if (stop) { + throw new IOException("Block flush file reader is closed, caused by " + readThrowable); + } + if (bs == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > bs.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + if (eof) { + return -1; + } + while (ringBuffer.empty() && !stop) { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + try { + lock.lockInterruptibly(); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + int c = this.ringBuffer.read(bs, off, len); + pos += c; + if (pos >= length) { + eof = true; + } + return c; + } + + @Override + public int read() throws IOException { + if (stop) { + throw new IOException("Block flush file reader is closed, caused by " + readThrowable); + } + if (eof) { + return -1; + } + while (ringBuffer.empty() && !stop) { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + try { + lock.lockInterruptibly(); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + int c = this.ringBuffer.read(); + pos++; + if (pos >= length) { + eof = true; + } + return c; + } + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/merge/DefaultMergeEventHandler.java b/server/src/main/java/org/apache/uniffle/server/merge/DefaultMergeEventHandler.java new file mode 100644 index 000000000..05c9a3723 --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/merge/DefaultMergeEventHandler.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.merge; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import com.google.common.collect.Queues; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.util.ThreadUtils; +import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.server.ShuffleServerMetrics; + +import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MERGE_THREAD_ALIVE_TIME; +import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MERGE_THREAD_POOL_QUEUE_SIZE; +import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MERGE_THREAD_POOL_SIZE; + +public class DefaultMergeEventHandler implements MergeEventHandler { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultMergeEventHandler.class); + + private Executor threadPoolExecutor; + protected final BlockingQueue<MergeEvent> queue = Queues.newLinkedBlockingQueue(); + private Consumer<MergeEvent> eventConsumer; + private volatile boolean stopped = false; + + public DefaultMergeEventHandler( + ShuffleServerConf serverConf, Consumer<MergeEvent> eventConsumer) { + this.eventConsumer = eventConsumer; + int poolSize = serverConf.get(SERVER_MERGE_THREAD_POOL_SIZE); + int queueSize = serverConf.get(SERVER_MERGE_THREAD_POOL_QUEUE_SIZE); + int keepAliveTime = serverConf.get(SERVER_MERGE_THREAD_ALIVE_TIME); + BlockingQueue<Runnable> waitQueue = Queues.newLinkedBlockingQueue(queueSize); + threadPoolExecutor = + new ThreadPoolExecutor( + poolSize, + poolSize, + keepAliveTime, + TimeUnit.SECONDS, + waitQueue, + ThreadUtils.getThreadFactory("DefaultMergeEventHandler")); + startEventProcessor(); + } + + private void startEventProcessor() { + Thread processEventThread = new Thread(this::eventLoop); + processEventThread.setName("ProcessEventThread"); + processEventThread.setDaemon(true); + processEventThread.start(); + } + + protected void eventLoop() { + while (!stopped && !Thread.currentThread().isInterrupted()) { + processNextEvent(); + } + } + + protected void processNextEvent() { + try { + MergeEvent event = queue.take(); + threadPoolExecutor.execute(() -> handleEventAndUpdateMetrics(event)); + } catch (Exception e) { + LOG.error("Exception happened when process event.", e); + } + } + + private void handleEventAndUpdateMetrics(MergeEvent event) { + try { + eventConsumer.accept(event); + } finally { + ShuffleServerMetrics.gaugeMergeEventQueueSize.dec(); + } + } + + @Override + public void handle(MergeEvent event) { + if (queue.offer(event)) { + ShuffleServerMetrics.gaugeMergeEventQueueSize.inc(); + } + } + + @Override + public int getEventNumInMerge() { + return queue.size(); + } + + @Override + public void stop() { + stopped = true; + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/merge/MergeEvent.java b/server/src/main/java/org/apache/uniffle/server/merge/MergeEvent.java new file mode 100644 index 000000000..250e9a1ee --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/merge/MergeEvent.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.merge; + +import org.roaringbitmap.longlong.Roaring64NavigableMap; + +public class MergeEvent { + + private final String appId; + private final int shuffleId; + private final int partitionId; + private final Class kClass; + private final Class vClass; + private Roaring64NavigableMap expectedBlockIdMap; + + public MergeEvent( + String appId, + int shuffleId, + int partitionId, + Class kClass, + Class vClass, + Roaring64NavigableMap expectedBlockIdMap) { + this.appId = appId; + this.shuffleId = shuffleId; + this.partitionId = partitionId; + this.kClass = kClass; + this.vClass = vClass; + this.expectedBlockIdMap = expectedBlockIdMap; + } + + public String getAppId() { + return appId; + } + + public int getShuffleId() { + return shuffleId; + } + + public int getPartitionId() { + return partitionId; + } + + public Roaring64NavigableMap getExpectedBlockIdMap() { + return expectedBlockIdMap; + } + + public Class getKeyClass() { + return kClass; + } + + public Class getValueClass() { + return vClass; + } + + @Override + public String toString() { + return "MergeEvent{" + + "appId='" + + appId + + '\'' + + ", shuffleId=" + + shuffleId + + ", partitionId=" + + partitionId + + ", kClass=" + + kClass + + ", vClass=" + + vClass + + ", expectedBlockIdMap=" + + expectedBlockIdMap + + '}'; + } +} diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java b/server/src/main/java/org/apache/uniffle/server/merge/MergeEventHandler.java similarity index 56% copy from internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java copy to server/src/main/java/org/apache/uniffle/server/merge/MergeEventHandler.java index 37a31652e..c4a248e3a 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java +++ b/server/src/main/java/org/apache/uniffle/server/merge/MergeEventHandler.java @@ -15,21 +15,13 @@ * limitations under the License. */ -package org.apache.uniffle.client.response; +package org.apache.uniffle.server.merge; -import org.apache.uniffle.common.ShuffleIndexResult; -import org.apache.uniffle.common.netty.buffer.ManagedBuffer; -import org.apache.uniffle.common.rpc.StatusCode; +public interface MergeEventHandler { -public class RssGetShuffleIndexResponse extends ClientResponse { - private final ShuffleIndexResult shuffleIndexResult; + void handle(MergeEvent event); - public RssGetShuffleIndexResponse(StatusCode statusCode, ManagedBuffer data, long dataFileLen) { - super(statusCode); - this.shuffleIndexResult = new ShuffleIndexResult(data, dataFileLen); - } + int getEventNumInMerge(); - public ShuffleIndexResult getShuffleIndexResult() { - return shuffleIndexResult; - } + void stop(); } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java b/server/src/main/java/org/apache/uniffle/server/merge/MergeStatus.java similarity index 57% copy from internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java copy to server/src/main/java/org/apache/uniffle/server/merge/MergeStatus.java index 37a31652e..b2263aa34 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleIndexResponse.java +++ b/server/src/main/java/org/apache/uniffle/server/merge/MergeStatus.java @@ -15,21 +15,25 @@ * limitations under the License. */ -package org.apache.uniffle.client.response; +package org.apache.uniffle.server.merge; -import org.apache.uniffle.common.ShuffleIndexResult; -import org.apache.uniffle.common.netty.buffer.ManagedBuffer; -import org.apache.uniffle.common.rpc.StatusCode; +import org.apache.uniffle.common.merger.MergeState; -public class RssGetShuffleIndexResponse extends ClientResponse { - private final ShuffleIndexResult shuffleIndexResult; +public class MergeStatus { - public RssGetShuffleIndexResponse(StatusCode statusCode, ManagedBuffer data, long dataFileLen) { - super(statusCode); - this.shuffleIndexResult = new ShuffleIndexResult(data, dataFileLen); + private MergeState state; + private long size; + + public MergeStatus(MergeState state, long size) { + this.state = state; + this.size = size; + } + + public MergeState getState() { + return state; } - public ShuffleIndexResult getShuffleIndexResult() { - return shuffleIndexResult; + public long getSize() { + return size; } } diff --git a/server/src/main/java/org/apache/uniffle/server/merge/MergedResult.java b/server/src/main/java/org/apache/uniffle/server/merge/MergedResult.java new file mode 100644 index 000000000..6c7ce056f --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/merge/MergedResult.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.merge; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.common.merger.Recordable; + +import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MERGE_DEFAULT_MERGED_BLOCK_SIZE; + +public class MergedResult { + + private final RssConf rssConf; + private final long mergedBlockSize; + // raw offset by blockId + private final List<Long> offsets = new ArrayList<>(); + private final CacheMergedBlockFuntion cachedMergedBlock; + + public MergedResult( + RssConf rssConf, CacheMergedBlockFuntion cachedMergedBlock, int mergedBlockSize) { + this.rssConf = rssConf; + this.cachedMergedBlock = cachedMergedBlock; + this.mergedBlockSize = + mergedBlockSize > 0 + ? mergedBlockSize + : this.rssConf.getSizeAsBytes( + SERVER_MERGE_DEFAULT_MERGED_BLOCK_SIZE.key(), + SERVER_MERGE_DEFAULT_MERGED_BLOCK_SIZE.defaultValue()); + offsets.add(0L); + } + + public OutputStream getOutputStream() { + return new MergedSegmentOutputStream(); + } + + public boolean isOutOfBound(long blockId) { + return blockId >= offsets.size(); + } + + public long getBlockSize(long blockId) { + return offsets.get((int) blockId) - offsets.get((int) (blockId - 1)); + } + + @FunctionalInterface + public interface CacheMergedBlockFuntion { + void cache(byte[] buffer, long blockId, int length); + } + + class MergedSegmentOutputStream extends OutputStream implements Recordable { + + ByteArrayOutputStream current; + + MergedSegmentOutputStream() { + current = new ByteArrayOutputStream((int) mergedBlockSize); + } + + @Override + public void write(int b) throws IOException { + current.write(b); + } + + @Override + public void close() throws IOException { + if (current != null) { + current.close(); + current = null; + } + } + + @Override + public boolean record(long written, Flushable flushable, boolean force) throws IOException { + assert written >= 0; + long currentOffsetInThisBlock = written - offsets.get(offsets.size() - 1); + if (currentOffsetInThisBlock >= mergedBlockSize || (currentOffsetInThisBlock > 0 && force)) { + if (flushable != null) { + flushable.flush(); + } + cachedMergedBlock.cache( + current.toByteArray(), offsets.size(), (int) (currentOffsetInThisBlock)); + offsets.add(written); + if (!force) { + current = new ByteArrayOutputStream((int) mergedBlockSize); + } + return true; + } + return false; + } + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/merge/Partition.java b/server/src/main/java/org/apache/uniffle/server/merge/Partition.java new file mode 100644 index 000000000..05e28ddaa --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/merge/Partition.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.merge; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import io.netty.buffer.ByteBuf; +import org.apache.hadoop.io.RawComparator; +import org.roaringbitmap.longlong.Roaring64NavigableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.ShuffleDataResult; +import org.apache.uniffle.common.ShuffleIndexResult; +import org.apache.uniffle.common.ShufflePartitionedBlock; +import org.apache.uniffle.common.ShufflePartitionedData; +import org.apache.uniffle.common.config.RssBaseConf; +import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.common.exception.FileNotFoundException; +import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.common.merger.MergeState; +import org.apache.uniffle.common.merger.Merger; +import org.apache.uniffle.common.merger.Segment; +import org.apache.uniffle.common.merger.StreamedSegment; +import org.apache.uniffle.common.netty.buffer.FileSegmentManagedBuffer; +import org.apache.uniffle.common.netty.buffer.ManagedBuffer; +import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer; +import org.apache.uniffle.common.rpc.StatusCode; +import org.apache.uniffle.common.util.JavaUtils; +import org.apache.uniffle.server.ShuffleDataReadEvent; +import org.apache.uniffle.storage.common.Storage; +import org.apache.uniffle.storage.handler.impl.LocalFileServerReadHandler; +import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest; +import org.apache.uniffle.storage.util.StorageType; + +import static org.apache.uniffle.common.merger.MergeState.DONE; +import static org.apache.uniffle.common.merger.MergeState.INITED; +import static org.apache.uniffle.common.merger.MergeState.INTERNAL_ERROR; +import static org.apache.uniffle.common.merger.MergeState.MERGING; +import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MERGE_BLOCK_RING_BUFFER_SIZE; +import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MERGE_CACHE_MERGED_BLOCK_INIT_SLEEP_MS; +import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MERGE_CACHE_MERGED_BLOCK_MAX_SLEEP_MS; +import static org.apache.uniffle.server.merge.ShuffleMergeManager.MERGE_APP_SUFFIX; + +public class Partition<K, V> { + + private static final Logger LOG = LoggerFactory.getLogger(Partition.class); + + private final Shuffle shuffle; + private final int partitionId; + // Inserting or deleting ShuffleBuffer::blocks while traversing blocks may cause an + // ConcurrentModificationException. + // So cache the block here. When we use the cached block, we should check refCnt so that we can + // make sure the ByteBuf + // is not released. + Map<Long, ShufflePartitionedBlock> cachedblockMap = JavaUtils.newConcurrentMap(); + Map<Long, ShufflePartitionedBlock> mergedBlockMap = JavaUtils.newConcurrentMap(); + + private MergeState state = MergeState.INITED; + private MergedResult result; + private ShuffleMeta shuffleMeta = new ShuffleMeta(); + + // These variable should be moved to ShuffleMergeManager, it is + // not necessary to use partition granularity + private final long initSleepTime; + private final long maxSleepTime; + private long sleepTime; + private int ringBufferSize; + private BlockFlushFileReader reader = null; + + public Partition(Shuffle shuffle, int partitionId) throws IOException { + this.shuffle = shuffle; + this.partitionId = partitionId; + this.result = + new MergedResult(shuffle.serverConf, this::cachedMergedBlock, shuffle.mergedBlockSize); + this.initSleepTime = shuffle.serverConf.get(SERVER_MERGE_CACHE_MERGED_BLOCK_INIT_SLEEP_MS); + this.maxSleepTime = shuffle.serverConf.get(SERVER_MERGE_CACHE_MERGED_BLOCK_MAX_SLEEP_MS); + int tmpRingBufferSize = shuffle.serverConf.get(SERVER_MERGE_BLOCK_RING_BUFFER_SIZE); + this.ringBufferSize = + Integer.highestOneBit((Math.min(32, Math.max(2, tmpRingBufferSize)) - 1) << 1); + if (tmpRingBufferSize != this.ringBufferSize) { + LOG.info( + "The ring buffer size will transient from {} to {}", + tmpRingBufferSize, + this.ringBufferSize); + } + } + + // startSortMerge is used to trigger to merger + synchronized void startSortMerge(Roaring64NavigableMap expectedBlockIdMap) throws IOException { + if (getState() != INITED) { + LOG.warn("Partition is already merging, so ignore duplicate reports, partition is {}", this); + } else { + if (!expectedBlockIdMap.isEmpty()) { + setState(MERGING); + MergeEvent event = + new MergeEvent( + shuffle.appId, + shuffle.shuffleId, + partitionId, + shuffle.kClass, + shuffle.vClass, + expectedBlockIdMap); + shuffle.eventHandler.handle(event); + } else { + setState(DONE); + } + } + } + + // getSegments is used to get segments from original shuffle blocks + public List<Segment> getSegments( + RssConf rssConf, Iterator<Long> blockIds, Class keyClass, Class valueClass) + throws IOException { + List<Segment> segments = new ArrayList<>(); + Set<Long> blocksFlushed = new HashSet<>(); + while (blockIds.hasNext()) { + long blockId = blockIds.next(); + ByteBuf buf = null; + if (cachedblockMap.containsKey(blockId)) { + buf = cachedblockMap.get(blockId).getData(); + } + if (buf != null && buf.refCnt() > 0) { + try { + StreamedSegment segment = + new StreamedSegment( + rssConf, + buf, + blockId, + keyClass, + valueClass, + (shuffle.comparator instanceof RawComparator)); + segments.add(segment); + } catch (Exception e) { + // If ByteBuf is released by flush cleanup before we retain in Segment, + // will throw ConcurrentModificationException. So we need get block buffer + // from file + LOG.warn("construct segment failed, caused by ", e); + blocksFlushed.add(blockId); + } + } else { + blocksFlushed.add(blockId); + } + } + if (blocksFlushed.isEmpty()) { + return segments; + } + try { + LocalFileServerReadHandler handler = getLocalFileServerReadHandler(rssConf, shuffle.appId); + this.reader = + new BlockFlushFileReader( + handler.getDataFileName(), handler.getIndexFileName(), ringBufferSize); + for (Long blockId : blocksFlushed) { + BlockFlushFileReader.BlockInputStream inputStream = + reader.registerBlockInputStream(blockId); + if (inputStream == null) { + throw new IOException("Can not find any buffer or file for block " + blockId); + } + segments.add( + new StreamedSegment( + rssConf, + inputStream, + blockId, + keyClass, + valueClass, + (shuffle.comparator instanceof RawComparator))); + } + return segments; + } catch (Throwable throwable) { + throw new IOException(throwable); + } + } + + void merge(List<Segment> segments) throws IOException { + try { + OutputStream outputStream = result.getOutputStream(); + Merger.merge( + shuffle.serverConf, + outputStream, + segments, + shuffle.kClass, + shuffle.vClass, + shuffle.comparator, + (shuffle.comparator instanceof RawComparator)); + setState(DONE); + } catch (Exception e) { + // TODO: should retry!!! + LOG.error("Partition {} remote merge failed, caused by {}", this, e); + setState(INTERNAL_ERROR); + throw new IOException(e); + } + } + + public void setState(MergeState state) { + if (LOG.isDebugEnabled()) { + LOG.debug("Partition is {}, transient from {} to {}.", this, this.state.name(), state.name()); + } + this.state = state; + } + + public MergeState getState() { + return state; + } + + // Input: The first value is state, the second value is fetch block size + // Output: left is the state, right is the blocks size that you can fetch + public MergeStatus tryGetBlock(long blockId) { + long size = -1L; + MergeState currentState = state; + if ((currentState == MERGING || currentState == DONE) && !result.isOutOfBound(blockId)) { + size = result.getBlockSize(blockId); + } + return new MergeStatus(currentState, size); + } + + public void cacheBlock(ShufflePartitionedBlock spb) { + cachedblockMap.put(spb.getBlockId(), spb); + } + + // When we merge data, we will divide the merge results into blocks according to the specified + // block size. + // The merged block in a new appId field (${appd} + MERGE_APP_SUFFIX). We will process the merged + // blocks in the + // original way, cache them first, and flush them to disk when necessary. + private void cachedMergedBlock(byte[] buffer, long blockId, int length) { + String appId = shuffle.appId + MERGE_APP_SUFFIX; + ShufflePartitionedBlock spb = + new ShufflePartitionedBlock(length, length, -1, blockId, -1, buffer); + ShufflePartitionedData spd = + new ShufflePartitionedData(partitionId, new ShufflePartitionedBlock[] {spb}); + while (true) { + StatusCode ret = + shuffle + .shuffleServer + .getShuffleTaskManager() + .cacheShuffleData(appId, shuffle.shuffleId, false, spd); + if (ret == StatusCode.SUCCESS) { + mergedBlockMap.put(blockId, spb); + shuffle + .shuffleServer + .getShuffleTaskManager() + .updateCachedBlockIds( + appId, shuffle.shuffleId, spd.getPartitionId(), spd.getBlockList()); + sleepTime = initSleepTime; + break; + } else if (ret == StatusCode.NO_BUFFER) { + try { + LOG.info( + "Can not allocate enough memory for " + + this + + ", then will sleep " + + sleepTime + + "ms"); + Thread.sleep(sleepTime); + sleepTime = Math.min(maxSleepTime, sleepTime * 2); + } catch (InterruptedException ex) { + throw new RssException(ex); + } + } else { + String shuffleDataInfo = + "appId[" + + appId + + "], shuffleId[" + + shuffle.shuffleId + + "], partitionId[" + + spd.getPartitionId() + + "]"; + throw new RssException( + "Error happened when shuffleEngine.write for " + + shuffleDataInfo + + ", statusCode=" + + ret); + } + } + } + + // get merged block + public ShuffleDataResult getShuffleData(long blockId) throws IOException { + // 1 Get result in memory + // For merged block, we read and merge at the same time. Blocks may be added during the + // traversal of blocks, + // then may throw ConcurrentModificationException. So use cache block in Partition. + ManagedBuffer managedBuffer = this.getMergedBlockBufferInMemory(blockId); + if (managedBuffer != null) { + return new ShuffleDataResult(managedBuffer); + } + + // 2 Get result in flush file if we can't find block in memory. + managedBuffer = this.getMergedBlockBufferInFile(shuffle.serverConf, blockId); + return new ShuffleDataResult(managedBuffer); + } + + private NettyManagedBuffer getMergedBlockBufferInMemory(long blockId) { + try { + ShufflePartitionedBlock block = this.mergedBlockMap.get(blockId); + // We must make sure refCnt > 0, it means the ByteBuf is not released by flush cleanup + if (block != null && block.getData().refCnt() > 0) { + return new NettyManagedBuffer(block.getData().retain()); + } + return null; + } catch (Exception e) { + // If release that is triggered by flush cleanup before we retain, may throw + // IllegalReferenceCountException. + // It means ByteBuf is not available, we must get the block buffer from file. + LOG.warn("Get ByteBuf from memory failed, cased by", e); + return null; + } + } + + private synchronized ManagedBuffer getMergedBlockBufferInFile(RssConf rssConf, long blockId) { + String appId = shuffle.appId + MERGE_APP_SUFFIX; + if (!shuffleMeta.getSegments().containsKey(blockId)) { + reloadShuffleMeta(rssConf, appId); + } + ShuffleMeta.Segment segment = shuffleMeta.getSegments().get(blockId); + if (segment != null) { + return new FileSegmentManagedBuffer( + new File(shuffleMeta.getDataFileName()), segment.getOffset(), segment.getLength()); + } + throw new RssException("Can not find block for blockId " + blockId); + } + + // The index file is constantly growing and needs to be reloaded when necessary. + private synchronized void reloadShuffleMeta(RssConf rssConf, String appId) { + ShuffleIndexResult indexResult = loadShuffleIndexResult(rssConf, appId); + shuffleMeta.setDataFileName(indexResult.getDataFileName()); + ByteBuffer indexData = indexResult.getIndexData(); + Map<Long, ShuffleMeta.Segment> segments = new HashMap<>(); + while (indexData.hasRemaining()) { + long offset = indexData.getLong(); + int length = indexData.getInt(); + int uncompressLength = indexData.getInt(); + long crc = indexData.getLong(); + long blockId = indexData.getLong(); + long taskAttemptId = indexData.getLong(); + segments.put(blockId, new ShuffleMeta.Segment(offset, length)); + } + shuffleMeta.getSegments().clear(); + shuffleMeta.getSegments().putAll(segments); + } + + private ShuffleIndexResult loadShuffleIndexResult(RssConf rssConf, String appId) { + CreateShuffleReadHandlerRequest request = new CreateShuffleReadHandlerRequest(); + request.setAppId(appId); + request.setShuffleId(shuffle.shuffleId); + request.setPartitionId(partitionId); + request.setPartitionNumPerRange(1); + request.setPartitionNum(Integer.MAX_VALUE); // ignore check partition number + request.setStorageType(StorageType.LOCALFILE.name()); + request.setRssBaseConf((RssBaseConf) rssConf); + Storage storage = + shuffle + .shuffleServer + .getStorageManager() + .selectStorage( + new ShuffleDataReadEvent(appId, shuffle.shuffleId, partitionId, partitionId)); + if (storage == null) { + throw new FileNotFoundException("No such data in current storage manager."); + } + ShuffleIndexResult index = storage.getOrCreateReadHandler(request).getShuffleIndex(); + return index; + } + + private LocalFileServerReadHandler getLocalFileServerReadHandler(RssConf rssConf, String appId) { + CreateShuffleReadHandlerRequest request = new CreateShuffleReadHandlerRequest(); + request.setAppId(appId); + request.setShuffleId(shuffle.shuffleId); + request.setPartitionId(partitionId); + request.setPartitionNumPerRange(1); + request.setPartitionNum(Integer.MAX_VALUE); // ignore check partition number + request.setStorageType(StorageType.LOCALFILE.name()); + request.setRssBaseConf((RssBaseConf) rssConf); + Storage storage = + shuffle + .shuffleServer + .getStorageManager() + .selectStorage( + new ShuffleDataReadEvent(appId, shuffle.shuffleId, partitionId, partitionId)); + if (storage == null) { + throw new FileNotFoundException("No such data in current storage manager."); + } + return (LocalFileServerReadHandler) storage.getOrCreateReadHandler(request); + } + + void cleanup() { + try { + if (reader != null) { + reader.close(); + } + cachedblockMap.clear(); + mergedBlockMap.clear(); + shuffleMeta.clear(); + } catch (Exception e) { + LOG.warn("Partition {} clean up failed, caused by {}", this, e); + } + } + + @Override + public String toString() { + return "Partition{" + + "appId=" + + shuffle.appId + + ", shuffle=" + + shuffle.shuffleId + + ", partitionId=" + + partitionId + + ", state=" + + state + + '}'; + } + + public static class ShuffleMeta { + + public static class Segment { + private long offset; + private int length; + + public Segment(long offset, int length) { + this.offset = offset; + this.length = length; + } + + public long getOffset() { + return offset; + } + + public int getLength() { + return length; + } + } + + private String dataFileName; + private Map<Long, Segment> segments = new HashMap(); + + public ShuffleMeta() {} + + public void setDataFileName(String dataFileName) { + this.dataFileName = dataFileName; + } + + public String getDataFileName() { + return dataFileName; + } + + public Map<Long, Segment> getSegments() { + return segments; + } + + public void clear() { + this.segments.clear(); + } + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java b/server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java new file mode 100644 index 000000000..92097eff6 --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.merge; + +import java.io.IOException; +import java.util.Comparator; +import java.util.Map; + +import com.google.common.annotations.VisibleForTesting; +import org.roaringbitmap.longlong.Roaring64NavigableMap; + +import org.apache.uniffle.common.ShufflePartitionedBlock; +import org.apache.uniffle.common.ShufflePartitionedData; +import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.common.util.JavaUtils; +import org.apache.uniffle.server.ShuffleServer; + +public class Shuffle<K, V> { + + final RssConf serverConf; + final String appId; + final int shuffleId; + final Class<K> kClass; + final Class<V> vClass; + final Comparator<K> comparator; + final MergeEventHandler eventHandler; + final ShuffleServer shuffleServer; + // partition id --> Partition + private final Map<Integer, Partition<K, V>> partitions = JavaUtils.newConcurrentMap(); + final int mergedBlockSize; + final ClassLoader classLoader; + + public Shuffle( + RssConf rssConf, + MergeEventHandler eventHandler, + ShuffleServer shuffleServer, + String appId, + int shuffleId, + Class<K> kClass, + Class<V> vClass, + Comparator<K> comparator, + int mergedBlockSize, + ClassLoader classLoader) { + this.serverConf = rssConf; + this.eventHandler = eventHandler; + this.shuffleServer = shuffleServer; + this.appId = appId; + this.shuffleId = shuffleId; + this.kClass = kClass; + this.vClass = vClass; + this.comparator = comparator; + this.mergedBlockSize = mergedBlockSize; + this.classLoader = classLoader; + } + + public void startSortMerge(int partitionId, Roaring64NavigableMap expectedBlockIdMap) + throws IOException { + this.partitions.putIfAbsent(partitionId, new Partition<K, V>(this, partitionId)); + this.partitions.get(partitionId).startSortMerge(expectedBlockIdMap); + } + + void cleanup() { + for (Partition partition : this.partitions.values()) { + partition.cleanup(); + } + this.partitions.clear(); + } + + public void cacheBlock(ShufflePartitionedData spd) throws IOException { + int partitionId = spd.getPartitionId(); + this.partitions.putIfAbsent(partitionId, new Partition<K, V>(this, partitionId)); + for (ShufflePartitionedBlock block : spd.getBlockList()) { + this.partitions.get(partitionId).cacheBlock(block); + } + } + + public ClassLoader getClassLoader() { + return classLoader; + } + + @VisibleForTesting + Partition getPartition(int partition) { + return this.partitions.get(partition); + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java b/server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java new file mode 100644 index 000000000..2024459d5 --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.merge; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.net.URLClassLoader; +import java.security.AccessController; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang3.ClassUtils; +import org.apache.commons.lang3.StringUtils; +import org.roaringbitmap.longlong.Roaring64NavigableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.ShuffleDataResult; +import org.apache.uniffle.common.ShufflePartitionedData; +import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.common.merger.Segment; +import org.apache.uniffle.common.rpc.StatusCode; +import org.apache.uniffle.common.util.JavaUtils; +import org.apache.uniffle.server.ShuffleServer; +import org.apache.uniffle.server.ShuffleServerConf; + +import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MERGE_CLASS_LOADER_JARS_PATH; + +public class ShuffleMergeManager { + + private static final Logger LOG = LoggerFactory.getLogger(ShuffleMergeManager.class); + public static final String MERGE_APP_SUFFIX = "@RemoteMerge"; + + private ShuffleServerConf serverConf; + private final ShuffleServer shuffleServer; + // appId -> shuffleid -> Shuffle + private final Map<String, Map<Integer, Shuffle>> shuffles = JavaUtils.newConcurrentMap(); + private final MergeEventHandler eventHandler; + private final Map<String, ClassLoader> cachedClassLoader = new HashMap<>(); + + // If comparator is not set, will use hashCode to compare. It is used for shuffle that does not + // require + // sort but require combine. + private Comparator defaultComparator = + new Comparator() { + @Override + public int compare(Object o1, Object o2) { + int h1 = (o1 == null) ? 0 : o1.hashCode(); + int h2 = (o2 == null) ? 0 : o2.hashCode(); + return h1 < h2 ? -1 : h1 == h2 ? 0 : 1; + } + }; + + public ShuffleMergeManager(ShuffleServerConf serverConf, ShuffleServer shuffleServer) + throws Exception { + this.serverConf = serverConf; + this.shuffleServer = shuffleServer; + this.eventHandler = new DefaultMergeEventHandler(this.serverConf, this::processEvent); + initCacheClassLoader(); + } + + public void initCacheClassLoader() throws Exception { + addCacheClassLoader("", serverConf.getString(SERVER_MERGE_CLASS_LOADER_JARS_PATH)); + Map<String, Object> props = + serverConf.getPropsWithPrefix(SERVER_MERGE_CLASS_LOADER_JARS_PATH.key() + "."); + for (Map.Entry<String, Object> prop : props.entrySet()) { + addCacheClassLoader(prop.getKey(), (String) prop.getValue()); + } + } + + public void addCacheClassLoader(String label, String jarsPath) throws Exception { + if (StringUtils.isNotBlank(jarsPath)) { + File jarsPathFile = new File(jarsPath); + if (jarsPathFile.exists()) { + if (jarsPathFile.isFile()) { + URLClassLoader urlClassLoader = + AccessController.doPrivileged( + new PrivilegedExceptionAction<URLClassLoader>() { + @Override + public URLClassLoader run() throws Exception { + return new URLClassLoader( + new URL[] {new URL("file://" + jarsPath)}, + Thread.currentThread().getContextClassLoader()); + } + }); + cachedClassLoader.put(label, urlClassLoader); + } else if (jarsPathFile.isDirectory()) { + File[] files = jarsPathFile.listFiles(); + List<URL> urlList = new ArrayList<>(); + if (files != null) { + for (File file : files) { + if (file.getName().endsWith(".jar")) { + urlList.add(new URL("file://" + file.getAbsolutePath())); + } + } + } + URLClassLoader urlClassLoader = + AccessController.doPrivileged( + new PrivilegedExceptionAction<URLClassLoader>() { + @Override + public URLClassLoader run() throws Exception { + return new URLClassLoader( + urlList.toArray(new URL[urlList.size()]), + Thread.currentThread().getContextClassLoader()); + } + }); + cachedClassLoader.put(label, urlClassLoader); + } else { + // If not set, will use current thread classloader + cachedClassLoader.put(label, Thread.currentThread().getContextClassLoader()); + } + } + } else { + // If not set, will use current thread classloader + cachedClassLoader.put(label, Thread.currentThread().getContextClassLoader()); + } + } + + public ClassLoader getClassLoader(String label) { + if (StringUtils.isBlank(label)) { + return cachedClassLoader.get(""); + } + return cachedClassLoader.getOrDefault(label, cachedClassLoader.get("")); + } + + public StatusCode registerShuffle( + String appId, + int shuffleId, + String keyClassName, + String valueClassName, + String comparatorClassName, + int mergedBlockSize, + String classLoaderLabel) { + try { + ClassLoader classLoader = getClassLoader(classLoaderLabel); + Class kClass = ClassUtils.getClass(classLoader, keyClassName); + Class vClass = ClassUtils.getClass(classLoader, valueClassName); + Comparator comparator; + if (StringUtils.isNotBlank(comparatorClassName)) { + Constructor constructor = + ClassUtils.getClass(classLoader, comparatorClassName).getDeclaredConstructor(); + constructor.setAccessible(true); + comparator = (Comparator) constructor.newInstance(); + } else { + comparator = defaultComparator; + } + this.shuffles.putIfAbsent(appId, JavaUtils.newConcurrentMap()); + this.shuffles + .get(appId) + .putIfAbsent( + shuffleId, + new Shuffle( + serverConf, + eventHandler, + shuffleServer, + appId, + shuffleId, + kClass, + vClass, + comparator, + mergedBlockSize, + classLoader)); + } catch (ClassNotFoundException + | InstantiationException + | IllegalAccessException + | NoSuchMethodException + | InvocationTargetException e) { + LOG.info("Cant register shuffle, caused by ", e); + removeBuffer(appId, shuffleId); + return StatusCode.INTERNAL_ERROR; + } + return StatusCode.SUCCESS; + } + + public void removeBuffer(String appId) { + if (this.shuffles.containsKey(appId)) { + for (Integer shuffleId : this.shuffles.get(appId).keySet()) { + removeBuffer(appId, shuffleId); + } + } + } + + public void removeBuffer(String appId, List<Integer> shuffleIds) { + if (this.shuffles.containsKey(appId)) { + for (Integer shuffleId : shuffleIds) { + removeBuffer(appId, shuffleId); + } + } + } + + public void removeBuffer(String appId, int shuffleId) { + if (this.shuffles.containsKey(appId)) { + if (this.shuffles.get(appId).containsKey(shuffleId)) { + this.shuffles.get(appId).get(shuffleId).cleanup(); + this.shuffles.get(appId).remove(shuffleId); + } + if (this.shuffles.get(appId).size() == 0) { + this.shuffles.remove(appId); + } + } + } + + public void startSortMerge( + String appId, int shuffleId, int partitionId, Roaring64NavigableMap expectedBlockIdMap) + throws IOException { + Map<Integer, Shuffle> shuffleMap = this.shuffles.get(appId); + if (shuffleMap != null) { + Shuffle shuffle = shuffleMap.get(shuffleId); + if (shuffle != null) { + shuffle.startSortMerge(partitionId, expectedBlockIdMap); + } + } + } + + public void processEvent(MergeEvent event) { + try { + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread() + .setContextClassLoader( + this.getShuffle(event.getAppId(), event.getShuffleId()).getClassLoader()); + List<Segment> segments = + this.getPartition(event.getAppId(), event.getShuffleId(), event.getPartitionId()) + .getSegments( + serverConf, + event.getExpectedBlockIdMap().iterator(), + event.getKeyClass(), + event.getValueClass()); + this.getPartition(event.getAppId(), event.getShuffleId(), event.getPartitionId()) + .merge(segments); + Thread.currentThread().setContextClassLoader(original); + } catch (Exception e) { + LOG.info("Found exception when merge, caused by ", e); + throw new RssException(e); + } + } + + public ShuffleDataResult getShuffleData( + String appId, int shuffleId, int partitionId, long blockId) throws IOException { + return this.getPartition(appId, shuffleId, partitionId).getShuffleData(blockId); + } + + public void cacheBlock(String appId, int shuffleId, ShufflePartitionedData spd) + throws IOException { + if (this.shuffles.containsKey(appId) && this.shuffles.get(appId).containsKey(shuffleId)) { + this.getShuffle(appId, shuffleId).cacheBlock(spd); + } + } + + public MergeStatus tryGetBlock(String appId, int shuffleId, int partitionId, long blockId) { + return this.getPartition(appId, shuffleId, partitionId).tryGetBlock(blockId); + } + + @VisibleForTesting + MergeEventHandler getEventHandler() { + return eventHandler; + } + + Shuffle getShuffle(String appId, int shuffleId) { + return this.shuffles.get(appId).get(shuffleId); + } + + @VisibleForTesting + Partition getPartition(String appId, int shuffleId, int partitionId) { + return this.shuffles.get(appId).get(shuffleId).getPartition(partitionId); + } + + public void refreshAppId(String appId) { + shuffleServer.getShuffleTaskManager().refreshAppId(appId + MERGE_APP_SUFFIX); + } +} diff --git a/server/src/test/java/org/apache/uniffle/server/merge/BlockFlushFileReaderTest.java b/server/src/test/java/org/apache/uniffle/server/merge/BlockFlushFileReaderTest.java new file mode 100644 index 000000000..534c74b7d --- /dev/null +++ b/server/src/test/java/org/apache/uniffle/server/merge/BlockFlushFileReaderTest.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.merge; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.collect.Lists; +import org.apache.hadoop.io.RawComparator; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import org.apache.uniffle.common.ShufflePartitionedBlock; +import org.apache.uniffle.common.config.RssBaseConf; +import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.common.merger.Merger; +import org.apache.uniffle.common.merger.Segment; +import org.apache.uniffle.common.merger.StreamedSegment; +import org.apache.uniffle.common.records.RecordsReader; +import org.apache.uniffle.common.serializer.PartialInputStream; +import org.apache.uniffle.common.serializer.PartialInputStreamImpl; +import org.apache.uniffle.common.serializer.SerializerUtils; +import org.apache.uniffle.common.util.BlockIdLayout; +import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler; +import org.apache.uniffle.storage.handler.impl.LocalFileServerReadHandler; +import org.apache.uniffle.storage.handler.impl.LocalFileWriteHandler; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class BlockFlushFileReaderTest { + + private static AtomicInteger ATOMIC_INT = new AtomicInteger(0); + + @ParameterizedTest + @ValueSource( + strings = { + "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,2", + "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,4", + "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,32", + }) + public void writeTestWithMerge(String classes, @TempDir File tmpDir) throws Exception { + final String[] classArray = classes.split(","); + final Class keyClass = SerializerUtils.getClassByName(classArray[0]); + final Class valueClass = SerializerUtils.getClassByName(classArray[1]); + final Comparator comparator = SerializerUtils.getComparator(keyClass); + final int ringBufferSize = Integer.parseInt(classArray[2]); + + final File dataOutput = new File(tmpDir, "dataOutput"); + final File dataDir = new File(tmpDir, "data"); + final String[] basePaths = new String[] {dataDir.getAbsolutePath()}; + final LocalFileWriteHandler writeHandler1 = + new LocalFileWriteHandler("appId", 0, 1, 1, basePaths[0], "pre"); + + RssBaseConf conf = new RssBaseConf(); + conf.setString("rss.storage.basePath", dataDir.getAbsolutePath()); + final Set<Long> expectedBlockIds = new HashSet<>(); + for (int i = 0; i < 10; i++) { + writeTestData( + generateBlocks(conf, keyClass, valueClass, i, 10, 10090), + writeHandler1, + expectedBlockIds); + } + + LocalFileServerReadHandler readHandler = + new LocalFileServerReadHandler("appId", 0, 1, 1, 10, dataDir.getAbsolutePath()); + String dataFileName = readHandler.getDataFileName(); + String indexFileName = readHandler.getIndexFileName(); + + BlockFlushFileReader blockFlushFileReader = + new BlockFlushFileReader(dataFileName, indexFileName, ringBufferSize); + + List<Segment> segments = new ArrayList<>(); + for (Long blockId : expectedBlockIds) { + PartialInputStream partialInputStream = + blockFlushFileReader.registerBlockInputStream(blockId); + segments.add( + new StreamedSegment( + conf, + partialInputStream, + blockId, + keyClass, + valueClass, + comparator instanceof RawComparator)); + } + FileOutputStream outputStream = new FileOutputStream(dataOutput); + Merger.merge( + conf, + outputStream, + segments, + keyClass, + valueClass, + comparator, + comparator instanceof RawComparator); + outputStream.close(); + + int index = 0; + RecordsReader reader = + new RecordsReader( + conf, + PartialInputStreamImpl.newInputStream(dataOutput, 0, dataOutput.length()), + keyClass, + valueClass, + false); + while (reader.next()) { + assertEquals(SerializerUtils.genData(keyClass, index), reader.getCurrentKey()); + assertEquals(SerializerUtils.genData(valueClass, index), reader.getCurrentValue()); + index++; + } + assertEquals(100900, index); + } + + public static void writeTestData( + List<ShufflePartitionedBlock> blocks, ShuffleWriteHandler handler, Set<Long> expectedBlockIds) + throws Exception { + blocks.forEach(block -> block.getData().retain()); + handler.write(blocks); + blocks.forEach(block -> expectedBlockIds.add(block.getBlockId())); + blocks.forEach(block -> block.getData().release()); + } + + public static List<ShufflePartitionedBlock> generateBlocks( + RssConf rssConf, Class keyClass, Class valueClass, int start, int interval, int length) + throws IOException { + BlockIdLayout layout = BlockIdLayout.DEFAULT; + List<ShufflePartitionedBlock> blocks = Lists.newArrayList(); + byte[] bytes = + SerializerUtils.genSortedRecordBytes( + rssConf, keyClass, valueClass, start, interval, length, 1); + long blockId = layout.getBlockId(ATOMIC_INT.incrementAndGet(), 0, 100); + blocks.add(new ShufflePartitionedBlock(bytes.length, bytes.length, 0, blockId, 100, bytes)); + return blocks; + } + + @Timeout(20) + @ParameterizedTest + @ValueSource( + strings = { + "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,2", + }) + public void writeTestWithMergeWhenInterrupted(String classes, @TempDir File tmpDir) + throws Exception { + String[] classArray = classes.split(","); + Class keyClass = SerializerUtils.getClassByName(classArray[0]); + Class valueClass = SerializerUtils.getClassByName(classArray[1]); + Comparator comparator = SerializerUtils.getComparator(keyClass); + int ringBufferSize = Integer.parseInt(classArray[2]); + + File dataDir = new File(tmpDir, "data"); + String[] basePaths = new String[] {dataDir.getAbsolutePath()}; + final LocalFileWriteHandler writeHandler1 = + new LocalFileWriteHandler("appId", 0, 1, 1, basePaths[0], "pre"); + + RssBaseConf conf = new RssBaseConf(); + conf.setString("rss.storage.basePath", dataDir.getAbsolutePath()); + final Set<Long> expectedBlockIds = new HashSet<>(); + for (int i = 0; i < 10; i++) { + writeTestData( + generateBlocks(conf, keyClass, valueClass, i, 10, 10090), + writeHandler1, + expectedBlockIds); + } + + File dataOutput = new File(tmpDir, "dataOutput"); + LocalFileServerReadHandler readHandler = + new LocalFileServerReadHandler("appId", 0, 1, 1, 10, dataDir.getAbsolutePath()); + String dataFileName = readHandler.getDataFileName(); + String indexFileName = readHandler.getIndexFileName(); + + BlockFlushFileReader blockFlushFileReader = + new BlockFlushFileReader(dataFileName, indexFileName, ringBufferSize); + + List<Segment> segments = new ArrayList<>(); + for (Long blockId : expectedBlockIds) { + PartialInputStream partialInputStream = + blockFlushFileReader.registerBlockInputStream(blockId); + segments.add( + new MockedStreamedSegment( + conf, + partialInputStream, + blockId, + keyClass, + valueClass, + comparator instanceof RawComparator, + blockFlushFileReader)); + } + + FileOutputStream outputStream = new FileOutputStream(dataOutput); + assertThrows( + Exception.class, + () -> { + Merger.merge( + conf, + outputStream, + segments, + keyClass, + valueClass, + comparator, + comparator instanceof RawComparator); + }); + outputStream.close(); + } + + class MockedStreamedSegment extends StreamedSegment { + + BlockFlushFileReader reader; + int count; + + MockedStreamedSegment( + RssConf rssConf, + PartialInputStream inputStream, + long blockId, + Class keyClass, + Class valueClass, + boolean raw, + BlockFlushFileReader reader) { + super(rssConf, inputStream, blockId, keyClass, valueClass, raw); + this.reader = reader; + } + + public boolean next() throws IOException { + boolean ret = super.next(); + if (this.count++ > 200) { + try { + this.reader.close(); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + return ret; + } + } +} diff --git a/server/src/test/java/org/apache/uniffle/server/merge/MergedResultTest.java b/server/src/test/java/org/apache/uniffle/server/merge/MergedResultTest.java new file mode 100644 index 000000000..e8b09a5fd --- /dev/null +++ b/server/src/test/java/org/apache/uniffle/server/merge/MergedResultTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.merge; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.io.RawComparator; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.common.merger.Merger; +import org.apache.uniffle.common.merger.Recordable; +import org.apache.uniffle.common.merger.Segment; +import org.apache.uniffle.common.records.RecordsReader; +import org.apache.uniffle.common.serializer.PartialInputStreamImpl; +import org.apache.uniffle.common.serializer.SerializerUtils; + +import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MERGE_DEFAULT_MERGED_BLOCK_SIZE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class MergedResultTest { + + private static final int BYTES_LEN = 10240; + private static final int RECORDS = 1009; + private static final int SEGMENTS = 4; + + @Test + public void testMergedResult() throws IOException { + // 1 Construct cache + List<Pair<Integer, byte[]>> blocks = new ArrayList<>(); + MergedResult.CacheMergedBlockFuntion cache = + (byte[] buffer, long blockId, int length) -> { + assertEquals(blockId - 1, blocks.size()); + blocks.add(Pair.of(length, buffer)); + }; + + // 2 Write to merged result + RssConf rssConf = new RssConf(); + rssConf.set(SERVER_MERGE_DEFAULT_MERGED_BLOCK_SIZE, String.valueOf(BYTES_LEN / 10)); + MergedResult result = new MergedResult(rssConf, cache, -1); + OutputStream output = result.getOutputStream(); + for (int i = 0; i < BYTES_LEN; i++) { + output.write((byte) (i & 0x7F)); + if (output instanceof Recordable) { + ((Recordable) output).record(i + 1, null, false); + } + } + output.close(); + + // 3 check blocks number + // Max merged block is 1024, every record have 2 bytes, so will result to 10 block + assertEquals(10, blocks.size()); + + // 4 check the blocks + int index = 0; + for (int i = 0; i < blocks.size(); i++) { + int length = blocks.get(i).getLeft(); + byte[] buffer = blocks.get(i).getRight(); + assertTrue(buffer.length >= length); + for (int j = 0; j < length; j++) { + assertEquals(index & 0x7F, buffer[j]); + index++; + } + } + assertEquals(BYTES_LEN, index); + } + + @ParameterizedTest + @ValueSource( + strings = { + "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable", + }) + public void testMergeSegmentToMergeResult(String classes, @TempDir File tmpDir) throws Exception { + // 1 Parse arguments + String[] classArray = classes.split(","); + Class keyClass = SerializerUtils.getClassByName(classArray[0]); + Class valueClass = SerializerUtils.getClassByName(classArray[1]); + + // 2 Construct cache + List<Pair<Integer, byte[]>> blocks = new ArrayList<>(); + MergedResult.CacheMergedBlockFuntion cache = + (byte[] buffer, long blockId, int length) -> { + assertEquals(blockId - 1, blocks.size()); + blocks.add(Pair.of(length, buffer)); + }; + + // 3 Construct segments, then merge + RssConf rssConf = new RssConf(); + List<Segment> segments = new ArrayList<>(); + Comparator comparator = SerializerUtils.getComparator(keyClass); + for (int i = 0; i < SEGMENTS; i++) { + if (i % 2 == 0) { + segments.add( + SerializerUtils.genMemorySegment( + rssConf, + keyClass, + valueClass, + i, + i, + SEGMENTS, + RECORDS, + comparator instanceof RawComparator)); + } else { + segments.add( + SerializerUtils.genFileSegment( + rssConf, + keyClass, + valueClass, + i, + i, + SEGMENTS, + RECORDS, + tmpDir, + comparator instanceof RawComparator)); + } + } + MergedResult result = new MergedResult(rssConf, cache, -1); + OutputStream mergedOutputStream = result.getOutputStream(); + Merger.merge( + rssConf, + mergedOutputStream, + segments, + keyClass, + valueClass, + comparator, + comparator instanceof RawComparator); + mergedOutputStream.flush(); + mergedOutputStream.close(); + + // 4 check merged blocks + int index = 0; + for (int i = 0; i < blocks.size(); i++) { + int length = blocks.get(i).getLeft(); + byte[] buffer = blocks.get(i).getRight(); + RecordsReader reader = + new RecordsReader( + rssConf, + PartialInputStreamImpl.newInputStream(buffer, 0, length), + keyClass, + valueClass, + false); + while (reader.next()) { + assertEquals(SerializerUtils.genData(keyClass, index), reader.getCurrentKey()); + assertEquals(SerializerUtils.genData(valueClass, index), reader.getCurrentValue()); + index++; + } + reader.close(); + } + assertEquals(RECORDS * SEGMENTS, index); + } +} diff --git a/server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java b/server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java new file mode 100644 index 000000000..4b545d3a7 --- /dev/null +++ b/server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.merge; + +import java.io.File; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.ImmutableMap; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.roaringbitmap.longlong.Roaring64NavigableMap; + +import org.apache.uniffle.common.PartitionRange; +import org.apache.uniffle.common.RemoteStorageInfo; +import org.apache.uniffle.common.ShuffleDataResult; +import org.apache.uniffle.common.ShufflePartitionedBlock; +import org.apache.uniffle.common.ShufflePartitionedData; +import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.common.merger.MergeState; +import org.apache.uniffle.common.records.RecordsReader; +import org.apache.uniffle.common.serializer.PartialInputStreamImpl; +import org.apache.uniffle.common.serializer.SerializerUtils; +import org.apache.uniffle.common.serializer.writable.WritableSerializer; +import org.apache.uniffle.common.util.BlockIdLayout; +import org.apache.uniffle.server.ShuffleServer; +import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.server.ShuffleServerMetrics; +import org.apache.uniffle.server.ShuffleTaskManager; +import org.apache.uniffle.storage.util.StorageType; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ShuffleMergeManagerTest { + + private static final String APP_ID = "app1"; + private static final int SHUFFLE_ID = 1; + private static final int PARTITION_ID = 2; + private static final int RECORDS_NUMBER = 1009; + private static final String USER = "testUser"; + + private ShuffleServer shuffleServer; + ShuffleServerConf serverConf; + + @TempDir File tempDir1; + @TempDir File tempDir2; + + @BeforeEach + public void beforeEach() { + String confFile = ClassLoader.getSystemResource("server.conf").getFile(); + serverConf = new ShuffleServerConf(confFile); + serverConf.setString( + ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE.name()); + serverConf.setString( + ShuffleServerConf.RSS_STORAGE_BASE_PATH.key(), + tempDir1.getAbsolutePath() + "," + tempDir2.getAbsolutePath()); + serverConf.setLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 60L * 1000L * 60L); + serverConf.set(ShuffleServerConf.SERVER_MERGE_ENABLE, true); + ShuffleServerMetrics.clear(); + ShuffleServerMetrics.register(); + assertTrue(this.tempDir1.isDirectory()); + assertTrue(this.tempDir2.isDirectory()); + } + + @AfterEach + public void afterEach() throws Exception { + serverConf = null; + if (shuffleServer != null) { + shuffleServer.stopServer(); + shuffleServer = null; + } + } + + @Timeout(10) + @ParameterizedTest + @ValueSource( + strings = { + "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable", + }) + public void testMergerManager(String classes, @TempDir File tmpDir) throws Exception { + // 1 Construct serializer and comparator + final String[] classArray = classes.split(","); + final String keyClassName = classArray[0]; + final String valueClassName = classArray[1]; + final Class keyClass = SerializerUtils.getClassByName(keyClassName); + final Class valueClass = SerializerUtils.getClassByName(valueClassName); + final Comparator comparator = SerializerUtils.getComparator(keyClass); + final String comparatorClassName = comparator.getClass().getName(); + final WritableSerializer serializer = new WritableSerializer(new RssConf()); + + // 2 Construct shuffle task manager and merge manager + shuffleServer = new ShuffleServer(serverConf); + final ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); + final ShuffleMergeManager mergeManager = shuffleServer.getShuffleMergeManager(); + + // 3 register shuffle + List<PartitionRange> partitionRanges = new ArrayList<>(); + partitionRanges.add(new PartitionRange(PARTITION_ID, PARTITION_ID)); + shuffleTaskManager.registerShuffle( + APP_ID, SHUFFLE_ID, partitionRanges, new RemoteStorageInfo(""), USER); + shuffleTaskManager.registerShuffle( + APP_ID + ShuffleMergeManager.MERGE_APP_SUFFIX, + SHUFFLE_ID, + partitionRanges, + new RemoteStorageInfo(""), + USER); + mergeManager.registerShuffle( + APP_ID, SHUFFLE_ID, keyClassName, valueClassName, comparatorClassName, -1, ""); + + // 4 report blocks + // 4.1 send shuffle data + // Upstream have 2 task, each task generate 2 blocks + BlockIdLayout blockIdLayout = BlockIdLayout.from(serverConf); + long[] blocks = new long[4]; + blocks[0] = blockIdLayout.getBlockId(0, PARTITION_ID, 0); + blocks[1] = blockIdLayout.getBlockId(1, PARTITION_ID, 0); + blocks[2] = blockIdLayout.getBlockId(0, PARTITION_ID, 1); + blocks[3] = blockIdLayout.getBlockId(1, PARTITION_ID, 1); + ShufflePartitionedBlock[] shufflePartitionedBlocks = new ShufflePartitionedBlock[4]; + for (int i = 0; i < 4; i++) { + byte[] buffer = + SerializerUtils.genSortedRecordBytes( + serverConf, keyClass, valueClass, i, 4, RECORDS_NUMBER, 1); + shufflePartitionedBlocks[i] = + new ShufflePartitionedBlock( + buffer.length, + buffer.length, + 0, + blocks[i], + blockIdLayout.getTaskAttemptId(blocks[i]), + buffer); + } + ShufflePartitionedData spd = new ShufflePartitionedData(PARTITION_ID, shufflePartitionedBlocks); + shuffleTaskManager.cacheShuffleData(APP_ID, SHUFFLE_ID, false, spd); + mergeManager.cacheBlock(APP_ID, SHUFFLE_ID, spd); + // 4.2 report shuffle result + shuffleTaskManager.addFinishedBlockIds( + APP_ID, SHUFFLE_ID, ImmutableMap.of(PARTITION_ID, blocks), 1); + // 4.3 report unique blockIds + Roaring64NavigableMap blockIdMap = Roaring64NavigableMap.bitmapOf(); + blockIdMap.add(blocks); + mergeManager.startSortMerge(APP_ID, SHUFFLE_ID, PARTITION_ID, blockIdMap); + + // 4 wait for drain event + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .until(() -> mergeManager.getEventHandler().getEventNumInMerge() == 0); + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .until( + () -> + mergeManager.getPartition(APP_ID, SHUFFLE_ID, PARTITION_ID).getState() + == MergeState.DONE); + + // 5 read and check result + int blockId = 1; + int index = 0; + boolean finish = false; + while (!finish) { + MergeStatus mergeStatus = mergeManager.tryGetBlock(APP_ID, SHUFFLE_ID, PARTITION_ID, blockId); + MergeState mergeState = mergeStatus.getState(); + long blockSize = mergeStatus.getSize(); + switch (mergeState) { + case INITED: + case MERGING: + case INTERNAL_ERROR: + fail("Find wrong merge state!"); + break; + case DONE: + if (blockSize != -1) { + ShuffleDataResult shuffleDataResult = + mergeManager.getShuffleData(APP_ID, SHUFFLE_ID, PARTITION_ID, blockId); + PartialInputStreamImpl inputStream = + PartialInputStreamImpl.newInputStream( + shuffleDataResult.getData(), 0, shuffleDataResult.getDataLength()); + RecordsReader reader = + new RecordsReader(serverConf, inputStream, keyClass, valueClass, false); + while (reader.next()) { + assertEquals(SerializerUtils.genData(keyClass, index), reader.getCurrentKey()); + assertEquals(SerializerUtils.genData(valueClass, index), reader.getCurrentValue()); + index++; + } + shuffleDataResult.release(); + blockId++; + break; + } else { + finish = true; + break; + } + default: + fail("Find invalid merge state!"); + } + } + assertEquals(RECORDS_NUMBER * 4, index); + + // 8 cleanup + mergeManager.removeBuffer(APP_ID, SHUFFLE_ID); + } +} diff --git a/server/src/test/resources/log4j2.xml b/server/src/test/resources/log4j2.xml new file mode 100644 index 000000000..d26fb6958 --- /dev/null +++ b/server/src/test/resources/log4j2.xml @@ -0,0 +1,29 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. +--> +<Configuration status="WARN" monitorInterval="30"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%t] [%p] %c{1}.%M - %m%n%ex"/> + </Console> + </Appenders> + <Loggers> + <Root level="info"> + <AppenderRef ref="Console"/> + </Root> + </Loggers> +</Configuration> diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java index f688a18bc..4eb7e2d52 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java @@ -149,6 +149,15 @@ public class LocalFileServerReadHandler implements ServerReadHandler { } // get dataFileSize for read segment generation in DataSkippableReadHandler#readShuffleData long dataFileSize = new File(dataFileName).length(); - return new ShuffleIndexResult(new FileSegmentManagedBuffer(indexFile, 0, len), dataFileSize); + return new ShuffleIndexResult( + new FileSegmentManagedBuffer(indexFile, 0, len), dataFileSize, dataFileName); + } + + public String getDataFileName() { + return dataFileName; + } + + public String getIndexFileName() { + return indexFileName; } }