zhengchenyu commented on code in PR #2607:
URL: https://github.com/apache/uniffle/pull/2607#discussion_r2343345821
##########
client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java:
##########
@@ -78,12 +78,31 @@ public void init(Context context) throws IOException,
ClassNotFoundException {
throw new IOException("Invalid sort memory use threshold : " +
sortThreshold);
}
- // combiner
- final Counters.Counter combineInputCounter =
- reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
- combinerRunner =
- Task.CombinerRunner.create(
- mrJobConf, mapTask.getTaskID(), combineInputCounter, reporter,
null);
+ boolean enableCombiner =
+ RssMRUtils.getBoolean(
+ rssJobConf,
+ RssMRConfig.RSS_CLIENT_COMBINER_ENABLE,
+ RssMRConfig.RSS_CLIENT_COMBINER_ENABLE_DEFAULT);
+
+ combinerRunner = null;
+ if (enableCombiner) {
+ try {
+ if (mrJobConf.getCombinerClass() != null ||
Review Comment:
Is this necessary? Can I check whether Task.CombinerRunner.create returns
null?
##########
client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java:
##########
@@ -78,12 +78,31 @@ public void init(Context context) throws IOException,
ClassNotFoundException {
throw new IOException("Invalid sort memory use threshold : " +
sortThreshold);
}
- // combiner
- final Counters.Counter combineInputCounter =
- reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
- combinerRunner =
- Task.CombinerRunner.create(
- mrJobConf, mapTask.getTaskID(), combineInputCounter, reporter,
null);
+ boolean enableCombiner =
+ RssMRUtils.getBoolean(
+ rssJobConf,
+ RssMRConfig.RSS_CLIENT_COMBINER_ENABLE,
+ RssMRConfig.RSS_CLIENT_COMBINER_ENABLE_DEFAULT);
+
+ combinerRunner = null;
+ if (enableCombiner) {
+ try {
+ if (mrJobConf.getCombinerClass() != null ||
+ mrJobConf.get("mapreduce.job.combine.class") != null ||
+ mrJobConf.get("mapreduce.combine.class") != null ||
+ mrJobConf.get("mapred.combiner.class") != null) {
+
+ final Counters.Counter combineInputCounter =
+ reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+ combinerRunner = Task.CombinerRunner.create(
+ mrJobConf, mapTask.getTaskID(), combineInputCounter,
reporter, null);
+ LOG.info("Map-stage combiner enabled. Warning: This may cause GC
issues in large jobs. " +
+ "Considering disable it by setting {}=false",
RssMRConfig.RSS_CLIENT_COMBINER_ENABLE);
+ }
+ } catch (Exception e) {
+ LOG.debug("Get CombinerClass failed", e);
Review Comment:
LOG.error?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]