7500+ reducers/partitions causes job to hang
--------------------------------------------

                 Key: HADOOP-1698
                 URL: https://issues.apache.org/jira/browse/HADOOP-1698
             Project: Hadoop
          Issue Type: Bug
          Components: examples
    Affects Versions: 0.13.1
         Environment: Standard hadoop installation, any number of nodes > 10
            Reporter: Srikanth Kakani
            Priority: Blocker


Steps to Reproduce:
On the above cluster run any job with #partitions/reducers = 8000+
Observe CPU utilization on any mapper.

Observations:
The output.collect(Key, Value) call takes a huge amount of CPU, causing the job 
to hang.

This is a result of two issues:
1) Number of partitions beyond 7500 results in a call to sortAndSpillToDisk() 
on each call to output.collect
2) Call to sortAndSpillToDisk causes creation of a writer object, eventually 
calling:
 MessageDigest digester = MessageDigest.getInstance("MD5");
        digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes());
        sync = digester.digest();
A code-block in  SequenceFile.java(652)


Issue #1:
Further investigation reveals the following stack trace whenever the task is 
suspended.
  [1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method)
  [2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849)
  [3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183)
  [4] java.net.InetAddress.getLocalHost (InetAddress.java:1,312)
  [5] org.apache.hadoop.io.SequenceFile$Writer.<init> (SequenceFile.java:653)
  [6] org.apache.hadoop.io.SequenceFile$Writer.<init> (SequenceFile.java:622)
  [7] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:386)
  [8] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:412)
  [9] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.startPartition 
(MapTask.java:307)
  [10] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpillToDisk 
(MapTask.java:387)
  [11] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect 
(MapTask.java:355)
/*My code*/
  [12] mypackage.MyClass$Map.map (MyClass.java:283)
--------------
  [13] org.apache.hadoop.mapred.MapRunner.run (MapRunner.java:46)
  [14] org.apache.hadoop.mapred.MapTask.run (MapTask.java:189)
  [15] org.apache.hadoop.mapred.TaskTracker$Child.main (TaskTracker.java:1,771)

The piece of code causing the problem is (MapTask.java:355)
----------------------------------------------------------
        long totalMem = 0;
        for (int i = 0; i < partitions; i++)
          totalMem += sortImpl[i].getMemoryUtilized();  <---- == 16K 
(BasicTypeSorterBase.java(88) (startOffsets.length (below)) * 
BUFFERED_KEY_VAL_OVERHEAD;

        if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) { 
<----------------condition is always true if partitions > 7500
          sortAndSpillToDisk();
          keyValBuffer.reset();
          for (int i = 0; i < partitions; i++) {
            sortImpl[i].close();
          }
        }
----------------------------------------------------------

Looking at the variable values in  
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect (MapTask.java:355)
 sortImpl[0] = {
    org.apache.hadoop.mapred.BasicTypeSorterBase.keyValBuffer: instance of 
org.apache.hadoop.io.DataOutputBuffer(id=1159)
    org.apache.hadoop.mapred.BasicTypeSorterBase.startOffsets: instance of 
int[1024] (id=1160) <--1K * 16 (previously explained) == 16K
    org.apache.hadoop.mapred.BasicTypeSorterBase.keyLengths: instance of 
int[1024] (id=1161)
    org.apache.hadoop.mapred.BasicTypeSorterBase.valueLengths: instance of 
int[1024] (id=1162)
    org.apache.hadoop.mapred.BasicTypeSorterBase.pointers: instance of 
int[1024] (id=1163)
    org.apache.hadoop.mapred.BasicTypeSorterBase.comparator: instance of 
org.apache.hadoop.io.MD5Hash$Comparator(id=1164)
    org.apache.hadoop.mapred.BasicTypeSorterBase.count: 0
    org.apache.hadoop.mapred.BasicTypeSorterBase.BUFFERED_KEY_VAL_OVERHEAD: 16
    org.apache.hadoop.mapred.BasicTypeSorterBase.reporter: instance of 
org.apache.hadoop.mapred.Task$2(id=1165)
}
Computation
maxBufferSize == 120M 
therotical max #of partitions assuming 0 keyValBuffer.getLength() =120M/16K = 
7500 partitions

Issue #2: 
digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes());
  [1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method)
  [2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849)
  [3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183)
InetAddress.getLocalHost() call does not cache results, this results in a look 
up to the host file and DNS(???) bumping up the CPU usage even higher 
(Observed).

This is a BLOCKER issue and needs immediate attention. 

Notes:
1) Output.collect should not take hit from framework, separate thread to handle 
spill buffer?
2) InetAddress.getLocalHost result should be cached in a static variable?
3) Spilling logic should not involve #of partitions, needs redesign?


-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to