Lobo2008 opened a new pull request, #3646:
URL: https://github.com/apache/celeborn/pull/3646
### What changes were proposed in this pull request?
Add adaptive early spill mechanism in `CelebornSortBasedPusher` to limit
temporary Record objects during sorting. The implementation:
1. Uses primitive int arrays to store record metadata (offsets, key
lengths, value lengths) during data collection
2. Creates temporary Record objects only during sorting
3. Triggers early spill when record count exceeds 5M to keep temporary
objects within Young Gen capacity
### Why are the changes needed?
**Problem**: CelebornSortBasedPusher causes severe FGC and OOM when
processing 100MB data with 1.2GB heap memory (-Xmx1230m). The job fails at only
64% map progress.
**Reproduction**:
- Input: WordCount 100MB (~18M records)
- Config: `mapreduce.task.io.sort.mb=100` (default), `-Xmx1230m`
**Evidence**:
Time Record Count FGC Count FGC Time Old Gen
16:29:34 9.90M 12 29.4s 98.72%
16:33:21 17.37M 73 317.4s 99.95%
16:38:46 18.43M 106 542.8s 100.00%
**Root Cause**: Creating 18M Record objects (432MB) during sort exceeds
Young Gen capacity, causing promotion to Old Gen and severe FGC.
### Does this PR resolve a correctness bug?
Yes
### Does this PR introduce _any_ user-facing change?
No. The fix is fully backward compatible. No configuration changes
required. Works with default `mapreduce.task.io.sort.mb=100`.
### How was this patch tested?
**Test Workload: WordCount 100MB**
Input sample (`datas_100m.txt`):
hello spark
hello this is java
hello java
hello it is spark
hello that is php
hello this is flink spark mapreduce hadoop
hello hadoop mapreduce java spark yarn
hbase this is hdfs
that is scala hdfs java python python shell yarn hadoop
hello world
(Each line repeated multiple times to reach ~100MB total size, ~18M
records)
Output:
flink 418475
hadoop 1255424
hbase 418475
hdfs 836949
hello 3347799
is 2510849
it 418475
java 1673899
mapreduce 836950
php 418475
python 836948
scala 418474
shell 418474
spark 1673900
that 836949
this 1255425
world 418474
yarn 836949
Configuration:
mapreduce.task.io.sort.mb=100 (default)
mapreduce.map.java.opts=-Xmx1230m
mapreduce.map.sort.spill.percent=0.8 (default)
Results comparison:
| Metric | Before Fix | After Fix | Improvement |
|--------|------------|-----------|-------------|
| Job duration | >10min (timeout) | ~90s | 94% faster |
| FGC count | 106 | <10 | 90%+ reduction |
| Total GC time | 542.8s | <10s | 98% reduction |
| Max map progress | 64% (hung) | 100% | completed |
| Result | FAILED | SUCCESS | ✓ |
**Performance Comparison**:
With same configuration (100MB WordCount, `-Xmx1230m`):
| Shuffle Service | Job Duration | FGC Count | Result |
|-----------------|--------------|-----------|--------|
| Celeborn (before fix) | >10min (timeout) | 106 | FAILED |
| Celeborn (after fix) | ~90s | <10 | SUCCESS |
| Uniffle | ~90s | <5 | SUCCESS |
| Hadoop ESS | ~85s | <5 | SUCCESS |
> **Note**: Unable to create JIRA ticket due to account limitations (newly
registered GitHub account). Will create JIRA once approved, or committer can
help create one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]