Hi, Lars

Thanks for your help. I did a try as your suggestion, seems the issue still 
exists. The reduce progress halt at 66% progress as usual. Then it throws "time 
out" after a while. Check the "Reduce Completion Graph", the copy and sort have 
finished, but the reduce doesn't start. I am wondering whether this issue is 
introduced by "shuffle" phase. I post a part of reduce syslog from one reduce 
task for your reference. 


2009-09-29 11:20:33,755 INFO org.apache.hadoop.mapred.ReduceTask: Read 2609846 
bytes from map-output for attempt_200909290651_0002_m_000095_0
2009-09-29 11:20:33,756 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from 
attempt_200909290651_0002_m_000095_0 -> (102, 45) from datanode3
2009-09-29 11:20:33,756 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200909290651_0002_r_000000_0 Scheduled 1 outputs (0 slow hosts and0 dup 
hosts)
2009-09-29 11:20:33,841 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200909290651_0002_r_000000_0: Got 1 new map-outputs
2009-09-29 11:20:33,843 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200909290651_0002_r_000000_0 Scheduled 1 outputs (0 slow hosts and0 dup 
hosts)
2009-09-29 11:20:33,851 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 
2086824 bytes (2086828 raw bytes) into RAM from 
attempt_200909290651_0002_m_000098_1
2009-09-29 11:20:33,861 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 
2811389 bytes (2811393 raw bytes) into RAM from 
attempt_200909290651_0002_m_000075_0
2009-09-29 11:20:34,004 INFO org.apache.hadoop.mapred.ReduceTask: Read 2086824 
bytes from map-output for attempt_200909290651_0002_m_000098_1
2009-09-29 11:20:34,005 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from 
attempt_200909290651_0002_m_000098_1 -> (102, 45) from datanode16
2009-09-29 11:20:35,609 INFO org.apache.hadoop.mapred.ReduceTask: Read 2811389 
bytes from map-output for attempt_200909290651_0002_m_000075_0
2009-09-29 11:20:35,609 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from 
attempt_200909290651_0002_m_000075_0 -> (102, 45) from datanode3
2009-09-29 11:20:39,908 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring 
obsolete output of KILLED map-task: 'attempt_200909290651_0002_m_000072_1'
2009-09-29 11:20:39,908 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring 
obsolete output of KILLED map-task: 'attempt_200909290651_0002_m_000095_1'
2009-09-29 11:20:39,908 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring 
obsolete output of KILLED map-task: 'attempt_200909290651_0002_m_000078_1'
2009-09-29 11:20:39,908 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring 
obsolete output of KILLED map-task: 'attempt_200909290651_0002_m_000091_1'
2009-09-29 11:20:39,908 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200909290651_0002_r_000000_0: Got 1 new map-outputs
2009-09-29 11:20:40,621 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200909290651_0002_r_000000_0 Scheduled 1 outputs (0 slow hosts and0 dup 
hosts)
2009-09-29 11:20:40,652 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 
2763086 bytes (2763090 raw bytes) into RAM from 
attempt_200909290651_0002_m_000088_1
2009-09-29 11:20:40,764 INFO org.apache.hadoop.mapred.ReduceTask: Read 2763086 
bytes from map-output for attempt_200909290651_0002_m_000088_1
2009-09-29 11:20:40,764 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from 
attempt_200909290651_0002_m_000088_1 -> (102, 45) from datanode16
2009-09-29 11:20:42,918 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring 
obsolete output of KILLED map-task: 'attempt_200909290651_0002_m_000090_1'
2009-09-29 11:20:42,919 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200909290651_0002_r_000000_0: Got 2 new map-outputs
2009-09-29 11:20:45,765 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200909290651_0002_r_000000_0 Scheduled 1 outputs (0 slow hosts and0 dup 
hosts)
2009-09-29 11:20:45,837 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 
2784058 bytes (2784062 raw bytes) into RAM from 
attempt_200909290651_0002_m_000055_0
2009-09-29 11:20:45,946 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring 
obsolete output of KILLED map-task: 'attempt_200909290651_0002_m_000064_1'
2009-09-29 11:20:45,946 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200909290651_0002_r_000000_0 Scheduled 1 outputs (0 slow hosts and1 dup 
hosts)
2009-09-29 11:20:45,946 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200909290651_0002_r_000000_0 Scheduled 2 outputs (0 slow hosts and1 dup 
hosts)
2009-09-29 11:20:45,946 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring 
obsolete output of KILLED map-task: 'attempt_200909290651_0002_m_000098_0'
2009-09-29 11:20:45,948 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring 
obsolete output of KILLED map-task: 'attempt_200909290651_0002_m_000094_1'
2009-09-29 11:20:45,948 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring 
obsolete output of KILLED map-task: 'attempt_200909290651_0002_m_000092_1'
2009-09-29 11:20:45,948 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring 
obsolete output of KILLED map-task: 'attempt_200909290651_0002_m_000045_1'
2009-09-29 11:20:45,948 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200909290651_0002_r_000000_0: Got 3 new map-outputs
2009-09-29 11:20:45,953 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 
2807355 bytes (2807359 raw bytes) into RAM from 
attempt_200909290651_0002_m_000076_1
2009-09-29 11:20:45,967 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 
2757643 bytes (2757647 raw bytes) into RAM from 
attempt_200909290651_0002_m_000093_0
2009-09-29 11:20:45,993 INFO org.apache.hadoop.mapred.ReduceTask: Read 2807355 
bytes from map-output for attempt_200909290651_0002_m_000076_1
2009-09-29 11:20:45,993 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from 
attempt_200909290651_0002_m_000076_1 -> (102, 45) from datanode4
2009-09-29 11:20:46,092 INFO org.apache.hadoop.mapred.ReduceTask: Read 2784058 
bytes from map-output for attempt_200909290651_0002_m_000055_0
2009-09-29 11:20:46,093 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from 
attempt_200909290651_0002_m_000055_0 -> (102, 45) from datanode7
2009-09-29 11:20:46,093 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200909290651_0002_r_000000_0 Scheduled 1 outputs (0 slow hosts and0 dup 
hosts)
2009-09-29 11:20:46,161 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 
2817616 bytes (2817620 raw bytes) into RAM from 
attempt_200909290651_0002_m_000092_0
2009-09-29 11:20:46,178 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 
2813282 bytes (2813286 raw bytes) into RAM from 
attempt_200909290651_0002_m_000082_1
2009-09-29 11:20:46,370 INFO org.apache.hadoop.mapred.ReduceTask: Read 2757643 
bytes from map-output for attempt_200909290651_0002_m_000093_0
2009-09-29 11:20:46,370 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from 
attempt_200909290651_0002_m_000093_0 -> (102, 45) from datanode10
2009-09-29 11:20:46,372 INFO org.apache.hadoop.mapred.ReduceTask: Read 2817616 
bytes from map-output for attempt_200909290651_0002_m_000092_0
2009-09-29 11:20:46,372 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from 
attempt_200909290651_0002_m_000092_0 -> (102, 45) from datanode7
2009-09-29 11:20:46,503 INFO org.apache.hadoop.mapred.ReduceTask: Read 2813282 
bytes from map-output for attempt_200909290651_0002_m_000082_1
2009-09-29 11:20:46,504 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from 
attempt_200909290651_0002_m_000082_1 -> (102, 45) from datanode2
2009-09-29 11:20:48,958 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring 
obsolete output of KILLED map-task: 'attempt_200909290651_0002_m_000074_0'
2009-09-29 11:20:48,958 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200909290651_0002_r_000000_0: Got 1 new map-outputs
2009-09-29 11:20:51,506 INFO org.apache.hadoop.mapred.ReduceTask: 
attempt_200909290651_0002_r_000000_0 Scheduled 1 outputs (0 slow hosts and0 dup 
hosts)
2009-09-29 11:20:51,513 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 
2814475 bytes (2814479 raw bytes) into RAM from 
attempt_200909290651_0002_m_000074_1
2009-09-29 11:20:51,535 INFO org.apache.hadoop.mapred.ReduceTask: Read 2814475 
bytes from map-output for attempt_200909290651_0002_m_000074_1
2009-09-29 11:20:51,535 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from 
attempt_200909290651_0002_m_000074_1 -> (102, 45) from datanode5
2009-09-29 11:20:51,973 INFO org.apache.hadoop.mapred.ReduceTask: 
GetMapEventsThread exiting
2009-09-29 11:20:51,973 INFO org.apache.hadoop.mapred.ReduceTask: 
getMapsEventsThread joined.
2009-09-29 11:20:51,974 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram 
manager
2009-09-29 11:20:51,975 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved 
on-disk merge complete: 3 files left.
2009-09-29 11:20:51,976 INFO org.apache.hadoop.mapred.ReduceTask: In-memory 
merge complete: 25 files left.
2009-09-29 11:20:52,116 INFO org.apache.hadoop.mapred.Merger: Merging 25 sorted 
segments
2009-09-29 11:20:52,117 INFO org.apache.hadoop.mapred.Merger: Down to the last 
merge-pass, with 25 segments left of total size: 69129586 bytes
2009-09-29 11:20:54,028 INFO org.apache.hadoop.mapred.ReduceTask: Merged 25 
segments, 69129586 bytes to disk to satisfy reduce memory limit
2009-09-29 11:20:54,030 INFO org.apache.hadoop.mapred.ReduceTask: Merging 4 
files, 285212383 bytes from disk
2009-09-29 11:20:54,032 INFO org.apache.hadoop.mapred.ReduceTask: Merging 0 
segments, 0 bytes from memory into reduce
2009-09-29 11:20:54,032 INFO org.apache.hadoop.mapred.Merger: Merging 4 sorted 
segments
2009-09-29 11:20:54,089 INFO org.apache.hadoop.mapred.Merger: Down to the last 
merge-pass, with 4 segments left of total size: 285212367 bytes




