This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new b98b48819 [#2738] feat(server): add metrics to track shuffle data
block count and avg block size (#2741)
b98b48819 is described below
commit b98b488198da3c130fa86be6e8e9f09407f4f84b
Author: xianjingfeng <[email protected]>
AuthorDate: Thu Mar 19 16:56:54 2026 +0800
[#2738] feat(server): add metrics to track shuffle data block count and avg
block size (#2741)
### What changes were proposed in this pull request?
Add metrics to track shuffle data block count and avg block size.
### Why are the changes needed?
Fix: #2738
We need to identify and stop the application that generate large amounts of
blocks when heap memory is insufficient.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
CI and manual testing
---
.../uniffle/common/ShufflePartitionedData.java | 18 ++++++++++
.../uniffle/server/ShuffleDataFlushEvent.java | 4 +++
.../uniffle/server/ShuffleServerGrpcService.java | 3 +-
.../uniffle/server/ShuffleServerMetrics.java | 26 ++++++++++++++
.../org/apache/uniffle/server/ShuffleTaskInfo.java | 14 ++++++++
.../apache/uniffle/server/ShuffleTaskManager.java | 15 +++++---
.../server/TopNShuffleDataSizeOfAppCalcTask.java | 41 ++++++++++++++++++++++
.../server/buffer/ShuffleBufferManager.java | 25 +++++++++++++
.../server/buffer/ShuffleBufferWithLinkedList.java | 6 ++++
.../server/buffer/ShuffleBufferWithSkipList.java | 6 ++++
.../org/apache/uniffle/server/merge/Partition.java | 2 +-
.../server/netty/ShuffleServerNettyHandler.java | 3 +-
.../server/KerberizedShuffleTaskManagerTest.java | 4 +--
.../uniffle/server/ShuffleTaskManagerTest.java | 41 +++++++++++-----------
.../server/buffer/ShuffleBufferManagerTest.java | 10 +++---
15 files changed, 181 insertions(+), 37 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
index fbc3c2797..5a49bfd27 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
@@ -29,6 +29,8 @@ public class ShufflePartitionedData {
private final ShufflePartitionedBlock[] blockList;
private final long totalBlockEncodedLength;
private final long totalBlockDataLength;
+ private int duplicateBlockCount;
+ private long duplicateBlockSize;
public ShufflePartitionedData(
int partitionId, long encodedLength, long dataLength,
ShufflePartitionedBlock[] blockList) {
@@ -80,4 +82,20 @@ public class ShufflePartitionedData {
public long getTotalBlockDataLength() {
return totalBlockDataLength;
}
+
+ public int getDuplicateBlockCount() {
+ return duplicateBlockCount;
+ }
+
+ public void setDuplicateBlockCount(int duplicateBlockCount) {
+ this.duplicateBlockCount = duplicateBlockCount;
+ }
+
+ public long getDuplicateBlockSize() {
+ return duplicateBlockSize;
+ }
+
+ public void setDuplicateBlockSize(long duplicateBlockSize) {
+ this.duplicateBlockSize = duplicateBlockSize;
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
index 7823cad04..04a18df00 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
@@ -120,6 +120,10 @@ public class ShuffleDataFlushEvent {
return dataLength;
}
+ public long getBlockCount() {
+ return shuffleBlocks.size();
+ }
+
public String getAppId() {
return appId;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index bd35750d3..b62df9c32 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -458,8 +458,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
// after each cacheShuffleData call, the `preAllocatedSize` is
updated timely.
manager.releasePreAllocatedSize(toReleasedSize);
alreadyReleasedSize += toReleasedSize;
- manager.updateCachedBlockIds(
- appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
+ manager.updateCachedBlockIds(appId, shuffleId,
spd.getPartitionId(), spd);
}
} catch (ExceedHugePartitionHardLimitException e) {
String errorMsg =
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index ced8a17a0..1f4d451e5 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -99,6 +99,8 @@ public class ShuffleServerMetrics {
private static final String ALLOCATED_BUFFER_SIZE = "allocated_buffer_size";
private static final String IN_FLUSH_BUFFER_SIZE = "in_flush_buffer_size";
private static final String USED_BUFFER_SIZE = "used_buffer_size";
+ private static final String TOTAL_IN_MEMORY_BLOCK_COUNT =
"total_in_memory_block_count";
+ private static final String TOTAL_IN_FLUSH_BLOCK_COUNT =
"total_in_flush_block_count";
private static final String READ_USED_BUFFER_SIZE = "read_used_buffer_size";
public static final String USED_DIRECT_MEMORY_SIZE =
"used_direct_memory_size";
public static final String USED_DIRECT_MEMORY_SIZE_BY_NETTY =
"used_direct_memory_size_by_netty";
@@ -158,6 +160,10 @@ public class ShuffleServerMetrics {
public static final String TOPN_OF_TOTAL_DATA_SIZE_FOR_APP =
"topN_of_total_data_size_for_app";
public static final String TOPN_OF_IN_MEMORY_DATA_SIZE_FOR_APP =
"topN_of_in_memory_data_size_for_app";
+ public static final String TOPN_OF_IN_MEMORY_BLOCK_COUNT_FOR_APP =
+ "topN_of_in_memory_block_count_for_app";
+ public static final String BOTTOMN_OF_IN_MEMORY_AVG_BLOCK_SIZE_FOR_APP =
+ "bottomN_of_in_memory_avg_block_size_for_app";
public static final String TOPN_OF_ON_LOCALFILE_DATA_SIZE_FOR_APP =
"topN_of_on_localfile_data_size_for_app";
public static final String TOPN_OF_ON_HADOOP_DATA_SIZE_FOR_APP =
@@ -239,6 +245,8 @@ public class ShuffleServerMetrics {
public static Gauge.Child gaugeAllocatedBufferSize;
public static Gauge.Child gaugeInFlushBufferSize;
public static Gauge.Child gaugeUsedBufferSize;
+ public static Gauge.Child gaugeTotalInMemoryBlockCount;
+ public static Gauge.Child gaugeTotalInFlushBlockCount;
public static Gauge.Child gaugeReadBufferUsedSize;
public static Gauge.Child gaugeWriteHandler;
public static Gauge.Child gaugeMergeEventQueueSize;
@@ -257,6 +265,8 @@ public class ShuffleServerMetrics {
public static Gauge gaugeTotalDataSizeUsage;
public static Gauge gaugeInMemoryDataSizeUsage;
+ public static Gauge gaugeInMemoryBlockCount;
+ public static Gauge gaugeInMemoryAvgBlockSize;
public static Gauge gaugeOnDiskDataSizeUsage;
public static Gauge gaugeOnHadoopDataSizeUsage;
@@ -477,6 +487,8 @@ public class ShuffleServerMetrics {
gaugeAllocatedBufferSize =
metricsManager.addLabeledGauge(ALLOCATED_BUFFER_SIZE);
gaugeInFlushBufferSize =
metricsManager.addLabeledGauge(IN_FLUSH_BUFFER_SIZE);
gaugeUsedBufferSize = metricsManager.addLabeledGauge(USED_BUFFER_SIZE);
+ gaugeTotalInMemoryBlockCount =
metricsManager.addLabeledGauge(TOTAL_IN_MEMORY_BLOCK_COUNT);
+ gaugeTotalInFlushBlockCount =
metricsManager.addLabeledGauge(TOTAL_IN_FLUSH_BLOCK_COUNT);
gaugeReadBufferUsedSize =
metricsManager.addLabeledGauge(READ_USED_BUFFER_SIZE);
gaugeWriteHandler = metricsManager.addLabeledGauge(TOTAL_WRITE_HANDLER);
gaugeMergeEventQueueSize =
metricsManager.addLabeledGauge(MERGE_EVENT_QUEUE_SIZE);
@@ -536,6 +548,20 @@ public class ShuffleServerMetrics {
.labelNames("app_id")
.register(metricsManager.getCollectorRegistry());
+ gaugeInMemoryBlockCount =
+ Gauge.build()
+ .name(TOPN_OF_IN_MEMORY_BLOCK_COUNT_FOR_APP)
+ .help("top N of in memory shuffle block count for app level")
+ .labelNames("app_id")
+ .register(metricsManager.getCollectorRegistry());
+
+ gaugeInMemoryAvgBlockSize =
+ Gauge.build()
+ .name(BOTTOMN_OF_IN_MEMORY_AVG_BLOCK_SIZE_FOR_APP)
+ .help("bottom N of in memory shuffle average block size for app
level")
+ .labelNames("app_id")
+ .register(metricsManager.getCollectorRegistry());
+
gaugeOnDiskDataSizeUsage =
Gauge.build()
.name(TOPN_OF_ON_LOCALFILE_DATA_SIZE_FOR_APP)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index e748a892f..820c76357 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -60,6 +60,7 @@ public class ShuffleTaskInfo {
private final AtomicLong totalDataSize = new AtomicLong(0);
private final AtomicLong inMemoryDataSize = new AtomicLong(0);
+ private final AtomicLong inMemoryBlockCount = new AtomicLong(0);
private final AtomicLong onLocalFileNum = new AtomicLong(0);
private final AtomicLong onLocalFileDataSize = new AtomicLong(0);
private final AtomicLong onHadoopFileNum = new AtomicLong(0);
@@ -176,6 +177,19 @@ public class ShuffleTaskInfo {
return inMemoryDataSize.get();
}
+ public long getInMemoryBlockCount() {
+ return inMemoryBlockCount.get();
+ }
+
+ public void addInMemoryBlockCount(long delta) {
+ inMemoryBlockCount.addAndGet(delta);
+ }
+
+ public long getInMemoryAvgBlockSize() {
+ long blockCount = getInMemoryBlockCount();
+ return blockCount <= 0 ? Long.MAX_VALUE : getInMemoryDataSize() /
blockCount;
+ }
+
public long addOnLocalFileDataSize(long delta, boolean isNewlyCreated) {
if (isNewlyCreated) {
onLocalFileNum.incrementAndGet();
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index f2c120464..33712244b 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -481,12 +481,14 @@ public class ShuffleTaskManager {
}
// Only for tests
- public void updateCachedBlockIds(String appId, int shuffleId,
ShufflePartitionedBlock[] spbs) {
- updateCachedBlockIds(appId, shuffleId, 0, spbs);
+ public void updateCachedBlockIds(
+ String appId, int shuffleId, ShufflePartitionedData
shufflePartitionedData) {
+ updateCachedBlockIds(appId, shuffleId, 0, shufflePartitionedData);
}
public void updateCachedBlockIds(
- String appId, int shuffleId, int partitionId, ShufflePartitionedBlock[]
spbs) {
+ String appId, int shuffleId, int partitionId, ShufflePartitionedData
shufflePartitionedData) {
+ ShufflePartitionedBlock[] spbs = shufflePartitionedData.getBlockList();
if (spbs == null || spbs.length == 0) {
return;
}
@@ -512,7 +514,12 @@ public class ShuffleTaskManager {
size += spb.getEncodedLength();
}
}
- long partitionSize = shuffleTaskInfo.addPartitionDataSize(shuffleId,
partitionId, size);
+ int blockCount = spbs.length -
shufflePartitionedData.getDuplicateBlockCount();
+ shuffleBufferManager.addInMemoryBlockCount(blockCount);
+ shuffleTaskInfo.addInMemoryBlockCount(blockCount);
+ long partitionSize =
+ shuffleTaskInfo.addPartitionDataSize(
+ shuffleId, partitionId, size -
shufflePartitionedData.getDuplicateBlockSize());
HugePartitionUtils.markHugePartition(
shuffleBufferManager, shuffleTaskInfo, shuffleId, partitionId,
partitionSize);
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
index 97ffb3cf4..b835c904a 100644
---
a/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
+++
b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java
@@ -37,6 +37,8 @@ public class TopNShuffleDataSizeOfAppCalcTask {
private final Gauge gaugeTotalDataSize;
private final Gauge gaugeInMemoryDataSize;
+ private final Gauge gaugeInMemoryBlockCount;
+ private final Gauge gaugeInMemoryAvgBlockSize;
private final Gauge gaugeOnLocalFileDataSize;
private final Gauge gaugeOnHadoopDataSize;
@@ -50,6 +52,8 @@ public class TopNShuffleDataSizeOfAppCalcTask {
shuffleTaskManager = taskManager;
this.gaugeTotalDataSize = ShuffleServerMetrics.gaugeTotalDataSizeUsage;
this.gaugeInMemoryDataSize =
ShuffleServerMetrics.gaugeInMemoryDataSizeUsage;
+ this.gaugeInMemoryBlockCount =
ShuffleServerMetrics.gaugeInMemoryBlockCount;
+ this.gaugeInMemoryAvgBlockSize =
ShuffleServerMetrics.gaugeInMemoryAvgBlockSize;
this.gaugeOnLocalFileDataSize =
ShuffleServerMetrics.gaugeOnDiskDataSizeUsage;
this.gaugeOnHadoopDataSize =
ShuffleServerMetrics.gaugeOnHadoopDataSizeUsage;
this.scheduler =
@@ -72,6 +76,22 @@ public class TopNShuffleDataSizeOfAppCalcTask {
.set(taskInfo.getValue().getInMemoryDataSize());
}
+ topNTaskInfo = calcTopNInMemoryBlockCountTaskInfo();
+ gaugeInMemoryBlockCount.clear();
+ for (Map.Entry<String, ShuffleTaskInfo> taskInfo : topNTaskInfo) {
+ gaugeInMemoryBlockCount
+ .labels(taskInfo.getKey())
+ .set(taskInfo.getValue().getInMemoryBlockCount());
+ }
+
+ topNTaskInfo = calcBottomNInMemoryAvgBlockSizeTaskInfo();
+ gaugeInMemoryAvgBlockSize.clear();
+ for (Map.Entry<String, ShuffleTaskInfo> taskInfo : topNTaskInfo) {
+ gaugeInMemoryAvgBlockSize
+ .labels(taskInfo.getKey())
+ .set(taskInfo.getValue().getInMemoryAvgBlockSize());
+ }
+
topNTaskInfo = calcTopNOnLocalFileDataSizeTaskInfo();
gaugeOnLocalFileDataSize.clear();
for (Map.Entry<String, ShuffleTaskInfo> taskInfo : topNTaskInfo) {
@@ -108,6 +128,27 @@ public class TopNShuffleDataSizeOfAppCalcTask {
.collect(Collectors.toList());
}
+ public List<Map.Entry<String, ShuffleTaskInfo>>
calcTopNInMemoryBlockCountTaskInfo() {
+ return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream()
+ .sorted(
+ (e1, e2) ->
+ Long.compare(
+ e2.getValue().getInMemoryBlockCount(),
e1.getValue().getInMemoryBlockCount()))
+ .limit(topNShuffleDataNumber)
+ .collect(Collectors.toList());
+ }
+
+ public List<Map.Entry<String, ShuffleTaskInfo>>
calcBottomNInMemoryAvgBlockSizeTaskInfo() {
+ return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream()
+ .sorted(
+ (e1, e2) ->
+ Long.compare(
+ e1.getValue().getInMemoryAvgBlockSize(),
+ e2.getValue().getInMemoryAvgBlockSize()))
+ .limit(topNShuffleDataNumber)
+ .collect(Collectors.toList());
+ }
+
public List<Map.Entry<String, ShuffleTaskInfo>>
calcTopNOnLocalFileDataSizeTaskInfo() {
return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream()
.sorted(
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index ed9331291..dee841898 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -55,6 +55,7 @@ import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleFlushManager;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.server.ShuffleTaskInfo;
import org.apache.uniffle.server.ShuffleTaskManager;
import org.apache.uniffle.server.buffer.lab.ChunkCreator;
import org.apache.uniffle.server.buffer.lab.LABShuffleBufferWithLinkedList;
@@ -96,6 +97,8 @@ public class ShuffleBufferManager {
protected AtomicLong inFlushSize = new AtomicLong(0L);
protected AtomicLong usedMemory = new AtomicLong(0L);
private AtomicLong readDataMemory = new AtomicLong(0L);
+ private final AtomicLong inMemoryBlockCount = new AtomicLong(0);
+ private final AtomicLong inFlushBlockCount = new AtomicLong(0);
// appId -> shuffleId -> partitionId -> ShuffleBuffer to avoid too many appId
protected Map<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>>
bufferPool;
// appId -> shuffleId -> shuffle size in buffer
@@ -496,11 +499,22 @@ public class ShuffleBufferManager {
shuffleFlushManager.getDataDistributionType(appId));
if (event != null) {
event.addCleanupCallback(() -> releaseMemory(event.getEncodedLength(),
true, false));
+ event.addCleanupCallback(
+ () -> {
+ long blockCount = event.getBlockCount();
+ ShuffleTaskInfo shuffleTaskInfo =
shuffleTaskManager.getShuffleTaskInfo(appId);
+ if (shuffleTaskInfo != null) {
+ shuffleTaskInfo.addInMemoryBlockCount(-blockCount);
+ }
+ addInMemoryBlockCount(-blockCount);
+ addInFlushBlockCount(-blockCount);
+ });
updateShuffleSize(appId, shuffleId, -event.getEncodedLength());
inFlushSize.addAndGet(event.getEncodedLength());
if (isHugePartition) {
event.markOwnedByHugePartition();
}
+ addInFlushBlockCount(event.getBlockCount());
ShuffleServerMetrics.gaugeInFlushBufferSize.set(inFlushSize.get());
shuffleFlushManager.addToFlushQueue(event);
return true;
@@ -524,6 +538,16 @@ public class ShuffleBufferManager {
}
}
+ public void addInMemoryBlockCount(long delta) {
+ long blockCount = inMemoryBlockCount.addAndGet(delta);
+ ShuffleServerMetrics.gaugeTotalInMemoryBlockCount.set(blockCount);
+ }
+
+ public void addInFlushBlockCount(long delta) {
+ long blockCount = inFlushBlockCount.addAndGet(delta);
+ ShuffleServerMetrics.gaugeTotalInFlushBlockCount.set(blockCount);
+ }
+
public synchronized boolean requireMemory(long size, boolean isPreAllocated)
{
if (capacity - usedMemory.get() >= size) {
usedMemory.addAndGet(size);
@@ -891,6 +915,7 @@ public class ShuffleBufferManager {
Collection<ShuffleBuffer> buffers =
bufferRangeMap.asMapOfRanges().values();
if (buffers != null) {
for (ShuffleBuffer buffer : buffers) {
+ addInMemoryBlockCount(-buffer.getBlockCount());
// the actual released size by this thread
long releasedSize = buffer.release();
ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index 6485eef55..156797bef 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -57,6 +57,8 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
}
long currentEncodedLength = 0;
long currentDataLength = 0;
+ int duplicateBlockCount = 0;
+ long duplicateBlockSize = 0;
for (ShufflePartitionedBlock block : data.getBlockList()) {
// If sendShuffleData retried, we may receive duplicate block. The
duplicate
@@ -65,11 +67,15 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
currentEncodedLength += block.getEncodedLength();
currentDataLength += block.getDataLength();
} else {
+ duplicateBlockCount++;
+ duplicateBlockSize += block.getEncodedLength();
releaseBlock(block);
}
}
this.encodedLength += currentEncodedLength;
this.dataLength += currentDataLength;
+ data.setDuplicateBlockCount(duplicateBlockCount);
+ data.setDuplicateBlockSize(duplicateBlockSize);
return currentEncodedLength;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index 9c65eeee0..c442c9af6 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -64,6 +64,8 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
}
long currentEncodedLength = 0;
long currentDataLength = 0;
+ int duplicateBlockCount = 0;
+ long duplicateBlockSize = 0;
for (ShufflePartitionedBlock block : data.getBlockList()) {
// If sendShuffleData retried, we may receive duplicate block. The
duplicate
@@ -74,11 +76,15 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
currentEncodedLength += block.getEncodedLength();
currentDataLength += block.getDataLength();
} else {
+ duplicateBlockCount++;
+ duplicateBlockSize += block.getEncodedLength();
releaseBlock(block);
}
}
this.encodedLength += currentEncodedLength;
this.dataLength += currentDataLength;
+ data.setDuplicateBlockCount(duplicateBlockCount);
+ data.setDuplicateBlockSize(duplicateBlockSize);
return currentEncodedLength;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
b/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
index e4000bc39..01ba26c8a 100644
--- a/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
+++ b/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
@@ -330,7 +330,7 @@ public class Partition<K, V> {
shuffle
.shuffleServer
.getShuffleTaskManager()
- .updateCachedBlockIds(appId, shuffle.shuffleId,
spd.getPartitionId(), spd.getBlockList());
+ .updateCachedBlockIds(appId, shuffle.shuffleId,
spd.getPartitionId(), spd);
sleepTime = initSleepTime;
return true;
} else {
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index 0b9b5f83c..d6489deb6 100644
---
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -317,8 +317,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
// after each cacheShuffleData call, the `preAllocatedSize` is
updated timely.
shuffleTaskManager.releasePreAllocatedSize(toReleasedSize);
alreadyReleasedSize += toReleasedSize;
- shuffleTaskManager.updateCachedBlockIds(
- appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
spd.getPartitionId(), spd);
}
} catch (ExceedHugePartitionHardLimitException e) {
String errorMsg =
diff --git
a/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java
index c21fb0db4..0a5508d8f 100644
---
a/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java
@@ -134,9 +134,9 @@ public class KerberizedShuffleTaskManagerTest extends
KerberizedHadoopBase {
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0);
- shuffleTaskManager.updateCachedBlockIds(appId, 0,
partitionedData0.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0);
shuffleTaskManager.cacheShuffleData(appId, 1, false, partitionedData0);
- shuffleTaskManager.updateCachedBlockIds(appId, 1,
partitionedData0.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, 1, partitionedData0);
shuffleTaskManager.refreshAppId(appId);
shuffleTaskManager.checkResourceStatus();
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index b9cdf1e24..3356cca5a 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -156,7 +156,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1,
35);
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.cacheShuffleData(appId, i, false, partitionedData0);
- shuffleTaskManager.updateCachedBlockIds(appId, i,
partitionedData0.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, i, partitionedData0);
}
assertEquals(1, shuffleTaskManager.getAppIds().size());
@@ -220,7 +220,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
// case3
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 500);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, true,
partitionedData0);
- shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1,
partitionedData0.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1,
partitionedData0);
try {
long requiredId =
shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1),
Arrays.asList(500), 500);
@@ -232,7 +232,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
// case4
partitionedData0 = createPartitionedData(1, 1, 500);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, true,
partitionedData0);
- shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1,
partitionedData0.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1,
partitionedData0);
try {
shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1),
Arrays.asList(500), 500);
fail("Should throw NoBufferForHugePartitionException");
@@ -255,7 +255,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
shuffleServer.getShuffleBufferManager().setBufferFlushThreshold(1024);
partitionedData0 = createPartitionedData(1, 1, 500);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, true,
partitionedData0);
- shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1,
partitionedData0.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1,
partitionedData0);
try {
shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1),
Arrays.asList(500), 500);
fail("Should throw NoBufferForHugePartitionException");
@@ -298,14 +298,14 @@ public class ShuffleTaskManagerTest extends
HadoopTestBase {
// case1
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
long size1 = partitionedData0.getTotalBlockEncodedLength();
- shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1,
partitionedData0.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1,
partitionedData0);
assertEquals(size1,
shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize());
// case2
partitionedData0 = createPartitionedData(1, 1, 35);
long size2 = partitionedData0.getTotalBlockEncodedLength();
- shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1,
partitionedData0.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1,
partitionedData0);
assertEquals(size1 + size2,
shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize());
assertEquals(
size1 + size2,
shuffleTaskManager.getShuffleTaskInfo(appId).getPartitionDataSize(1, 1));
@@ -412,7 +412,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
PreAllocatedBufferInfo pabi = bufferIds.get(bufferId);
assertEquals(35, pabi.getRequireSize());
StatusCode sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId,
true, partitionedData0);
- shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData0.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData0);
// the required id won't be removed in shuffleTaskManager, it is removed
in Grpc service
assertEquals(1, bufferIds.size());
assertEquals(StatusCode.SUCCESS, sc);
@@ -429,7 +429,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
expectedBlocks1.addAll(Lists.newArrayList(partitionedData1.getBlockList()));
bufferId = shuffleTaskManager.requireBuffer(70);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true,
partitionedData1);
- shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData1.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData1);
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1);
@@ -439,7 +439,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
expectedBlocks1.addAll(Lists.newArrayList(partitionedData2.getBlockList()));
// receive un-preAllocation data
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, false,
partitionedData2);
- shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData2.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData2);
assertEquals(StatusCode.SUCCESS, sc);
// won't flush for partition 2-2
@@ -447,7 +447,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
expectedBlocks2.addAll(Lists.newArrayList(partitionedData3.getBlockList()));
bufferId = shuffleTaskManager.requireBuffer(30);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true,
partitionedData3);
- shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData3.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData3);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
assertEquals(StatusCode.SUCCESS, sc);
@@ -456,7 +456,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
expectedBlocks2.addAll(Lists.newArrayList(partitionedData4.getBlockList()));
bufferId = shuffleTaskManager.requireBuffer(35);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true,
partitionedData4);
- shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData4.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData4);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
assertEquals(StatusCode.SUCCESS, sc);
@@ -466,7 +466,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
// flush for partition 1-1
ShufflePartitionedData partitionedData5 = createPartitionedData(1, 2, 35);
- shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData5.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData5);
expectedBlocks1.addAll(Lists.newArrayList(partitionedData5.getBlockList()));
bufferId = shuffleTaskManager.requireBuffer(70);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true,
partitionedData5);
@@ -483,7 +483,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
// flush for partition 0-1
ShufflePartitionedData partitionedData7 = createPartitionedData(1, 2, 35);
- shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData7.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData7);
bufferId = shuffleTaskManager.requireBuffer(70);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true,
partitionedData7);
assertEquals(StatusCode.SUCCESS, sc);
@@ -544,9 +544,9 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0);
- shuffleTaskManager.updateCachedBlockIds(appId, 0,
partitionedData0.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0);
shuffleTaskManager.cacheShuffleData(appId, 1, false, partitionedData0);
- shuffleTaskManager.updateCachedBlockIds(appId, 1,
partitionedData0.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, 1, partitionedData0);
shuffleTaskManager.refreshAppId(appId);
shuffleTaskManager.checkResourceStatus();
@@ -596,7 +596,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1,
35);
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.cacheShuffleData(appId, i, false, partitionedData0);
- shuffleTaskManager.updateCachedBlockIds(appId, i,
partitionedData0.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, i, partitionedData0);
}
assertEquals(1, shuffleTaskManager.getAppIds().size());
@@ -666,8 +666,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
Thread.sleep(1000);
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1,
35);
shuffleTaskManager.cacheShuffleData("clearTest1", shuffleId, false,
partitionedData0);
- shuffleTaskManager.updateCachedBlockIds(
- "clearTest1", shuffleId, partitionedData0.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds("clearTest1", shuffleId,
partitionedData0);
shuffleTaskManager.refreshAppId("clearTest1");
shuffleTaskManager.checkResourceStatus();
retry++;
@@ -1039,7 +1038,7 @@ public class ShuffleTaskManagerTest extends
HadoopTestBase {
Thread.sleep(1000);
ShufflePartitionedData shuffleData = createPartitionedData(1, 1, 48);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, false,
shuffleData);
- shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
shuffleData.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, shuffleData);
shuffleTaskManager.refreshAppId(appId);
shuffleTaskManager.checkResourceStatus();
@@ -1215,7 +1214,7 @@ public class ShuffleTaskManagerTest extends
HadoopTestBase {
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0);
- shuffleTaskManager.updateCachedBlockIds(appId, 0,
partitionedData0.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0);
shuffleTaskManager.refreshAppId(appId);
shuffleTaskManager.checkResourceStatus();
assertEquals(1, shuffleTaskManager.getAppIds().size());
@@ -1260,7 +1259,7 @@ public class ShuffleTaskManagerTest extends
HadoopTestBase {
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0);
- shuffleTaskManager.updateCachedBlockIds(appId, 0,
partitionedData0.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0);
shuffleTaskManager.refreshAppId(appId);
shuffleTaskManager.checkResourceStatus();
assertEquals(1, shuffleTaskManager.getAppIds().size());
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 7dbba94b2..f2991b226 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -517,7 +517,7 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 0);
ShufflePartitionedData partitionedData = createData(0, 1);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, false,
partitionedData);
- shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0,
partitionedData.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0,
partitionedData);
assertEquals(1 + 32, shuffleBufferManager.getUsedMemory());
long usedSize = shuffleTaskManager.getPartitionDataSize(appId, shuffleId,
0);
assertEquals(1 + 32, usedSize);
@@ -532,7 +532,7 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
// case2: its partition is huge partition, its buffer will be flushed to
DISK directly
partitionedData = createData(0, 36);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, false,
partitionedData);
- shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0,
partitionedData.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0,
partitionedData);
assertEquals(33 + 36 + 32, shuffleBufferManager.getUsedMemory());
assertTrue(
HugePartitionUtils.limitHugePartition(
@@ -543,7 +543,7 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 0)));
partitionedData = createData(0, 1);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, false,
partitionedData);
- shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0,
partitionedData.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0,
partitionedData);
waitForFlush(shuffleFlushManager, appId, shuffleId, 3);
}
@@ -845,7 +845,7 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 0);
ShufflePartitionedData partitionedData = createData(0, 1);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, false,
partitionedData);
- shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0,
partitionedData.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0,
partitionedData);
long usedSize = shuffleTaskManager.getPartitionDataSize(appId, shuffleId,
0);
assertEquals(1 + 32, usedSize);
assertFalse(
@@ -855,7 +855,7 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
// case2: its partition exceed the split limit
partitionedData = createData(0, 200);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, false,
partitionedData);
- shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0,
partitionedData.getBlockList());
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0,
partitionedData);
usedSize = shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 0);
assertEquals(1 + 32 + 200 + 32, usedSize);
assertTrue(