Author: sandy
Date: Fri Aug 16 08:14:58 2013
New Revision: 1514610

URL: http://svn.apache.org/r1514610
Log:
MAPREDUCE-5462. In map-side sort, swap entire meta entries instead of indexes 
for better cache performance. (Sandy Ryza)

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1514610&r1=1514609&r2=1514610&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt 
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri 
Aug 16 08:14:58 2013
@@ -52,6 +52,9 @@ Release 2.1.1-beta - UNRELEASED
     MAPREDUCE-5446. TestJobHistoryEvents and TestJobHistoryParsing have race
     conditions (jlowe via kihwal)
 
+    MAPREDUCE-5462. In map-side sort, swap entire meta entries instead of
+    indexes for better cache performance. (Sandy Ryza)
+
   BUG FIXES
 
     MAPREDUCE-5385. Fixed a bug with JobContext getCacheFiles API. (Omkar Vinit

Modified: 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1514610&r1=1514609&r2=1514610&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
 Fri Aug 16 08:14:58 2013
@@ -884,10 +884,10 @@ public class MapTask extends Task {
     byte[] kvbuffer;        // main output buffer
     private final byte[] b0 = new byte[0];
 
-    private static final int INDEX = 0;            // index offset in acct
-    private static final int VALSTART = 1;         // val offset in acct
-    private static final int KEYSTART = 2;         // key offset in acct
-    private static final int PARTITION = 3;        // partition offset in acct
+    private static final int VALSTART = 0;         // val offset in acct
+    private static final int KEYSTART = 1;         // key offset in acct
+    private static final int PARTITION = 2;        // partition offset in acct
+    private static final int VALLEN = 3;           // length of value
     private static final int NMETA = 4;            // num meta ints
     private static final int METASIZE = NMETA * 4; // size in bytes
 
@@ -1151,10 +1151,10 @@ public class MapTask extends Task {
             distanceTo(keystart, valend, bufvoid));
 
         // write accounting info
-        kvmeta.put(kvindex + INDEX, kvindex);
         kvmeta.put(kvindex + PARTITION, partition);
         kvmeta.put(kvindex + KEYSTART, keystart);
         kvmeta.put(kvindex + VALSTART, valstart);
+        kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
         // advance kvindex
         kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
       } catch (MapBufferTooSmallException e) {
@@ -1224,17 +1224,11 @@ public class MapTask extends Task {
     }
 
     /**
-     * For the given meta position, return the dereferenced position in the
-     * integer array. Each meta block contains several integers describing
-     * record data in its serialized form, but the INDEX is not necessarily
-     * related to the proximate metadata. The index value at the referenced int
-     * position is the start offset of the associated metadata block. So the
-     * metadata INDEX at metapos may point to the metadata described by the
-     * metadata block at metapos + k, which contains information about that
-     * serialized record.
+     * For the given meta position, return the offset into the int-sized
+     * kvmeta buffer.
      */
     int offsetFor(int metapos) {
-      return kvmeta.get(metapos * NMETA + INDEX);
+      return metapos * NMETA;
     }
 
     /**
@@ -1260,16 +1254,17 @@ public class MapTask extends Task {
           kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
     }
 
+    final byte META_BUFFER_TMP[] = new byte[METASIZE];
     /**
-     * Swap logical indices st i, j MOD offset capacity.
+     * Swap metadata for items i, j
      * @see IndexedSortable#swap
      */
     public void swap(final int mi, final int mj) {
-      final int kvi = (mi % maxRec) * NMETA + INDEX;
-      final int kvj = (mj % maxRec) * NMETA + INDEX;
-      int tmp = kvmeta.get(kvi);
-      kvmeta.put(kvi, kvmeta.get(kvj));
-      kvmeta.put(kvj, tmp);
+      int iOff = (mi % maxRec) * METASIZE;
+      int jOff = (mj % maxRec) * METASIZE;
+      System.arraycopy(kvbuffer, iOff, META_BUFFER_TMP, 0, METASIZE);
+      System.arraycopy(kvbuffer, jOff, kvbuffer, iOff, METASIZE);
+      System.arraycopy(META_BUFFER_TMP, 0, kvbuffer, jOff, METASIZE);
     }
 
     /**
@@ -1601,9 +1596,9 @@ public class MapTask extends Task {
               while (spindex < mend &&
                   kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
                 final int kvoff = offsetFor(spindex % maxRec);
-                key.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),
-                          (kvmeta.get(kvoff + VALSTART) -
-                           kvmeta.get(kvoff + KEYSTART)));
+                int keystart = kvmeta.get(kvoff + KEYSTART);
+                int valstart = kvmeta.get(kvoff + VALSTART);
+                key.reset(kvbuffer, keystart, valstart - keystart);
                 getVBytesForOffset(kvoff, value);
                 writer.append(key, value);
                 ++spindex;
@@ -1729,14 +1724,8 @@ public class MapTask extends Task {
     private void getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
       // get the keystart for the next serialized value to be the end
       // of this value. If this is the last value in the buffer, use bufend
-      final int nextindex = kvoff == kvend
-        ? bufend
-        : kvmeta.get(
-            (kvoff - NMETA + kvmeta.capacity() + KEYSTART) % 
kvmeta.capacity());
-      // calculate the length of the value
-      int vallen = (nextindex >= kvmeta.get(kvoff + VALSTART))
-        ? nextindex - kvmeta.get(kvoff + VALSTART)
-        : (bufvoid - kvmeta.get(kvoff + VALSTART)) + nextindex;
+      final int vallen = kvmeta.get(kvoff + VALLEN);
+      assert vallen >= 0;
       vbytes.reset(kvbuffer, kvmeta.get(kvoff + VALSTART), vallen);
     }
 


Reply via email to