Thanks,
HB.



-----Original Message-----
From: Lars George [mailto:l...@worldlingo.com] 
Sent: 2009年9月29日 18:46
To: hbase-user@hadoop.apache.org
Subject: Re: how to handle large volume reduce input value in mapreduce program?

Hi HB,

I'd say do a Put for each objid. Then leave it to the flush of the write 
buffer to have them written out. So change to code to:

  byte[] family = Bytes.toBytes("oid");
  for (Writable objid : values) {
    Put put = new Put(((ImmutableBytesWritable) key).get());
    put.add(family, Bytes.toBytes(((Text) objid).toString()),
      Bytes.toBytes(((Text) objid).toString()));
    context.write((ImmutableBytesWritable) key, put);
    context.progress();
  }
}


The first version will not work at all as your List becomes to large 
obviously. So you will have to go the route you chose in attempt #2, 
i.e. save them as columns with qualifiers.

BTW, I threw in a context.progress() for good measure as otherwise you 
may have your tasks time out. In earlier versions of Hadoop the 
"write()" call may also have triggered an update, but for Hadoop 0.20 
you must call the "progress()" to report a life signal.

HTH,
Lars


yin_hong...@emc.com wrote:
> Hi, all
>
>  
>
> I am a newbie to hadoop and just begin to play it recent days. I am
> trying to write a mapreduce program to parse a large dataset (about 20G)
> to abstract object id and store to HBase table. The issue is there is
> one keyword which associates with several million object id. Here is my
> first reduce program.
>
>  
>
>  
>
> <program1>
>
> public class MyReducer extends TableReducer<Writable, Writable,
> Writable> {
>
>  
>
>     @Override
>
>     public void reduce(Writable key, Iterable<Writable> objectids,
> Context context)
>
>            throws IOException, InterruptedException {
>
>           
>
>           Set<String> objectIDs = new HashSet<String>();
>
>        Put put = new Put(((ImmutableBytesWritable) key).get());
>
>        byte[] family = Bytes.toBytes("oid");
>
>         for (Writable objid : objectids) {
>
>               objectIDs.add(((Text)objid)).toString());
>
>         }                 
>
>           put.add(family, null, Bytes.toBytes(objectIDs.toString());
>
> context.write((ImmutableBytesWritable) key, put);
>
>  
>
>     }
>
> }
>
>  
>
> In this program, the reduce failed because of the java heap "out of
> memory" issue. A rough counting show that the several million object id
> consumes about 900M heap if loading them all into a Set at one time. So
> I implements the reduce in another way:
>
>  
>
> <program2>
>
> public class IndexReducer extends TableReducer<Writable, Writable,
> Writable> {
>
>     @Override
>
>     public void reduce(Writable key, Iterable<Writable> values, Context
> context)
>
>            throws IOException, InterruptedException {
>
>  
>
>        Put put = new Put(((ImmutableBytesWritable) key).get());
>
>        byte[] family = Bytes.toBytes("oid");
>
>        for (Writable objid : values) {
>
>            put.add(family, Bytes.toBytes(((Text) objid).toString()),
> Bytes
>
>                   .toBytes(((Text) objid).toString()));
>
>        }
>
>        context.write((ImmutableBytesWritable) key, put);
>
>     }
>
> }
>
>  
>
> This time, the reduce still failed as a result of "reduce time out"
> issue. I doubled the reduce time-out. Then, "Out of memory" happened.
> Error log shows the put.add() throws "Out of memory" error.
>
>  
>
>  
>
> By the way, there are totally 18 datanode in the hadoop/hbase
> environment. The number of reduce tasks is 50.
>
>  
>
> So, my question is how to handle large volume reduce input value in
> mapreduce program. Increase memory? I don't think it is a reasonable
> option. Increase reduce task number?.........
>
>  
>
> Sigh, I totally have no any clue. What's your suggestion?
>
>  
>
>  
>
> Best Regards, 
> HB
>
>  
>
>
>   

Reply via email to