My observation is based on this call chain:
MapOutputCopier.run() calling copyOutput() calling getMapOutput() calling
ramManager.canFitInMemory(decompressedLength)

Basically ramManager.canFitInMemory() makes decision without considering the
number of MapOutputCopiers that are running. Thus 1.25 * 0.7 of total heap
may be used in shuffling if default parameters were used.
Of course, you should check the value for mapred.reduce.parallel.copies to
see if it is 5. If it is 4 or lower, my reasoning wouldn't apply.

About ramManager.unreserve() call, ReduceTask.java from hadoop 0.20.2 only
has 2731 lines. So I have to guess the location of the code snippet you
provided.
I found this around line 1535:
        } catch (IOException ioe) {
          LOG.info("Failed to shuffle from " +
mapOutputLoc.getTaskAttemptId(),
                   ioe);

          // Inform the ram-manager
          ramManager.closeInMemoryFile(mapOutputLength);
          ramManager.unreserve(mapOutputLength);

          // Discard the map-output
          try {
            mapOutput.discard();
          } catch (IOException ignored) {
            LOG.info("Failed to discard map-output from " +
                     mapOutputLoc.getTaskAttemptId(), ignored);
          }
Please confirm the line number.

If we're looking at the same code, I am afraid I don't see how we can
improve it. First, I assume IOException shouldn't happen that often. Second,
mapOutput.discard() just sets:
          data = null;
for in memory case. Even if we call mapOutput.discard() before
ramManager.unreserve(), we don't know when GC would kick in and make more
memory available.
Of course, given the large number of map outputs in your system, it became
more likely that the root cause from my reasoning made OOME happen sooner.

Thanks

>
On Sun, Mar 7, 2010 at 1:03 PM, Andy Sautins <andy.saut...@returnpath.net>wrote:

>
>   Ted,
>
>   I'm trying to follow the logic in your mail and I'm not sure I'm
> following.  If you would mind helping me understand I would appreciate it.
>
>   Looking at the code maxSingleShuffleLimit is only used in determining if
> the copy _can_ fit into memory:
>
>     boolean canFitInMemory(long requestedSize) {
>        return (requestedSize < Integer.MAX_VALUE &&
>                requestedSize < maxSingleShuffleLimit);
>      }
>
>    It also looks like the RamManager.reserve should wait until memory is
> available so it should hit a memory limit for that reason.
>
>    What does seem a little strange to me is the following ( ReduceTask.java
> starting at 2730 ):
>
>          // Inform the ram-manager
>          ramManager.closeInMemoryFile(mapOutputLength);
>          ramManager.unreserve(mapOutputLength);
>
>          // Discard the map-output
>          try {
>            mapOutput.discard();
>          } catch (IOException ignored) {
>            LOG.info("Failed to discard map-output from " +
>                     mapOutputLoc.getTaskAttemptId(), ignored);
>          }
>          mapOutput = null;
>
>   So to me that looks like the ramManager unreserves the memory before the
> mapOutput is discarded.  Shouldn't the mapOutput be discarded _before_ the
> ramManager unreserves the memory?  If the memory is unreserved before the
> actual underlying data references are removed then it seems like another
> thread can try to allocate memory ( ReduceTask.java:2730 ) before the
> previous memory is disposed ( mapOutput.discard() ).
>
>   Not sure that makes sense.  One thing to note is that the particular job
> that is failing does have a good number ( 200k+ ) of map outputs.  The large
> number of small map outputs may be why we are triggering a problem.
>
>   Thanks again for your thoughts.
>
>   Andy
>
>
> -----Original Message-----
> From: Jacob R Rideout [mailto:apa...@jacobrideout.net]
> Sent: Sunday, March 07, 2010 1:21 PM
> To: common-user@hadoop.apache.org
> Cc: Andy Sautins; Ted Yu
> Subject: Re: Shuffle In Memory OutOfMemoryError
>
> Ted,
>
> Thank you. I filled MAPREDUCE-1571 to cover this issue. I might have
> some time to write a patch later this week.
>
> Jacob Rideout
>
> On Sat, Mar 6, 2010 at 11:37 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> > I think there is mismatch (in ReduceTask.java) between:
> >      this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
> > and:
> >        maxSingleShuffleLimit = (long)(maxSize *
> > MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
> > where MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION is 0.25f
> >
> > because
> >      copiers = new ArrayList<MapOutputCopier>(numCopiers);
> > so the total memory allocated for in-mem shuffle is 1.25 * maxSize
> >
> > A JIRA should be filed to correlate the constant 5 above and
> > MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION.
> >
> > Cheers
> >
> > On Sat, Mar 6, 2010 at 8:31 AM, Jacob R Rideout <apa...@jacobrideout.net
> >wrote:
> >
> >> Hi all,
> >>
> >> We are seeing the following error in our reducers of a particular job:
> >>
> >> Error: java.lang.OutOfMemoryError: Java heap space
> >>        at
> >>
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.shuffleInMemory(ReduceTask.java:1508)
> >>        at
> >>
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.getMapOutput(ReduceTask.java:1408)
> >>        at
> >>
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:1261)
> >>        at
> >>
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:1195)
> >>
> >>
> >> After enough reducers fail the entire job fails. This error occurs
> >> regardless of whether mapred.compress.map.output is true. We were able
> >> to avoid the issue by reducing mapred.job.shuffle.input.buffer.percent
> >> to 20%. Shouldn't the framework via ShuffleRamManager.canFitInMemory
> >> and.ShuffleRamManager.reserve correctly detect the the memory
> >> available for allocation? I would think that with poor configuration
> >> settings (and default settings in particular) the job may not be as
> >> efficient, but wouldn't die.
> >>
> >> Here is some more context in the logs, I have attached the full
> >> reducer log here: http://gist.github.com/323746
> >>
> >>
> >> 2010-03-06 07:54:49,621 INFO org.apache.hadoop.mapred.ReduceTask:
> >> Shuffling 4191933 bytes (435311 raw bytes) into RAM from
> >> attempt_201003060739_0002_m_000061_0
> >> 2010-03-06 07:54:50,222 INFO org.apache.hadoop.mapred.ReduceTask: Task
> >> attempt_201003060739_0002_r_000000_0: Failed fetch #1 from
> >> attempt_201003060739_0002_m_000202_0
> >> 2010-03-06 07:54:50,223 WARN org.apache.hadoop.mapred.ReduceTask:
> >> attempt_201003060739_0002_r_000000_0 adding host
> >> hd37.dfs.returnpath.net to penalty box, next contact in 4 seconds
> >> 2010-03-06 07:54:50,223 INFO org.apache.hadoop.mapred.ReduceTask:
> >> attempt_201003060739_0002_r_000000_0: Got 1 map-outputs from previous
> >> failures
> >> 2010-03-06 07:54:50,223 FATAL org.apache.hadoop.mapred.TaskRunner:
> >> attempt_201003060739_0002_r_000000_0 : Map output copy failure :
> >> java.lang.OutOfMemoryError: Java heap space
> >>        at
> >>
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.shuffleInMemory(ReduceTask.java:1508)
> >>        at
> >>
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.getMapOutput(ReduceTask.java:1408)
> >>        at
> >>
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:1261)
> >>        at
> >>
> org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:1195)
> >>
> >>
> >> We tried this both in 0.20.1 and 0.20.2. We had hoped MAPREDUCE-1182
> >> would address the issue in 0.20.2, but it did not. Does anyone have
> >> any comments or suggestions? Is this a bug I should file a JIRA for?
> >>
> >> Jacob Rideout
> >> Return Path
> >>
> >
>

Reply via email to