[
https://issues.apache.org/jira/browse/HADOOP-1698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12519492
]
Tahir Hashmi commented on HADOOP-1698:
--------------------------------------
Minor nit with the latest patch. SequenceFile.java still has the following
comment, which no longer applies:
// use hash of uid + host
> 7500+ reducers/partitions causes job to hang
> --------------------------------------------
>
> Key: HADOOP-1698
> URL: https://issues.apache.org/jira/browse/HADOOP-1698
> Project: Hadoop
> Issue Type: Bug
> Components: examples
> Affects Versions: 0.13.1
> Environment: Standard hadoop installation, any number of nodes > 10
> Reporter: Srikanth Kakani
> Assignee: Devaraj Das
> Priority: Blocker
> Fix For: 0.14.0
>
> Attachments: 1698.patch, 1698.patch, 1698.patch
>
>
> Steps to Reproduce:
> On the above cluster run any job with #partitions/reducers = 8000+
> Observe CPU utilization on any mapper.
> Observations:
> The output.collect(Key, Value) call takes a huge amount of CPU, causing the
> job to hang.
> This is a result of two issues:
> 1) Number of partitions beyond 7500 results in a call to sortAndSpillToDisk()
> on each call to output.collect
> 2) Call to sortAndSpillToDisk causes creation of a writer object, eventually
> calling:
> MessageDigest digester = MessageDigest.getInstance("MD5");
> digester.update((new
> UID()+"@"+InetAddress.getLocalHost()).getBytes());
> sync = digester.digest();
> A code-block in SequenceFile.java(652)
> Issue #1:
> Further investigation reveals the following stack trace whenever the task is
> suspended.
> [1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method)
> [2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849)
> [3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183)
> [4] java.net.InetAddress.getLocalHost (InetAddress.java:1,312)
> [5] org.apache.hadoop.io.SequenceFile$Writer.<init> (SequenceFile.java:653)
> [6] org.apache.hadoop.io.SequenceFile$Writer.<init> (SequenceFile.java:622)
> [7] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:386)
> [8] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:412)
> [9] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.startPartition
> (MapTask.java:307)
> [10] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpillToDisk
> (MapTask.java:387)
> [11] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect
> (MapTask.java:355)
> /*My code*/
> [12] mypackage.MyClass$Map.map (MyClass.java:283)
> --------------
> [13] org.apache.hadoop.mapred.MapRunner.run (MapRunner.java:46)
> [14] org.apache.hadoop.mapred.MapTask.run (MapTask.java:189)
> [15] org.apache.hadoop.mapred.TaskTracker$Child.main
> (TaskTracker.java:1,771)
> The piece of code causing the problem is (MapTask.java:355)
> ----------------------------------------------------------
> long totalMem = 0;
> for (int i = 0; i < partitions; i++)
> totalMem += sortImpl[i].getMemoryUtilized(); <---- == 16K
> (BasicTypeSorterBase.java(88) (startOffsets.length (below)) *
> BUFFERED_KEY_VAL_OVERHEAD;
> if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) {
> <----------------condition is always true if partitions > 7500
> sortAndSpillToDisk();
> keyValBuffer.reset();
> for (int i = 0; i < partitions; i++) {
> sortImpl[i].close();
> }
> }
> ----------------------------------------------------------
> Looking at the variable values in
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect (MapTask.java:355)
> sortImpl[0] = {
> org.apache.hadoop.mapred.BasicTypeSorterBase.keyValBuffer: instance of
> org.apache.hadoop.io.DataOutputBuffer(id=1159)
> org.apache.hadoop.mapred.BasicTypeSorterBase.startOffsets: instance of
> int[1024] (id=1160) <--1K * 16 (previously explained) == 16K
> org.apache.hadoop.mapred.BasicTypeSorterBase.keyLengths: instance of
> int[1024] (id=1161)
> org.apache.hadoop.mapred.BasicTypeSorterBase.valueLengths: instance of
> int[1024] (id=1162)
> org.apache.hadoop.mapred.BasicTypeSorterBase.pointers: instance of
> int[1024] (id=1163)
> org.apache.hadoop.mapred.BasicTypeSorterBase.comparator: instance of
> org.apache.hadoop.io.MD5Hash$Comparator(id=1164)
> org.apache.hadoop.mapred.BasicTypeSorterBase.count: 0
> org.apache.hadoop.mapred.BasicTypeSorterBase.BUFFERED_KEY_VAL_OVERHEAD: 16
> org.apache.hadoop.mapred.BasicTypeSorterBase.reporter: instance of
> org.apache.hadoop.mapred.Task$2(id=1165)
> }
> Computation
> maxBufferSize == 120M
> therotical max #of partitions assuming 0 keyValBuffer.getLength() =120M/16K =
> 7500 partitions
> Issue #2:
> digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes());
> [1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method)
> [2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849)
> [3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183)
> InetAddress.getLocalHost() call does not cache results, this results in a
> look up to the host file and DNS(???) bumping up the CPU usage even higher
> (Observed).
> This is a BLOCKER issue and needs immediate attention.
> Notes:
> 1) Output.collect should not take hit from framework, separate thread to
> handle spill buffer?
> 2) InetAddress.getLocalHost result should be cached in a static variable?
> 3) Spilling logic should not involve #of partitions, needs redesign?
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.