This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 922e65a85 [CELEBORN-2153] Fix NPE problem that occurs during
concurrent merge
922e65a85 is described below
commit 922e65a8529cdb5c4aa66edc8096cda8780bb332
Author: TheodoreLx <[email protected]>
AuthorDate: Sat Sep 20 15:15:11 2025 +0800
[CELEBORN-2153] Fix NPE problem that occurs during concurrent merge
### What changes were proposed in this pull request?
Avoid NPE problems by adding non-null checks
### Why are the changes needed?
When multiple threads call pushOrMergeData, the first thread may remove the
merged DataBatches from pushState and send it out. The subsequent thread will
get null through the takeDataBatches method, which may cause NPE problems.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Manual testing
Closes #3480 from TheodoreLx/concurrent-merge-data.
Authored-by: TheodoreLx <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../org/apache/celeborn/client/ShuffleClientImpl.java | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index c90555baf..4dd060074 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -1333,14 +1333,16 @@ public class ShuffleClientImpl extends ShuffleClient {
if (shouldPush) {
limitMaxInFlight(mapKey, pushState, loc.hostAndPushPort());
DataBatches dataBatches = pushState.takeDataBatches(addressPair);
- doPushMergedData(
- addressPair,
- shuffleId,
- mapId,
- attemptId,
- dataBatches.requireBatches(),
- pushState,
- maxReviveTimes);
+ if (dataBatches != null) {
+ doPushMergedData(
+ addressPair,
+ shuffleId,
+ mapId,
+ attemptId,
+ dataBatches.requireBatches(),
+ pushState,
+ maxReviveTimes);
+ }
}
}