This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 21bdae957 [#2154] fix: Correct the data length and buffer size metrics
and logs (#2155)
21bdae957 is described below
commit 21bdae957588fc21758fbba0e72d0e2bb8527075
Author: maobaolong <[email protected]>
AuthorDate: Thu Oct 17 10:02:09 2024 +0800
[#2154] fix: Correct the data length and buffer size metrics and logs
(#2155)
### What changes were proposed in this pull request?
Correct the data length and buffer size metrics and logs
### Why are the changes needed?
Fix: #2154
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UTs.
---
.../uniffle/server/DefaultFlushEventHandler.java | 4 +-
.../apache/uniffle/server/HugePartitionUtils.java | 2 +-
.../uniffle/server/ShuffleDataFlushEvent.java | 45 +++++++++++++++++++---
.../apache/uniffle/server/ShuffleFlushManager.java | 6 +--
.../server/buffer/AbstractShuffleBuffer.java | 16 ++++++--
.../uniffle/server/buffer/ShuffleBuffer.java | 6 ++-
.../server/buffer/ShuffleBufferManager.java | 16 ++++----
.../server/buffer/ShuffleBufferWithLinkedList.java | 28 ++++++++++----
.../server/buffer/ShuffleBufferWithSkipList.java | 29 ++++++++++----
.../server/storage/HadoopStorageManager.java | 2 +-
.../server/storage/LocalStorageManager.java | 4 +-
.../server/storage/SingleStorageManager.java | 18 +++------
.../hybrid/DefaultStorageManagerSelector.java | 2 +-
.../uniffle/server/ShuffleFlushManagerTest.java | 3 ++
.../server/buffer/ShuffleBufferManagerTest.java | 18 ++++-----
.../buffer/ShuffleBufferWithLinkedListTest.java | 18 ++++-----
.../buffer/ShuffleBufferWithSkipListTest.java | 18 ++++-----
17 files changed, 152 insertions(+), 83 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
index 5b06bce76..9fa586628 100644
---
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
@@ -110,7 +110,7 @@ public class DefaultFlushEventHandler implements
FlushEventHandler {
"Flush event:{} successfully in {} ms and release {} bytes",
event,
System.currentTimeMillis() - start,
- event.getSize());
+ event.getEncodedLength());
}
} catch (Exception e) {
if (e instanceof EventRetryException) {
@@ -134,7 +134,7 @@ public class DefaultFlushEventHandler implements
FlushEventHandler {
"Flush event: {} failed in {} ms and release {} bytes. This will
make data lost.",
event,
System.currentTimeMillis() - start,
- event.getSize());
+ event.getEncodedLength());
return;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
b/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
index 3d7099c8b..f2501cd42 100644
--- a/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
+++ b/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
@@ -133,7 +133,7 @@ public class HugePartitionUtils {
if (usedPartitionDataSize >
shuffleBufferManager.getHugePartitionSizeThreshold()) {
ShuffleBuffer buffer =
shuffleBufferManager.getShuffleBufferEntry(appId, shuffleId,
partitionId).getValue();
- long memoryUsed = buffer.getInFlushSize() + buffer.getSize();
+ long memoryUsed = buffer.getInFlushSize() + buffer.getEncodedLength();
if (memoryUsed > shuffleBufferManager.getHugePartitionMemoryLimitSize())
{
LOG.warn(
"AppId: {}, shuffleId: {}, partitionId: {}, memory used: {}, "
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
index b1c889adb..7823cad04 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +39,11 @@ public class ShuffleDataFlushEvent {
private final int shuffleId;
private final int startPartition;
private final int endPartition;
- private final long size;
+ /** The memory cost size include encoded length */
+ private final long encodedLength;
+ /** The data size of this shuffle block */
+ private final long dataLength;
+
private final Collection<ShufflePartitionedBlock> shuffleBlocks;
private final Supplier<Boolean> valid;
private final ShuffleBuffer shuffleBuffer;
@@ -51,13 +56,38 @@ public class ShuffleDataFlushEvent {
private boolean ownedByHugePartition = false;
private long startPendingTime;
+ @VisibleForTesting
+ public ShuffleDataFlushEvent(
+ long eventId,
+ String appId,
+ int shuffleId,
+ int startPartition,
+ int endPartition,
+ long encodedLength,
+ Collection<ShufflePartitionedBlock> shuffleBlocks,
+ Supplier<Boolean> valid,
+ ShuffleBuffer shuffleBuffer) {
+ this(
+ eventId,
+ appId,
+ shuffleId,
+ startPartition,
+ endPartition,
+ encodedLength,
+ encodedLength,
+ shuffleBlocks,
+ valid,
+ shuffleBuffer);
+ }
+
public ShuffleDataFlushEvent(
long eventId,
String appId,
int shuffleId,
int startPartition,
int endPartition,
- long size,
+ long encodedLength,
+ long dataLength,
Collection<ShufflePartitionedBlock> shuffleBlocks,
Supplier<Boolean> valid,
ShuffleBuffer shuffleBuffer) {
@@ -66,11 +96,12 @@ public class ShuffleDataFlushEvent {
this.shuffleId = shuffleId;
this.startPartition = startPartition;
this.endPartition = endPartition;
- this.size = size;
+ this.encodedLength = encodedLength;
this.shuffleBlocks = shuffleBlocks;
this.valid = valid;
this.shuffleBuffer = shuffleBuffer;
this.cleanupCallbackChains = new ArrayList<>();
+ this.dataLength = dataLength;
}
public Collection<ShufflePartitionedBlock> getShuffleBlocks() {
@@ -81,8 +112,12 @@ public class ShuffleDataFlushEvent {
return eventId;
}
- public long getSize() {
- return size;
+ public long getEncodedLength() {
+ return encodedLength;
+ }
+
+ public long getDataLength() {
+ return dataLength;
}
public String getAppId() {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index c1b759b66..0d3d5ca0d 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -198,7 +198,7 @@ public class ShuffleFlushManager {
event.getStartPartition() + "_" + event.getEndPartition(),
event.getUnderStorage().getStorageHost(),
event.getUnderStorage().getStoragePath(),
- event.getSize(),
+ event.getDataLength(),
DateFormatUtils.format(startTime, AUDIT_DATE_PATTERN),
DateFormatUtils.format(endTime, AUDIT_DATE_PATTERN),
endTime - startTime));
@@ -206,9 +206,9 @@ public class ShuffleFlushManager {
if (null != shuffleTaskInfo) {
String storageHost = event.getUnderStorage().getStorageHost();
if (LocalStorage.STORAGE_HOST.equals(storageHost)) {
- shuffleTaskInfo.addOnLocalFileDataSize(event.getSize());
+ shuffleTaskInfo.addOnLocalFileDataSize(event.getEncodedLength());
} else {
- shuffleTaskInfo.addOnHadoopDataSize(event.getSize());
+ shuffleTaskInfo.addOnHadoopDataSize(event.getEncodedLength());
}
}
} finally {
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
index c0f880b2d..41911e00d 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
@@ -40,7 +40,10 @@ public abstract class AbstractShuffleBuffer implements
ShuffleBuffer {
protected static final Logger LOG =
LoggerFactory.getLogger(AbstractShuffleBuffer.class);
- protected long size;
+ /** The memory cost size include encoded length */
+ protected long encodedLength;
+ /** The data size of this shuffle block */
+ protected long dataLength;
protected AtomicLong inFlushSize = new AtomicLong();
@@ -48,7 +51,7 @@ public abstract class AbstractShuffleBuffer implements
ShuffleBuffer {
public static final long BUFFER_EVICTED = -1L;
public AbstractShuffleBuffer() {
- this.size = 0;
+ this.encodedLength = 0;
this.evicted = false;
}
@@ -70,8 +73,13 @@ public abstract class AbstractShuffleBuffer implements
ShuffleBuffer {
}
@Override
- public long getSize() {
- return size;
+ public long getEncodedLength() {
+ return encodedLength;
+ }
+
+ @Override
+ public long getDataLength() {
+ return dataLength;
}
@Override
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index 522f9a057..c181aba42 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -50,7 +50,11 @@ public interface ShuffleBuffer {
ShuffleDataResult getShuffleData(
long lastBlockId, int readBufferSize, Roaring64NavigableMap
expectedTaskIds);
- long getSize();
+ /** @return the buffer memory size include encoded length */
+ long getEncodedLength();
+
+ /** @return the buffer block data size */
+ long getDataLength();
long getInFlushSize();
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 58ce7665b..b7b66be37 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -325,7 +325,7 @@ public class ShuffleBufferManager {
// When we use multi storage and trigger single buffer flush, the buffer
size should be bigger
// than rss.server.flush.cold.storage.threshold.size, otherwise cold
storage will be useless.
if ((isHugePartition || this.bufferFlushEnabled)
- && (buffer.getSize() > this.bufferFlushThreshold
+ && (buffer.getEncodedLength() > this.bufferFlushThreshold
|| buffer.getBlockCount() > bufferFlushBlocksNumThreshold)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
@@ -334,7 +334,7 @@ public class ShuffleBufferManager {
startPartition,
endPartition,
isHugePartition,
- buffer.getSize(),
+ buffer.getEncodedLength(),
buffer.getBlockCount());
}
flushBuffer(buffer, appId, shuffleId, startPartition, endPartition,
isHugePartition);
@@ -397,9 +397,9 @@ public class ShuffleBufferManager {
() -> bufferPool.getOrDefault(appId, new
HashMap<>()).containsKey(shuffleId),
shuffleFlushManager.getDataDistributionType(appId));
if (event != null) {
- event.addCleanupCallback(() -> releaseMemory(event.getSize(), true,
false));
- updateShuffleSize(appId, shuffleId, -event.getSize());
- inFlushSize.addAndGet(event.getSize());
+ event.addCleanupCallback(() -> releaseMemory(event.getEncodedLength(),
true, false));
+ updateShuffleSize(appId, shuffleId, -event.getEncodedLength());
+ inFlushSize.addAndGet(event.getEncodedLength());
if (isHugePartition) {
event.markOwnedByHugePartition();
}
@@ -575,7 +575,7 @@ public class ShuffleBufferManager {
shuffleIdToBuffers.getValue().asMapOfRanges().entrySet()) {
Range<Integer> range = rangeEntry.getKey();
ShuffleBuffer shuffleBuffer = rangeEntry.getValue();
- pickedFlushSize += shuffleBuffer.getSize();
+ pickedFlushSize += shuffleBuffer.getEncodedLength();
flushBuffer(
shuffleBuffer,
appId,
@@ -770,11 +770,11 @@ public class ShuffleBufferManager {
// the actual released size by this thread
long releasedSize = buffer.release();
ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
- if (releasedSize != buffer.getSize()) {
+ if (releasedSize != buffer.getEncodedLength()) {
LOG.warn(
"Release shuffle buffer size {} is not equal to buffer size
{}, appId: {}, shuffleId: {}",
releasedSize,
- buffer.getSize(),
+ buffer.getEncodedLength(),
appId,
shuffleId);
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index 215ef2f7f..553b2870b 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -55,20 +55,23 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
if (evicted) {
return BUFFER_EVICTED;
}
- long currentSize = 0;
+ long currentEncodedLength = 0;
+ long currentDataLength = 0;
for (ShufflePartitionedBlock block : data.getBlockList()) {
// If sendShuffleData retried, we may receive duplicate block. The
duplicate
// block would gc without release. Here we must release the duplicated
block.
if (blocks.add(block)) {
- currentSize += block.getEncodedLength();
+ currentEncodedLength += block.getEncodedLength();
+ currentDataLength += block.getDataLength();
} else {
block.getData().release();
}
}
- this.size += currentSize;
+ this.encodedLength += currentEncodedLength;
+ this.dataLength += currentDataLength;
- return currentSize;
+ return currentEncodedLength;
}
@Override
@@ -97,17 +100,26 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
long eventId = ShuffleFlushManager.ATOMIC_EVENT_ID.getAndIncrement();
final ShuffleDataFlushEvent event =
new ShuffleDataFlushEvent(
- eventId, appId, shuffleId, startPartition, endPartition, size,
spBlocks, isValid, this);
+ eventId,
+ appId,
+ shuffleId,
+ startPartition,
+ endPartition,
+ encodedLength,
+ dataLength,
+ spBlocks,
+ isValid,
+ this);
event.addCleanupCallback(
() -> {
this.clearInFlushBuffer(event.getEventId());
inFlushedQueueBlocks.forEach(spb -> spb.getData().release());
- inFlushSize.addAndGet(-event.getSize());
+ inFlushSize.addAndGet(-event.getEncodedLength());
});
inFlushBlockMap.put(eventId, inFlushedQueueBlocks);
blocks = new LinkedHashSet<>();
- inFlushSize.addAndGet(size);
- size = 0;
+ inFlushSize.addAndGet(encodedLength);
+ encodedLength = 0;
return event;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index 414d9a66b..4a92c919f 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -62,7 +62,8 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
if (evicted) {
return BUFFER_EVICTED;
}
- long currentSize = 0;
+ long currentEncodedLength = 0;
+ long currentDataLength = 0;
for (ShufflePartitionedBlock block : data.getBlockList()) {
// If sendShuffleData retried, we may receive duplicate block. The
duplicate
@@ -70,14 +71,16 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
if (!blocksMap.containsKey(block.getBlockId())) {
blocksMap.put(block.getBlockId(), block);
blockCount++;
- currentSize += block.getEncodedLength();
+ currentEncodedLength += block.getEncodedLength();
+ currentDataLength += block.getDataLength();
} else {
block.getData().release();
}
}
- this.size += currentSize;
+ this.encodedLength += currentEncodedLength;
+ this.dataLength += currentDataLength;
- return currentSize;
+ return currentEncodedLength;
}
@Override
@@ -95,18 +98,28 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
long eventId = ShuffleFlushManager.ATOMIC_EVENT_ID.getAndIncrement();
final ShuffleDataFlushEvent event =
new ShuffleDataFlushEvent(
- eventId, appId, shuffleId, startPartition, endPartition, size,
spBlocks, isValid, this);
+ eventId,
+ appId,
+ shuffleId,
+ startPartition,
+ endPartition,
+ encodedLength,
+ dataLength,
+ spBlocks,
+ isValid,
+ this);
event.addCleanupCallback(
() -> {
this.clearInFlushBuffer(event.getEventId());
spBlocks.forEach(spb -> spb.getData().release());
- inFlushSize.addAndGet(-event.getSize());
+ inFlushSize.addAndGet(-event.getEncodedLength());
});
inFlushBlockMap.put(eventId, blocksMap);
blocksMap = newConcurrentSkipListMap();
blockCount = 0;
- inFlushSize.addAndGet(size);
- size = 0;
+ inFlushSize.addAndGet(encodedLength);
+ encodedLength = 0;
+ dataLength = 0;
return event;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
index 99c055e5f..adbbc594c 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
@@ -82,7 +82,7 @@ public class HadoopStorageManager extends
SingleStorageManager {
return;
}
ShuffleServerMetrics.incHadoopStorageWriteDataSize(
- storage.getStorageHost(), event.getSize(),
event.isOwnedByHugePartition());
+ storage.getStorageHost(), event.getDataLength(),
event.isOwnedByHugePartition());
}
@Override
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 67ace22d8..c33c17f3b 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -275,11 +275,11 @@ public class LocalStorageManager extends
SingleStorageManager {
super.updateWriteMetrics(event, writeTime);
ShuffleServerMetrics.counterTotalLocalFileWriteDataSize
.labels(ShuffleServerMetrics.LOCAL_DISK_PATH_LABEL_ALL)
- .inc(event.getSize());
+ .inc(event.getDataLength());
if (event.getUnderStorage() != null) {
ShuffleServerMetrics.counterTotalLocalFileWriteDataSize
.labels(event.getUnderStorage().getStoragePath())
- .inc(event.getSize());
+ .inc(event.getDataLength());
}
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
index 936a7550f..655375dd8 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
@@ -26,7 +26,6 @@ import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.storage.ApplicationStorageInfo;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
@@ -87,7 +86,7 @@ public abstract class SingleStorageManager implements
StorageManager {
if (metrics.getWriteTime() > writeSlowThreshold) {
ShuffleServerMetrics.counterWriteSlow.inc();
}
-
ShuffleServerMetrics.counterTotalWriteDataSize.inc(metrics.getEventSize());
+
ShuffleServerMetrics.counterTotalWriteDataSize.inc(metrics.getDataSize());
ShuffleServerMetrics.counterTotalWriteBlockSize.inc(metrics.getWriteBlocks());
if (metrics.getEventSize() < eventSizeThresholdL1) {
ShuffleServerMetrics.counterEventSizeThresholdLevel1.inc();
@@ -101,8 +100,8 @@ public abstract class SingleStorageManager implements
StorageManager {
String appId = event.getAppId();
ApplicationStorageInfo appStorage =
appStorageInfoMap.computeIfAbsent(appId, id -> new
ApplicationStorageInfo(appId));
- appStorage.incUsedBytes(event.getSize());
- ShuffleServerMetrics.gaugeStorageUsedBytes.inc(event.getSize());
+ appStorage.incUsedBytes(event.getDataLength());
+ ShuffleServerMetrics.gaugeStorageUsedBytes.inc(event.getDataLength());
if (event.getUnderStorage().containsWriteHandler(appId)) {
appStorage.incFileNum(1);
ShuffleServerMetrics.gaugeFlushFileNum.inc();
@@ -134,12 +133,7 @@ public abstract class SingleStorageManager implements
StorageManager {
public StorageWriteMetrics createStorageWriteMetrics(
ShuffleDataFlushEvent event, long writeTime) {
- long length = 0;
- long blockNum = 0;
- for (ShufflePartitionedBlock block : event.getShuffleBlocks()) {
- length += block.getDataLength();
- blockNum++;
- }
+ long blockNum = event.getShuffleBlocks().size();
List<Integer> partitions = Lists.newArrayList();
for (int partition = event.getStartPartition();
partition <= event.getEndPartition();
@@ -147,10 +141,10 @@ public abstract class SingleStorageManager implements
StorageManager {
partitions.add(partition);
}
return new StorageWriteMetrics(
- event.getSize(),
+ event.getEncodedLength(),
blockNum,
writeTime,
- length,
+ event.getDataLength(),
partitions,
event.getAppId(),
event.getShuffleId());
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/hybrid/DefaultStorageManagerSelector.java
b/server/src/main/java/org/apache/uniffle/server/storage/hybrid/DefaultStorageManagerSelector.java
index e9b73d878..c308f194f 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/hybrid/DefaultStorageManagerSelector.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/hybrid/DefaultStorageManagerSelector.java
@@ -39,7 +39,7 @@ public class DefaultStorageManagerSelector extends
FallbackBasedStorageManagerSe
@Override
StorageManager regularSelect(ShuffleDataFlushEvent flushEvent) {
StorageManager storageManager = warmStorageManager;
- if (flushEvent.getSize() > flushColdStorageThresholdSize) {
+ if (flushEvent.getEncodedLength() > flushColdStorageThresholdSize) {
storageManager = coldStorageManager;
}
return storageManager;
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 32531b876..ad3faf210 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.IntStream;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -668,6 +669,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase
{
} while (size < expectedBlockNum);
}
+ @VisibleForTesting
public static ShuffleDataFlushEvent createShuffleDataFlushEvent(
String appId,
int shuffleId,
@@ -692,6 +694,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase
{
startPartition,
endPartition,
size,
+ spbs.stream().mapToInt(ShufflePartitionedBlock::getDataLength).sum(),
spbs,
isValid,
null);
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index ab3b0cb97..13dbe94e1 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -177,9 +177,9 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
// validate buffer, no flush happened
Map<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>> bufferPool =
shuffleBufferManager.getBufferPool();
- assertEquals(100, bufferPool.get(appId).get(1).get(0).getSize());
- assertEquals(200, bufferPool.get(appId).get(2).get(0).getSize());
- assertEquals(100, bufferPool.get(appId).get(3).get(0).getSize());
+ assertEquals(100, bufferPool.get(appId).get(1).get(0).getEncodedLength());
+ assertEquals(200, bufferPool.get(appId).get(2).get(0).getEncodedLength());
+ assertEquals(100, bufferPool.get(appId).get(3).get(0).getEncodedLength());
// validate get shuffle data
ShuffleDataResult sdr =
shuffleBufferManager.getShuffleData(appId, 2, 0,
Constants.INVALID_BLOCK_ID, 60);
@@ -300,16 +300,16 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
Map<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>> bufferPool =
shuffleBufferManager.getBufferPool();
ShuffleBuffer buffer = bufferPool.get(appId).get(shuffleId).get(0);
- assertEquals(48, buffer.getSize());
+ assertEquals(48, buffer.getEncodedLength());
assertEquals(48, shuffleBufferManager.getUsedMemory());
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false,
createData(0, 16));
- assertEquals(96, buffer.getSize());
+ assertEquals(96, buffer.getEncodedLength());
assertEquals(96, shuffleBufferManager.getUsedMemory());
// reach high water lever, flush
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false,
createData(0, 273));
- assertEquals(0, buffer.getSize());
+ assertEquals(0, buffer.getEncodedLength());
assertEquals(401, shuffleBufferManager.getUsedMemory());
assertEquals(401, shuffleBufferManager.getInFlushSize());
verify(mockShuffleFlushManager, times(1)).addToFlushQueue(any());
@@ -334,9 +334,9 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
ShuffleBuffer buffer0 = bufferPool.get(appId).get(shuffleId).get(0);
ShuffleBuffer buffer1 = bufferPool.get(appId).get(shuffleId).get(1);
ShuffleBuffer buffer2 = bufferPool.get(appId).get(2).get(0);
- assertEquals(0, buffer0.getSize());
- assertEquals(0, buffer1.getSize());
- assertEquals(64, buffer2.getSize());
+ assertEquals(0, buffer0.getEncodedLength());
+ assertEquals(0, buffer1.getEncodedLength());
+ assertEquals(64, buffer2.getEncodedLength());
assertEquals(528, shuffleBufferManager.getUsedMemory());
assertEquals(464, shuffleBufferManager.getInFlushSize());
verify(mockShuffleFlushManager, times(3)).addToFlushQueue(any());
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
index 31c4492d2..acc86a6db 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
@@ -49,13 +49,13 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
shuffleBuffer.append(createData(10));
// ShufflePartitionedBlock has constant 32 bytes overhead
- assertEquals(42, shuffleBuffer.getSize());
+ assertEquals(42, shuffleBuffer.getEncodedLength());
shuffleBuffer.append(createData(26));
- assertEquals(100, shuffleBuffer.getSize());
+ assertEquals(100, shuffleBuffer.getEncodedLength());
shuffleBuffer.append(createData(1));
- assertEquals(133, shuffleBuffer.getSize());
+ assertEquals(133, shuffleBuffer.getEncodedLength());
}
@Test
@@ -67,7 +67,7 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
dataCombine[0] = data1.getBlockList()[0];
dataCombine[1] = data2.getBlockList()[0];
shuffleBuffer.append(new ShufflePartitionedData(1, dataCombine));
- assertEquals(84, shuffleBuffer.getSize());
+ assertEquals(84, shuffleBuffer.getEncodedLength());
}
@Test
@@ -76,10 +76,10 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
ShuffleDataFlushEvent event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1,
null);
assertNull(event);
shuffleBuffer.append(createData(10));
- assertEquals(42, shuffleBuffer.getSize());
+ assertEquals(42, shuffleBuffer.getEncodedLength());
event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null);
- assertEquals(42, event.getSize());
- assertEquals(0, shuffleBuffer.getSize());
+ assertEquals(42, event.getEncodedLength());
+ assertEquals(0, shuffleBuffer.getEncodedLength());
assertEquals(0, shuffleBuffer.getBlocks().size());
}
@@ -590,11 +590,11 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
ShufflePartitionedData block = createData(10);
shuffleBuffer.append(block);
// ShufflePartitionedBlock has constant 32 bytes overhead
- assertEquals(42, shuffleBuffer.getSize());
+ assertEquals(42, shuffleBuffer.getEncodedLength());
shuffleBuffer.append(block);
// The repeat block should not append to shuffleBuffer
- assertEquals(42, shuffleBuffer.getSize());
+ assertEquals(42, shuffleBuffer.getEncodedLength());
}
private byte[] getExpectedData(ShufflePartitionedData... spds) {
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
index 7e293be34..87ce14569 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
@@ -42,13 +42,13 @@ public class ShuffleBufferWithSkipListTest extends
BufferTestBase {
ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList();
shuffleBuffer.append(createData(10));
// ShufflePartitionedBlock has constant 32 bytes overhead
- assertEquals(42, shuffleBuffer.getSize());
+ assertEquals(42, shuffleBuffer.getEncodedLength());
shuffleBuffer.append(createData(26));
- assertEquals(100, shuffleBuffer.getSize());
+ assertEquals(100, shuffleBuffer.getEncodedLength());
shuffleBuffer.append(createData(1));
- assertEquals(133, shuffleBuffer.getSize());
+ assertEquals(133, shuffleBuffer.getEncodedLength());
}
@Test
@@ -60,7 +60,7 @@ public class ShuffleBufferWithSkipListTest extends
BufferTestBase {
dataCombine[0] = data1.getBlockList()[0];
dataCombine[1] = data2.getBlockList()[0];
shuffleBuffer.append(new ShufflePartitionedData(1, dataCombine));
- assertEquals(84, shuffleBuffer.getSize());
+ assertEquals(84, shuffleBuffer.getEncodedLength());
}
@Test
@@ -69,10 +69,10 @@ public class ShuffleBufferWithSkipListTest extends
BufferTestBase {
ShuffleDataFlushEvent event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1,
null);
assertNull(event);
shuffleBuffer.append(createData(10));
- assertEquals(42, shuffleBuffer.getSize());
+ assertEquals(42, shuffleBuffer.getEncodedLength());
event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null);
- assertEquals(42, event.getSize());
- assertEquals(0, shuffleBuffer.getSize());
+ assertEquals(42, event.getEncodedLength());
+ assertEquals(0, shuffleBuffer.getEncodedLength());
assertEquals(0, shuffleBuffer.getBlocks().size());
}
@@ -203,11 +203,11 @@ public class ShuffleBufferWithSkipListTest extends
BufferTestBase {
ShufflePartitionedData block = createData(10);
shuffleBuffer.append(block);
// ShufflePartitionedBlock has constant 32 bytes overhead
- assertEquals(42, shuffleBuffer.getSize());
+ assertEquals(42, shuffleBuffer.getEncodedLength());
shuffleBuffer.append(block);
// The repeat block should not append to shuffleBuffer
- assertEquals(42, shuffleBuffer.getSize());
+ assertEquals(42, shuffleBuffer.getEncodedLength());
}
@Override