leixm opened a new pull request, #3652:
URL: https://github.com/apache/celeborn/pull/3652

   ### What changes were proposed in this pull request?
   This PR makes partition file sorting asynchronous in the worker's 
FetchHandler, so that open stream requests no longer block RPC threads while 
waiting for sort completion.
   
   Key changes:
   
   PartitionFilesSorter: Replaced the blocking Thread.sleep polling loop in 
getSortedFileInfo with a callback-based async model using FileResolvedCallback 
and FileSortedCallback. When a sort is initiated, the caller's callback is 
invoked upon completion instead of blocking. Also introduced 
pendingSortCallbacks to correctly handle concurrent readers — when multiple 
open stream requests arrive for the same file being sorted, all their callbacks 
are notified upon sort completion.
   
   FetchHandler: Refactored the open stream handling into three clear 
responsibilities:
   
   openReduceStreamAsync — decides whether sorting is needed and dispatches to 
PartitionFilesSorter asynchronously or invokes the callback directly.
   registerAndHandleStream — pure stream registration logic (no sorting), 
called from the async callback with the resolved FileInfo.
   Both single PbOpenStream and batch PbOpenStreamList handlers now use the 
async model. The batch handler uses AtomicInteger completion counting and an 
ordered Array[PbStreamHandlerOpt] to aggregate results and send a single 
response when all files are resolved.
   FileResolvedCallback (new interface): Defines onSuccess(FileInfo) / 
onFailure(Throwable) for async file resolution notification.
   
   
   ### Why are the changes needed?
   Currently, getSortedFileInfo blocks the calling RPC thread with a 
Thread.sleep(50) polling loop (up to 220s timeout) while waiting for partition 
file sorting to complete. This ties up Netty RPC threads, which are a shared 
and limited resource. Under high concurrency — especially when many reduce 
tasks open streams simultaneously and trigger sorting — this can exhaust the 
RPC thread pool and cause timeouts or stalls for unrelated requests.
   
   By making the sort wait asynchronous via callbacks, RPC threads are released 
immediately after dispatching the sort request, significantly improving worker 
responsiveness and throughput under concurrent open stream workloads.
   
   
   ### Does this PR resolve a correctness bug?
   
   No.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. This is an internal implementation change. The RPC protocol and behavior 
remain unchanged.
   
   ### How was this patch tested?
   Existing UTs.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to