This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new ad7babb7d [#2496] improvement(server): Use tryLock instead of
synchronized on buffer flushing (#2524)
ad7babb7d is described below
commit ad7babb7d7c177df196cfb8cb396bc2089d4d501
Author: xianjingfeng <[email protected]>
AuthorDate: Mon Jun 30 14:35:14 2025 +0800
[#2496] improvement(server): Use tryLock instead of synchronized on buffer
flushing (#2524)
### What changes were proposed in this pull request?
Use tryLock instead of synchronized when flush buffer
### Why are the changes needed?
To followup the #2523 for better performance, this optimization is
mentioned in #2523 (comment)
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
CI
---
.../server/buffer/ShuffleBufferManager.java | 35 +++++++++++++++-------
1 file changed, 25 insertions(+), 10 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 54dde2d9d..32112febe 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -26,6 +26,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.annotations.VisibleForTesting;
@@ -99,6 +100,7 @@ public class ShuffleBufferManager {
// appId -> shuffleId -> shuffle size in buffer
protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap =
JavaUtils.newConcurrentMap();
private final boolean appBlockSizeMetricEnabled;
+ private final ReentrantLock globalFlushLock = new ReentrantLock();
public ShuffleBufferManager(
ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager, boolean
nettyServerEnabled) {
@@ -406,16 +408,29 @@ public class ShuffleBufferManager {
return usedMemory.get() - preAllocatedSize.get() - inFlushSize.get() >
highWaterMark;
}
- public synchronized void flushIfNecessary() {
- if (needToFlush()) {
- // todo: add a metric here to track how many times flush occurs.
- LOG.info(
- "Start to flush with usedMemory[{}], preAllocatedSize[{}],
inFlushSize[{}]",
- usedMemory.get(),
- preAllocatedSize.get(),
- inFlushSize.get());
- Map<String, Set<Integer>> pickedShuffle = pickFlushedShuffle();
- flush(pickedShuffle);
+ public void flushIfNecessary() {
+ boolean lockAcquired = false;
+ try {
+ lockAcquired = globalFlushLock.tryLock(flushTryLockTimeout,
TimeUnit.MILLISECONDS);
+ if (!lockAcquired) {
+ return;
+ }
+ if (needToFlush()) {
+ // todo: add a metric here to track how many times flush occurs.
+ LOG.info(
+ "Start to flush with usedMemory[{}], preAllocatedSize[{}],
inFlushSize[{}]",
+ usedMemory.get(),
+ preAllocatedSize.get(),
+ inFlushSize.get());
+ Map<String, Set<Integer>> pickedShuffle = pickFlushedShuffle();
+ flush(pickedShuffle);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Ignore the InterruptedException which should be caused by
internal killed");
+ } finally {
+ if (lockAcquired) {
+ globalFlushLock.unlock();
+ }
}
}