This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new d2474e040 [CELEBORN-894][FOLLOWUP] update commitMeta before update
subPartition…
d2474e040 is described below
commit d2474e04025578481b5043106043af2706bcac7d
Author: lijianfu03 <[email protected]>
AuthorDate: Tue Jul 1 14:05:45 2025 +0800
[CELEBORN-894][FOLLOWUP] update commitMeta before update subPartition…
…sProcessed
### What changes were proposed in this pull request?
In SkewHandlingWithoutMapRangeValidator, we should put
"updateCommitMetadata" ahead of "subPartitionsProcessed.put(endMapIndex,
actualCommitMetadata)"
### Why are the changes needed?
In test,we found some cases.When a job enable intergrity checked && enable
celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled && job is skew,
the last two reduce tasks excutes concurrently may cause validate failed.
task1-attempt1
```
org.apache.celeborn.common.exception.CelebornIOException: AQE Partition 121
failed validation checkwhile processing range startMapIndex: 5 endMapIndex:
1ExpectedCommitMetadata CommitMetadata{bytes=27755354976,
crc=CelebornCRC32{current=1320432810}}, ActualCommitMetadata
CommitMetadata{bytes=41366743972, crc=CelebornCRC32{current=477185228}},
```
task1-attempt2
```
org.apache.celeborn.common.exception.CelebornIOException: AQE Partition 121
failed validation checkwhile processing range startMapIndex: 5 endMapIndex:
1ExpectedCommitMetadata CommitMetadata{bytes=27755354976,
crc=CelebornCRC32{current=1320432810}}, ActualCommitMetadata
CommitMetadata{bytes=48072750200, crc=CelebornCRC32{current=755010}},
```
task2-attempt1
```
org.apache.celeborn.common.exception.CelebornIOException: AQE Partition 121
failed validation checkwhile processing range startMapIndex: 5 endMapIndex:
0ExpectedCommitMetadata CommitMetadata{bytes=27755354976,
crc=CelebornCRC32{current=1320432810}}, ActualCommitMetadata
CommitMetadata{bytes=34660737744, crc=CelebornCRC32{current=953615190}},
```
task2-attempt2
```
org.apache.celeborn.common.exception.CelebornIOException: AQE Partition 121
failed validation checkwhile processing range startMapIndex: 5 endMapIndex:
0ExpectedCommitMetadata CommitMetadata{bytes=27755354976,
crc=CelebornCRC32{current=1320432810}}, ActualCommitMetadata
CommitMetadata{bytes=54978132968, crc=CelebornCRC32{current=-366062354}},
```
they both read skew Partition 121.And they are the last two reduce tasks of
the stage.When task1 executes
'CommitMetadata.checkCommitMetadata(expectedCommitMetadata,
currentCommitMetadata)' meanwhile task2 puts its endMapIndex to
subPartitionsProcessed but not update commitMeta yet, task1 fails to validate.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Test by skew jobs with optimizeSkewedPartitionRead.enabled.
Closes #3351 from buska88/feature/894.
Authored-by: lijianfu03 <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../celeborn/client/commit/SkewHandlingWithoutMapRangeValidator.scala | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/SkewHandlingWithoutMapRangeValidator.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/SkewHandlingWithoutMapRangeValidator.scala
index e2d7d8463..0e4eb401e 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/SkewHandlingWithoutMapRangeValidator.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/SkewHandlingWithoutMapRangeValidator.scala
@@ -68,8 +68,11 @@ class SkewHandlingWithoutMapRangeValidator extends
AbstractPartitionCompleteness
return (
false,
s"Mismatch in metadata for the same chunk range on retry:
$endMapIndex existing: $existingCommitMetadata new: $actualCommitMetadata")
+ } else {
+ return (true, "")
}
}
+ updateCommitMetadata(partitionId, actualCommitMetadata)
subPartitionsProcessed.put(endMapIndex, actualCommitMetadata)
val partitionProcessed =
getTotalNumberOfSubPartitionsProcessed(partitionId)
checkState(
@@ -78,7 +81,6 @@ class SkewHandlingWithoutMapRangeValidator extends
AbstractPartitionCompleteness
partitionProcessed,
startMapIndex)
}
- updateCommitMetadata(partitionId, actualCommitMetadata)
(true, "")
}