This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 41d0fc53f [#2558] improvement(server): Limit the max flush event count
for a single buffer (#2562)
41d0fc53f is described below
commit 41d0fc53fd8ea7db164b3b510d8a351816cd1dd9
Author: xianjingfeng <[email protected]>
AuthorDate: Mon Aug 4 11:20:56 2025 +0800
[#2558] improvement(server): Limit the max flush event count for a single
buffer (#2562)
### What changes were proposed in this pull request?
Limit the max flush event count for a single buffer.
### Why are the changes needed?
Fix: #2558
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Verify in production environment.
---
.../ShuffleServerConcurrentWriteOfHadoopTest.java | 1 +
.../apache/uniffle/server/ShuffleServerConf.java | 6 ++++
.../uniffle/server/buffer/ShuffleBuffer.java | 2 ++
.../server/buffer/ShuffleBufferManager.java | 39 ++++++++++++++++------
.../server/buffer/ShuffleBufferWithLinkedList.java | 5 +++
.../server/buffer/ShuffleBufferWithSkipList.java | 5 +++
6 files changed, 47 insertions(+), 11 deletions(-)
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
index 7959688a3..c9c743806 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
@@ -87,6 +87,7 @@ public class ShuffleServerConcurrentWriteOfHadoopTest extends
ShuffleServerWithH
ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION,
MAX_CONCURRENCY);
shuffleServerConf.setBoolean(shuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED,
true);
shuffleServerConf.setLong(shuffleServerConf.SINGLE_BUFFER_FLUSH_SIZE_THRESHOLD,
1024 * 1024L);
+
shuffleServerConf.setInteger(shuffleServerConf.MAX_FLUSH_EVENT_COUNT_PER_BUFFER,
-1);
return shuffleServerConf;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index ea7bed551..114160c7a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -445,6 +445,12 @@ public class ShuffleServerConf extends RssBaseConf {
+ "This can cause severe garbage collection issues on the
server side, and may even lead to out-of-heap-memory errors. "
+ "If the threshold is set too high, it becomes meaningless.
It won't be enabled by default.");
+ public static final ConfigOption<Integer> MAX_FLUSH_EVENT_COUNT_PER_BUFFER =
+ ConfigOptions.key("rss.server.buffer.maxFlushEventCountPerBuffer")
+ .intType()
+ .defaultValue(5)
+ .withDescription("The max flush event count for a single buffer.");
+
public static final ConfigOption<Long>
SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL =
ConfigOptions.key("rss.server.leak.shuffledata.check.interval")
.longType()
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index c181aba42..141d926ca 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -65,6 +65,8 @@ public interface ShuffleBuffer {
long getInFlushBlockCount();
+ int getInFlushEventCount();
+
long release();
void clearInFlushBuffer(long eventId);
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 32112febe..5cfbcb12f 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -72,6 +72,7 @@ public class ShuffleBufferManager {
private final ShuffleBufferType shuffleBufferType;
private final Boolean isLABEnabled;
private final int flushTryLockTimeout;
+ private final int maxFlushEventCountPerBuffer;
private ShuffleTaskManager shuffleTaskManager;
private final ShuffleFlushManager shuffleFlushManager;
private long capacity;
@@ -228,6 +229,7 @@ public class ShuffleBufferManager {
int maxAlloc = (int) (chunkSize * maxAllocRatio);
ChunkCreator.initialize(chunkSize, (long) (capacity *
chunkPoolCapacityRatio), maxAlloc);
}
+ this.maxFlushEventCountPerBuffer =
conf.get(ShuffleServerConf.MAX_FLUSH_EVENT_COUNT_PER_BUFFER);
}
public void setShuffleTaskManager(ShuffleTaskManager taskManager) {
@@ -450,7 +452,7 @@ public class ShuffleBufferManager {
}
}
- protected void flushBuffer(
+ protected boolean flushBuffer(
ShuffleBuffer buffer,
String appId,
int shuffleId,
@@ -465,7 +467,16 @@ public class ShuffleBufferManager {
"Shuffle[{}] for app[{}] has already been removed, no need to
flush the buffer",
shuffleId,
appId);
- return;
+ return false;
+ }
+ int flushEventCount = buffer.getInFlushEventCount();
+ if (maxFlushEventCountPerBuffer > 0 && flushEventCount >=
maxFlushEventCountPerBuffer) {
+ LOG.warn(
+ "Shuffle[{}] for app[{}] already has [{}] flush events, ignore
it.",
+ shuffleId,
+ appId,
+ flushEventCount);
+ return false;
}
ShuffleDataFlushEvent event =
buffer.toFlushEvent(
@@ -484,10 +495,12 @@ public class ShuffleBufferManager {
}
ShuffleServerMetrics.gaugeInFlushBufferSize.set(inFlushSize.get());
shuffleFlushManager.addToFlushQueue(event);
+ return true;
}
} finally {
readLock.unlock();
}
+ return false;
}
public void removeBuffer(String appId) {
@@ -664,15 +677,19 @@ public class ShuffleBufferManager {
shuffleIdToBuffers.getValue().asMapOfRanges().entrySet()) {
Range<Integer> range = rangeEntry.getKey();
ShuffleBuffer shuffleBuffer = rangeEntry.getValue();
- pickedFlushSize += shuffleBuffer.getEncodedLength();
- flushBuffer(
- shuffleBuffer,
- appId,
- shuffleId,
- range.lowerEndpoint(),
- range.upperEndpoint(),
- HugePartitionUtils.isHugePartition(
- shuffleTaskManager, appId, shuffleId,
range.lowerEndpoint()));
+ long bufferEncodedLength = shuffleBuffer.getEncodedLength();
+ boolean success =
+ flushBuffer(
+ shuffleBuffer,
+ appId,
+ shuffleId,
+ range.lowerEndpoint(),
+ range.upperEndpoint(),
+ HugePartitionUtils.isHugePartition(
+ shuffleTaskManager, appId, shuffleId,
range.lowerEndpoint()));
+ if (success) {
+ pickedFlushSize += bufferEncodedLength;
+ }
if (pickedFlushSize > expectedFlushSize) {
LOG.info("Already picked enough buffers to flush {} bytes",
pickedFlushSize);
return;
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index 2216e2907..6485eef55 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -151,6 +151,11 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
return inFlushBlockMap.values().stream().mapToLong(Set::size).sum();
}
+ @Override
+ public int getInFlushEventCount() {
+ return inFlushBlockMap.size();
+ }
+
@Override
public synchronized long release() {
Throwable lastException = null;
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index 33ba313ce..9c65eeee0 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -149,6 +149,11 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
return inFlushBlockMap.values().stream().mapToLong(Map::size).sum();
}
+ @Override
+ public int getInFlushEventCount() {
+ return inFlushBlockMap.size();
+ }
+
@Override
public synchronized long release() {
Throwable lastException = null;