Hello,

there is an optimization for Hadoop on Lustre FS, or any high-performance distributed filesystem.

The research paper with test results can be found here
http://www.xyratex.com/pdfs/whitepapers/Xyratex_white_paper_MapReduce_1-4.pdf
and a presentation for LUG 2011:
http://www.olcf.ornl.gov/wp-content/events/lug2011/4-12-2011/1100-1130_Nathan_Rutman_MapReduce_Lug_2011.pptx

Basically the optimization is a replacement for http transport in shuffle phase by simple linking target file to the source one. I attached a draft patch against hadoop-1.0.0 to illustrate the idea.
How to push this patch upstream?

Thanks,
--

Alexander "Zam" Zarochentsev
[email protected]


______________________________________________________________________
This email may contain privileged or confidential information, which should 
only be used for the purpose for which it was sent by Xyratex. No further 
rights or licenses are granted to use such information. If you are not the 
intended recipient of this message, please notify the sender by return and 
delete it. You may not use, copy, disclose or rely on the information contained 
in it.

Internet email is susceptible to data corruption, interception and unauthorised 
amendment for which Xyratex does not accept liability. While we have taken 
reasonable precautions to ensure that this email is free of viruses, Xyratex 
does not accept liability for the presence of any computer viruses in this 
email, nor for any losses caused as a result of viruses.

Xyratex Technology Limited (03134912), Registered in England & Wales, 
Registered Office, Langstone Road, Havant, Hampshire, PO9 1SA.

The Xyratex group of companies also includes, Xyratex Ltd, registered in 
Bermuda, Xyratex International Inc, registered in California, Xyratex 
(Malaysia) Sdn Bhd registered in Malaysia, Xyratex Technology (Wuxi) Co Ltd 
registered in The People's Republic of China and Xyratex Japan Limited 
registered in Japan.
______________________________________________________________________

diff --git a/bin/start-all.sh b/bin/start-all.sh
index 88ce430..c545974 100755
--- a/bin/start-all.sh
+++ b/bin/start-all.sh
@@ -28,7 +28,7 @@ else
 fi
 
 # start dfs daemons
-"$bin"/start-dfs.sh --config $HADOOP_CONF_DIR
+# "$bin"/start-dfs.sh --config $HADOOP_CONF_DIR
 
 # start mapred daemons
 "$bin"/start-mapred.sh --config $HADOOP_CONF_DIR
diff --git a/conf/log4j.properties b/conf/log4j.properties
index 1bac90d..2658c37 100644
--- a/conf/log4j.properties
+++ b/conf/log4j.properties
@@ -1,5 +1,5 @@
 # Define some default values that can be overridden by system properties
-hadoop.root.logger=INFO,console
+hadoop.root.logger=DEBUG,console
 hadoop.log.dir=.
 hadoop.log.file=hadoop.log
 
@@ -107,9 +107,11 @@ 
log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=WARN
 
 hadoop.metrics.log.level=INFO
 #log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
-#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+log4j.logger.org.apache.hadoop.mapred.ReduceTask=DEBUG
 #log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
 log4j.logger.org.apache.hadoop.metrics2=${hadoop.metrics.log.level}
+log4j.logger.org.apache.hadoop.mapred.Child=DEBUG
 
 # Jets3t library
 log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
