This is an automated email from the ASF dual-hosted git repository. rickyma pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new d32542bae [#1847] refactor: Remove capacity and related codes within AbstractShuffleBuffer (#1853) d32542bae is described below commit d32542bae04b6be99cb82e13ffa90c5d1d50c837 Author: maobaolong <baoloong...@tencent.com> AuthorDate: Tue Jul 2 19:35:05 2024 +0800 [#1847] refactor: Remove capacity and related codes within AbstractShuffleBuffer (#1853) ### What changes were proposed in this pull request? Remove `capacity` and related codes within `AbstractShuffleBuffer`. ### Why are the changes needed? Fix: #1847. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. --- .../server/buffer/AbstractShuffleBuffer.java | 9 +------ .../uniffle/server/buffer/ShuffleBuffer.java | 2 -- .../server/buffer/ShuffleBufferManager.java | 6 ++--- .../server/buffer/ShuffleBufferWithLinkedList.java | 3 +-- .../server/buffer/ShuffleBufferWithSkipList.java | 3 +-- .../buffer/ShuffleBufferWithLinkedListTest.java | 28 ++++++++++------------ .../buffer/ShuffleBufferWithSkipListTest.java | 12 ++++------ 7 files changed, 21 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java index fd5a62280..ddbeb21cf 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java @@ -39,11 +39,9 @@ public abstract class AbstractShuffleBuffer implements ShuffleBuffer { private static final Logger LOG = LoggerFactory.getLogger(AbstractShuffleBuffer.class); - private final long capacity; protected long size; - public AbstractShuffleBuffer(long capacity) { - this.capacity = capacity; + public AbstractShuffleBuffer() { this.size = 0; } @@ -69,11 +67,6 @@ public abstract class AbstractShuffleBuffer implements ShuffleBuffer { return size; } - @Override - public boolean isFull() { - return size > capacity; - } - @Override public synchronized ShuffleDataResult getShuffleData(long lastBlockId, int readBufferSize) { return getShuffleData(lastBlockId, readBufferSize, null); diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java index 1884a50c0..f0d4dadb4 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java @@ -52,8 +52,6 @@ public interface ShuffleBuffer { long getSize(); - boolean isFull(); - /** Only for test */ List<ShufflePartitionedBlock> getBlocks(); diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java index c62ebd8c5..ed0c2136d 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java @@ -74,8 +74,6 @@ public class ShuffleBufferManager { // Huge partition vars private ReconfigurableConfManager.Reconfigurable<Long> hugePartitionSizeThresholdRef; private long hugePartitionMemoryLimitSize; - - protected long bufferSize = 0; protected AtomicLong preAllocatedSize = new AtomicLong(0L); protected AtomicLong inFlushSize = new AtomicLong(0L); protected AtomicLong usedMemory = new AtomicLong(0L); @@ -156,9 +154,9 @@ public class ShuffleBufferManager { ShuffleServerMetrics.gaugeTotalPartitionNum.inc(); ShuffleBuffer shuffleBuffer; if (shuffleBufferType == ShuffleBufferType.SKIP_LIST) { - shuffleBuffer = new ShuffleBufferWithSkipList(bufferSize); + shuffleBuffer = new ShuffleBufferWithSkipList(); } else { - shuffleBuffer = new ShuffleBufferWithLinkedList(bufferSize); + shuffleBuffer = new ShuffleBufferWithLinkedList(); } bufferRangeMap.put(Range.closed(startPartition, endPartition), shuffleBuffer); } else { diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java index d37dc446f..3f5ff900c 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java @@ -42,8 +42,7 @@ public class ShuffleBufferWithLinkedList extends AbstractShuffleBuffer { private List<ShufflePartitionedBlock> blocks; private Map<Long, List<ShufflePartitionedBlock>> inFlushBlockMap; - public ShuffleBufferWithLinkedList(long capacity) { - super(capacity); + public ShuffleBufferWithLinkedList() { this.blocks = new LinkedList<>(); this.inFlushBlockMap = JavaUtils.newConcurrentMap(); } diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java index db0ce2543..e7630b434 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java @@ -44,8 +44,7 @@ public class ShuffleBufferWithSkipList extends AbstractShuffleBuffer { private final Map<Long, ConcurrentSkipListMap<Long, ShufflePartitionedBlock>> inFlushBlockMap; private int blockCount; - public ShuffleBufferWithSkipList(long capacity) { - super(capacity); + public ShuffleBufferWithSkipList() { this.blocksMap = newConcurrentSkipListMap(); this.inFlushBlockMap = JavaUtils.newConcurrentMap(); } diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java index 90ac86c9f..b262b7c32 100644 --- a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java +++ b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java @@ -35,7 +35,6 @@ import org.apache.uniffle.server.ShuffleDataFlushEvent; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -45,24 +44,21 @@ public class ShuffleBufferWithLinkedListTest extends BufferTestBase { @Test public void appendTest() { - ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(100); + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(); shuffleBuffer.append(createData(10)); // ShufflePartitionedBlock has constant 32 bytes overhead assertEquals(42, shuffleBuffer.getSize()); - assertFalse(shuffleBuffer.isFull()); shuffleBuffer.append(createData(26)); assertEquals(100, shuffleBuffer.getSize()); - assertFalse(shuffleBuffer.isFull()); shuffleBuffer.append(createData(1)); assertEquals(133, shuffleBuffer.getSize()); - assertTrue(shuffleBuffer.isFull()); } @Test public void appendMultiBlocksTest() { - ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(100); + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(); ShufflePartitionedData data1 = createData(10); ShufflePartitionedData data2 = createData(10); ShufflePartitionedBlock[] dataCombine = new ShufflePartitionedBlock[2]; @@ -74,7 +70,7 @@ public class ShuffleBufferWithLinkedListTest extends BufferTestBase { @Test public void toFlushEventTest() { - ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(100); + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(); ShuffleDataFlushEvent event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null); assertNull(event); shuffleBuffer.append(createData(10)); @@ -88,7 +84,7 @@ public class ShuffleBufferWithLinkedListTest extends BufferTestBase { @Test public void getShuffleDataWithExpectedTaskIdsFilterTest() { /** case1: all blocks in cached(or in flushed map) and size < readBufferSize */ - ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(100); + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(); ShufflePartitionedData spd1 = createData(1, 1, 15); ShufflePartitionedData spd2 = createData(1, 0, 15); ShufflePartitionedData spd3 = createData(1, 2, 55); @@ -200,7 +196,7 @@ public class ShuffleBufferWithLinkedListTest extends BufferTestBase { @Test public void getShuffleDataWithLocalOrderTest() { - ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(200); + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(); ShufflePartitionedData spd1 = createData(1, 1, 15); ShufflePartitionedData spd2 = createData(1, 0, 15); ShufflePartitionedData spd3 = createData(1, 2, 15); @@ -238,7 +234,7 @@ public class ShuffleBufferWithLinkedListTest extends BufferTestBase { @Test public void getShuffleDataTest() { - ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(200); + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList(); // case1: cached data only, blockId = -1, readBufferSize > buffer size ShufflePartitionedData spd1 = createData(10); ShufflePartitionedData spd2 = createData(20); @@ -250,7 +246,7 @@ public class ShuffleBufferWithLinkedListTest extends BufferTestBase { assertArrayEquals(expectedData, sdr.getData()); // case2: cached data only, blockId = -1, readBufferSize = buffer size - shuffleBuffer = new ShuffleBufferWithLinkedList(200); + shuffleBuffer = new ShuffleBufferWithLinkedList(); spd1 = createData(20); spd2 = createData(20); shuffleBuffer.append(spd1); @@ -261,7 +257,7 @@ public class ShuffleBufferWithLinkedListTest extends BufferTestBase { assertArrayEquals(expectedData, sdr.getData()); // case3-1: cached data only, blockId = -1, readBufferSize < buffer size - shuffleBuffer = new ShuffleBufferWithLinkedList(200); + shuffleBuffer = new ShuffleBufferWithLinkedList(); spd1 = createData(20); spd2 = createData(21); shuffleBuffer.append(spd1); @@ -272,7 +268,7 @@ public class ShuffleBufferWithLinkedListTest extends BufferTestBase { assertArrayEquals(expectedData, sdr.getData()); // case3-2: cached data only, blockId = -1, readBufferSize < buffer size - shuffleBuffer = new ShuffleBufferWithLinkedList(200); + shuffleBuffer = new ShuffleBufferWithLinkedList(); spd1 = createData(15); spd2 = createData(15); ShufflePartitionedData spd3 = createData(15); @@ -292,7 +288,7 @@ public class ShuffleBufferWithLinkedListTest extends BufferTestBase { assertArrayEquals(expectedData, sdr.getData()); // case5: flush data only, blockId = -1, readBufferSize < buffer size - shuffleBuffer = new ShuffleBufferWithLinkedList(200); + shuffleBuffer = new ShuffleBufferWithLinkedList(); spd1 = createData(15); spd2 = createData(15); shuffleBuffer.append(spd1); @@ -310,13 +306,13 @@ public class ShuffleBufferWithLinkedListTest extends BufferTestBase { assertEquals(0, sdr.getBufferSegments().size()); // case6: no data in buffer & flush buffer - shuffleBuffer = new ShuffleBufferWithLinkedList(200); + shuffleBuffer = new ShuffleBufferWithLinkedList(); sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 10); assertEquals(0, sdr.getBufferSegments().size()); assertEquals(0, sdr.getDataLength()); // case7: get data with multiple flush buffer and cached buffer - shuffleBuffer = new ShuffleBufferWithLinkedList(200); + shuffleBuffer = new ShuffleBufferWithLinkedList(); spd1 = createData(15); spd2 = createData(15); spd3 = createData(15); diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java index bf5040304..5bb5e2aa1 100644 --- a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java +++ b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java @@ -31,7 +31,6 @@ import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.server.ShuffleDataFlushEvent; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -40,24 +39,21 @@ public class ShuffleBufferWithSkipListTest extends BufferTestBase { @Test public void appendTest() { - ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList(100); + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList(); shuffleBuffer.append(createData(10)); // ShufflePartitionedBlock has constant 32 bytes overhead assertEquals(42, shuffleBuffer.getSize()); - assertFalse(shuffleBuffer.isFull()); shuffleBuffer.append(createData(26)); assertEquals(100, shuffleBuffer.getSize()); - assertFalse(shuffleBuffer.isFull()); shuffleBuffer.append(createData(1)); assertEquals(133, shuffleBuffer.getSize()); - assertTrue(shuffleBuffer.isFull()); } @Test public void appendMultiBlocksTest() { - ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList(100); + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList(); ShufflePartitionedData data1 = createData(10); ShufflePartitionedData data2 = createData(10); ShufflePartitionedBlock[] dataCombine = new ShufflePartitionedBlock[2]; @@ -69,7 +65,7 @@ public class ShuffleBufferWithSkipListTest extends BufferTestBase { @Test public void toFlushEventTest() { - ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList(100); + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList(); ShuffleDataFlushEvent event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null); assertNull(event); shuffleBuffer.append(createData(10)); @@ -83,7 +79,7 @@ public class ShuffleBufferWithSkipListTest extends BufferTestBase { @Test public void getShuffleDataWithExpectedTaskIdsFilterTest() { /** case1: all blocks in cached(or in flushed map) and size < readBufferSize */ - ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList(100); + ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList(); ShufflePartitionedData spd1 = createData(1, 1, 15); ShufflePartitionedData spd2 = createData(1, 0, 15); ShufflePartitionedData spd3 = createData(1, 2, 55);