This is an automated email from the ASF dual-hosted git repository. ethanfeng pushed a commit to branch branch-0.6 in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit 154379bd3485bda9e08e6a8585abc8347b04f382 Author: lijianfu03 <[email protected]> AuthorDate: Tue Jul 1 14:05:45 2025 +0800 [CELEBORN-894][FOLLOWUP] update commitMeta before update subPartition… …sProcessed ### What changes were proposed in this pull request? In SkewHandlingWithoutMapRangeValidator, we should put "updateCommitMetadata" ahead of "subPartitionsProcessed.put(endMapIndex, actualCommitMetadata)" ### Why are the changes needed? In test,we found some cases.When a job enable intergrity checked && enable celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled && job is skew, the last two reduce tasks excutes concurrently may cause validate failed. task1-attempt1 ``` org.apache.celeborn.common.exception.CelebornIOException: AQE Partition 121 failed validation checkwhile processing range startMapIndex: 5 endMapIndex: 1ExpectedCommitMetadata CommitMetadata{bytes=27755354976, crc=CelebornCRC32{current=1320432810}}, ActualCommitMetadata CommitMetadata{bytes=41366743972, crc=CelebornCRC32{current=477185228}}, ``` task1-attempt2 ``` org.apache.celeborn.common.exception.CelebornIOException: AQE Partition 121 failed validation checkwhile processing range startMapIndex: 5 endMapIndex: 1ExpectedCommitMetadata CommitMetadata{bytes=27755354976, crc=CelebornCRC32{current=1320432810}}, ActualCommitMetadata CommitMetadata{bytes=48072750200, crc=CelebornCRC32{current=755010}}, ``` task2-attempt1 ``` org.apache.celeborn.common.exception.CelebornIOException: AQE Partition 121 failed validation checkwhile processing range startMapIndex: 5 endMapIndex: 0ExpectedCommitMetadata CommitMetadata{bytes=27755354976, crc=CelebornCRC32{current=1320432810}}, ActualCommitMetadata CommitMetadata{bytes=34660737744, crc=CelebornCRC32{current=953615190}}, ``` task2-attempt2 ``` org.apache.celeborn.common.exception.CelebornIOException: AQE Partition 121 failed validation checkwhile processing range startMapIndex: 5 endMapIndex: 0ExpectedCommitMetadata CommitMetadata{bytes=27755354976, crc=CelebornCRC32{current=1320432810}}, ActualCommitMetadata CommitMetadata{bytes=54978132968, crc=CelebornCRC32{current=-366062354}}, ``` they both read skew Partition 121.And they are the last two reduce tasks of the stage.When task1 executes 'CommitMetadata.checkCommitMetadata(expectedCommitMetadata, currentCommitMetadata)' meanwhile task2 puts its endMapIndex to subPartitionsProcessed but not update commitMeta yet, task1 fails to validate. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Test by skew jobs with optimizeSkewedPartitionRead.enabled. Closes #3351 from buska88/feature/894. Authored-by: lijianfu03 <[email protected]> Signed-off-by: SteNicholas <[email protected]> (cherry picked from commit d2474e04025578481b5043106043af2706bcac7d) --- .../celeborn/client/commit/SkewHandlingWithoutMapRangeValidator.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/commit/SkewHandlingWithoutMapRangeValidator.scala b/client/src/main/scala/org/apache/celeborn/client/commit/SkewHandlingWithoutMapRangeValidator.scala index e2d7d8463..0e4eb401e 100644 --- a/client/src/main/scala/org/apache/celeborn/client/commit/SkewHandlingWithoutMapRangeValidator.scala +++ b/client/src/main/scala/org/apache/celeborn/client/commit/SkewHandlingWithoutMapRangeValidator.scala @@ -68,8 +68,11 @@ class SkewHandlingWithoutMapRangeValidator extends AbstractPartitionCompleteness return ( false, s"Mismatch in metadata for the same chunk range on retry: $endMapIndex existing: $existingCommitMetadata new: $actualCommitMetadata") + } else { + return (true, "") } } + updateCommitMetadata(partitionId, actualCommitMetadata) subPartitionsProcessed.put(endMapIndex, actualCommitMetadata) val partitionProcessed = getTotalNumberOfSubPartitionsProcessed(partitionId) checkState( @@ -78,7 +81,6 @@ class SkewHandlingWithoutMapRangeValidator extends AbstractPartitionCompleteness partitionProcessed, startMapIndex) } - updateCommitMetadata(partitionId, actualCommitMetadata) (true, "") }
