This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 887d04284 [#2496] improvement(server): Improve the performance of
flushing single buffer (#2523)
887d04284 is described below
commit 887d04284eeee6a9d8886d9fe78edc7e309e6901
Author: xianjingfeng <[email protected]>
AuthorDate: Fri Jun 27 10:12:41 2025 +0800
[#2496] improvement(server): Improve the performance of flushing single
buffer (#2523)
### What changes were proposed in this pull request?
Improve the performance of flushing single buffer
### Why are the changes needed?
Fix: #2496
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
CI
---
.../apache/uniffle/server/HugePartitionUtils.java | 8 ++--
.../org/apache/uniffle/server/ShuffleTaskInfo.java | 8 ++--
.../server/buffer/ShuffleBufferManager.java | 50 ++++++++++++----------
3 files changed, 38 insertions(+), 28 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
b/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
index 59a010cff..dd5477561 100644
--- a/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
+++ b/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
@@ -52,9 +52,11 @@ public class HugePartitionUtils {
*/
public static boolean isHugePartition(
ShuffleTaskManager shuffleTaskManager, String appId, int shuffleId, int
partitionId) {
- return shuffleTaskManager != null
- && shuffleTaskManager.getShuffleTaskInfo(appId) != null
- &&
shuffleTaskManager.getShuffleTaskInfo(appId).isHugePartition(shuffleId,
partitionId);
+ if (shuffleTaskManager == null) {
+ return false;
+ }
+ ShuffleTaskInfo taskInfo = shuffleTaskManager.getShuffleTaskInfo(appId);
+ return taskInfo != null && taskInfo.isHugePartition(shuffleId,
partitionId);
}
/**
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index 687622bb0..e748a892f 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -243,9 +243,11 @@ public class ShuffleTaskInfo {
}
public boolean isHugePartition(int shuffleId, int partitionId) {
- return existHugePartition.get()
- && hugePartitionTags.containsKey(shuffleId)
- && hugePartitionTags.get(shuffleId).contains(partitionId);
+ if (!existHugePartition.get()) {
+ return false;
+ }
+ Set<Integer> partitions = hugePartitionTags.get(shuffleId);
+ return partitions != null && partitions.contains(partitionId);
}
public Set<Integer> getShuffleIds() {
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 0b6b246b5..54dde2d9d 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -303,15 +303,13 @@ public class ShuffleBufferManager {
shuffleId,
spd.getPartitionId());
updateShuffleSize(appId, shuffleId, size);
- synchronized (this) {
- flushSingleBufferIfNecessary(
- buffer,
- appId,
- shuffleId,
- spd.getPartitionId(),
- entry.getKey().lowerEndpoint(),
- entry.getKey().upperEndpoint());
- }
+ flushSingleBufferIfNecessary(
+ buffer,
+ appId,
+ shuffleId,
+ spd.getPartitionId(),
+ entry.getKey().lowerEndpoint(),
+ entry.getKey().upperEndpoint());
if (bufferFlushWhenCachingData && needToFlush()) {
flushIfNecessary();
}
@@ -374,24 +372,32 @@ public class ShuffleBufferManager {
int partitionId,
int startPartition,
int endPartition) {
+ if (!(buffer.getEncodedLength() > this.bufferFlushThreshold
+ || buffer.getBlockCount() > bufferFlushBlocksNumThreshold)) {
+ return;
+ }
boolean isHugePartition =
HugePartitionUtils.isHugePartition(shuffleTaskManager, appId,
shuffleId, partitionId);
+ if (!(isHugePartition || this.bufferFlushEnabled)) {
+ return;
+ }
// When we use multi storage and trigger single buffer flush, the buffer
size should be bigger
// than rss.server.flush.cold.storage.threshold.size, otherwise cold
storage will be useless.
- if ((isHugePartition || this.bufferFlushEnabled)
- && (buffer.getEncodedLength() > this.bufferFlushThreshold
- || buffer.getBlockCount() > bufferFlushBlocksNumThreshold)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Start to flush single buffer. Details - shuffleId:{},
startPartition:{}, endPartition:{}, isHugePartition:{}, bufferSize:{},
blocksNum:{}",
- shuffleId,
- startPartition,
- endPartition,
- isHugePartition,
- buffer.getEncodedLength(),
- buffer.getBlockCount());
+ synchronized (buffer) {
+ if (buffer.getEncodedLength() > this.bufferFlushThreshold
+ || buffer.getBlockCount() > bufferFlushBlocksNumThreshold) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Start to flush single buffer. Details - shuffleId:{},
startPartition:{}, endPartition:{}, isHugePartition:{}, bufferSize:{},
blocksNum:{}",
+ shuffleId,
+ startPartition,
+ endPartition,
+ isHugePartition,
+ buffer.getEncodedLength(),
+ buffer.getBlockCount());
+ }
+ flushBuffer(buffer, appId, shuffleId, startPartition, endPartition,
isHugePartition);
}
- flushBuffer(buffer, appId, shuffleId, startPartition, endPartition,
isHugePartition);
}
}