using StreamInputFormat, StreamXmlRecordReader with your custom Jobs
Hi, I am playing around with version 0.20.2 of Hadoop. I have written and packaged a Job using a custom Mapper and Reducer. The input format in my Job is set to StreamInputFormat. Also setting property stream.recordreader.class to org.apache.hadoop.streaming.StreamXmlRecordReader. This is how I want to start my job: hadoop jar custom-1.0-SNAPSHOT.jar EmailCountingJob /input /output The problem is that in this case all classes from hadoop-0.20.2-streaming.jar are missing (ClassNotFoundException). I tried using -libjars without luck. hadoop jar -libjars PATH/hadoop-0.20.2-streaming.jar custom-1.0-SNAPSHOT.jar EmailCountingJob /input /output Any chance to use streaming classes with your own Jobs without copying these classes to your projects and packaging them into your own jar? /Reik
Re: Shuffle In Memory OutOfMemoryError
I verified that size and maxSize are long. This means MR-1182 didn't resolve Andy's issue. According to Andy: At the beginning of the job there are 209,754 pending map tasks and 32 pending reduce tasks My guess is that GC wasn't reclaiming memory fast enough, leading to OOME because of large number of in-memory shuffle candidates. My suggestion for Andy would be to: 1. add -*verbose*:*gc as JVM parameter 2. modify reserve() slightly to calculate the maximum outstanding numPendingRequests and print the maximum. Based on the output from above two items, we can discuss solution. My intuition is to place upperbound on numPendingRequests beyond which canFitInMemory() returns false. * My two cents. On Tue, Mar 9, 2010 at 11:51 PM, Christopher Douglas chri...@yahoo-inc.comwrote: That section of code is unmodified in MR-1182. See the patches/svn log. -C Sent from my iPhone On Mar 9, 2010, at 7:44 PM, Ted Yu yuzhih...@gmail.com wrote: I just downloaded hadoop-0.20.2 tar ball from cloudera mirror. This is what I see in ReduceTask (line 999): public synchronized boolean reserve(int requestedSize, InputStream in) throws InterruptedException { // Wait till the request can be fulfilled... while ((size + requestedSize) maxSize) { I don't see the fix from MR-1182. That's why I suggested to Andy that he manually apply MR-1182. Cheers On Tue, Mar 9, 2010 at 5:01 PM, Andy Sautins andy.saut...@returnpath.net wrote: Thanks Christopher. The heap size for reduce tasks is configured to be 640M ( mapred.child.java.opts set to -Xmx640m ). Andy -Original Message- From: Christopher Douglas [mailto:chri...@yahoo-inc.com] Sent: Tuesday, March 09, 2010 5:19 PM To: common-user@hadoop.apache.org Subject: Re: Shuffle In Memory OutOfMemoryError No, MR-1182 is included in 0.20.2 What heap size have you set for your reduce tasks? -C Sent from my iPhone On Mar 9, 2010, at 2:34 PM, Ted Yu yuzhih...@gmail.com wrote: Andy: You need to manually apply the patch. Cheers On Tue, Mar 9, 2010 at 2:23 PM, Andy Sautins andy.saut...@returnpath.net wrote: Thanks Ted. My understanding is that MAPREDUCE-1182 is included in the 0.20.2 release. We upgraded our cluster to 0.20.2 this weekend and re-ran the same job scenarios. Running with mapred.reduce.parallel.copies set to 1 and continue to have the same Java heap space error. -Original Message- From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Tuesday, March 09, 2010 12:56 PM To: common-user@hadoop.apache.org Subject: Re: Shuffle In Memory OutOfMemoryError This issue has been resolved in http://issues.apache.org/jira/browse/MAPREDUCE-1182 Please apply the patch M1182-1v20.patch http://issues.apache.org/jira/secure/attachment/12424116/M1182-1v20.patch On Sun, Mar 7, 2010 at 3:57 PM, Andy Sautins andy.saut...@returnpath.net wrote: Thanks Ted. Very helpful. You are correct that I misunderstood the code at ReduceTask.java:1535. I missed the fact that it's in a IOException catch block. My mistake. That's what I get for being in a rush. For what it's worth I did re-run the job with mapred.reduce.parallel.copies set with values from 5 all the way down to 1. All failed with the same error: Error: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.mapred.ReduceTask$ReduceCopier $MapOutputCopier.shuffleInMemory(ReduceTask.java:1508) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier $MapOutputCopier.getMapOutput(ReduceTask.java:1408) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier $MapOutputCopier.copyOutput(ReduceTask.java:1261) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run (ReduceTask.java:1195) So from that it does seem like something else might be going on, yes? I need to do some more research. I appreciate your insights. Andy -Original Message- From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Sunday, March 07, 2010 3:38 PM To: common-user@hadoop.apache.org Subject: Re: Shuffle In Memory OutOfMemoryError My observation is based on this call chain: MapOutputCopier.run() calling copyOutput() calling getMapOutput() calling ramManager.canFitInMemory(decompressedLength) Basically ramManager.canFitInMemory() makes decision without considering the number of MapOutputCopiers that are running. Thus 1.25 * 0.7 of total heap may be used in shuffling if default parameters were used. Of course, you should check the value for mapred.reduce.parallel.copies to see if it is 5. If it is 4 or lower, my reasoning wouldn't apply. About ramManager.unreserve() call, ReduceTask.java from hadoop 0.20.2 only has 2731 lines. So I have to guess the location of the code snippet you provided. I found this around line 1535: } catch (IOException ioe) { LOG.info(Failed to shuffle from + mapOutputLoc.getTaskAttemptId(),
Uneven DBInputFormat Splits
Hi all, I've setup a job that pulls say 250 records from MySQL and splits them across several mappers. Each mapper (with the exception of attempt_*_0) gets roughly 250/(n mappers) records. However, attempt 0 always ends up with ~5x the workload of the others. Is there something I'm missing or is this normal? Thanks Nick Jones
hadoop streaming one process per map question
Hi Hadoop Streaming users, Accordingly to Hadoop Streaming FAQ, http://hadoop.apache.org/common/docs/r0.20.2/streaming.html#Frequently+Asked+Questions if I want to embarrassingly parallelize my task and assign one process per map, I can do Hadoop Streaming and custom mapper script: - Generate a file containing the full HDFS path of the input files. Each map task would get one file name as input. - Create a mapper script which, given a filename, will get the file to local disk, gzip the file and put it back in the desired output directory I understand that if I use a long list of full HDFS path as an input file to hadoop-streaming, the file will be chopped into piece of the ~same size to feed each mapper. The bigger question is how can we make sure that files in each piece are sitting local to the mapper, i.e., data locality? If not, the files will be fetched over the network, that is definitely not scalable for data-intensive application, and defeat the purpose of using Hadoop. I am questioning the rationale of this workaround in the FAQ. Please advice. Thanks Qiming -- Dr. Qiming He 240-751-4526 (Phone) 301-525-6612 (Cell) 240-766-0561 (Fax)
Executing jobs with other users
Hi all, I've Hadoop running on a cluster. I would like to execute jobs with other users, not just with the hadoop's user. Is it possible? Thanks in Advance, Edson Ramiro
Re: Executing jobs with other users
Of course, as long as the other user is in the same user group as the hadoop user. On Wed, Mar 10, 2010 at 6:10 AM, Edson Ramiro erlfi...@gmail.com wrote: Hi all, I've Hadoop running on a cluster. I would like to execute jobs with other users, not just with the hadoop's user. Is it possible? Thanks in Advance, Edson Ramiro -- Best Regards Jeff Zhang
Fair scheduler fairness question
I am learning how fair scheduler manage the jobs to allow each job share resource over time; but don't know if my understanding is correct or not. My scenario is that I have 3 data nodes and the cluster is configured using fair scheduler with three pools launched (e.g. A, B, C). Each pool is configured with 'maxRunningJobs1/maxRunningJobs.' Now the clients try to submit 4 jobs (e.g. submitjob()) to 3 differt pools. For instance, the first job is submitted to pool A the second job is submitted to pool B the third job is submitted to pool B the fourth job is submitted to pool C So I expect that the first 3 jobs will occupy the free slots (the slots should be fool now.) Then the fourth job is submitted. But since the slots are full, and the fourth job should also have a slot executing its job; therefore, the third job will be terminated (or kill) so that the fourth job can be launched. Is my scenario correct? And if I am right, is there any key word searchable in the log to observe such activites (the job that is being killed e.g. the third job)? Thanks for help. I apprecaite any advice.
Re: Fair scheduler fairness question
On 3/10/10 7:38 AM, Neo Anderson javadeveloper...@yahoo.co.uk wrote: I am learning how fair scheduler manage the jobs to allow each job share resource over time; but don't know if my understanding is correct or not. My scenario is that I have 3 data nodes and the cluster is configured using fair scheduler with three pools launched (e.g. A, B, C). Each pool is configured with 'maxRunningJobs1/maxRunningJobs.' Now the clients try to submit 4 jobs (e.g. submitjob()) to 3 differt pools. For instance, the first job is submitted to pool A the second job is submitted to pool B the third job is submitted to pool B the fourth job is submitted to pool C So I expect that the first 3 jobs will occupy the free slots (the slots should be fool now.) Then the fourth job is submitted. But since the slots are full, and the fourth job should also have a slot executing its job; therefore, the third job will be terminated (or kill) so that the fourth job can be launched. Is my scenario correct? And if I am right, is there any key word searchable in the log to observe such activites (the job that is being killed e.g. the third job)? A lot of it depends upon timing. If there is a long enough pause between job 1 and job 2, job 1 will take every slot available to it. As job 1's slots finish, job 2 and 4 would get those slots. As job 2 finishes, job 3 will get its slots. Slots are only freed by force if the scheduler you are using has pre-emption. I think some versions of fair share may have it. Entire jobs are never killed.
Re: using StreamInputFormat, StreamXmlRecordReader with your custom Jobs
I think you can use DistributedCache to specify the location of the jar after you have it in hdfs.. On Wed, Mar 10, 2010 at 6:11 AM, Reik Schatz reik.sch...@bwin.org wrote: Hi, I am playing around with version 0.20.2 of Hadoop. I have written and packaged a Job using a custom Mapper and Reducer. The input format in my Job is set to StreamInputFormat. Also setting property stream.recordreader.class to org.apache.hadoop.streaming.StreamXmlRecordReader. This is how I want to start my job: hadoop jar custom-1.0-SNAPSHOT.jar EmailCountingJob /input /output The problem is that in this case all classes from hadoop-0.20.2-streaming.jar are missing (ClassNotFoundException). I tried using -libjars without luck. hadoop jar -libjars PATH/hadoop-0.20.2-streaming.jar custom-1.0-SNAPSHOT.jar EmailCountingJob /input /output Any chance to use streaming classes with your own Jobs without copying these classes to your projects and packaging them into your own jar? /Reik
Re: Symbolic link in Hadoop HDFS?
Those patches won't apply to an existing release so you'd need to use trunk to get symlinks. They will be included in the next release, however only via the FileContext API. Thanks, Eli On Tue, Mar 9, 2010 at 6:16 PM, jiang licht licht_ji...@yahoo.com wrote: Thanks, Eli. I actually just read hdfs-245 before I saw your reply. So, as I understand, symlink is available by using some relevant patches, right? From what you mentioned, it seems to me that symlink support is quite stable as of now. Just curious, why not included in a recent hadoop release yet or will it be considered in the next release? Thanks, Michael --- On Tue, 3/9/10, Eli Collins e...@cloudera.com wrote: From: Eli Collins e...@cloudera.com Subject: Re: Symbolic link in Hadoop HDFS? To: common-user@hadoop.apache.org Date: Tuesday, March 9, 2010, 8:01 PM Hey Michael, Symbolic links has been implemented [1] but are not yet available in a Hadoop release. The implementation is only available to clients that use the new FileContext API so clients like Hive need to be migrated from using FileSystem to FileContext. This is currently being done in Hadoop itself [2]. Thanks, Eli [1] http://issues.apache.org/jira/browse/HDFS-245 [2] http://issues.apache.org/jira/browse/HADOOP-6446 On Tue, Mar 9, 2010 at 4:10 PM, jiang licht licht_ji...@yahoo.com wrote: Is there a way to create symlink in hdfs? And does LOAD function in Pig follows such a link? Thanks! Michael
Re: Fair scheduler fairness question
--- On Wed, 10/3/10, Allen Wittenauer awittena...@linkedin.com wrote: From: Allen Wittenauer awittena...@linkedin.com Subject: Re: Fair scheduler fairness question To: common-user@hadoop.apache.org Date: Wednesday, 10 March, 2010, 16:06 On 3/10/10 7:38 AM, Neo Anderson javadeveloper...@yahoo.co.uk wrote: I am learning how fair scheduler manage the jobs to allow each job share resource over time; but don't know if my understanding is correct or not. My scenario is that I have 3 data nodes and the cluster is configured using fair scheduler with three pools launched (e.g. A, B, C). Each pool is configured with 'maxRunningJobs1/maxRunningJobs.' Now the clients try to submit 4 jobs (e.g. submitjob()) to 3 differt pools. For instance, the first job is submitted to pool A the second job is submitted to pool B the third job is submitted to pool B the fourth job is submitted to pool C So I expect that the first 3 jobs will occupy the free slots (the slots should be fool now.) Then the fourth job is submitted. But since the slots are full, and the fourth job should also have a slot executing its job; therefore, the third job will be terminated (or kill) so that the fourth job can be launched. Is my scenario correct? And if I am right, is there any key word searchable in the log to observe such activites (the job that is being killed e.g. the third job)? A lot of it depends upon timing. If there is a long enough pause between job 1 and job 2, job 1 will take every slot available to it. As job 1's slots finish, job 2 and 4 would get those slots. As job 2 finishes, job 3 will get its slots. Slots are only freed by force if the scheduler you are using has pre-emption. I think some versions of fair share may have it. Entire jobs are never killed. At the moment I use hadoop 0.20.2 and I can not find code that relates to 'preempt' function; however, I read the jira MAPREDUCE-551 saying preempt function is already been fixed at version 0.20.0. Also, I can find some functons that relates to 'preemption' e.g. 'protected void preemptTasksIfNecessary()' in the patch. I am confused now - which function in version 0.20.2 (or 0.20.1) is used to preempt unnecessary tasks (so that slots can be freed for other tasks/ jobs to run)? Thanks you for your help.
Re: Fair scheduler fairness question
On 3/10/10 9:14 AM, Neo Anderson javadeveloper...@yahoo.co.uk wrote: At the moment I use hadoop 0.20.2 and I can not find code that relates to 'preempt' function; however, I read the jira MAPREDUCE-551 saying preempt function is already been fixed at version 0.20.0. MR-551 says fixed in 0.21 at the top. Reading the text shows that patches are available if you want to patch your own build of 0.20.
Re: Fair scheduler fairness question
On Wed, Mar 10, 2010 at 9:18 AM, Allen Wittenauer awittena...@linkedin.comwrote: On 3/10/10 9:14 AM, Neo Anderson javadeveloper...@yahoo.co.uk wrote: At the moment I use hadoop 0.20.2 and I can not find code that relates to 'preempt' function; however, I read the jira MAPREDUCE-551 saying preempt function is already been fixed at version 0.20.0. MR-551 says fixed in 0.21 at the top. Reading the text shows that patches are available if you want to patch your own build of 0.20. If you'd rather not patch your own build of Hadoop, the fair scheduler preemption feature is also available in CDH2: http://archive.cloudera.com/cdh/2/hadoop-0.20.1+169.56.tar.gz -Todd -- Todd Lipcon Software Engineer, Cloudera
Re: Symbolic link in Hadoop HDFS?
Thanks, Eli. I c. Any plan to support symlink in hadoop fs shell? That'll make life easier instead of everyone writing similar codes. With that support in shell commands, with one physical copy of a big data set, different ppl can easily create different subset of it to work on by using symlinks. Thanks, Michael --- On Wed, 3/10/10, Eli Collins e...@cloudera.com wrote: From: Eli Collins e...@cloudera.com Subject: Re: Symbolic link in Hadoop HDFS? To: common-user@hadoop.apache.org Date: Wednesday, March 10, 2010, 11:08 AM Those patches won't apply to an existing release so you'd need to use trunk to get symlinks. They will be included in the next release, however only via the FileContext API. Thanks, Eli On Tue, Mar 9, 2010 at 6:16 PM, jiang licht licht_ji...@yahoo.com wrote: Thanks, Eli. I actually just read hdfs-245 before I saw your reply. So, as I understand, symlink is available by using some relevant patches, right? From what you mentioned, it seems to me that symlink support is quite stable as of now. Just curious, why not included in a recent hadoop release yet or will it be considered in the next release? Thanks, Michael --- On Tue, 3/9/10, Eli Collins e...@cloudera.com wrote: From: Eli Collins e...@cloudera.com Subject: Re: Symbolic link in Hadoop HDFS? To: common-user@hadoop.apache.org Date: Tuesday, March 9, 2010, 8:01 PM Hey Michael, Symbolic links has been implemented [1] but are not yet available in a Hadoop release. The implementation is only available to clients that use the new FileContext API so clients like Hive need to be migrated from using FileSystem to FileContext. This is currently being done in Hadoop itself [2]. Thanks, Eli [1] http://issues.apache.org/jira/browse/HDFS-245 [2] http://issues.apache.org/jira/browse/HADOOP-6446 On Tue, Mar 9, 2010 at 4:10 PM, jiang licht licht_ji...@yahoo.com wrote: Is there a way to create symlink in hdfs? And does LOAD function in Pig follows such a link? Thanks! Michael
region server appearing twice on HBase Master page
I noticed two lines for the same region server on HBase Master page: X.com:600301268160765854requests=0, regions=16, usedHeap=1068, maxHeap=6127 X.com:600301268250726442requests=21, regions=9, usedHeap=1258, maxHeap=6127 I checked there is only one org.apache.hadoop.hbase.regionserver.HRegionServer instance running on that machine. This is from region server log: 2010-03-10 13:25:38,157 ERROR [IPC Server handler 43 on 60020] regionserver.HRegionServer(844): org.apache.hadoop.hbase.NotServingRegionException: ruletable,,1268083966723 at org.apache.hadoop.hbase.regionserver.HRegionServer.getRegion(HRegionServer.java:2307) at org.apache.hadoop.hbase.regionserver.HRegionServer.get(HRegionServer.java:1784) at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.hbase.ipc.HBaseRPC$Server.call(HBaseRPC.java:648) at org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:915) 2010-03-10 13:25:38,189 ERROR [IPC Server handler 0 on 60020] regionserver.HRegionServer(844): org.apache.hadoop.hbase.NotServingRegionException: ruletable,,1268083966723 at org.apache.hadoop.hbase.regionserver.HRegionServer.getRegion(HRegionServer.java:2307) at org.apache.hadoop.hbase.regionserver.HRegionServer.get(HRegionServer.java:1784) at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.hbase.ipc.HBaseRPC$Server.call(HBaseRPC.java:648) at org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:915) If you know how to troubleshoot, please share.
Re: Symbolic link in Hadoop HDFS?
Yup, it needs to be ported to FileContext first (HADOOP-6424). Thanks, Eli On Wed, Mar 10, 2010 at 9:47 AM, jiang licht licht_ji...@yahoo.com wrote: Thanks, Eli. I c. Any plan to support symlink in hadoop fs shell? That'll make life easier instead of everyone writing similar codes. With that support in shell commands, with one physical copy of a big data set, different ppl can easily create different subset of it to work on by using symlinks. Thanks, Michael --- On Wed, 3/10/10, Eli Collins e...@cloudera.com wrote: From: Eli Collins e...@cloudera.com Subject: Re: Symbolic link in Hadoop HDFS? To: common-user@hadoop.apache.org Date: Wednesday, March 10, 2010, 11:08 AM Those patches won't apply to an existing release so you'd need to use trunk to get symlinks. They will be included in the next release, however only via the FileContext API. Thanks, Eli On Tue, Mar 9, 2010 at 6:16 PM, jiang licht licht_ji...@yahoo.com wrote: Thanks, Eli. I actually just read hdfs-245 before I saw your reply. So, as I understand, symlink is available by using some relevant patches, right? From what you mentioned, it seems to me that symlink support is quite stable as of now. Just curious, why not included in a recent hadoop release yet or will it be considered in the next release? Thanks, Michael --- On Tue, 3/9/10, Eli Collins e...@cloudera.com wrote: From: Eli Collins e...@cloudera.com Subject: Re: Symbolic link in Hadoop HDFS? To: common-user@hadoop.apache.org Date: Tuesday, March 9, 2010, 8:01 PM Hey Michael, Symbolic links has been implemented [1] but are not yet available in a Hadoop release. The implementation is only available to clients that use the new FileContext API so clients like Hive need to be migrated from using FileSystem to FileContext. This is currently being done in Hadoop itself [2]. Thanks, Eli [1] http://issues.apache.org/jira/browse/HDFS-245 [2] http://issues.apache.org/jira/browse/HADOOP-6446 On Tue, Mar 9, 2010 at 4:10 PM, jiang licht licht_ji...@yahoo.com wrote: Is there a way to create symlink in hdfs? And does LOAD function in Pig follows such a link? Thanks! Michael
Track tuple progress
I am newbie to hadoop. I have been looking at the hadoop source code and examples but I can't find any sample implementations for Reporter interface. Where can I find sample Reporter implementations. Also, is there any way to track progress of map and reduce at the record level (not task level)? -- View this message in context: http://old.nabble.com/Track-tuple-progress-tp27857222p27857222.html Sent from the Hadoop core-user mailing list archive at Nabble.com.
Re: Shuffle In Memory OutOfMemoryError
I don't think this OOM is a framework bug per se, and given the rewrite/refactoring of the shuffle in MAPREDUCE-318 (in 0.21), tuning the 0.20 shuffle semantics is likely not worthwhile (though data informing improvements to trunk would be excellent). Most likely (and tautologically), ReduceTask simply requires more memory than is available and the job failure can be avoided by either 0) increasing the heap size or 1) lowering mapred.shuffle.input.buffer.percent. Most of the tasks we run have a heap of 1GB. For a reduce fetching 200k map outputs, that's a reasonable, even stingy amount of space. -C On Mar 10, 2010, at 5:26 AM, Ted Yu wrote: I verified that size and maxSize are long. This means MR-1182 didn't resolve Andy's issue. According to Andy: At the beginning of the job there are 209,754 pending map tasks and 32 pending reduce tasks My guess is that GC wasn't reclaiming memory fast enough, leading to OOME because of large number of in-memory shuffle candidates. My suggestion for Andy would be to: 1. add -*verbose*:*gc as JVM parameter 2. modify reserve() slightly to calculate the maximum outstanding numPendingRequests and print the maximum. Based on the output from above two items, we can discuss solution. My intuition is to place upperbound on numPendingRequests beyond which canFitInMemory() returns false. * My two cents. On Tue, Mar 9, 2010 at 11:51 PM, Christopher Douglas chri...@yahoo-inc.comwrote: That section of code is unmodified in MR-1182. See the patches/svn log. -C Sent from my iPhone On Mar 9, 2010, at 7:44 PM, Ted Yu yuzhih...@gmail.com wrote: I just downloaded hadoop-0.20.2 tar ball from cloudera mirror. This is what I see in ReduceTask (line 999): public synchronized boolean reserve(int requestedSize, InputStream in) throws InterruptedException { // Wait till the request can be fulfilled... while ((size + requestedSize) maxSize) { I don't see the fix from MR-1182. That's why I suggested to Andy that he manually apply MR-1182. Cheers On Tue, Mar 9, 2010 at 5:01 PM, Andy Sautins andy.saut...@returnpath.net wrote: Thanks Christopher. The heap size for reduce tasks is configured to be 640M ( mapred.child.java.opts set to -Xmx640m ). Andy -Original Message- From: Christopher Douglas [mailto:chri...@yahoo-inc.com] Sent: Tuesday, March 09, 2010 5:19 PM To: common-user@hadoop.apache.org Subject: Re: Shuffle In Memory OutOfMemoryError No, MR-1182 is included in 0.20.2 What heap size have you set for your reduce tasks? -C Sent from my iPhone On Mar 9, 2010, at 2:34 PM, Ted Yu yuzhih...@gmail.com wrote: Andy: You need to manually apply the patch. Cheers On Tue, Mar 9, 2010 at 2:23 PM, Andy Sautins andy.saut...@returnpath.net wrote: Thanks Ted. My understanding is that MAPREDUCE-1182 is included in the 0.20.2 release. We upgraded our cluster to 0.20.2 this weekend and re-ran the same job scenarios. Running with mapred.reduce.parallel.copies set to 1 and continue to have the same Java heap space error. -Original Message- From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Tuesday, March 09, 2010 12:56 PM To: common-user@hadoop.apache.org Subject: Re: Shuffle In Memory OutOfMemoryError This issue has been resolved in http://issues.apache.org/jira/browse/MAPREDUCE-1182 Please apply the patch M1182-1v20.patch http://issues.apache.org/jira/secure/attachment/12424116/M1182-1v20.patch On Sun, Mar 7, 2010 at 3:57 PM, Andy Sautins andy.saut...@returnpath.net wrote: Thanks Ted. Very helpful. You are correct that I misunderstood the code at ReduceTask.java:1535. I missed the fact that it's in a IOException catch block. My mistake. That's what I get for being in a rush. For what it's worth I did re-run the job with mapred.reduce.parallel.copies set with values from 5 all the way down to 1. All failed with the same error: Error: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.mapred.ReduceTask$ReduceCopier $MapOutputCopier.shuffleInMemory(ReduceTask.java:1508) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier $MapOutputCopier.getMapOutput(ReduceTask.java:1408) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier $MapOutputCopier.copyOutput(ReduceTask.java:1261) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier $MapOutputCopier.run (ReduceTask.java:1195) So from that it does seem like something else might be going on, yes? I need to do some more research. I appreciate your insights. Andy -Original Message- From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Sunday, March 07, 2010 3:38 PM To: common-user@hadoop.apache.org Subject: Re: Shuffle In Memory OutOfMemoryError My observation is based on this call chain: MapOutputCopier.run() calling copyOutput() calling getMapOutput() calling ramManager.canFitInMemory(decompressedLength) Basically ramManager.canFitInMemory() makes decision without
Why hadoop jobs need setup and cleanup phases which would consume a lot of time ?
Hi all, Why hadoop jobs need setup and cleanup phases which would consume a lot of time ? Why could not us archieve it like a distributed RDBMS does a master process coordinates all salve nodes through socket. I think that will save plenty of time if there won't be any setups and cleanups. What's hadoop philosophy on this? Thanks, Min -- My research interests are distributed systems, parallel computing and bytecode based virtual machine. My profile: http://www.linkedin.com/in/coderplay My blog: http://coderplay.javaeye.com
Re: Why hadoop jobs need setup and cleanup phases which would consume a lot of time ?
Hi Zhou, I look at the source code, it seems it is the JobTracker initiate the setup and cleanup task. And why do you think the setup and cleanup phases consume a lot of time, actually the time cost is depend on the OutputCommitter On Thu, Mar 11, 2010 at 11:04 AM, Min Zhou coderp...@gmail.com wrote: Hi all, Why hadoop jobs need setup and cleanup phases which would consume a lot of time ? Why could not us archieve it like a distributed RDBMS does a master process coordinates all salve nodes through socket. I think that will save plenty of time if there won't be any setups and cleanups. What's hadoop philosophy on this? Thanks, Min -- My research interests are distributed systems, parallel computing and bytecode based virtual machine. My profile: http://www.linkedin.com/in/coderplay My blog: http://coderplay.javaeye.com -- Best Regards Jeff Zhang
Re: Shuffle In Memory OutOfMemoryError
Thanks to Andy for the log he provided. You can see from the log below that size increased steadily from 341535057 to 408181692, approaching maxSize. Then OOME: 2010-03-10 18:38:32,936 INFO org.apache.hadoop.mapred.ReduceTask: reserve: pos=start requestedSize=3893000 size=341535057 numPendingRequests=0 maxSize=417601952 2010-03-10 18:38:32,936 INFO org.apache.hadoop.mapred.ReduceTask: reserve: pos=end requestedSize=3893000 size=345428057 numPendingRequests=0 maxSize=417601952 ... 2010-03-10 18:38:35,950 INFO org.apache.hadoop.mapred.ReduceTask: reserve: pos=end requestedSize=635753 size=408181692 numPendingRequests=0 maxSize=417601952 2010-03-10 18:38:36,603 INFO org.apache.hadoop.mapred.ReduceTask: Task attempt_201003101826_0001_r_04_0: Failed fetch #1 from attempt_201003101826_0001_m_000875_0 2010-03-10 18:38:36,603 WARN org.apache.hadoop.mapred.ReduceTask: attempt_201003101826_0001_r_04_0 adding host hd17.dfs.returnpath.net to penalty box, next contact in 4 seconds 2010-03-10 18:38:36,604 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201003101826_0001_r_04_0: Got 1 map-outputs from previous failures 2010-03-10 18:38:36,605 FATAL org.apache.hadoop.mapred.TaskRunner: attempt_201003101826_0001_r_04_0 : Map output copy failure : java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.shuffleInMemory(ReduceTask.java:1513) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.getMapOutput(ReduceTask.java:1413) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:1266) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:1200) Looking at the call to unreserve() in ReduceTask, two were for IOException and the other was for Sanity check (line 1557). Meaning they wouldn't be called in normal execution path. I see one call in IFile.InMemoryReader: // Inform the RamManager ramManager.unreserve(bufferSize); And InMemoryReader is used in ReaderK, V reader = new InMemoryReaderK, V(ramManager, mo.mapAttemptId, mo.data, 0, mo.data.length); On Wed, Mar 10, 2010 at 3:34 PM, Chris Douglas chri...@yahoo-inc.comwrote: I don't think this OOM is a framework bug per se, and given the rewrite/refactoring of the shuffle in MAPREDUCE-318 (in 0.21), tuning the 0.20 shuffle semantics is likely not worthwhile (though data informing improvements to trunk would be excellent). Most likely (and tautologically), ReduceTask simply requires more memory than is available and the job failure can be avoided by either 0) increasing the heap size or 1) lowering mapred.shuffle.input.buffer.percent. Most of the tasks we run have a heap of 1GB. For a reduce fetching 200k map outputs, that's a reasonable, even stingy amount of space. -C On Mar 10, 2010, at 5:26 AM, Ted Yu wrote: I verified that size and maxSize are long. This means MR-1182 didn't resolve Andy's issue. According to Andy: At the beginning of the job there are 209,754 pending map tasks and 32 pending reduce tasks My guess is that GC wasn't reclaiming memory fast enough, leading to OOME because of large number of in-memory shuffle candidates. My suggestion for Andy would be to: 1. add -*verbose*:*gc as JVM parameter 2. modify reserve() slightly to calculate the maximum outstanding numPendingRequests and print the maximum. Based on the output from above two items, we can discuss solution. My intuition is to place upperbound on numPendingRequests beyond which canFitInMemory() returns false. * My two cents. On Tue, Mar 9, 2010 at 11:51 PM, Christopher Douglas chri...@yahoo-inc.comwrote: That section of code is unmodified in MR-1182. See the patches/svn log. -C Sent from my iPhone On Mar 9, 2010, at 7:44 PM, Ted Yu yuzhih...@gmail.com wrote: I just downloaded hadoop-0.20.2 tar ball from cloudera mirror. This is what I see in ReduceTask (line 999): public synchronized boolean reserve(int requestedSize, InputStream in) throws InterruptedException { // Wait till the request can be fulfilled... while ((size + requestedSize) maxSize) { I don't see the fix from MR-1182. That's why I suggested to Andy that he manually apply MR-1182. Cheers On Tue, Mar 9, 2010 at 5:01 PM, Andy Sautins andy.saut...@returnpath.net wrote: Thanks Christopher. The heap size for reduce tasks is configured to be 640M ( mapred.child.java.opts set to -Xmx640m ). Andy -Original Message- From: Christopher Douglas [mailto:chri...@yahoo-inc.com] Sent: Tuesday, March 09, 2010 5:19 PM To: common-user@hadoop.apache.org Subject: Re: Shuffle In Memory OutOfMemoryError No, MR-1182 is included in 0.20.2 What heap size have you set for your reduce tasks? -C Sent from my iPhone On Mar 9, 2010, at 2:34 PM, Ted Yu yuzhih...@gmail.com wrote: Andy:
Re: Shuffle In Memory OutOfMemoryError
I pressed send key a bit early. I will have to dig a bit deeper. Hopefully someone can find reader.close() call after which I will look for another possible root cause :-) On Wed, Mar 10, 2010 at 7:48 PM, Ted Yu yuzhih...@gmail.com wrote: Thanks to Andy for the log he provided. You can see from the log below that size increased steadily from 341535057 to 408181692, approaching maxSize. Then OOME: 2010-03-10 18:38:32,936 INFO org.apache.hadoop.mapred.ReduceTask: reserve: pos=start requestedSize=3893000 size=341535057 numPendingRequests=0 maxSize=417601952 2010-03-10 18:38:32,936 INFO org.apache.hadoop.mapred.ReduceTask: reserve: pos=end requestedSize=3893000 size=345428057 numPendingRequests=0 maxSize=417601952 ... 2010-03-10 18:38:35,950 INFO org.apache.hadoop.mapred.ReduceTask: reserve: pos=end requestedSize=635753 size=408181692 numPendingRequests=0 maxSize=417601952 2010-03-10 18:38:36,603 INFO org.apache.hadoop.mapred.ReduceTask: Task attempt_201003101826_0001_r_04_0: Failed fetch #1 from attempt_201003101826_0001_m_000875_0 2010-03-10 18:38:36,603 WARN org.apache.hadoop.mapred.ReduceTask: attempt_201003101826_0001_r_04_0 adding host hd17.dfs.returnpath.netto penalty box, next contact in 4 seconds 2010-03-10 18:38:36,604 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201003101826_0001_r_04_0: Got 1 map-outputs from previous failures 2010-03-10 18:38:36,605 FATAL org.apache.hadoop.mapred.TaskRunner: attempt_201003101826_0001_r_04_0 : Map output copy failure : java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.shuffleInMemory(ReduceTask.java:1513) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.getMapOutput(ReduceTask.java:1413) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:1266) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:1200) Looking at the call to unreserve() in ReduceTask, two were for IOException and the other was for Sanity check (line 1557). Meaning they wouldn't be called in normal execution path. I see one call in IFile.InMemoryReader close() method: // Inform the RamManager ramManager.unreserve(bufferSize); And InMemoryReader is used in createInMemorySegments(): ReaderK, V reader = new InMemoryReaderK, V(ramManager, mo.mapAttemptId, mo.data, 0, mo.data.length); But I don't see reader.close() in ReduceTask file. On Wed, Mar 10, 2010 at 3:34 PM, Chris Douglas chri...@yahoo-inc.comwrote: I don't think this OOM is a framework bug per se, and given the rewrite/refactoring of the shuffle in MAPREDUCE-318 (in 0.21), tuning the 0.20 shuffle semantics is likely not worthwhile (though data informing improvements to trunk would be excellent). Most likely (and tautologically), ReduceTask simply requires more memory than is available and the job failure can be avoided by either 0) increasing the heap size or 1) lowering mapred.shuffle.input.buffer.percent. Most of the tasks we run have a heap of 1GB. For a reduce fetching 200k map outputs, that's a reasonable, even stingy amount of space. -C On Mar 10, 2010, at 5:26 AM, Ted Yu wrote: I verified that size and maxSize are long. This means MR-1182 didn't resolve Andy's issue. According to Andy: At the beginning of the job there are 209,754 pending map tasks and 32 pending reduce tasks My guess is that GC wasn't reclaiming memory fast enough, leading to OOME because of large number of in-memory shuffle candidates. My suggestion for Andy would be to: 1. add -*verbose*:*gc as JVM parameter 2. modify reserve() slightly to calculate the maximum outstanding numPendingRequests and print the maximum. Based on the output from above two items, we can discuss solution. My intuition is to place upperbound on numPendingRequests beyond which canFitInMemory() returns false. * My two cents. On Tue, Mar 9, 2010 at 11:51 PM, Christopher Douglas chri...@yahoo-inc.comwrote: That section of code is unmodified in MR-1182. See the patches/svn log. -C Sent from my iPhone On Mar 9, 2010, at 7:44 PM, Ted Yu yuzhih...@gmail.com wrote: I just downloaded hadoop-0.20.2 tar ball from cloudera mirror. This is what I see in ReduceTask (line 999): public synchronized boolean reserve(int requestedSize, InputStream in) throws InterruptedException { // Wait till the request can be fulfilled... while ((size + requestedSize) maxSize) { I don't see the fix from MR-1182. That's why I suggested to Andy that he manually apply MR-1182. Cheers On Tue, Mar 9, 2010 at 5:01 PM, Andy Sautins andy.saut...@returnpath.net wrote: Thanks Christopher. The heap size for reduce tasks is configured to be 640M ( mapred.child.java.opts set to -Xmx640m ). Andy -Original