This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b1805e2e [CELEBORN-1082] Fixing partition sorter task failures due to 
duplicate sorting
9b1805e2e is described below

commit 9b1805e2efcd77d7faa31a7f59f77f4949f60ce5
Author: Fu Chen <[email protected]>
AuthorDate: Tue Oct 24 15:55:41 2023 +0800

    [CELEBORN-1082] Fixing partition sorter task failures due to duplicate 
sorting
    
    ### What changes were proposed in this pull request?
    
    As title
    
    ### Why are the changes needed?
    
    Recently, while testing on the main branch, we discovered that the 
partition sorter task might fail with a `NoSuchFileException`, leading to the 
entire job's failure. Upon further investigation, we identified that the root 
cause of this issue is the potential addition of the same sorting task to the 
sorter queue multiple times.
    
    ```
    23/10/22 01:02:15,334 DEBUG [worker-file-sorter-execute-9530] 
PartitionFilesSorter: sort complete for application_1653035898918_4284043-9975 
/data1/celeborn/worker/celeborn-worker/shuffle_data/application_1653035898918_4284043/9975/0-0-0
    ...
    23/10/22 01:02:15,335 ERROR [worker-file-sorter-execute-9532] 
PartitionFilesSorter: Sorting shuffle file for 
application_1653035898918_4284043-9975-0-0-0 /data1/celeborn/worker/cele
    born-worker/shuffle_data/application_1653035898918_4284043/9975/0-0-0 
failed, detail:
    java.nio.file.NoSuchFileException: 
/data1/celeborn/worker/celeborn-worker/shuffle_data/application_1653035898918_4284043/9975/0-0-0
            at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
            at 
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
            at 
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
            at 
sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:177)
            at java.nio.channels.FileChannel.open(FileChannel.java:287)
            at java.nio.channels.FileChannel.open(FileChannel.java:335)
            at 
org.apache.celeborn.common.util.FileChannelUtils.openReadableFileChannel(FileChannelUtils.java:33)
            at 
org.apache.celeborn.service.deploy.worker.storage.PartitionFilesSorter$FileSorter.initializeFiles(PartitionFilesSorter.java:641)
            at 
org.apache.celeborn.service.deploy.worker.storage.PartitionFilesSorter$FileSorter.sort(PartitionFilesSorter.java:559)
            at 
org.apache.celeborn.service.deploy.worker.storage.PartitionFilesSorter.lambda$null$0(PartitionFilesSorter.java:146)
            at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    ...
    ```
    
    Before this PR, there was a scenario where sorter tasks for the same 
`fileId` could arrive after being removed from the `sorting` state, and they 
could be mistakenly added to the sorter queue. To address this, we moved the 
code block that checks the `fileId`'s status in `sorted` inside the 
`synchronized (sorting)` block. This change ensures that tasks are not added to 
the sorter queue multiple times because if a `fileId`'s sorter task has already 
completed and its status has been remo [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, only bug fix
    
    ### How was this patch tested?
    
    Pass GA
    
    Closes #2031 from cfmcgrady/fix-no-such-file-exception.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../worker/storage/PartitionFilesSorter.java       | 26 +++++++++++++---------
 .../service/deploy/worker/FetchHandler.scala       |  8 +++----
 2 files changed, 20 insertions(+), 14 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 1741f4bcc..b229b89e3 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -186,21 +186,23 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
 
     String sortedFilePath = Utils.getSortedFilePath(fileInfo.getFilePath());
     String indexFilePath = Utils.getIndexFilePath(fileInfo.getFilePath());
-    if (sorted.contains(fileId)) {
-      return resolve(
-          shuffleKey,
-          fileId,
-          userIdentifier,
-          sortedFilePath,
-          indexFilePath,
-          startMapIndex,
-          endMapIndex);
-    }
     synchronized (sorting) {
+      if (sorted.contains(fileId)) {
+        return resolve(
+            shuffleKey,
+            fileId,
+            userIdentifier,
+            sortedFilePath,
+            indexFilePath,
+            startMapIndex,
+            endMapIndex);
+      }
       if (!sorting.contains(fileId)) {
         try {
           FileSorter fileSorter = new FileSorter(fileInfo, fileId, shuffleKey);
           sorting.add(fileId);
+          logger.debug(
+              "Adding sorter to sort queue shuffle key {}, file name {}", 
shuffleKey, fileName);
           shuffleSortTaskDeque.put(fileSorter);
         } catch (InterruptedException e) {
           logger.error("Sorter scheduler thread is interrupted means worker is 
shutting down.", e);
@@ -747,6 +749,10 @@ class PartitionFilesCleaner {
                       PartitionFilesSorter.FileSorter sorter = it.next();
                       try {
                         if (sorter.getOriginFileInfo().isStreamsEmpty()) {
+                          logger.debug(
+                              "Deleting the original files for shuffle key {}: 
{}",
+                              sorter.getShuffleKey(),
+                              sorter.getOriginFileInfo().getFilePath());
                           sorter.deleteOriginFiles();
                           queue.remove(sorter);
                         }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 4b299728b..cbb575e68 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -195,8 +195,11 @@ class FetchHandler(val conf: CelebornConf, val 
transportConf: TransportConf)
       var fileInfo = getRawFileInfo(shuffleKey, fileName)
       fileInfo.getPartitionType match {
         case PartitionType.REDUCE =>
-          val streamId = chunkStreamManager.nextStreamId()
+          logDebug(s"Received open stream request $shuffleKey $fileName 
$startIndex " +
+            s"$endIndex get file name $fileName from client channel " +
+            s"${NettyUtils.getRemoteAddress(client.getChannel)}")
 
+          val streamId = chunkStreamManager.nextStreamId()
           // we must get sorted fileInfo for the following cases.
           // 1. when the current request is a non-range openStream, but the 
original unsorted file
           //    has been deleted by another range's openStream request.
@@ -210,9 +213,6 @@ class FetchHandler(val conf: CelebornConf, val 
transportConf: TransportConf)
               startIndex,
               endIndex)
           }
-          logDebug(s"Received chunk fetch request $shuffleKey $fileName 
$startIndex " +
-            s"$endIndex get file info $fileInfo from client channel " +
-            s"${NettyUtils.getRemoteAddress(client.getChannel)}")
           if (readLocalShuffle) {
             replyStreamHandler(
               client,

Reply via email to