This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 04964f30e [#2606] feat(mr): Add safety switch for map-stage combiner
(#2607)
04964f30e is described below
commit 04964f30e367e5d698f2965c287474ded0f0b71b
Author: l.zonghai <[email protected]>
AuthorDate: Tue Sep 16 10:08:39 2025 +0800
[#2606] feat(mr): Add safety switch for map-stage combiner (#2607)
### What changes were proposed in this pull request?
Introduce a configuration `mapreduce.rss.client.combiner.enable` to control
whether the map-stage combiner runs in Uniffle MapReduce client.
Default value is `false` to prevent job instability caused by large
send-buffer GC storm.
### Why are the changes needed?
Using map-stage combiner on large send buffer (`mapreduce.task.io.sort.mb *
mapreduce.rss.client.sort.memory.use.threshold`) can trigger severe GC overhead,
which may stall MapTask and sender threads, leading to job hang. Most users
do not require this by default.
### Does this PR introduce _any_ user-facing change?
Yes, this adds a new optional configuration for expert users. Default
behavior remains stable.
### How was this patch tested?
Manually tested with MapReduce jobs with combiners. Verified that jobs run
successfully with combiner disabled.
1. **Combiner disabled**: MapTasks completed normally with fast GC cycles.
Sample logs:
```
[2025-09-11 19:48:47] S0: 0MB, S1: 0MB, Eden: 299.02MB, Old: 11.53MB, ...
Total: 0.101s
...
[2025-09-11 19:49:30] S0: 82.88MB, S1: 0MB, Eden: 160.04MB, Old: 485.99MB,
...Total: 6.683s
...
[2025-09-11 19:49:57] S0: 0MB, S1: 0MB, Eden: 207.66MB, Old: 532.27MB,
...Total: 12.474s
```
> The MapTask completed successfully within 1 minute.
2. **Combiner enabled**: MapTask GC cycles grew very long; job stalled and
was eventually killed. Sample logs:
```
[2025-09-11 19:52:00] S0: 0MB, S1: 0MB, Eden: 80.49MB, Old: 12.86MB, ...
Total: 0.054s
[2025-09-11 19:52:08] S0: 0MB, S1: 0MB, Eden: 515.53MB, Old: 27.24MB, ...
Total: 0.149s
...
[2025-09-11 20:01:54] S0: 0MB, S1: 0MB, Eden: 60.36MB, Old: 687.51MB, ...
Total: 242.505s
```
> The MapTask did not complete after 9 minutes.
These logs demonstrate that disabling the map-stage combiner avoids severe
GC overhead and job stalls, validating the safety switch.
---------
Co-authored-by: Lobo2008 <[email protected]>
---
.../hadoop/mapred/RssMapOutputCollector.java | 31 +++++++++++++++++-----
.../org/apache/hadoop/mapreduce/RssMRConfig.java | 4 +++
.../uniffle/client/util/RssClientConfig.java | 2 ++
3 files changed, 31 insertions(+), 6 deletions(-)
diff --git
a/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
b/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
index 2db387e3f..0823dbd66 100644
---
a/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
+++
b/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
@@ -78,12 +78,31 @@ public class RssMapOutputCollector<K extends Object, V
extends Object>
throw new IOException("Invalid sort memory use threshold : " +
sortThreshold);
}
- // combiner
- final Counters.Counter combineInputCounter =
- reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
- combinerRunner =
- Task.CombinerRunner.create(
- mrJobConf, mapTask.getTaskID(), combineInputCounter, reporter,
null);
+ boolean enableCombiner =
+ RssMRUtils.getBoolean(
+ rssJobConf,
+ RssMRConfig.RSS_CLIENT_COMBINER_ENABLE,
+ RssMRConfig.RSS_CLIENT_COMBINER_ENABLE_DEFAULT);
+
+ combinerRunner = null;
+ if (enableCombiner) {
+ try {
+ final Counters.Counter combineInputCounter =
+ reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+ combinerRunner =
+ Task.CombinerRunner.create(
+ mrJobConf, mapTask.getTaskID(), combineInputCounter, reporter,
null);
+ if (combinerRunner != null) {
+ LOG.info(
+ "Map-stage combiner enabled. Warning: This may cause GC issues
in large jobs. "
+ + "Consider setting {}=false if experiencing instability",
+ RssMRConfig.RSS_CLIENT_COMBINER_ENABLE);
+ }
+
+ } catch (Exception e) {
+ LOG.error("Get CombinerClass failed", e);
+ }
+ }
int batch =
RssMRUtils.getInt(
diff --git
a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
index 41f3fbe68..0d61fcf66 100644
--- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
+++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
@@ -206,6 +206,10 @@ public class RssMRConfig {
public static final String RSS_REMOTE_MERGE_CLASS_LOADER =
MR_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_MERGE_CLASS_LOADER;
+ public static final String RSS_CLIENT_COMBINER_ENABLE =
+ MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_COMBINER_ENABLE;
+ public static final boolean RSS_CLIENT_COMBINER_ENABLE_DEFAULT = false;
+
public static RssConf toRssConf(Configuration jobConf) {
RssConf rssConf = new RssConf();
for (Map.Entry<String, String> entry : jobConf) {
diff --git
a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
index a2e60caf7..a4ced2111 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
@@ -98,4 +98,6 @@ public class RssClientConfig {
public static final String RSS_MERGED_BLOCK_SZIE = "rss.merged.block.size";
public static final int RSS_MERGED_BLOCK_SZIE_DEFAULT = -1;
public static final String RSS_REMOTE_MERGE_CLASS_LOADER =
"rss.remote.merge.classloader";
+
+ public static final String RSS_CLIENT_COMBINER_ENABLE =
"rss.client.combiner.enable";
}