Hello,
there is an optimization for Hadoop on Lustre FS, or any
high-performance distributed filesystem.
The research paper with test results can be found here
http://www.xyratex.com/pdfs/whitepapers/Xyratex_white_paper_MapReduce_1-4.pdf
and a presentation for LUG 2011:
http://www.olcf.ornl.gov/wp-content/events/lug2011/4-12-2011/1100-1130_Nathan_Rutman_MapReduce_Lug_2011.pptx
Basically the optimization is a replacement for http transport in
shuffle phase by simple linking target file to the source one. I
attached a draft patch against hadoop-1.0.0 to illustrate the idea.
How to push this patch upstream?
Thanks,
--
Alexander "Zam" Zarochentsev
[email protected]
______________________________________________________________________
This email may contain privileged or confidential information, which should
only be used for the purpose for which it was sent by Xyratex. No further
rights or licenses are granted to use such information. If you are not the
intended recipient of this message, please notify the sender by return and
delete it. You may not use, copy, disclose or rely on the information contained
in it.
Internet email is susceptible to data corruption, interception and unauthorised
amendment for which Xyratex does not accept liability. While we have taken
reasonable precautions to ensure that this email is free of viruses, Xyratex
does not accept liability for the presence of any computer viruses in this
email, nor for any losses caused as a result of viruses.
Xyratex Technology Limited (03134912), Registered in England & Wales,
Registered Office, Langstone Road, Havant, Hampshire, PO9 1SA.
The Xyratex group of companies also includes, Xyratex Ltd, registered in
Bermuda, Xyratex International Inc, registered in California, Xyratex
(Malaysia) Sdn Bhd registered in Malaysia, Xyratex Technology (Wuxi) Co Ltd
registered in The People's Republic of China and Xyratex Japan Limited
registered in Japan.
______________________________________________________________________
diff --git a/bin/start-all.sh b/bin/start-all.sh
index 88ce430..c545974 100755
--- a/bin/start-all.sh
+++ b/bin/start-all.sh
@@ -28,7 +28,7 @@ else
fi
# start dfs daemons
-"$bin"/start-dfs.sh --config $HADOOP_CONF_DIR
+# "$bin"/start-dfs.sh --config $HADOOP_CONF_DIR
# start mapred daemons
"$bin"/start-mapred.sh --config $HADOOP_CONF_DIR
diff --git a/conf/log4j.properties b/conf/log4j.properties
index 1bac90d..2658c37 100644
--- a/conf/log4j.properties
+++ b/conf/log4j.properties
@@ -1,5 +1,5 @@
# Define some default values that can be overridden by system properties
-hadoop.root.logger=INFO,console
+hadoop.root.logger=DEBUG,console
hadoop.log.dir=.
hadoop.log.file=hadoop.log
@@ -107,9 +107,11 @@
log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=WARN
hadoop.metrics.log.level=INFO
#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
-#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+log4j.logger.org.apache.hadoop.mapred.ReduceTask=DEBUG
#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
log4j.logger.org.apache.hadoop.metrics2=${hadoop.metrics.log.level}
+log4j.logger.org.apache.hadoop.mapred.Child=DEBUG
# Jets3t library
log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
diff --git a/conf/taskcontroller.cfg b/conf/taskcontroller.cfg
index a13aeb9..ef85fa9 100644
--- a/conf/taskcontroller.cfg
+++ b/conf/taskcontroller.cfg
@@ -1,4 +1,11 @@
-mapred.local.dir=#configured value of mapred.local.dir. It can be a list of
comma separated paths.
-hadoop.log.dir=#configured value of hadoop.log.dir.
-mapred.tasktracker.tasks.sleeptime-before-sigkill=#sleep time before sig kill
is to be sent to process group after sigterm is sent. Should be in seconds
-mapreduce.tasktracker.group=#configured value of mapreduce.tasktracker.group.
+#configured value of mapred.local.dir. It can be a list of comma separated
paths.
+mapred.local.dir=/mnt/lustre/hadoop-tmpdir
+
+#configured value of hadoop.log.dir.
+hadoop.log.dir=/work/lustre/tasks/Hadoop.update/git/hadoop-common/logs
+
+#sleep time before sig kill is to be sent to process group after sigterm is
sent. Should be in seconds
+mapred.tasktracker.tasks.sleeptime-before-sigkill=30
+
+#configured value of mapreduce.tasktracker.group.
+mapreduce.tasktracker.group=0
diff --git a/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
b/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
index 980b988..076ad08 100644
--- a/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
+++ b/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
@@ -1481,10 +1481,10 @@ class ReduceTask extends Task {
URLConnection connection = url.openConnection();
InputStream input = setupSecureConnection(mapOutputLoc, connection);
-
- // Validate header from map output
+
+ // Validate header from map output
TaskAttemptID mapId = null;
- try {
+ try {
mapId =
TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK));
} catch (IllegalArgumentException ia) {
@@ -1524,6 +1524,23 @@ class ReduceTask extends Task {
LOG.debug("header: " + mapId + ", compressed len: " +
compressedLength +
", decompressed len: " + decompressedLength);
}
+
+
+ // Shuffle
+ MapOutput mapOutput = null;
+
+ boolean shuffleLink = conf.getBoolean("mapreduce.shuffle.link", false);
+
+ if (shuffleLink) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Shuffling " + decompressedLength + " bytes ("
+ + compressedLength + " raw bytes) "
+ + "to Link from " + mapOutputLoc.getTaskAttemptId());
+ }
+ mapOutput = shuffleToLink(mapOutputLoc, input, filename,
+ (int) decompressedLength);
+ return mapOutput;
+ }
//We will put a file in memory if it meets certain criteria:
//1. The size of the (decompressed) file should be less than 25% of
@@ -1533,8 +1550,6 @@ class ReduceTask extends Task {
// Check if this map-output can be saved in-memory
boolean shuffleInMemory =
ramManager.canFitInMemory(decompressedLength);
- // Shuffle
- MapOutput mapOutput = null;
if (shuffleInMemory) {
if (LOG.isDebugEnabled()) {
LOG.debug("Shuffling " + decompressedLength + " bytes (" +
@@ -1772,6 +1787,48 @@ class ReduceTask extends Task {
return mapOutput;
}
+ private MapOutput shuffleToLink(MapOutputLocation mapOutputLoc,
+ InputStream input,
+ Path filename,
+ long mapOutputLength)
+ throws IOException {
+ // Find out a suitable location for the output on local-filesystem
+ Path localFilename =
+ lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(),
+ mapOutputLength, conf);
+
+ LOG.error("shuffleToDisk: local file = \"" + localFilename.toString() +
"\"");
+
+ MapOutput mapOutput =
+ new MapOutput(mapOutputLoc.getTaskId(),
mapOutputLoc.getTaskAttemptId(),
+ conf, localFileSys.makeQualified(localFilename),
+ mapOutputLength);
+
+ String query = mapOutputLoc.getOutputLocation().getQuery();
+ LOG.error("shuffleToDisk: query = \"" + query + "\"");
+
+
+ String[] qureies=query.split("&");
+ String maphost=mapOutputLoc.getHost();
+ String tmpDir = conf.get("hadoop.tmp.dir");
+ String lnCmd = conf.get("hadoop.ln.cmd");
+
+ String src=tmpDir+"/"+"/taskTracker/root/jobcache/"
+ +qureies[0].substring(qureies[0].indexOf('=')+1)+"/"
+
+qureies[1].substring(qureies[1].indexOf('=')+1)+"/output/file.out";
+
+ String command = lnCmd + " "+src+" "+localFilename;
+ try {
+ LOG.debug("shuffleToLink: Command used for hardlink "+command);
+ Runtime.getRuntime().exec(command).waitFor();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ return mapOutput;
+ }
+
private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc,
InputStream input,
Path filename,
diff --git a/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
b/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
index ca54508..c9a9677 100644
--- a/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
+++ b/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
@@ -248,6 +248,7 @@ abstract class TaskRunner extends Thread {
}
setupCmds.add(setup);
+ LOG.warn("JVM to start: " + setupCmds.toString() + "; workdir = " +
workDir);
launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);
tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
if (exitCodeSet) {