This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 61a24d165 [CELEBORN-2153] Fix NPE problem that occurs during
concurrent merge
61a24d165 is described below
commit 61a24d165bd02b7b256f3fd3b56da38d7e9a5392
Author: TheodoreLx <[email protected]>
AuthorDate: Sat Sep 20 15:15:11 2025 +0800
[CELEBORN-2153] Fix NPE problem that occurs during concurrent merge
### What changes were proposed in this pull request?
Avoid NPE problems by adding non-null checks
### Why are the changes needed?
When multiple threads call pushOrMergeData, the first thread may remove the
merged DataBatches from pushState and send it out. The subsequent thread will
get null through the takeDataBatches method, which may cause NPE problems.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Manual testing
Closes #3480 from TheodoreLx/concurrent-merge-data.
Authored-by: TheodoreLx <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit 922e65a8529cdb5c4aa66edc8096cda8780bb332)
Signed-off-by: SteNicholas <[email protected]>
---
.../org/apache/celeborn/client/ShuffleClientImpl.java | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 2a272e83a..d58417187 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -1298,14 +1298,16 @@ public class ShuffleClientImpl extends ShuffleClient {
if (shouldPush) {
limitMaxInFlight(mapKey, pushState, loc.hostAndPushPort());
DataBatches dataBatches = pushState.takeDataBatches(addressPair);
- doPushMergedData(
- addressPair,
- shuffleId,
- mapId,
- attemptId,
- dataBatches.requireBatches(),
- pushState,
- maxReviveTimes);
+ if (dataBatches != null) {
+ doPushMergedData(
+ addressPair,
+ shuffleId,
+ mapId,
+ attemptId,
+ dataBatches.requireBatches(),
+ pushState,
+ maxReviveTimes);
+ }
}
}