diff --git a/conf/taskcontroller.cfg b/conf/taskcontroller.cfg
index a13aeb9..ef85fa9 100644
--- a/conf/taskcontroller.cfg
+++ b/conf/taskcontroller.cfg
@@ -1,4 +1,11 @@
-mapred.local.dir=#configured value of mapred.local.dir. It can be a list of 
comma separated paths.
-hadoop.log.dir=#configured value of hadoop.log.dir.
-mapred.tasktracker.tasks.sleeptime-before-sigkill=#sleep time before sig kill 
is to be sent to process group after sigterm is sent. Should be in seconds
-mapreduce.tasktracker.group=#configured value of mapreduce.tasktracker.group.
+#configured value of mapred.local.dir. It can be a list of comma separated 
paths.
+mapred.local.dir=/mnt/lustre/hadoop-tmpdir
+
+#configured value of hadoop.log.dir.
+hadoop.log.dir=/work/lustre/tasks/Hadoop.update/git/hadoop-common/logs
+
+#sleep time before sig kill is to be sent to process group after sigterm is 
sent. Should be in seconds
+mapred.tasktracker.tasks.sleeptime-before-sigkill=30
+
+#configured value of mapreduce.tasktracker.group.
+mapreduce.tasktracker.group=0
diff --git a/src/mapred/org/apache/hadoop/mapred/ReduceTask.java 
b/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
index 980b988..076ad08 100644
--- a/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
+++ b/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
@@ -1481,10 +1481,10 @@ class ReduceTask extends Task {
         URLConnection connection = url.openConnection();
         
         InputStream input = setupSecureConnection(mapOutputLoc, connection);
- 
-        // Validate header from map output
+
+       // Validate header from map output
         TaskAttemptID mapId = null;
-        try {
+       try {
           mapId =
             TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK));
         } catch (IllegalArgumentException ia) {
@@ -1524,6 +1524,23 @@ class ReduceTask extends Task {
           LOG.debug("header: " + mapId + ", compressed len: " + 
compressedLength +
               ", decompressed len: " + decompressedLength);
         }
+        
+        
+        // Shuffle
+        MapOutput mapOutput = null;
+
+        boolean shuffleLink = conf.getBoolean("mapreduce.shuffle.link", false);
+
+        if (shuffleLink) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Shuffling " + decompressedLength + " bytes ("
+                    + compressedLength + " raw bytes) "
+                    + "to Link from " + mapOutputLoc.getTaskAttemptId());
+          }
+          mapOutput = shuffleToLink(mapOutputLoc, input, filename,
+                  (int) decompressedLength);
+          return mapOutput;
+        }
 
         //We will put a file in memory if it meets certain criteria:
         //1. The size of the (decompressed) file should be less than 25% of 
@@ -1533,8 +1550,6 @@ class ReduceTask extends Task {
         // Check if this map-output can be saved in-memory
         boolean shuffleInMemory = 
ramManager.canFitInMemory(decompressedLength); 
 
-        // Shuffle
-        MapOutput mapOutput = null;
         if (shuffleInMemory) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("Shuffling " + decompressedLength + " bytes (" + 
@@ -1772,6 +1787,48 @@ class ReduceTask extends Task {
         return mapOutput;
       }
       
+      private MapOutput shuffleToLink(MapOutputLocation mapOutputLoc,
+                                      InputStream input,
+                                      Path filename,
+                                      long mapOutputLength) 
+      throws IOException {
+        // Find out a suitable location for the output on local-filesystem
+        Path localFilename = 
+          lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(), 
+                                         mapOutputLength, conf);
+
+       LOG.error("shuffleToDisk: local file = \"" + localFilename.toString() + 
"\"");
+
+        MapOutput mapOutput = 
+          new MapOutput(mapOutputLoc.getTaskId(), 
mapOutputLoc.getTaskAttemptId(), 
+                        conf, localFileSys.makeQualified(localFilename), 
+                        mapOutputLength);
+
+       String query = mapOutputLoc.getOutputLocation().getQuery();
+       LOG.error("shuffleToDisk: query = \"" + query + "\"");
+
+
+       String[] qureies=query.split("&");
+       String maphost=mapOutputLoc.getHost();
+       String tmpDir = conf.get("hadoop.tmp.dir");
+        String lnCmd = conf.get("hadoop.ln.cmd");
+
+       String src=tmpDir+"/"+"/taskTracker/root/jobcache/"
+               +qureies[0].substring(qureies[0].indexOf('=')+1)+"/"
+               
+qureies[1].substring(qureies[1].indexOf('=')+1)+"/output/file.out";
+
+       String command = lnCmd + " "+src+" "+localFilename;
+       try {
+               LOG.debug("shuffleToLink: Command used for hardlink "+command);
+               Runtime.getRuntime().exec(command).waitFor();
+       } catch (InterruptedException e) {
+               // TODO Auto-generated catch block
+               e.printStackTrace();
+       }
+
+        return mapOutput;
+      }
+
       private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc,
                                       InputStream input,
                                       Path filename,
diff --git a/src/mapred/org/apache/hadoop/mapred/TaskRunner.java 
b/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
index ca54508..c9a9677 100644
--- a/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
+++ b/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
@@ -248,6 +248,7 @@ abstract class TaskRunner extends Thread {
       }
       setupCmds.add(setup);
       
+      LOG.warn("JVM to start: " + setupCmds.toString() + "; workdir = " + 
workDir);
       launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);
       tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
       if (exitCodeSet) {

Reply via email to