This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git

commit 154379bd3485bda9e08e6a8585abc8347b04f382
Author: lijianfu03 <[email protected]>
AuthorDate: Tue Jul 1 14:05:45 2025 +0800

    [CELEBORN-894][FOLLOWUP] update commitMeta before update subPartition…
    
    …sProcessed
    
    ### What changes were proposed in this pull request?
    
    In SkewHandlingWithoutMapRangeValidator, we should put 
"updateCommitMetadata"  ahead of "subPartitionsProcessed.put(endMapIndex, 
actualCommitMetadata)"
    
    ### Why are the changes needed?
    In test,we found some cases.When a job enable intergrity checked && enable 
celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled && job is skew, 
the last two reduce tasks excutes concurrently may cause validate failed.
    
    task1-attempt1
    ```
    org.apache.celeborn.common.exception.CelebornIOException: AQE Partition 121 
failed validation checkwhile processing range startMapIndex: 5 endMapIndex: 
1ExpectedCommitMetadata CommitMetadata{bytes=27755354976, 
crc=CelebornCRC32{current=1320432810}}, ActualCommitMetadata 
CommitMetadata{bytes=41366743972, crc=CelebornCRC32{current=477185228}},
    ```
    task1-attempt2
    ```
    org.apache.celeborn.common.exception.CelebornIOException: AQE Partition 121 
failed validation checkwhile processing range startMapIndex: 5 endMapIndex: 
1ExpectedCommitMetadata CommitMetadata{bytes=27755354976, 
crc=CelebornCRC32{current=1320432810}}, ActualCommitMetadata 
CommitMetadata{bytes=48072750200, crc=CelebornCRC32{current=755010}},
    ```
    task2-attempt1
    ```
    org.apache.celeborn.common.exception.CelebornIOException: AQE Partition 121 
failed validation checkwhile processing range startMapIndex: 5 endMapIndex: 
0ExpectedCommitMetadata CommitMetadata{bytes=27755354976, 
crc=CelebornCRC32{current=1320432810}}, ActualCommitMetadata 
CommitMetadata{bytes=34660737744, crc=CelebornCRC32{current=953615190}},
    ```
    task2-attempt2
    ```
    org.apache.celeborn.common.exception.CelebornIOException: AQE Partition 121 
failed validation checkwhile processing range startMapIndex: 5 endMapIndex: 
0ExpectedCommitMetadata CommitMetadata{bytes=27755354976, 
crc=CelebornCRC32{current=1320432810}}, ActualCommitMetadata 
CommitMetadata{bytes=54978132968, crc=CelebornCRC32{current=-366062354}},
    ```
    
    they both read skew Partition 121.And they are the last two reduce tasks of 
the stage.When task1 executes 
'CommitMetadata.checkCommitMetadata(expectedCommitMetadata, 
currentCommitMetadata)' meanwhile task2 puts its endMapIndex to  
subPartitionsProcessed but not update commitMeta yet, task1 fails to validate.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    Test by skew jobs with optimizeSkewedPartitionRead.enabled.
    
    Closes #3351 from buska88/feature/894.
    
    Authored-by: lijianfu03 <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit d2474e04025578481b5043106043af2706bcac7d)
---
 .../celeborn/client/commit/SkewHandlingWithoutMapRangeValidator.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/SkewHandlingWithoutMapRangeValidator.scala
 
b/client/src/main/scala/org/apache/celeborn/client/commit/SkewHandlingWithoutMapRangeValidator.scala
index e2d7d8463..0e4eb401e 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/SkewHandlingWithoutMapRangeValidator.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/SkewHandlingWithoutMapRangeValidator.scala
@@ -68,8 +68,11 @@ class SkewHandlingWithoutMapRangeValidator extends 
AbstractPartitionCompleteness
           return (
             false,
             s"Mismatch in metadata for the same chunk range on retry: 
$endMapIndex existing: $existingCommitMetadata new: $actualCommitMetadata")
+        } else {
+          return (true, "")
         }
       }
+      updateCommitMetadata(partitionId, actualCommitMetadata)
       subPartitionsProcessed.put(endMapIndex, actualCommitMetadata)
       val partitionProcessed = 
getTotalNumberOfSubPartitionsProcessed(partitionId)
       checkState(
@@ -78,7 +81,6 @@ class SkewHandlingWithoutMapRangeValidator extends 
AbstractPartitionCompleteness
         partitionProcessed,
         startMapIndex)
     }
-    updateCommitMetadata(partitionId, actualCommitMetadata)
     (true, "")
   }
 

Reply via email to