Re: Number of Mappers Running Simultaneously
Hi Rahul, Can you please be more specific? Do you want to control mappers running simultaneously for your job ( I guess ) or the cluster as a whole? If for your job, and you want to control it on a per node basis, one way is to allocate more memory to each of your mapper so it occupies more than one slot. If a slot is free, a task will be scheduled on it and that's more or less out of your control, especially so in pig. In case you want a global cap on simultaneous mappers, its a little more complicated and inefficient too. A little more detail on your use case should get you better response on the list. Sorry if I misunderstood your quesiton. Amogh On 9/15/10 3:02 AM, Rahul Malviya rmalv...@apple.com wrote: Hi, I want to control the number of mappers tasks running simultaneously. Is there a way to do that if I run Pig jobs on hadoop ? Any input is helpful. Thanks, Rahul
Re: getJobID and job handling
Hi, I see you are using the new APIs, so this should be relevant for you https://issues.apache.org/jira/browse/MAPREDUCE-118 As you have noticed, in the old APIs the JobClient could be queried using JobID , which was returned when the job was submitted. There was a thread in hadoop-dev to discuss un-deprecating the old APIs (not sure where it ultimately went, you may search around the list though) In any case, the patch on above jira should fix it. Amogh On 7/23/10 9:39 PM, Michael Sutter michael.sut...@kit.edu wrote: Hello everybody, I have a problem with my application and hopefully someone can help me. I want to submit a large number of jobs, notice the job id and come back after some hours to query the status of the jobs. The idea was to create the Job (org.apache.hadoop.mapreduce.Job), query the job id via getJobID() and store it in a database. So far everything works fine, except that getJobID() always returns null. I found a workaround with getTrackingURL() and cut the JobID from it. So to my questions: Is it a bug, that getJobID() always returns null? And is it possible to query the status when I only have the job id? The only information I found is to do it with JobClient.getJob(JobID), whereas JobID is already deprecated. Or is this the correct way to do it? I'm using hadoop 0.20.2. Thanks in advance Michael
Re: Can we modify existing file in HDFS?
Do I need to remove and re-create the whole file? Simply put, as of now, yes. Append functionality is being made available to users to add to end of file though :) Amogh On 6/22/10 1:56 PM, elton sky eltonsky9...@gmail.com wrote: hello everyone, I noticed there are 6 operations in HDFS: OP_WRITE_BLOCK OP_READ_BLOCK OP_READ_METADATA OP_REPLACE_BLOCK OP_COPY_BLOCK OP_BLOCK_CHECKSUM and As I know there's no way to modify some arbitrary part in a existing file in HDFS. So what if I create a say, 2 Petabytes, file and like to modify some parts? Do I need to remove and re-create the whole file?
Re: Performance tuning of sort
Since the scale of input data and operations of each reduce task is the same, what may cause the execution time of reduce tasks different? You should consider looking at the copy, shuffle and reduce times separately from JT UI to get better info. Many (dynamic) considerations like network congestion, number of mappers reducer is fetching from, data skew wrt input keys to reducer etc will affect this number. HTH, Amogh On 6/18/10 8:05 AM, 李钰 car...@gmail.com wrote: Hi Todd and Jeff, Thanks a lot for your discussion, it's really helpful to me. I'd like to express my especial appreciation for Todd's patient explanation, you help me see more clearly about the working mechanism of SORT. And Jeff, really thank you for reminding me that sort uses TotalOrderPartitioner to do partitioning. Based on your discussion I update my understanding as follows: The sorting happens on the map side during the spill process of each map task, after that, the overall map outputs are partitioned by method of TotalOrderPartitioner, this decides the input range of each reducer. Reducers get map outputs as decided by the partitioner, and do merging and write results into HDFS. Is this understanding right? Please correct me if you find any faults, thanks. If this understanding is right, then my question rolls back to the original one: Since the scale of input data and operations of each reduce task is the same, what may cause the execution time of reduce tasks different? All nodes used in my experiment are on the same rack, and they are homogenous. Any suggesion will be highly appreciated, thanks. Best Regards, Carp 2010/6/18 Todd Lipcon t...@cloudera.com On Thu, Jun 17, 2010 at 9:37 AM, Jeff Zhang zjf...@gmail.com wrote: Todd, Why's there a sorting in map task, the sorting here seems useless in my opinion. For map-only jobs there isn't. For jobs with reduce, typically the number of reduce tasks is smaller than the number of map tasks, so parallelizing the sort on the mappers and just doing merge on the reducers is beneficial. Second, this allows the combiner to run on the mapper by identifying when it has multiple outputs for the same key. Third, this allows improved compression on the map output (thus less intermediate data transfer) by putting similar keys near each other (hopefully within the compression window). Fourth, it kills two birds with one stone since the mappers already have to group outputs by the partition. -Todd On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon t...@cloudera.com wrote: On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang zjf...@gmail.com wrote: Your understanding of Sort is not right. The key concept of Sort is the TotalOrderPartitioner. Actually before the map-reduce job, client side will do sampling of input data to estimate the distribution of input data. And the mapper do nothing, each reducer will fetch its data according the TotalOrderPartitioner. The data in each reducer is local sorted, and each reducer are sorted ( r0r1r2), so the overall result data is sorted. The sorting happens on the map side, actually, during the spill process. The mapper itself is an identity function, but the map task code does perform a sort (on a partition,key tuple) as originally described in this thread. Reducers just do a merge of mapper outputs. -Todd On Thu, Jun 17, 2010 at 12:13 AM, 李钰 car...@gmail.com wrote: Hi all, I'm doing some tuning of the sort benchmark of hadoop. To be more specified, running test against the org.apache.hadoop.examples.Sort class. As looking through the source code, I think the map tasks take responsibility of sorting the input data, and the reduce tasks just merge the map outputs and write them into HDFS. But here I've got a question I couldn't understand: the time cost of the reduce phase of each reduce task, that is writing data into HDFS, is different from each other. Since the input data and operations of each reduce task is the same, what reason will cause the execution time different? Is there anything wrong of my understanding? Does anybody have any experience on this? Badly need your help, thanks. Best Regards, Carp -- Best Regards Jeff Zhang -- Todd Lipcon Software Engineer, Cloudera -- Best Regards Jeff Zhang -- Todd Lipcon Software Engineer, Cloudera
Re: hadoop streaming on Amazon EC2
Hi, Depending on what hadoop version ( 0.18.3??? ) EC2 uses, you can try one of the following 1. Compile the streaming jar files with your own custom classes and run on ec2 using this custom jar ( should work for 18.3 . Make sure you pick compatible streaming classes ) 2. Jar up your classes and specify them as -libjars option on command line, and specify the custom input and output formats as you have on your local machine ( should work for 19.0 ) I have never worked on EC2, so not sure if any easier solution exists. Amogh On 6/2/10 1:52 AM, Mo Zhou moz...@umail.iu.edu wrote: Hi, I know it may not be suitable to be posted here since it relates to EC2 more than Hadoop. However I could not find a solution and hope some one here could kindly help me out. Here is my question. I created my own inputreader and outputformatter to split an input file while use hadoop streaming. They are tested in my local machine. Following is how I use them. bin/hadoop jar hadoop-0.20.2-streaming.jar \ -D mapred.map.tasks=4\ -D mapred.reduce.tasks=0\ -input HumanSeqs.4\ -output output\ -mapper ./blastp -db nr -evalue 0.001 -outfmt 6\ -inputreader org.apache.hadoop.streaming.StreamFastaRecordReader\ -inputformat org.apache.hadoop.streaming.StreamFastaInputFormat I want to deploy the job to elastic mapreduce. I first create a streaming job. I specify input and output in S3, mapper, and reducer. However I could not find the place where I can specify -inputreader and -inputformat. So my questions are 1) how I can upload the class files to be used as inputreader and inputformat to elastic mapreduce? 2) how I specify to use them in the streaming? Any reply is appreciated. Thanks for your time! -- Thanks, Mo
Re: error in communication with hdfs
Hi, Quick couple of questions, Is the namenode formatted and the daemon started? Can you ssh w/o password? Amogh On 6/2/10 5:03 PM, Khaled BEN BAHRI khaled.ben_ba...@it-sudparis.eu wrote: Hi :) I installed hadoop and i tried to store data in hdfs but any command i want to execute like fs -mkdir or dfsadmin -report or any other one tryed to connect to server 10 times but it failed and gives this error [khale...@node004 bin]$ ./hadoop dfsadmin -report 10/06/02 13:15:45 INFO ipc.Client: Retrying connect to server: /x.x.x.x:9000. Already tried 0 time(s). Bad connection to DFS... command aborted. Thank you for any help khaled
Re: hadoop streaming on Amazon EC2
Hi, You might need to add -Dstream.shipped.hadoopstreaming=path_to_your_custom_streaming_jar Amogh On 6/2/10 5:10 PM, Mo Zhou moz...@umail.iu.edu wrote: Thank you Amogh. Elastic mapreduce use 0.18.3. I tried the first way by download hadoop-0.18.3 to my local machine. Then I got following warning. WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). So the results were incorrect. Thanks, Mo On Wed, Jun 2, 2010 at 4:56 AM, Amogh Vasekar am...@yahoo-inc.com wrote: Hi, Depending on what hadoop version ( 0.18.3??? ) EC2 uses, you can try one of the following 1. Compile the streaming jar files with your own custom classes and run on ec2 using this custom jar ( should work for 18.3 . Make sure you pick compatible streaming classes ) 2. Jar up your classes and specify them as -libjars option on command line, and specify the custom input and output formats as you have on your local machine ( should work for 19.0 ) I have never worked on EC2, so not sure if any easier solution exists. Amogh On 6/2/10 1:52 AM, Mo Zhou moz...@umail.iu.edu wrote: Hi, I know it may not be suitable to be posted here since it relates to EC2 more than Hadoop. However I could not find a solution and hope some one here could kindly help me out. Here is my question. I created my own inputreader and outputformatter to split an input file while use hadoop streaming. They are tested in my local machine. Following is how I use them. bin/hadoop jar hadoop-0.20.2-streaming.jar \ -D mapred.map.tasks=4\ -D mapred.reduce.tasks=0\ -input HumanSeqs.4\ -output output\ -mapper ./blastp -db nr -evalue 0.001 -outfmt 6\ -inputreader org.apache.hadoop.streaming.StreamFastaRecordReader\ -inputformat org.apache.hadoop.streaming.StreamFastaInputFormat I want to deploy the job to elastic mapreduce. I first create a streaming job. I specify input and output in S3, mapper, and reducer. However I could not find the place where I can specify -inputreader and -inputformat. So my questions are 1) how I can upload the class files to be used as inputreader and inputformat to elastic mapreduce? 2) how I specify to use them in the streaming? Any reply is appreciated. Thanks for your time! -- Thanks, Mo -- Thanks, Mo
Re: hadoop streaming on Amazon EC2
Hi, $ bin/hadoop jar custom_streaming_jar \ -input\ -Dstream.shipped.hadoopstreaming=custom_streaming_jar Should work. Check $ bin/hadoop jar hadoop-0.18.3-streaming.jar -info for more details. Amogh On 6/2/10 10:15 PM, Mo Zhou moz...@umail.iu.edu wrote: Thank you Amogh. I tried so but it through exceptions as follows: $ bin/hadoop jar hadoop-0.18.3-streaming.jar \ -D stream.shipped.hadoopstreaming=fasta.jar\ -input HumanSeqs.4\ -output output\ -mapper cat -\ -inputreader org.apache.hadoop.streaming.StreamFastaRecordReader,begin=\ -inputformat org.apache.hadoop.streaming.StreamFastaInputFormat 10/06/02 12:44:35 ERROR streaming.StreamJob: Unexpected -D while processing -input|-output|-mapper|-combiner|-reducer|-file|-dfs|-jt|-additionalconfspec|-inputformat|-outputformat|-partitioner|-numReduceTasks|-inputreader|-mapdebug|-reducedebug|||-cacheFile|-cacheArchive|-verbose|-info|-debug|-inputtagged|-help Usage: $HADOOP_HOME/bin/hadoop [--config dir] jar \ $HADOOP_HOME/hadoop-streaming.jar [options] Options: -inputpath DFS input file(s) for the Map step -output path DFS output directory for the Reduce step -mapper cmd|JavaClassName The streaming command to run -combiner JavaClassName Combiner has to be a Java class -reducer cmd|JavaClassName The streaming command to run -file file File/dir to be shipped in the Job jar file -dfsh:p|local Optional. Override DFS configuration -jt h:p|local Optional. Override JobTracker configuration -additionalconfspec specfile Optional. -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional. -outputformat TextOutputFormat(default)|JavaClassName Optional. -partitioner JavaClassName Optional. -numReduceTasks num Optional. -inputreader spec Optional. -jobconf n=vOptional. Add or override a JobConf property -cmdenv n=vOptional. Pass env.var to streaming commands -mapdebug path Optional. To run this script when a map task fails -reducedebug path Optional. To run this script when a reduce task fails -cacheFile fileNameURI -cacheArchive fileNameURI -verbose For more details about these options: Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info java.lang.RuntimeException: at org.apache.hadoop.streaming.StreamJob.fail(StreamJob.java:550) at org.apache.hadoop.streaming.StreamJob.exitUsage(StreamJob.java:487) at org.apache.hadoop.streaming.StreamJob.parseArgv(StreamJob.java:209) at org.apache.hadoop.streaming.StreamJob.go(StreamJob.java:111) at org.apache.hadoop.streaming.HadoopStreaming.main(HadoopStreaming.java:33) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.util.RunJar.main(RunJar.java:155) at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79) at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68) Thanks, Mo On Wed, Jun 2, 2010 at 8:40 AM, Amogh Vasekar am...@yahoo-inc.com wrote: Hi, You might need to add -Dstream.shipped.hadoopstreaming=path_to_your_custom_streaming_jar Amogh On 6/2/10 5:10 PM, Mo Zhou moz...@umail.iu.edu wrote: Thank you Amogh. Elastic mapreduce use 0.18.3. I tried the first way by download hadoop-0.18.3 to my local machine. Then I got following warning. WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). So the results were incorrect. Thanks, Mo On Wed, Jun 2, 2010 at 4:56 AM, Amogh Vasekar am...@yahoo-inc.com wrote: Hi, Depending on what hadoop version ( 0.18.3??? ) EC2 uses, you can try one of the following 1. Compile the streaming jar files with your own custom classes and run on ec2 using this custom jar ( should work for 18.3 . Make sure you pick compatible streaming classes ) 2. Jar up your classes and specify them as -libjars option on command line, and specify the custom input and output formats as you have on your local machine ( should work for 19.0 ) I have never worked on EC2, so not sure if any easier solution exists. Amogh On 6/2/10 1:52 AM, Mo Zhou moz...@umail.iu.edu wrote: Hi, I know it may not be suitable to be posted here since it relates to EC2 more than Hadoop. However I could not find a solution and hope some one here could kindly help me out. Here is my question. I created my own inputreader and outputformatter to split an input file while use hadoop streaming. They are tested in my local machine
Re: Getting zero length files on the reduce output.
Hi, The default partitioner is - hashcode(key) MODULO number_of_reducers, so its pretty much possible. Can I change this hash function in anyway? Sure, any custom partitioner can be plugged in. Check o.a.h.mapreduce.partition or the secondary sort example on mapred tutorial for more. On a side note, if you don't want the zero output files to come up, use lazyoutputformat instead. Amogh On 6/3/10 1:22 AM, Raymond Jennings III raymondj...@yahoo.com wrote: I have a cluster of 12 slave nodes. I see that for some jobs the part-r-0 type files, half of them are zero in size after the job completes. Does this mean the hash function that splits the data to each reducer node is not working all that well? On other jobs it's pretty much even across all reducers but on certain jobs only half of the reducers have files bigger than 0. It is reproducible though. Can I change this hash function in anyway? Thanks.
Re: which node processed my job
Hi, InetAddress.getLocalHost() should give you the hostname for each mapper/reducer Amogh On 5/6/10 8:39 PM, Alan Miller alan.mil...@synopsys.com wrote: Not sure if this is the right list for this question, but. Is it possible to determine which host actually processed my MR job? Regards, Alan
Re: Per-file block size
Hi, Pass the -D property in command line. eg: Hadoop fs -Ddfs.block.size=multiple of checksum . You can check if its actually set the way you needed by hadoop fs -stat %o file HTH, Amogh On 4/14/10 9:01 AM, Andrew Nguyen andrew-lists-had...@ucsfcti.org wrote: I thought I saw a way to specify the block size for individual files using the command-line using hadoop dfs -put/copyFromLocal... However, I can't seem to find the reference anywhere. I see that I can do it via the API but no references to a command-line mechanism. Am I just remembering something that doesn't exist? Or, can some point me in the right direction. Thanks! --Andrew
Re: How do I use MapFile Reader and Writer
Hi, The file system object will contain the scheme, authority etc for the given uri or path. The conf object acts as reference ( unable to get a better terminology ) to this info. Looking at the MapFileOutputFormat should help provide better understanding as to how writers and readers are initialized. Hope this helps, Amogh On 4/13/10 7:33 PM, Placebo placebobec...@hotmail.com wrote: I have a large text file, approximately 500mb containing key value pairs on each line. I would like to implement Hadoop MapFile so that I can access any key,value pair fairly quickly. To construct either the Reader or Writer the MapFile requires a Configurations object and a File System object. I am confused as to how to create either object, and why they are necessary. Would someone be so kind to demonstrate to me a trivial example as to how I can accomplish this. Thanks in advance. -- View this message in context: http://old.nabble.com/How-do-I-use-MapFile-Reader-and-Writer-tp28230683p28230683.html Sent from the Hadoop core-user mailing list archive at Nabble.com.
Re: swapping on hadoop
Hi, (#maxmapTasksperTaskTracker + #maxreduceTasksperTaskTracker) * JVMHeapSize PhysicalMemoryonNode The tasktracker and datanode daemons also take up memory, 1GB each by default I think. Is that accounted for? Could there be an issue with HDFS data or metadata taking up memory? Is the namenode a separate machine or participates in compute nodes too? What are the memory requirements for the jobtracker,namenode and tasktracker,datanode JVMs? See above, there was a thread running on this on the forum sometime back, to manipulate these values for TT and DN. this setting doesn't sound like it should be causing swapping in the first place ( io.sort.mb) I think so too :) Just yesterday I read a tweet on machine configs for Hadoop, hope it helps you http://bit.ly/cphF7R Amogh On 3/30/10 10:45 PM, Vasilis Liaskovitis vlias...@gmail.com wrote: Hi all, I 've noticed swapping for a single terasort job on a small 8-node cluster using hadoop-0.20.1. The swapping doesn't happen repeatably; I can have back to back runs of the same job from the same hdfs input data and get swapping only on 1 out of 4 identical runs. I 've noticed this swapping behaviour on both terasort jobs and hive query jobs. - Focusing on a single job config, Is there a rule of thumb about how much node memory should be left for use outside of Child JVMs? I make sure that per Node, there is free memory: (#maxmapTasksperTaskTracker + #maxreduceTasksperTaskTracker) * JVMHeapSize PhysicalMemoryonNode The total JVM heap size per node per job from the above equation currently account 65%-75% of the node's memory. (I 've tried allocating a riskier 90% of the node's memory, with similar swapping observations). - Could there be an issue with HDFS data or metadata taking up memory? I am not cleaning output or intermediate outputs from HDFS between runs. Is this possible? - Do people use any specific java flags (particularly garbage collection flags) for production environments where one job runs (or possibly more jobs run simultaneously) ? - What are the memory requirements for the jobtracker,namenode and tasktracker,datanode JVMs? - I am setting io.sort.mb to about half of the JVM heap size (half of -Xmx in javaopts). Should this be set to a different ratio? (this setting doesn't sound like it should be causing swapping in the first place). - The buffer cache is cleaned before each run (flush and echo 3 /proc/sys/vm/drop_caches) any empirical advice and suggestions to solve this are appreciated. thanks, - Vasilis
Re: execute mapreduce job on multiple hdfs files
Hi, Piggybacking on Gang’s reply, to add files / dirs recursively you can use the filestatus, liststatus to determine if its a file or dir and add as needed ( check FileStatus API for this ) There is a patch which does this for FileInputFormat http://issues.apache.org/jira/browse/MAPREDUCE-1501 Amogh On 3/23/10 6:25 PM, Gang Luo lgpub...@yahoo.com.cn wrote: Hi Oleg, you can use FileInputFormat.addInputPath(JobConf, Path) multiple times in your program to add arbitrary paths. Instead, if you use FileInputFormat.setInputPath, there could be only one input path. If you are talking about output, the path you give is an output directory, all the output files (part-0, part-1...) will be generated in that directory. -Gang - 原始邮件 发件人: Oleg Ruchovets oruchov...@gmail.com 收件人: common-user@hadoop.apache.org 发送日期: 2010/3/23 (周二) 6:18:34 上午 主 题: execute mapreduce job on multiple hdfs files Hi , All examples that I found executes mapreduce job on a single file but in my situation I have more than one. Suppose I have such folder on HDFS which contains some files: /my_hadoop_hdfs/my_folder: /my_hadoop_hdfs/my_folder/file1.txt /my_hadoop_hdfs/my_folder/file2.txt /my_hadoop_hdfs/my_folder/file3.txt How can I execute hadoop mapreduce on file1.txt , file2.txt and file3.txt? Is it possible to provide to hadoop job folder as parameter and all files will be produced by mapreduce job? Thanks In Advance Oleg
Re: split number
Hi, AFAIK, it is a hint. Depending on the block size, minimum split size and this hint the exact number of splits is computed. So if you have total_size/hint block size but greater than min split size, you should see the exact number. This is how I understand it, please let me know if I'm going wrong. Amogh On 3/22/10 12:33 AM, Gang Luo lgpub...@yahoo.com.cn wrote: Hi all, in InputFormat.getSplits(JobConf, splitNum), I think the splitNum should be a hint. The number of splits is equal to the numbers of mappers working on that file. But I do get the same number of splits as indicated by splitNum, and the sum of the split length is the length of that file. It seems the splitNum here is not a hint. Is it a bug, or did I do something wrong? Thanks, -Gang
Re: when to sent distributed cache file
Hi Gang, Yes, the time to distribute files is considered as jobs running time ( more specifically the set up time ). The time is essentially for the the TT to copy the files specified in distributed cache to its local FS, generally from HDFS unless you have a separate FS for JT. So in general you might be having small time gains when your files to be distributed have relatively high replication factor. Wrt blocks, AFAIK, even on HDFS if the file size block size, the actual space consumed is the file size itself. The overhead is in terms of storing metadata on that (small) file block. So when you have it on local disk, it will still consume only the actual size and not block size. Thanks, Amogh On 3/18/10 2:28 AM, Gang Luo lgpub...@yahoo.com.cn wrote: Thanks Ravi. Here are some observations. I run job1 to generate some data used by the following job2 without replication. The total size of the job 1 output is 25mb and is in 50 files. I use distributed cache to sent all the files to nodes running job2 tasks. When job2 starts, it stayed at map 0% reduce 0% for 10 minutes. When the job1 output is in 10 files (using 10 reducers in job1), the time consumed here are 2 minutes. So, I think the time to distribute cache files is actually counted as part of the total time of the MR job. And in order to sent a cache file from HDFS to local disk, it sent at least one block (64mb by default) even that file is only 1mb. Is that right? If so, how much space that cache file takes on the local disk, 64mb or 1mb? -Gang Hello Gang, The framework will copy the necessary files to the slave node before any tasks for the job are executed on that node. Not sure if time required to distribute cache is counted in map reduce job time but it is included in job submission process in JobClient . -- Ravi On 3/17/10 11:32 AM, Gang Luo lgpub...@yahoo.com.cn wrote: Hi all, I doubt when does hadoop distributes the cache files. The moment we call DistributedCache.addCacheFile() ? Will the time to distribute caches be counted as part of the mapreduce job time? Thanks, -Gang
Re: Is there an easy way to clear old jobs from the jobtracker webpage?
Hi, The property mapred.jobtracker.completeuserjobs.maximum property specifies the number of jobs to be kept on JT page at any time. After this they are available under history page. Probably setting this to 0 will do the trick? Amogh On 3/17/10 10:09 PM, Raymond Jennings III raymondj...@yahoo.com wrote: I'd like to be able to clear the contents of the jobs that have completed running on the jobtracker webpage. Is there an easy way to do this without restarting the cluster?
Re: java.lang.NullPointerException at org.apache.hadoop.mapred.IFile$Writer.(IFile.java:102)
Hi, http://hadoop.apache.org/common/docs/current/native_libraries.html Should answer your questions. Amogh On 3/18/10 10:48 PM, jiang licht licht_ji...@yahoo.com wrote: I got the following error when I tried to do gzip compression on map output, using hadoop-0.20.1. settings in mapred-site.xml-- mapred.compress.map.output=true mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec error message-- java.lang.NullPointerException at org.apache.hadoop.mapred.IFile$Writer.(IFile.java:102) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1198) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1091) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:359) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307) at org.apache.hadoop.mapred.Child.main(Child.java:170) I read the src that Writer in IFile takes care of map output compression. So, it seems to me that I didn't have gzip native library built or didn't have correct settings. There is no built folder in HADOOP_HOME and no native in lib folder in HADOOP_HOME. I checked that I have gzip and zlib installed. So, next is to build hadoop native library on top of these. How to do that? Is it a simple matter of pointing some variable to gzip or zlib libs or should I use build.xml in hadoop to build some target, what target should I build? Thanks, Michael
Re: cluster involvement trigger
Hi, You mentioned you pass the files packed together using -archives option. This will uncompress the archive on the compute node itself, so the namenode won't be hampered in this case. However, cleaning up the distributed cache is a tricky scenario ( user doesn't have explicit control over this ), you may search this list for many discussions pertaining to this. And while on the topic of archives, while it may not be practical for you as of now, but Hadoop Archives (har) provide similar functionality. Hope this helps. Amogh On 2/27/10 12:53 AM, Michael Kintzer michael.kint...@zerk.com wrote: Amogh, Thank you for the detailed information. Our initial prototyping seems to agree with your statements below, i.e. a single large input file is performing better than an index file + an archive of small files. I will take a look at the CombineFileInputFormat as you suggested. One question. Since the many small input files are all in a single jar archive managed by the name node, does that still hamper name node performance? I was under the impression these archives are are only unpacked into the temporary map reduce file space (and I'm assuming cleaned up after map-reduce completes). Does the name node need to store the metadata of each individual file during the unpacking for this case? -Michael On Feb 25, 2010, at 10:31 PM, Amogh Vasekar wrote: Hi, The number of mappers initialized depends largely on your input format ( the getSplits of your input format) , (almost all) input formats available in hadoop derive from fileinputformat, hence the 1 mapper per file block notion ( this actually is 1 mapper per split ). You say that you have too many small files. In general each of these small files ( 64 mb ) will be executed by a single mapper. However, I would suggest looking at CombineFileInputFormat which does the job of packaging many small files together depending on data locality for better performance ( initialization time is a significant factor in hadoop's performance ). On the other side, many small files will hamper your namenode performance since file metadata is stored in memory and limit its overall capacity wrt number of files. Amogh On 2/25/10 11:15 PM, Michael Kintzer michael.kint...@zerk.com wrote: Hi, We are using the streaming API.We are trying to understand what hadoop uses as a threshold or trigger to involve more TaskTracker nodes in a given Map-Reduce execution. With default settings (64MB chunk size in HDFS), if the input file is less than 64MB, will the data processing only occur on a single TaskTracker Node, even if our cluster size is greater than 1? For example, we are trying to figure out if hadoop is more efficient at processing: a) a single input file which is just an index file that refers to a jar archive of 100K or 1M individual small files, where the jar file is passed as the -archives argument, or b) a single input file containing all the raw data represented by the 100K or 1M small files. With (a), our input file is 64MB. With (b) our input file is very large. Thanks for any insight, -Michael
Re: cluster involvement trigger
Hi, The number of mappers initialized depends largely on your input format ( the getSplits of your input format) , (almost all) input formats available in hadoop derive from fileinputformat, hence the 1 mapper per file block notion ( this actually is 1 mapper per split ). You say that you have too many small files. In general each of these small files ( 64 mb ) will be executed by a single mapper. However, I would suggest looking at CombineFileInputFormat which does the job of packaging many small files together depending on data locality for better performance ( initialization time is a significant factor in hadoop's performance ). On the other side, many small files will hamper your namenode performance since file metadata is stored in memory and limit its overall capacity wrt number of files. Amogh On 2/25/10 11:15 PM, Michael Kintzer michael.kint...@zerk.com wrote: Hi, We are using the streaming API.We are trying to understand what hadoop uses as a threshold or trigger to involve more TaskTracker nodes in a given Map-Reduce execution. With default settings (64MB chunk size in HDFS), if the input file is less than 64MB, will the data processing only occur on a single TaskTracker Node, even if our cluster size is greater than 1? For example, we are trying to figure out if hadoop is more efficient at processing: a) a single input file which is just an index file that refers to a jar archive of 100K or 1M individual small files, where the jar file is passed as the -archives argument, or b) a single input file containing all the raw data represented by the 100K or 1M small files. With (a), our input file is 64MB. With (b) our input file is very large. Thanks for any insight, -Michael
Re: How are intermediate key/value pairs materialized between map and reduce?
Hi, Map spilled records: 1,802,965 (ahhh... now that you ask for it - here there also is a factor of 3 between output and spilled). Exactly what I suspected :) Ed has already provided some pointers as to why this is the case. You should try to minimize this number as much as possible, since this along with the Reduce Shuffle Bytes degrades your job performance by considerable amount. To understand the internals and what Ed said, I would strongly recommend going through http://www.slideshare.net/gnap/berkeley-performance-tuning By a few fellow Yahoos. There is detailed explanation on why map side spills occur and how one can minimize that :) Thanks, Amogh On 2/24/10 1:15 PM, Tim Kiefer tim-kie...@gmx.de wrote: Sure, I see: Map input eecords: 10,000 Map output records: 600,000 Map output bytes: 307,216,800,000 (each reacord is about 500kb - that fits the application and is to be expected) Map spilled records: 1,802,965 (ahhh... now that you ask for it - here there also is a factor of 3 between output and spilled). So - question now is: why are three times as many records spilled than actually produced by the mappers? In my map function, I do not perform any additional file writing besides the context.write() for the intermediate records. Thanks, Tim Am 24.02.2010 05:28, schrieb Amogh Vasekar: Hi, Can you let us know what is the value for : Map input records Map spilled records Map output bytes Is there any side effect file written? Thanks, Amogh
Re: How are intermediate key/value pairs materialized between map and reduce?
Hi, Can you let us know what is the value for : Map input records Map spilled records Map output bytes Is there any side effect file written? Thanks, Amogh On 2/23/10 8:57 PM, Tim Kiefer tim-kie...@gmx.de wrote: No... 900GB is in the map column. Reduce adds another ~70GB of FILE_BYTES_WRITTEN and the total column consequently shows ~970GB. Am 23.02.2010 16:11, schrieb Ed Mazur: Hi Tim, I'm guessing a lot of these writes are happening on the reduce side. On the JT web interface, there are three columns: map, reduce, overall. Is the 900GB figure from the overall column? The value in the map column will probably be closer to what you were expecting. There are writes on the reduce side too during the shuffle and multi-pass merge. Ed 2010/2/23 Tim Kiefer tim-kie...@gmx.de: Hi Gang, thanks for your reply. To clarify: I look at the statistics through the job tracker. In the webinterface for my job I have columns for map, reduce and total. What I was refering to is map - i.e. I see FILE_BYTES_WRITTEN = 3 * Map Output Bytes in the map column. About the replication factor: I would expect the exact same thing - changing to 6 has no influence on FILE_BYTES_WRITTEN. About the sorting: I have io.sort.mb = 100 and io.sort.factor = 10. Furthermore, I have 40 mappers and map output data is ~300GB. I can't see how that ends up in a factor 3? - tim Am 23.02.2010 14:39, schrieb Gang Luo: Hi Tim, the intermediate data is materialized to local file system. Before it is available for reducers, mappers will sort them. If the buffer (io.sort.mb) is too small for the intermediate data, multi-phase sorting happen, which means you read and write the same bit more than one time. Besides, are you looking at the statistics per mapper through the job tracker, or just the information output when a job finish? If you look at the information given out at the end of the job, note that this is an overall statistics which include sorting at reduce side. It also include the amount of data written to HDFS (I am not 100% sure). And, the FILE-BYTES_WRITTEN has nothing to do with the replication factor. I think if you change the factor to 6, FILE_BYTES_WRITTEN is still the same. -Gang Hi there, can anybody help me out on a (most likely) simple unclarity. I am wondering how intermediate key/value pairs are materialized. I have a job where the map phase produces 600,000 records and map output bytes is ~300GB. What I thought (up to now) is that these 600,000 records, i.e., 300GB, are materialized locally by the mappers and that later on reducers pull these records (based on the key). What I see (and cannot explain) is that the FILE_BYTES_WRITTEN counter is as high as ~900GB. So - where does the factor 3 come from between Map output bytes and FILE_BYTES_WRITTEN??? I thought about the replication factor of 3 in the file system - but that should be HDFS only?! Thanks - tim
Re: java.io.IOException: Spill failed when using w/ GzipCodec for Map output
Hi, Can you please let us know what platform you are running on your hadoop machines? For gzip and lzo to work, you need supported hadoop native libraries ( I remember reading on this somewhere in hadoop wiki :) ) Amogh On 2/23/10 8:16 AM, jiang licht licht_ji...@yahoo.com wrote: I have a pig script. If I don't set any codec for Map output for hadoop cluster, no problem. Now I made the following compression settings, the job failed and the error message is shown below. I guess there are some other settings that should be correctly set together with using the compression. Im using 0.20.1. Any thoughts? Thanks for your help! mapred-site.xml property namemapred.compress.map.output/name valuetrue/value /property property namemapred.map.output.compression.codec/name valueorg.apache.hadoop.io.compress.GzipCodec/value /property error message of failed map task--- java.io.IOException: Spill failed at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:822) at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:466) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Map.collect(PigMapReduce.java:108) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:251) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:240) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Map.map(PigMapReduce.java:93) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307) at org.apache.hadoop.mapred.Child.main(Child.java:170) Caused by: java.lang.NullPointerException at org.apache.hadoop.mapred.IFile$Writer.(IFile.java:102) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1198) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1800(MapTask.java:648) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask.java:1135) Thanks, Michael
Re: java.io.IOException: Spill failed when using w/ GzipCodec for Map output
Hi, Certainly this might not cause the issue. But, Hadoop native library is supported only on *nix platforms only. Unfortunately it is known not to work on Cygwinand Mac OS X and has mainly been used on the GNU/Linux platform. http://hadoop.apache.org/common/docs/current/native_libraries.html#Supported+Platforms The mapper log would throw more light on this Amogh On 2/23/10 11:41 AM, jiang licht licht_ji...@yahoo.com wrote: Thanks Amogh. The platform that I got this error is mac os x and hadoop 0.20.1. All native library installed except lzo (which will report that codec not found). But I didn't see this error when I ran the same thing w/o expression specified, in addition I also ran sth with the same expression setting on Fedora 8 and 0.19.1 without any problem. So, I think it might depends on some other settings (wrt what spill is about). Thanks, Michael --- On Mon, 2/22/10, Amogh Vasekar am...@yahoo-inc.com wrote: From: Amogh Vasekar am...@yahoo-inc.com Subject: Re: java.io.IOException: Spill failed when using w/ GzipCodec for Map output To: common-user@hadoop.apache.org common-user@hadoop.apache.org Date: Monday, February 22, 2010, 11:27 PM Hi, Can you please let us know what platform you are running on your hadoop machines? For gzip and lzo to work, you need supported hadoop native libraries ( I remember reading on this somewhere in hadoop wiki :) ) Amogh On 2/23/10 8:16 AM, jiang licht licht_ji...@yahoo.com wrote: I have a pig script. If I don't set any codec for Map output for hadoop cluster, no problem. Now I made the following compression settings, the job failed and the error message is shown below. I guess there are some other settings that should be correctly set together with using the compression. Im using 0.20.1. Any thoughts? Thanks for your help! mapred-site.xml property namemapred.compress.map.output/name valuetrue/value /property property namemapred.map.output.compression.codec/name valueorg.apache.hadoop.io.compress.GzipCodec/value /property error message of failed map task--- java.io.IOException: Spill failed at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:822) at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:466) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Map.collect(PigMapReduce.java:108) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:251) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:240) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Map.map(PigMapReduce.java:93) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307) at org.apache.hadoop.mapred.Child.main(Child.java:170) Caused by: java.lang.NullPointerException at org.apache.hadoop.mapred.IFile$Writer.(IFile.java:102) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1198) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1800(MapTask.java:648) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask.java:1135) Thanks, Michael
Re: Unexpected empty result problem (zero-sized part-### files)?
So, considering this situation of loading mixed good and corrupted .gz files, how to still get expected results? Try manipulating the value mapred.max.map.failures.percent to a % of files you expect to be corrupted / acceptable data skip percent. Amogh On 2/21/10 7:17 AM, jiang licht licht_ji...@yahoo.com wrote: I think I found what caused the problem. Actually, the folder to load to 'a' contains all .gz files. Somehow, some .gz files are corrupted. Thus, java.io.EOFException: Unexpected end of ZLIB input stream were thrown. I did the following test: I truncated a .gz file and name it corrupted.gz. Then load only this file it to 'a' and execute the same remaining scripts. This cause the exact same error message dumped as given in the 1st post. The same thing happens if loading both this file and other good gz files. My guess is that such corrupted files will not be loaded (since the above exception will be thrown). But data from good .gz files still got loaded. Then why empty result is generated (0-sized part-)? So, considering this situation of loading mixed good and corrupted .gz files, how to still get expected results? Thanks! Michael --- On Sat, 2/20/10, Ashutosh Chauhan ashutosh.chau...@gmail.com wrote: From: Ashutosh Chauhan ashutosh.chau...@gmail.com Subject: Re: Unexpected empty result problem (zero-sized part-### files)? To: common-user@hadoop.apache.org Date: Saturday, February 20, 2010, 7:29 PM A log file with a name like pig_1234567890.log must be sitting in the directory from where you launched your pig script. Can you send its content ? Ashutosh On Sat, Feb 20, 2010 at 16:41, jiang licht licht_ji...@yahoo.com wrote: I have a pig script as follows (see far below). It loads 2 data sets, perform some filtering, then join the two sets. Lastly count occurrences of a combination of fields and writes results to hdfs. --load raw data a = LOAD 'foldera/*'; b = LOAD 'somefile'; --choose rows and columns a_filtered = FILTER a BY somecondition; a_filtered_shortened = FOREACH a_filtered GENERATE somefields; a_filtered_shortened_unique = DISTINCT a_filtered_short PARALLEL #; --join a b and count occurrences of a combination of fields ab = JOIN a_filtered_short_unique BY somefield, b by somefield PARALLEL #; ab_shortened = FOREACH ab GENERATE somefileds; ab_shortened_grouped = GROUP ab_shortened BY ($0, $1) PARALLEL #; --c will contain: fields, counts c = FOREACH ab_shortened_grouped GENERATE FLATTEN($0), COUNT(ab_shortened); --save results STORE c INTO 'MYRESULTS' USING PigStorage(); PROBLEM is that empty sets (empty part- files) were generated. But a non-empty result is expected. For example, if I chose to load one file (instead of loading all files in a folder) to 'a', quite a number of tuples are created (non-empty part-### files). It seems to me the logic in the script is good and it generates correct result for randomly selected file anyway. So, I am wondering what could cause this empty result problem? FYI, I ran the same script multiple time and all gave me empty part-### files. Though in the output, I did see repeatedly error message similar to the following ones that show one result file is failed to produce (these are last lines from job output). Could this be the problem? How to locate the cause? Thanks! ... 2010-02-20 16:21:37,737 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 86% complete 2010-02-20 16:21:38,239 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 87% complete 2010-02-20 16:21:39,265 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 88% complete 2010-02-20 16:21:44,286 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 93% complete 2010-02-20 16:21:46,931 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 95% complete 2010-02-20 16:21:47,432 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 99% complete 2010-02-20 16:21:54,005 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete 2010-02-20 16:21:54,005 [main] ERROR org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map reduce job(s) failed! 2010-02-20 16:21:54,008 [main] ERROR org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Failed to produce result in: hdfs://hostA:50001/tmp/temp829697187/tmp-531977953 2010-02-20 16:21:54,008 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Successfully stored result in: hdfs://hostA:50001/tmp/temp829697187/tmp504533728 2010-02-20 16:21:54,023 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher -
Re: basic hadoop job help
Hi, The hadoop meet last year has some very interesting business solutions discussed: http://www.cloudera.com/company/press-center/hadoop-world-nyc/ Most of the companies in there have shared their methodology on their blogs / on slideshare. One I have handy is: http://www.slideshare.net/hadoop/practical-problem-solving-with-apache-hadoop-pig Shows how Y! Search assist is implemented. Amogh On 2/19/10 12:48 AM, C Berg icey...@yahoo.com wrote: Hi Eric, Thanks for the advice, that is very much appreciated. With your help I was able to get past the mechanical part to something a bit more substantive, which is, wrapping my head around doing an actual business calculation in a mapreduce way. Any recommendations on some tutorials that cover some real-world examples other than word counting and the like? Thanks again, Cory --- On Thu, 2/18/10, Eric Arenas eare...@rocketmail.com wrote: From: Eric Arenas eare...@rocketmail.com Subject: Re: basic hadoop job help To: common-user@hadoop.apache.org Date: Thursday, February 18, 2010, 10:52 AM Hi Cory, regarding the part that you are not sure about: String inputdir = args[0]; String outputdir= args[1]; int numberReducers = Integer.parseInt(args[2]); //it is better to at least pass the numbers of reducers as parameters, or read from the XML job config file, if you want //setting the number of reducers to 1 , as you had in your code *might* potentially make it slower to process and generate the output //if you are trying to sell the idea of Hadoop as a new ETL tool, you want it to be as fast as you can ... job2.setNumReduceTasks(1); FileInputFormat.setInputPaths(job, inputdir); FileOutputFormat.setOutputPath(job, new Path(outputdir)); return job.waitForCompletion(true) ? 0 : 1; } //end of run method Unless you copy/paste your code, I do not see why you need to set setWorkingDirectory in your M/R job. Give this a try and let me know, regards, Eric Arenas - Original Message From: Cory Berg icey...@yahoo.com To: common-user@hadoop.apache.org Sent: Thu, February 18, 2010 9:07:54 AM Subject: basic hadoop job help Hey all, I'm trying to get Hadoop up and running as a proof of concept to make an argument for moving away from a big RDBMS. I'm having some challenges just getting a really simple demo mapreduce to run. The examples I have seen on the web tend to make use of classes that are now deprecated in the latest hadoop (0.20.1). It is not clear what the equivalent newer classes are in some cases. Anyway, I am stuck at this exception - here it is start to finish: --- $ ./bin/hadoop jar ./testdata/RetailTest.jar RetailTest testdata outputdata 10/02/18 09:24:55 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName =JobTracker, sessionId= 10/02/18 09:24:55 WARN mapred.JobClient: Use GenericOptionsParser for parsing th e arguments. Applications should implement Tool for the same. 10/02/18 09:24:55 INFO input.FileInputFormat: Total input paths to process : 5 10/02/18 09:24:56 INFO input.FileInputFormat: Total input paths to process : 5 Exception in thread Thread-13 java.lang.IllegalStateException: Shutdown in pro gress at java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java: 39) at java.lang.Runtime.addShutdownHook(Runtime.java:192) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1387) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:191) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:95) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:180) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:175) at org.apache.hadoop.mapred.FileOutputCommitter.cleanupJob(FileOutputCom mitter.java:61) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:2 45) Now here is the code that actually starts things up (not including the actual mapreduce code). I initially suspected this code because I was guessing at the correct non-deprecated classes to use: public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job2 = new Job(conf); job2.setJobName(RetailTest); job2.setJarByClass(RetailTest.class); job2.setMapperClass(RetailMapper.class); job2.setReducerClass(RetailReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); job2.setNumReduceTasks(1); // this was a guess on my part as I could not find out the recommended way job2.setWorkingDirectory(new Path(args[0])); FileInputFormat.setInputPaths(job2, new Path(args[0])); FileOutputFormat.setOutputPath(job2, new Path(args[1])); job2.submit(); return 0; } /** * @param args */ public static void main(String[] args) throws Exception {
Re: Pass the TaskId from map to Reduce
Hi Ankit, however the the issue that i am facing that I was expecting all the maps to finish before any reduce starts. This is exactly how it happens, reducers poll map tasks for data and begin user code only after all maps complete. when is closed function called after every map or after all the maps? Once for every map task, after all the input data for that task is consumed. Also, can you let us know where you are writing your data, ie in cwd of the task or directly on hdfs? Thanks, Amogh On 2/18/10 8:58 AM, ANKITBHATNAGAR abhatna...@vantage.com wrote: Hi Don, Thanks for your reply. I already tried this approach, however the the issue that i am facing that I was expecting all the maps to finish before any reduce starts.This is not happening for me. It looks like as one map finishes reduce starts. Thats why I called close().? Could you tell me when is closed function called after every map or after all the maps? Am I doing something wrong? Thanks Ankit -- View this message in context: http://old.nabble.com/Pass-the-TaskId-from-map-to-Reduce-tp27575531p27634001.html Sent from the Hadoop core-user mailing list archive at Nabble.com.
Re: configuration file
Hi, A shot in the dark, is the conf file in your classpath? If yes, are the parameters you are trying to override marked final? Amogh On 2/4/10 3:18 AM, Gang Luo lgpub...@yahoo.com.cn wrote: Hi, I am writing script to run whole bunch of jobs automatically. But the configuration file doesn't seems working. I think there is something wrong in my command. The command is my script is like: bin/hadoop jar myJarFile myClass -conf myConfigurationFilr.xml arg1 agr2 I use conf.get() so show the value of some parameters. But the values are not what I define in that xml file. Is there something wrong? Thanks. -Gang
Re: Input file format doubt
Hi, For global line numbers, you would need to know the ordering within each split generated from the input file. The standard input formats provide offsets in splits, so if the records are of equal length you can compute some kind of numbering. I remember someone had implemented sequential numbering using the partition id for each map task (mapred.task.partition) and posted this on his blog. I don't have it handy with me right now, but will send you off the list if I find it. Amogh On 1/28/10 3:29 PM, Udaya Lakshmi udaya...@gmail.com wrote: Hi all.. I have searched the documentation but could not find a input file format which will give line number as the key and line as the value. Did I miss something? Can someone give me a clue of how to implement one such input file format. Thanks, Udaya.
Re: Input file format doubt
Hi, Here's the relevant thread with Gordon, the author of the solution: I am in the process of learning Hadoop (and I think I've made a lot of progress). I have described the specific problem and solution on my blog http://www.data-miners.com/blog/2009/11/hadoop-and-mapreduce-parallel-program.html. You particular solution won't work, because I need to do additional processing between the two passes. --gordon On Wed, Nov 25, 2009 at 1:50 AM, Amogh Vasekar am...@yahoo-inc.com wrote: Amogh On 1/28/10 4:03 PM, Ravi ravindra.babu.rav...@gmail.com wrote: Thank you Amogh. On Thu, Jan 28, 2010 at 3:44 PM, Amogh Vasekar am...@yahoo-inc.com wrote: Hi, For global line numbers, you would need to know the ordering within each split generated from the input file. The standard input formats provide offsets in splits, so if the records are of equal length you can compute some kind of numbering. I remember someone had implemented sequential numbering using the partition id for each map task (mapred.task.partition) and posted this on his blog. I don't have it handy with me right now, but will send you off the list if I find it. Amogh On 1/28/10 3:29 PM, Udaya Lakshmi udaya...@gmail.com wrote: Hi all.. I have searched the documentation but could not find a input file format which will give line number as the key and line as the value. Did I miss something? Can someone give me a clue of how to implement one such input file format. Thanks, Udaya.
Re: fine granularity operation on HDFS
Hi Gang, Yes PathFilters work only on file paths. I meant you can include such type of logic at split level. The input format's getSplits() method is responsible for computing and adding splits to a list container, for which JT initializes mapper tasks. You can override the getSplits() method to add only a few , say, based on the location or offset, to the list. Here's the reference : while (((double) bytesRemaining)/splitSize SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(new FileSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts())); Before splits.add you can use your logic for discarding. However, you need to ensure your record reader takes care of incomplete records at boundaries. To get the block locations to load separately, the FileSystem class APIs expose few methods like getBlockLocations etc .. Hope this helps. Amogh On 1/28/10 7:26 PM, Gang Luo lgpub...@yahoo.com.cn wrote: Thanks Amogh. For the second part of my question, I actually mean loading block separately from HDFS. I don't know whether it is realistic. Anyway, for my goal is to process different division of a file separately, to do that at split level is OK. But even I can get the splits from inputformat, how to add only a few splits you need to mapper and discard the others? (pathfilters only works for file, but not block, I think). Thanks. -Gang
Re: File split query
Hi, In general, the file split may break the records, its the responsibility of the record reader to present the record as a whole. If you use standard available InputFormats, the framework will make sure complete records are presented in key,value. Amogh On 1/29/10 9:04 AM, Udaya Lakshmi udaya...@gmail.com wrote: Hi, When framework splits a file, will it happen that some part of a line falls in one split and the other part in some other split? Or is the framework going to take care that it always splits at the end of the line? Thanks, Udaya.
Re: When exactly is combiner invoked?
Hi, To elaborate a little on Gang's point, the buffer threshold is limited by io.sort.spill.percent, during which spills are created. If the number of spills is more than min.num.spills.for.combine, combiner gets invoked on the spills created before writing to disk. I'm not sure what exactly you intend to say by finish processing an input record. Typically, the processing (map) ends with a output.collect. Amogh
Re: Debugging Partitioner problems
Can I tell hadoop to save the map outputs per reducer to be able to inspect what's in them You can set keep.tasks.files.pattern will save mapper output, set this regex to match your job/task as need be. But this will eat up a lot of local disk space. The problem most likely is your data ( or more specifically map output data ) being skewed, hence most keys hash to same partition id, and hence to one reducer. Are you implementing a join? If not, writing a custom partitioner would help. Amogh On 1/20/10 5:33 PM, Erik Forsberg forsb...@opera.com wrote: Hi! I have a problem with one of my reducers getting 3 times as much data as the other 15 reducers, causing longer total runtime per job. What would be the best way to debug this? I'm guessing I'm outputting keys that somehow fool the partitioner. Can I tell hadoop to save the map outputs per reducer to be able to inspect what's in them? Thanks, \EF -- Erik Forsberg forsb...@opera.com Developer, Opera Software - http://www.opera.com/
Re: rmr: org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot delete /op. Name node is in safe mode.
Hi, Glad to know it helped. If you need to get your cluster up and running quickly, you can manipulate the parameter dfs.namenode.threshold.percent. If you set it to 0, NN will not enter safe mode. Amogh On 1/19/10 12:39 PM, prasenjit mukherjee pmukher...@quattrowireless.com wrote: That was exactly the reason. Thanks a bunch. On Tue, Jan 19, 2010 at 12:24 PM, Mafish Liu maf...@gmail.com wrote: 2010/1/19 prasenjit mukherjee pmukher...@quattrowireless.com: I run hadoop fs -rmr .. immediately after start-all.shDoes the namenode always start in safemode and after sometime switches to normal mode ? If that is the problem then your suggestion of waiting might work. Lemme check. This is the point. Namenode will enter safemode on starting to gather metadata information of files, and then switch to normal mode. The time spent in safemode depends one the data scale in your HDFS. -Thanks for the pointer. Prasen On Tue, Jan 19, 2010 at 10:47 AM, Amogh Vasekar am...@yahoo-inc.com wrote: Hi, When NN is in safe mode, you get a read-only view of the hadoop file system. ( since NN is reconstructing its image of FS ) Use hadoop dfsadmin -safemode get to check if in safe mode. hadoop dfsadmin -safemode leave to leave safe mode forcefully. Or use hadoop dfsadmin -safemode wait to block till NN leaves by itself. Amogh On 1/19/10 10:31 AM, prasenjit mukherjee prasen@gmail.com wrote: Hmmm. I am actually running it from a batch file. Is hadoop fs -rmr not that stable compared to pig's rm OR hadoop's FileSystem ? Let me try your suggestion by writing a cleanup script in pig. -Thanks, Prasen On Tue, Jan 19, 2010 at 10:25 AM, Rekha Joshi rekha...@yahoo-inc.com wrote: Can you try with dfs/ without quotes?If using pig to run jobs you can use rmf within your script(again w/o quotes) to force remove and avoid error if file/dir not present.Or if doing this inside hadoop job, you can use FileSystem/FileStatus to delete directories.HTH. Cheers, /R On 1/19/10 10:15 AM, prasenjit mukherjee prasen@gmail.com wrote: hadoop fs -rmr /op That command always fails. I am trying to run sequential hadoop jobs. After the first run all subsequent runs fail while cleaning up ( aka removing the hadoop dir created by previous run ). What can I do to avoid this ? here is my hadoop version : # hadoop version Hadoop 0.20.0 Subversion https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20 -r 763504 Compiled by ndaley on Thu Apr 9 05:18:40 UTC 2009 Any help is greatly appreciated. -Prasen -- maf...@gmail.com
Re: Is it always called part-00000?
Hi, Do your steps qualify as separate MR jobs? Then using JobClient APIs should be more than sufficient for such dependencies. You can add the whole output directory as input to another one to read all files, and provide PathFilter to ignore any files you don't want to be processed, like side effect files etc. However, to add recursively, you need to list the FileStatus and add to InputPath as required ( probably not needed in your case since its an output of a MR job ). Thanks, Amogh On 1/18/10 6:41 AM, Mark Kerzner markkerz...@gmail.com wrote: Hi, I am writing a second step to run after my first Hadoop job step finished. It is to pick up the results of the previous step and to do further processing on it. Therefore, I have two questions please. 1. Is the output file always called part-0? 2. Am I perhaps better off reading all files in the output directory and how do I do it? Thank you, Mark PS. Thank you guys for answering my questions - that's a tremendous help and a great resource. Mark
Re: rmr: org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot delete /op. Name node is in safe mode.
Hi, When NN is in safe mode, you get a read-only view of the hadoop file system. ( since NN is reconstructing its image of FS ) Use hadoop dfsadmin -safemode get to check if in safe mode. hadoop dfsadmin -safemode leave to leave safe mode forcefully. Or use hadoop dfsadmin -safemode wait to block till NN leaves by itself. Amogh On 1/19/10 10:31 AM, prasenjit mukherjee prasen@gmail.com wrote: Hmmm. I am actually running it from a batch file. Is hadoop fs -rmr not that stable compared to pig's rm OR hadoop's FileSystem ? Let me try your suggestion by writing a cleanup script in pig. -Thanks, Prasen On Tue, Jan 19, 2010 at 10:25 AM, Rekha Joshi rekha...@yahoo-inc.com wrote: Can you try with dfs/ without quotes?If using pig to run jobs you can use rmf within your script(again w/o quotes) to force remove and avoid error if file/dir not present.Or if doing this inside hadoop job, you can use FileSystem/FileStatus to delete directories.HTH. Cheers, /R On 1/19/10 10:15 AM, prasenjit mukherjee prasen@gmail.com wrote: hadoop fs -rmr /op That command always fails. I am trying to run sequential hadoop jobs. After the first run all subsequent runs fail while cleaning up ( aka removing the hadoop dir created by previous run ). What can I do to avoid this ? here is my hadoop version : # hadoop version Hadoop 0.20.0 Subversion https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20 -r 763504 Compiled by ndaley on Thu Apr 9 05:18:40 UTC 2009 Any help is greatly appreciated. -Prasen
Re: Is it possible to share a key across maps?
+1 for the documentation change in mapred-tutorial. Can we do that and publish using a normal apache account? Thanks, Amogh On 1/13/10 2:29 AM, Raymond Jennings III raymondj...@yahoo.com wrote: Amogh, You bet it helps! Thanks! Sometimes it's very difficult to map between the old and the new APIs. I was digging for that answer for awhile. Thanks. --- On Tue, 1/12/10, Amogh Vasekar am...@yahoo-inc.com wrote: From: Amogh Vasekar am...@yahoo-inc.com Subject: Re: Is it possible to share a key across maps? To: common-user@hadoop.apache.org common-user@hadoop.apache.org, raymondj...@yahoo.com raymondj...@yahoo.com, core-u...@hadoop.apache.org core-u...@hadoop.apache.org Date: Tuesday, January 12, 2010, 3:32 PM Re: Is it possible to share a key across maps? (Sorry for the spam if any, mails are bouncing back for me) Hi, In setup() use this, FileSplit split = (FileSplit)context.getInputSplit(); split.getPath() will return you the Path. Hope this helps. Amogh On 1/13/10 1:25 AM, Raymond Jennings III raymondj...@yahoo.com wrote: Hi Gang, I was able to use this on an older version that uses the JobClient class to run the job but not on the newer api with the Job class. The Job class appears to use a setup() method instead of a configure() method but the map.input.file attribute does not appear to be available via the conf class the setup() method. Have you tried to do what you described using the newer api? Thank you. --- On Fri, 1/8/10, Gang Luo lgpub...@yahoo.com.cn wrote:
Re: What can cause: Map output copy failure
Hi, Can you please let us know your system configuration running hadoop? The error you see is when the reducer is copying its respective map output into memory. The parameter mapred.job.shuffle.input.buffer.percent can be manipulated for this ( a bunch of others will also help you optimize sort later ), but I would say 200M is far too less memory allocated for hadoop application jvms :) Amogh On 1/8/10 2:46 AM, Mayuran Yogarajah mayuran.yogara...@casalemedia.com wrote: I'm seeing this error when a job runs: Shuffling 35338524 bytes (35338524 raw bytes) into RAM from attempt_201001051549_0036_m_03_0 Map output copy failure: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.shuffleInMemory(ReduceTask.java:1198) I originally had mapred.child.java.opts set to 200M. If I boost this up to 512M the error goes away. I'm trying to understand whats going on though. Can anyone explain? Also are there any other parameters that I should be tweaking to help with this? thank you very much, M
Re: File _partition.lst does not exist.
Hi, I believe you need to add the partition file to distributed cache so that all tasks have it. The terasort code uses this sampler, you can refer to that if needed. Amogh On 12/15/09 5:06 PM, afarsek adji...@gmail.com wrote: Hi, I'm using the InputSampler.RandomSampler to perform a partition sampling. It should create a file called _partition.lst that should be use later on by the partitionner class. For some reason it doesn't work and I get a java.io.FileNotFoundException: File _partition.lst does not exist. Below the code: it consists of a mapper only job, taking as input a file in a SequenceFileInputFormat that was generated by a previous job. Thanks a lot in advance for any insights. public class WordCountSorted { public static class Map extends MapReduceBase implements MapperLongWritable, Text, IntWritable, Text { // private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollectorIntWritable, Text output, Reporter reporter) throws IOException { String line = value.toString(); String[] tokens = line.split(\t); int nbOccurences = Integer.parseInt(tokens[1]); word.set(tokens[0]); output.collect(new IntWritable(nbOccurences),word ); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCountSorted.class); conf.setJobName(wordcountsorted); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setInputFormat(SequenceFileInputFormat.class); conf.setOutputKeyClass(IntWritable.class); conf.setOutputValueClass(Text.class); conf.setMapperClass(Map.class); conf.setReducerClass(IdentityReducer.class); conf.setNumReduceTasks(2); InputSampler.SamplerIntWritable, Text sampler = new InputSampler.RandomSamplerIntWritable, Text(0.1, 100, 10); InputSampler.writePartitionFile(conf, sampler); conf.setPartitionerClass(TotalOrderPartitioner.class); JobClient.runJob(conf); } } -- View this message in context: http://old.nabble.com/File-_partition.lst-does-not-exist.-tp26793409p26793409.html Sent from the Hadoop core-user mailing list archive at Nabble.com.
Re: Re: Re: Re: Re: map output not euqal to reduce input
how do you define 'consumed by reducer' Trivially, as long as you have your values iterator go to the end, you should be just fine. Sorry, haven’t worked with decision support per se, probably someone else can shed some light on its quirks :) Amogh On 12/11/09 7:38 PM, Gang Luo lgpub...@yahoo.com.cn wrote: Thanks, Amogn. I am not sure whether all the records mepper generate are consumed by reducer. But how do you define 'consumed by reducer'? I can set a counter to see how many lines go to my map function, but this is likely the same as reduce input # which is less than map output #. I didn't use SkipBadRecords class. I think by default the feature is disabled. So, it should have nothing to do with this. I do my test using tables of TPC-DS. If I run my job on some 'toy tables' I make, the statistics is correct. -Gang - 原始邮件 发件人: Amogh Vasekar am...@yahoo-inc.com 收件人: common-user@hadoop.apache.org common-user@hadoop.apache.org 发送日期: 2009/12/11 (周五) 2:55:12 上午 主 题: Re: Re: Re: Re: map output not euqal to reduce input Hi, The counters are updated as the records are *consumed*, for both mapper and reducer. Can you confirm if all the values returned by your iterators are consumed on reduce side? Also, are you having feature of skipping bad records switched on? Amogh On 12/11/09 4:32 AM, Gang Luo lgpub...@yahoo.com.cn wrote: In the mapper of this job, I get something I am interested in for each line and then output all of them. So the number of map input records is equal to the map output records. Actually, I am doing semi join in this job. There is no failure during execution. -Gang - ԭʼ�ʼ� �ˣ� Todd Lipcon t...@cloudera.com �ռ��ˣ� common-user@hadoop.apache.org ���ڣ� 2009/12/10 () 4:43:52 �� �⣺ Re: Re�� Re�� map output not euqal to reduce input On Thu, Dec 10, 2009 at 1:15 PM, Gang Luo lgpub...@yahoo.com.cn wrote: Hi Todd, I didn't change the partitioner, just use the default one. Will the default partitioner cause the lost of the records? -Gang Do the maps output data nondeterministically? Did you experience any task failures in the run of the job? -Todd ___ ؿ㷢��ؿ�ȫ�ߣ� http://card.mail.cn.yahoo.com/ ___ 好玩贺卡等你发,邮箱贺卡全新上线! http://card.mail.cn.yahoo.com/
Re: Re: Re: Re: map output not euqal t o reduce input
Hi, The counters are updated as the records are *consumed*, for both mapper and reducer. Can you confirm if all the values returned by your iterators are consumed on reduce side? Also, are you having feature of skipping bad records switched on? Amogh On 12/11/09 4:32 AM, Gang Luo lgpub...@yahoo.com.cn wrote: In the mapper of this job, I get something I am interested in for each line and then output all of them. So the number of map input records is equal to the map output records. Actually, I am doing semi join in this job. There is no failure during execution. -Gang - ԭʼ�ʼ� �ˣ� Todd Lipcon t...@cloudera.com �ռ��ˣ� common-user@hadoop.apache.org ���ڣ� 2009/12/10 () 4:43:52 �� �⣺ Re: Re�� Re�� map output not euqal to reduce input On Thu, Dec 10, 2009 at 1:15 PM, Gang Luo lgpub...@yahoo.com.cn wrote: Hi Todd, I didn't change the partitioner, just use the default one. Will the default partitioner cause the lost of the records? -Gang Do the maps output data nondeterministically? Did you experience any task failures in the run of the job? -Todd ___ ؿ㷢��ؿ�ȫ�ߣ� http://card.mail.cn.yahoo.com/
Re: Re: return in map
Hi, If the file doesn’t exist, java will error out. For partial skips, o.a.h.mapreduce.Mapper class provides a method run(), which determines if the end of split is reached and if not, calls map() on your k,v pair. You may override this method to include flag checks too and if that fails, the remaining split may be skipped. Hope this helps. Amogh On 12/7/09 6:38 AM, Edmund Kohlwey ekohl...@gmail.com wrote: As far as I know (someone please correct me if I'm wrong), mapreduce doesn't provide a facility to signal to stop processing. You will simply have to add a field to your mapper class that you set to signal an error condition, then in map check if you've set the error condition and return on each call if its been set. On 12/6/09 6:55 PM, Gang Luo wrote: Thanks for reponse. It seems there is something wrong in my logic. I kind of solve it now. What I am still unsure of is how to return or exit in a mapreduce program. If I want to skip one line (because it doesn't satisfy some constrains, for example), use return to quit map function is enough. But what if I want to quit a map task (due to some error I detect, for example, the file I want to read doesn't exist)? if use System.exit(), hadoop will try to run it again. Similarly, if I catch an exception and I want to quit the current task, what should I do? -Gang - ԭʼ�ʼ� �ˣ� Edmund Kohlwey ekohl...@gmail.com �ռ��ˣ� common-user@hadoop.apache.org ���ڣ� 2009/12/6 () 10:52:40 �� �⣺ Re: return in map Let me see if I understand: The mapper is reading lines in a text file. You want to see if a single line meets a given criteria, and emit all the lines whose index is greater than or equal to the single matching line's index. I'll assume that if more than one line meets the criteria, you have a different condition which you will handle appropriately. First some discussion of your input- is this a single file that should be considered as a whole? In that case, you probably only want one mapper, which, depending on your reduce task, may totally invalidate the use case for MapReduce. You may just want to read the file directly from HDFS and write to HDFS in whatever application is using the data. Anyways, here's how I'd do it. In setup, open a temporary file (it can be directly on the node, or on HDFS, although directly on the node is preferable). Use map to perform your test, and keep a counter of how many lines match. After the first line matches, begin saving lines. If a second line matches, log the error condition or whatever. In cleanup, if only one line matched, open your temp file and begin emitting the lines you saved from earlier. There's a few considerations in your implementation: 1. File size. If the temporary file exceeds the available space on a mapper, you can make a temp file in HDFS but this is far from ideal. 2. As noted above, if there's a single mapper and no need to sort or reduce the output, you probably want to just implement this as a program that happens to be using HDFS as a data store, and not bother with MapReduce at all. On 12/6/09 10:03 AM, Sonal Goyal wrote: Hi, Maybe you could post your code/logic for doing this. One way would be to set a flag once your criteria is met and emit keys based on the flag. Thanks and Regards, Sonal 2009/12/5 Gang Luo lgpub...@yahoo.com.cn Hi all, I got a tricky problem. I input a small file manually to do some filtering work on each line in map function. I check if the line satisfy the constrain then I output it, otherwise I return, without doing any other work below. For the map function will be called on each line, I think the logic is correct. But it doesn't work like this. If there are 5 line for a map task, and only the 2nd line satisfies the constrain, then the output will be line 2, 3, 4, and 5. If the 3rd line satisfies, then output will be line 3, 4, 5. It seems that once a map task meet the first satisfying line, the filter doesn't work for following lines. It is interesting problem. I am checking it now. I also hope someone could give me some ideas on this. Thanks. -Gang ___ ؿ㷢��ؿ�ȫ�ߣ� http://card.mail.cn.yahoo.com/ ___ ؿ㷢��ؿ�ȫ�ߣ� http://card.mail.cn.yahoo.com/
Re: Hadoop with Multiple Inpus and Outputs
Hi, Please try removing the combiner and running. I know that if you use multiple outputs from within a mapper, those k,v pairs are not a part of sort and shuffle phase. Your combiner is same as reducer which uses mos, and might be an issue on map side. If I'm to take a guess, mos writes to a different file from default map output, and the default key format is LongWritable. If nothing is written, maybe this isnt modified? Just a thought. For checking input file being consumed in current map task, you can use map.input.file from job conf, instead of figuring it out from split name. Amogh On 12/3/09 12:17 PM, James R. Leek le...@llnl.gov wrote: I've been trying to figure out how to do a set difference in hadoop. I would like to take 2 file, and remove the values they have in common between them. Let's say I have two bags, 'students' and 'employees'. I want to find which students are just students, and which employees are just employees. So, an example: Students: (Jane) (John) (Dave) Employees: (Dave) (Sue) (Anne) If I were to join these, I would get the students who are also employees, or: (Dave). However, what I want is the distinct values: Only_Student: (Jane) (John) Only_Employee: (Sue) (Anne) I was able to do this in pig, but I think I should be able to do it in one MapReduce pass. (With hadoop 20.1) I read from two files, and attached the file names as the values. (Students and Employees, in this case. My actually problem is on DNA, bacteria and viruses in this case.) Then I output from the reducer if I only get one value for a given key. However, I've had some real trouble figuring out MultipleOutput and the multiple inputs. I've attached my code. I'm getting this error, which is a total mystery to me: 09/12/02 22:33:52 INFO mapred.JobClient: Task Id : attempt_200911301448_0019_m_00_2, Status : FAILED java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:807) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:504) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:583) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305) at org.apache.hadoop.mapred.Child.main(Child.java:170) Thanks, Jim
Re: How can I change the mapreduce output coder?
Hi, What are your intermediate output K,V class formats? “Text” format is inherently UTF-8 encoded. If you want end-to-end processing to be via gbk encoding, you may have to write a custom writable type. Amogh On 11/30/09 7:09 PM, 郭鹏 gpcus...@gmail.com wrote: I know the default output coder is utf-8, but how can we change it into a other coder? like gbk? thx. -- Regards Peng Guo
Re: Problem with mapred.job.reuse.jvm.num.tasks
Hi, Task slots reuse JVM over the course of entire job right? Specifically, would like to point to : http://issues.apache.org/jira/browse/MAPREDUCE-453?focusedCommentId=12619492page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#action_12619492 Thanks, Amogh On 11/30/09 5:44 PM, Chandraprakash Bhagtani cpbhagt...@gmail.com wrote: Hi, In one of my hadoop jobs I had set mapred.job.reuse.jvm.num.tasks = -1. The job was such that each map/reduce task occupied around 2GB RAM. So by setting mapred.job.reuse.jvm.num.tasks = -1, even when all the maps had finished, JVMs were still alive. Though the JVMs were not using CPU, but occupied RAM. Thus reducing overall available RAM for reducer JVMs. All the JVMs were killed when the Job completed. Is this a bug in hadoop? If yes, should I open a JIRA for that? Please enlighten, Thanks in advance. -- Thanks Regards, Chandra Prakash Bhagtani
Re: The name of the current input file during a map
-mapred.input.file +map.input.file Should work Amogh On 11/26/09 12:57 PM, Saptarshi Guha saptarshi.g...@gmail.com wrote: Hello again, I'm using Hadoop 0.21 and its context object e.g public void setup(Context context) { Configuration cfg = context.getConfiguration(); System.out.println(mapred.input.file=+cfg.get(mapred.input.file)); displays null, so maybe this fell out by mistake in the api change? Regards Saptarshi On Thu, Nov 26, 2009 at 2:13 AM, Saptarshi Guha saptarshi.g...@gmail.com wrote: Thank you. Regards Saptarshi On Thu, Nov 26, 2009 at 2:10 AM, Amogh Vasekar am...@yahoo-inc.com wrote: Conf.get(map.input.file) is what you need. Amogh On 11/26/09 12:35 PM, Saptarshi Guha saptarshi.g...@gmail.com wrote: Hello, I have a set of input files part-r-* which I will pass through another map(no reduce). the part-r-* files consist of key, values, keys being small, values fairly large(MB's) I would like to index these, i.e run a map, whose output is key and /filename/ i.e to which part-r-* file the particular key belongs, so that if i need them again I can just access that file. Q: In the map stage,how do I retrieve the name of the file being processed? I'd rather not use the MapFileOutputFormat. Hadoop 0.21 Regards Saptarshi
Re: Saving Intermediate Results from the Mapper
Hi, I'm not sure if this will apply to your case since i'm not aware of the common part of job2:mapper and job3:mapper but would like to give it a shot. The whole process can be combined into a single mapred job. The mapper will read a record and process till the saved data part , then for each record will output 2 records , one each for the job2 and job3 mappers. The keys of records will be tagged ( tag,key ) depending on what reducer processing you want to do. In reduce() you can use this tag to make processing decision. A custom partitioner might be needed depending on the key types to ensure unique sets for reducer. Ignore if this doesn't fit your bill :) Amogh On 11/25/09 9:35 AM, Gordon Linoff glin...@gmail.com wrote: Does anyone have a pointer to code that allows the map to save data in intermediate files, for use in a later map/reduce job? I have been looking for an example and cannot find one. I have investigated MultipleOutputFormat and MultipleOutputs. Because I am using version 0.18.3, I don't have MultipleOutputs. The problem with MultipleOutputFormat is that the data I want to save is a different format from the data I want to pass to the Reducer. I have also tried opening a sequence file directly from the mapper, but I am concerned that this is not fault tolerant. The process currently is: Job1: Mapper: reads complicated data, saves out data structure. Job2: Mapper: reads saved data, processes and sends data to Reducer 2. Job3: Mapper: reads saved data, processes and sends data to Reducer 3. I would like to combine the first two steps, so the process is: Job1: Mapper: reads complicated data, saves out data structure, and passes processed data to Reducer 2. Job2: Mapper: reads saved data, processes and sends to Reducer 3. --gordon On Sun, Nov 22, 2009 at 9:27 PM, Jason Venner jason.had...@gmail.comwrote: You can manually write the map output to a new file, there are a number of examples of opening a sequence file and writing to it on the web or in the example code for various hadoop books. You can also disable the removal of intermediate data, which will result in potentially large amounts of data being left in the mapred.local.dir. On Sun, Nov 22, 2009 at 3:56 PM, Gordon Linoff glin...@gmail.com wrote: I am starting to learn Hadoop, using the Yahoo virtual machine with version 0.18. My question is rather simple. I would like to execute a map/reduce job. In addition to getting the results from the reduce, I would also like to save the intermediate results from the map in another HDFS file. Is this possible? --gordon -- Pro Hadoop, a book to guide you from beginner to hadoop mastery, http://www.amazon.com/dp/1430219424?tag=jewlerymall www.prohadoopbook.com a community for Hadoop Professionals
Re: Hadoop Performance
Hi, For near real time performance you may try Hbase. I had read about Streamy doing this, and their hadoop-world-nyc ppt is available on their blog: http://devblog.streamy.com/2009/07/24/streamy-hadoop-summit-hbase-goes-realtime/ Amogh On 11/25/09 1:31 AM, onur ascigil onurasci...@hotmail.com wrote: Thanks for your reply! I am running 0.20.1 in pseudo-distributed mode in Ubuntu. I want to run interactive jobs with Hadoop and trying to see if Hadoop is suitable for that purpose or not. I wonder if there is anybody using Hadoop for interactive jobs where given a query, output is returned within an acceptable amount of time. Is Hadoop meant to be used only for batch processing? From: rekha...@yahoo-inc.com To: common-user@hadoop.apache.org Date: Mon, 23 Nov 2009 22:30:43 -0800 Subject: Re: Hadoop Performance Hi, Not sure about your hadoop version, and havent done much on single m/c setup myself. However there is a IPC improvement bug filed @ https://issues.apache.org/jira/browse/HADOOP-2864.Thanks! On 11/24/09 11:22 AM, onur ascigil onurasci...@hotmail.com wrote: I am running Hadoop on a single machine and have some questions about its performance. I have a simple java program that runs breadth first search on a graph with 5 nodes. It involves several map-reduce iterations. I observed that, Hadoop takes too long to produce results on such a simple job. So I attached a java profiler to my mapreduce job (runJar) to see what is going on. The java profiler reported several IPC connections to ports 54310 and 54311. Each of these IPCs to Jobtracker and HDFS takes around 10 seconds! First of all why are these IPCs take this long? And I am wondering if there is anyway to improve the performance of these IPC calls. Does Hadoop have such a large fixed-cost ? I would really appreciate any comments or suggestions. Thanks in advance, Onur _ Windows Live Hotmail: Your friends can get your Facebook updates, right from Hotmail®. http://www.microsoft.com/middleeast/windows/windowslive/see-it-in-action/social-network-basics.aspx?ocid=PID23461::T:WLMTAGL:ON:WL:en-xm:SI_SB_4:092009 _ Windows Live Hotmail: Your friends can get your Facebook updates, right from Hotmail®. http://www.microsoft.com/middleeast/windows/windowslive/see-it-in-action/social-network-basics.aspx?ocid=PID23461::T:WLMTAGL:ON:WL:en-xm:SI_SB_4:092009
Re: Saving Intermediate Results from the Mapper
Hi, keep.tasks.files.pattern is what you need, as the name suggests its a pattern match on intermediate outputs generated. Wrt to copying map data to hdfs, your mappers close() method should help you achieve this, but might slow up your tasks. Amogh On 11/23/09 8:08 AM, Jeff Zhang zjf...@gmail.com wrote: Hi Jason, which option is for setting disable the removal of intermediate data ? Thank you Jeff Zhang On Mon, Nov 23, 2009 at 10:27 AM, Jason Venner jason.had...@gmail.comwrote: You can manually write the map output to a new file, there are a number of examples of opening a sequence file and writing to it on the web or in the example code for various hadoop books. You can also disable the removal of intermediate data, which will result in potentially large amounts of data being left in the mapred.local.dir. On Sun, Nov 22, 2009 at 3:56 PM, Gordon Linoff glin...@gmail.com wrote: I am starting to learn Hadoop, using the Yahoo virtual machine with version 0.18. My question is rather simple. I would like to execute a map/reduce job. In addition to getting the results from the reduce, I would also like to save the intermediate results from the map in another HDFS file. Is this possible? --gordon -- Pro Hadoop, a book to guide you from beginner to hadoop mastery, http://www.amazon.com/dp/1430219424?tag=jewlerymall www.prohadoopbook.com a community for Hadoop Professionals
Re: How to handle imbalanced data in hadoop ?
Hi, This is the time for all three phases of reducer right? I think its due to the constant spilling for a single key to disk since the map partitions couldn't be held in-mem due to buffer limit. Did the other reducer have numerous keys with low number of values ( ie smaller partitions? ) Thanks, Amogh On 11/18/09 3:37 AM, Todd Lipcon t...@cloudera.com wrote: On Tue, Nov 17, 2009 at 1:54 PM, Pankil Doshi forpan...@gmail.com wrote: With respect to Imbalanced data, Can anyone guide me how sorting takes place in Hadoop after Map phase. I did some experiments and found that if there are two reducers which have same number of keys to sort and one reducer has all the keys same and other have different keys then time taken by by the reducer having all keys same is terribly large then other one. Hi Pankil, This is an interesting experiment you've done with results that I wouldn't quite expect. Do you have the java source available that you used to run this experiment? Also I found that length on my Key doesnt matter in the time taken to sort it. With small keys on CPU-bound workload this is probably the case since the sort would be dominated by comparison. If you were to benchmark keys that are 10 bytes vs keys that are 1000 bytes, I'm sure you'd see a difference. I wanted some hints how sorting is done .. MapTask.java, ReduceTask.java, and Merger.java are the key places to look. The actual sort is a relatively basic quicksort, but there is plenty of complexity in the spill/shuffle/merge logic. -Todd Pankil On Sun, Nov 15, 2009 at 7:25 PM, Jeff Hammerbacher ham...@cloudera.com wrote: Hey Jeff, You may be interested in the Skewed Design specification from the Pig team: http://wiki.apache.org/pig/PigSkewedJoinSpec. Regards, Jeff On Sun, Nov 15, 2009 at 2:00 PM, brien colwell xcolw...@gmail.com wrote: My first thought is that it depends on the reduce logic. If you could do the reduction in two passes then you could do an initial arbitrary partition for the majority key and bring the partitions together in a second reduction (or a map-side join). I would use a round robin strategy to assign the arbitrary partitions. On Sat, Nov 14, 2009 at 11:03 PM, Jeff Zhang zjf...@gmail.com wrote: Hi all, Today there's a problem about imbalanced data come out of mind . I'd like to know how hadoop handle this kind of data. e.g. one key dominates the map output, say 99%. So 99% data set will go to one reducer, and this reducer will become the bottleneck. Does hadoop have any other better ways to handle such imbalanced data set ? Jeff Zhang
Re: new MR API:MutilOutputFormat
MultipleOutputFormat and MOS are to be merged : http://issues.apache.org/jira/browse/MAPREDUCE-370 Amogh On 11/18/09 12:03 PM, Y G gymi...@gmail.com wrote: in the old MR API ,there is MutilOutputFormat class which i can use to custom the reduce output file name. it's very useful for me. but i can't find it in the new API. can anybody tell how to do the same func using the new MR API? - 天天开心 身体健康 Sent from Nanjing, 32, China Samuel Goldwynhttp://www.brainyquote.com/quotes/authors/s/samuel_goldwyn.html - I'm willing to admit that I may not always be right, but I am never wrong.
Re: architecture help
I would like the connection management to live separately from the mapper instances per node. The JVM reuse option in Hadoop might be helpful for you in this case. Amogh On 11/16/09 6:22 AM, yz5od2 woods5242-outdo...@yahoo.com wrote: Hi, a) I have a Mapper ONLY job, the job reads in records, then parses them apart. No reduce phase b) I would like this mapper job to save the record into a shared mysql database on the network. c) I am running a 4 node cluster, and obviously running out of connections very quickly, that is something I can work on the db server side. What I am trying to understand, is that for each mapper task instance that is processing an input split... does that run in its own classloader? I guess I am trying to figure out how to manage a connection pool on each processing node, so that all mapper instances would use that to get access to the database. Right now it appears that each node is creating thousands of mapper instance each with their own connection management, hence this is blowing up quite quickly. I would like the connection management to live separately from the mapper instances per node. I hope I am explaining what I want to do ok, please let me know if anyone has any thoughts, tips, best practices, features I should look at etc. thanks
Re: Multiple Input Paths
Hi Mark, A future release of Hadoop will have a MultipleInputs class, akin to MultipleOutputs. This would allow you to have a different inputformat, mapper depending on the path you are getting the split from. It uses special Delegating[mapper/input] classes to resolve this. I understand backporting this is more or less out of question, but the ideas there might provide pointers to help you solve your current problem. Just a thought :) Amogh On 11/3/09 8:44 PM, Mark Vigeant mark.vige...@riskmetrics.com wrote: Hey Vipul No I haven't concatenated my files yet, and I was just thinking over how to approach the issue of multiple input paths. I actually did what Amandeep hinted at which was we wrote our own XMLInputFormat and XMLRecordReader. When configuring the job in my driver I set job.setInputFormatClass(XMLFileInputFormat.class) and what it does is send chunks of XML to the mapper as opposed to lines of text or whole files. So I specified the Line Delimiter in the XMLRecordReader (ie startTag) and everything in between the tags startTag and /startTag are sent to the mapper. Inside the map function is where to parse the data and write it to the table. What I have to do now is just figure out how to set the Line Delimiter to be something common in both XML files I'm reading. Currently I have 2 mapper classes and thus 2 submitted jobs which is really inefficient and time consuming. Make sense at all? Sorry if it doesn't, feel free to ask more questions Mark -Original Message- From: Vipul Sharma [mailto:sharmavi...@gmail.com] Sent: Monday, November 02, 2009 7:48 PM To: common-user@hadoop.apache.org Subject: RE: Multiple Input Paths Mark, were you able to concatenate both the xml files together. What did you do to keep the resulting xml well forned? Regards, Vipul Sharma, Cell: 281-217-0761
Re: too many 100% mapper does not complete / finish / commit
Hi, Quick questions... Are you creating too many small files? Are there any task side files being created? Is the heap for NN having enough space to list metadata? Any details on its general health will probably be helpful to people on the list. Amogh On 11/2/09 2:02 PM, Zhang Bingjun (Eddy) eddym...@gmail.com wrote: Dear hadoop fellows, We have been using Hadoop-0.20.1 MapReduce to crawl some web data. In this case, we only have mappers to crawl data and save data into HDFS in a distributed way. No reducers is specified in the job conf. The problem is that for every job we have about one third mappers stuck with 100% progress but never complete. If we look at the the tasktracker log of those mappers, the last log was the key input INFO log line and no others logs were output after that. From the stdout log of a specific attempt of one of those mappers, we can see that the map function of the mapper has been finished completely and the control of the execution should be somewhere in the MapReduce framework part. Does anyone have any clue about this problem? Is it because we didn't use any reducers? Since two thirds of the mappers could complete successfully and commit their output data into HDFS, I suspect the stuck mappers has something to do with the MapReduce framework code? Any input will be appreciated. Thanks a lot! Best regards, Zhang Bingjun (Eddy) E-mail: eddym...@gmail.com, bing...@nus.edu.sg, bing...@comp.nus.edu.sg Tel No: +65-96188110 (M)
Re: Multiple Input Paths
Mark, Set-up for a mapred job consumes a considerable amount of time and resources and so, if possible a single job is preferred. You can add multiple paths to your job, and if you need different processing logic depending upon the input being consumed, you can use parameter map.input.file in your mapper to decide. Amogh On 11/2/09 8:53 PM, Mark Vigeant mark.vige...@riskmetrics.com wrote: Hey, quick question: I'm writing a program that parses data from 2 different files and puts the data into a table. Currently I have 2 different map functions and so I submit 2 separate jobs to the job client. Would it be more efficient to add both paths to the same mapper and only submit one job? Thanks a lot! Mark Vigeant RiskMetrics Group, Inc.
Re: Distribution of data in nodes with different storage capacity
Hi, Rebalancer should help you : http://issues.apache.org/jira/browse/HADOOP-1652 Amogh On 10/28/09 2:54 PM, Vibhooti Verma verma.vibho...@gmail.com wrote: Hi All, We are facing the issue with distribution of data in a cluster where nodes have differnt storage capacity. We have 4 nodes with 100G capacity and 1 node with 2TB capacity. The storage of the high storage capacity is not being utilized where as all low storage capccity nodes are being full. Any help/suggestion in the regard will be helpful. -- cheers, Vibhooti
Re: Problem to create sequence file for
Hi Bhushan, If splitting input files is an option, why don't you let hadoop do this for you? If need be you may use a custom input format and sequencefile*outputformat. Amogh On 10/27/09 7:55 PM, bhushan_mahale bhushan_mah...@persistent.co.in wrote: Hi Jason, Thanks for the reply. The string is the entire content of the input text file. It could as long as ~300MB. I tried increasing jvm heap but unfortunately it was giving same error. Other option I am thinking is to split input files first. - Bhushan -Original Message- From: Jason Venner [mailto:jason.had...@gmail.com] Sent: Tuesday, October 27, 2009 7:19 PM To: common-user@hadoop.apache.org Subject: Re: Problem to create sequence file for How large is the string that is being written? Does it contain the entire contents of your file? You may simple need to increase the heap size with your jvm. On Tue, Oct 27, 2009 at 3:43 AM, bhushan_mahale bhushan_mah...@persistent.co.in wrote: Hi, I have written a code to create sequence files for given text files. The program takes following input parameters: 1. Local source directory - contains all the input text files 2. Destination HDFS URI - location on hdfs where sequence file will be copied The key for a sequence-record is the file-name. The value for a sequence-record is the content of the text file. The program runs fine for large number input text files. But if the size of a single input text file is 100 MB then it throws following exception: Exception in thread main java.lang.OutOfMemoryError: Java heap space at java.lang.String.toCharArray(String.java:2726) at org.apache.hadoop.io.Text.encode(Text.java:388) at org.apache.hadoop.io.Text.set(Text.java:178) at org.apache.hadoop.io.Text.init(Text.java:81) at SequenceFileCreator.create(SequenceFileCreator.java:106) at SequenceFileCreator.processFile(SequenceFileCreator.java:168) I am using org.apache.hadoop.io.SequenceFile.Writer for creating the sequence file. The Text class is used for keyclass and valclass. I tried increasing the max memory for the program but it throws same error. Can you provide your suggestions? Thanks, - Bhushan DISCLAIMER == This e-mail may contain privileged and confidential information which is the property of Persistent Systems Ltd. It is intended only for the use of the individual or entity to which it is addressed. If you are not the intended recipient, you are not authorized to read, retain, copy, print, distribute or use this message. If you have received this communication in error, please notify the sender and delete all copies of this message. Persistent Systems Ltd. does not accept any liability for virus infected mails. -- Pro Hadoop, a book to guide you from beginner to hadoop mastery, http://www.amazon.com/dp/1430219424?tag=jewlerymall www.prohadoopbook.com a community for Hadoop Professionals DISCLAIMER == This e-mail may contain privileged and confidential information which is the property of Persistent Systems Ltd. It is intended only for the use of the individual or entity to which it is addressed. If you are not the intended recipient, you are not authorized to read, retain, copy, print, distribute or use this message. If you have received this communication in error, please notify the sender and delete all copies of this message. Persistent Systems Ltd. does not accept any liability for virus infected mails.
Re: How To Pass Parameters To Mapper Through Main Method
Hi, Many options available here. You can use jobconf (0.18 ) / context.conf (0.20) to pass these lines across all tasks ( assuming the size isnt relatively large ) and use configure / setup to retrieve these.. Or use distributed cache to read a file containing these lines ( possibly with jvm reuse if you want that extra bit as well. ) Thanks, Amogh On 10/26/09 6:17 AM, Boyu Zhang boyuzhan...@gmail.com wrote: Dear All, I am implementing a clustering algorithm in which I need to compare each line to two specific lines (they all have the same format ) and output two scores denoting the similarity between each line to the two specific lines. Can I define two global variables (the 2 specific lines) in the main[] method and pass those two variables to the mapper class? Or can I store the two lines in a separate file (say Centric )and have mapper class read the file and compare each lines (from other files, say Data in which the data need to be processed) with the two from the separate file Centric? Thanks a lot for reading my email, really appreciate any help! Boyu Zhang(Emma) University of Delaware
Re: How to skip fail map to done the job
For skipping failed tasks try : mapred.max.map.failures.percent Amogh On 10/21/09 8:58 AM, 梁景明 futur...@gmail.com wrote: hi, I use hadoop0.20 and 8 nodes, there is a job that has 130 map to run ,and completed 128 map, but only 2 map fail ,and its fail in my case is accepted ,but the job fail ,the last 128 map also fail, i can't get any result. i found that SkipBadRecords maybe work for my case,but i used it ,and it doesnt work,maybe something wrong,here is my code , how to skip the bad fail map ,thanks for any help JobConf conf = new JobConf(ProductMR.class); conf.setJobName(ProductMR); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Product.class); conf.setMapperClass(Map.class); conf.setReducerClass(Reduce.class); conf.setMapOutputCompressorClass(DefaultCodec.class); conf.setInputFormat(SequenceFileInputFormat.class); conf.setOutputFormat(SequenceFileOutputFormat.class); String objpath = abc1; SequenceFileInputFormat.addInputPath(conf, new Path(objpath)); SkipBadRecords.setMapperMaxSkipRecords(conf, Long.MAX_VALUE); SkipBadRecords.setAttemptsToStartSkipping(conf, 0); SkipBadRecords.setSkipOutputPath(conf, new Path(data/product/skip/)); String output = abc; SequenceFileOutputFormat.setOutputPath(conf, new Path(output)); JobClient.runJob(conf);
Re: proper way to configure classes required by mapper job
Hi, Check the distributed cache APIs, it provides various functionalities to distribute and add jars to classpath on compute machines. Amogh On 10/19/09 3:38 AM, yz5od2 woods5242-outdo...@yahoo.com wrote: Hi, What is the preferred method to distribute the classes (in various Jars) to my Hadoop instances, that are required by my Mapper class? thanks!
Re: Hadoop dfs can't allocate memory with enough hard disk space when data gets huge
Hi, It would be more helpful if you provide the exact error here. Also, hadoop uses the local FS to store intermediate data, along with HDFS for final output. If your job is memory intensive, try limiting the number of tasks you are running in parallel on a machine. Amogh On 10/19/09 8:27 AM, Kunsheng Chen ke...@yahoo.com wrote: I and running a hadoop program to perform MapReduce work on files inside a folder. My program is basically doing Map and Reduce work, each line of any file is a pair of string, and the result is a string associate with occurence inside all files. The program works fine until the number of files grow to about 80,000,then the 'cannot allocate memory' error occur for some reason. Each of the file contains around 50 lines, but the total size of all files is no more than 1.5 GB. There are 3 datanodes performing calculation,each of them have more than 10GB hd left. I am wondering if that is normal for Hadoop because the data is too large ? Or it might be my programs problem ? It is really not supposed to be since Hadoop was developed for processing large data sets. Any idea is well appreciated
Re: How to get IP address of the machine where map task runs
InetAddress.getLocalHost() should give you that. If you are planning to make some decisions based on this, please do account for conditions arising from speculative executions ( they caused me some amount of trouble when I was designing my app ) Thanks, Amogh On 10/15/09 8:15 AM, Long Van Nguyen Dinh munt...@gmail.com wrote: Thanks Amogh. For my application, I want each map task reports to me where it's running. However, I have no idea how to use Java Inetaddress APIs to get that info. Could you explain more? Van On Wed, Oct 14, 2009 at 2:16 PM, Amogh Vasekar am...@yahoo-inc.com wrote: For starters look at any monitoring tool like vaidya, hadoop UI ( ganglia too, haven't read much on it though ). Not sure if you need this for debugging purposes or for some other real-time app.. You should be able to get info on localhost of each of your map tasks in a pretty straightforward way using Java Inetaddress APIs( and use that info for search?) Thanks, Amogh On 10/15/09 12:11 AM, Long Van Nguyen Dinh munt...@gmail.com wrote: Hello again, Could you give me any hint to start with? I have no idea how to get that information. Many thanks, Van On Tue, Oct 13, 2009 at 9:22 PM, Long Van Nguyen Dinh munt...@gmail.com wrote: Hi all, Given a map task, I need to know the IP address of the machine where that task is running. Is there any existing method to get that information? Thank you, Van
RE: Easiest way to pass dynamic variable to Map Class
Hi, I guess configure is now setup(), and using toolrunner can create a configuration / context to mimic the required behavior. Thanks, Amogh -Original Message- From: Amandeep Khurana [mailto:ama...@gmail.com] Sent: Tuesday, October 06, 2009 5:43 AM To: common-user@hadoop.apache.org Subject: Re: Easiest way to pass dynamic variable to Map Class How do we do it in the 0.20 api? I've used this in 0.19 but not sure of 0.20... On 10/5/09, Aaron Kimball aa...@cloudera.com wrote: You can set these in the JobConf when you're creating the MapReduce job, and then read them back in the configure() method of the Mapper class. - Aaron On Mon, Oct 5, 2009 at 4:50 PM, Pankil Doshi forpan...@gmail.com wrote: Hello everyone, What will be easiest way to pass Dynamic value to map class?? Dynamic value are arguments given at run time. Pankil -- Amandeep Khurana Computer Science Graduate Student University of California, Santa Cruz
RE: How can I assign the same mapper class with different data?
Hi Huang, Haven't worked with Hbase but in general, If you want to have control over what data split to go as a whole to mapper, easiest way is to compress that split in single file; making as many split files as needed. If you need to know what file is currently being processed, you can use map.input.file ( corresponds to HBase table?? )from configuration, and do file specific operations as needed. Hope this helps Amogh -Original Message- From: Huang Qian [mailto:skysw...@gmail.com] Sent: Tuesday, October 06, 2009 7:15 AM To: common-user@hadoop.apache.org Subject: Re: How can I assign the same mapper class with different data? The real problem is I want to use different mapper to deal with different hbase data. For example the data is storing in different HTable, So I should use different mapper to connect to different Htable and get the data.How can I made it? 2009/10/5 Huang Qian skysw...@gmail.com I am a beginner at hadoop. I want to ask a question , how can I configurate a job with two map task with the same mapper class and different dataset? For example, I want to sort the num from 1 to 100, then use one task to deal with 1 to 50, and the other with 51 to 100, I want to control the dataset I send to mapper. How can I make it? Can anyone help me ?
RE: Best Idea to deal with following situation
Along with partitioner, try to plug in a combiner. It would provide significant performance gains. Not sure about the algo you use, but might have to tweak that a little to facilitate a combiner. Thanks, Amogh -Original Message- From: Chandraprakash Bhagtani [mailto:cpbhagt...@gmail.com] Sent: Tuesday, September 29, 2009 12:25 PM To: common-user@hadoop.apache.org Cc: core-u...@hadoop.apache.org Subject: Re: Best Idea to deal with following situation you can write your custom partitioner instead of hash partitioner On Sat, Sep 26, 2009 at 6:18 AM, Pankil Doshi forpan...@gmail.com wrote: Hello everyone, I have job whose result has only 5 keys but but each key has long list of values like in 10's . What should be best way to deal with it. I feel few of my reducers get over loaded as two or more keys go to same reduce and hence they have lots of work to do. So what should be best way out with this situation? Pankil -- Thanks Regards, Chandra Prakash Bhagtani,
RE: Distributed cache - are files unique per job?
I believe framework checks timestamps on HDFS for marking an already available copy of the file valid or invalid, since the archived files are not cleaned up till a certain du limit is reached, and no apis for cleanup available. There was a thread on this some time back on the list. Amogh -Original Message- From: Allen Wittenauer [mailto:awittena...@linkedin.com] Sent: Tuesday, September 29, 2009 10:41 PM To: common-user@hadoop.apache.org Subject: Re: Distributed cache - are files unique per job? On 9/29/09 2:55 AM, Erik Forsberg forsb...@opera.com wrote: If I distribute files using the Distributed Cache (-archives option), are they guaranteed to be unique per job, or is there a risk that if I distribute a file named A with job 1, job 2 which also distributes a file named A will read job 1's file? From my understanding, at one point in time there was a 'shortcut' in the system that did exactly what you fear. If the same cache file name was specified by multiple jobs, they'd get the same file as it was assumed they were the same file. I *think* this has been fixed though. [Needless to say, for automated jobs that push security keys through a cache file, this is bad.]
RE: Program crashed when volume of data getting large
Hi, Please check the namenode heap usage. Your cluster may be having too many files to handle / too little free space. It is generally available in the UI. This is one of the causes I have seen for the Timeout. Amogh -Original Message- From: Kunsheng Chen [mailto:ke...@yahoo.com] Sent: Wednesday, September 23, 2009 6:21 PM To: common-user@hadoop.apache.org Subject: Program crashed when volume of data getting large Hi everyone, I am running two map-reduce program, they were working good but when the data turns into around 900MB (5+ files). things weird happen to remind me as below: 'Communication problem with server: java.net.SocketTimeoutException: timed out waiting for rpc response' Also there is some other reminder like fail to allocate memory. Strange is that the program keeps running and shows mapping and reduce percentage after those errorsseems it is still progressing in a slow pace. Does anyone have some idea ? Thanks, -Kun
JVM reuse
Hi All, Regarding the JVM reuse feature incorporated, it says reuse is generally recommended for streaming and pipes jobs. I'm a little unclear on this and any pointers will be appreciated. Also, in what scenarios will this feature be helpful for java mapred jobs? Thanks, Amogh
RE: about hadoop jvm allocation in job excution
Hi, Funny enough was looking at it just yesterday. http://hadoop.apache.org/common/docs/r0.20.0/mapred_tutorial.html#Task+JVM+Reuse Thanks, Amogh -Original Message- From: Zhimin [mailto:wan...@cs.umb.edu] Sent: Tuesday, September 15, 2009 10:53 PM To: core-u...@hadoop.apache.org Subject: about hadoop jvm allocation in job excution We have a project which needs to support similarity queries against items from a huge amount of data. One approach we have tried is to use Hbase as the data repository and Hadoop as the query execution engine. We adopted Hadoop because Map-Reduce is a very good model of our underlying task and the programming was straightforward. However, we found that Hadoop will always allocate a new JVM for each individual task on a node. This is inefficient for us because in our case the whole Hadoop platform is dedicated to some relatively stable parametrized querries, and security and strict isolation of different tasks is not our main concern. To save the task setup time, I wonder if there are some existing mechanism in Hadoop or some extension of Hadoop in other open source projects that can let us reside our classes in a JVM on the job node, with task nodes waiting for requests. -- View this message in context: http://www.nabble.com/about-hadoop-jvm-allocation-in-job-excution-tp25458201p25458201.html Sent from the Hadoop core-user mailing list archive at Nabble.com.
RE: DistributedCache purgeCache()
My bad, its not exposed. If nothing else's available, may be mimic its behaviour in the code? Thanks, Amogh -Original Message- From: #YONG YONG CHENG# [mailto:aarnc...@pmail.ntu.edu.sg] Sent: Monday, September 07, 2009 10:27 AM To: common-user@hadoop.apache.org Subject: RE: DistributedCache purgeCache() Thanks for your swift response. But where can I find deletecache()? Thanks. -Original Message- From: Amogh Vasekar [mailto:am...@yahoo-inc.com] Sent: Thu 9/3/2009 2:44 PM To: common-user@hadoop.apache.org Subject: RE: DistributedCache purgeCache() AFAIK, releaseCache only works on cleaning reference to your file. Try using deletecache in synchronized manner. Thanks, Amogh -Original Message- From: #YONG YONG CHENG# [mailto:aarnc...@pmail.ntu.edu.sg] Sent: Thursday, September 03, 2009 8:50 AM To: common-user@hadoop.apache.org Subject: DistributedCache purgeCache() Good Day, I have a question on the DistributedCache as follows. I have used DistributedCache to move my executable(.exe) around the (onto the local filesystems of) nodes in Hadoop and run the .exe (via addCacheArchive() and getLocalCacheArchives()). But I discovered after my job, the .exe still resides on the nodes. I have try using releaseCache() and/or purgeCache(), but the .exe still on the nodes. Is there a way to remove the .exe from the nodes? Thanks.
RE: multi core nodes
Before setting the task limits, do take into account the memory considerations ( many archive posts on this can be found ). Also, your tasktracker and datanode daemons will run on that machine as well, so you might want to set aside some processing power for that. Cheers! Amogh -Original Message- From: Erik Forsberg [mailto:forsb...@opera.com] Sent: Friday, September 04, 2009 11:55 AM To: common-user@hadoop.apache.org Subject: Re: multi core nodes On Thu, 3 Sep 2009 13:20:16 -0700 (PDT) ll_oz_ll himanshu_cool...@yahoo.com wrote: Hi, Is hadoop able to take into account multi core nodes, so that nodes which have multiple cores run multiple concurrent jobs ? Or does that need to be configured manually and if so can that be configured individually for each node ? Yes, it has to be configured manually. You set the following two configuration variables in hadoop-site.xml on each node depending on the number of cores on the node: mapred.tasktracker.map.tasks.maximum mapred.tasktracker.reduce.tasks.maximum According to the book Hadoop - the definitive guide, a good rule of thumb is to have between 1 and 2 tasks per processor, counting both map and reduce tasks. So, for example, if a machine has 8 cores, setting mapred.tasktracker.map.tasks.maximum = 8 and mapred.tasktracker.reduce.tasks.maximum = 8 probably makes sense, but this also depends a bit on your load. Cheers, \EF -- Erik Forsberg forsb...@opera.com Developer, Opera Software - http://www.opera.com/
RE: Some issues!
Have a look at jobclient, it should suffice. Cheers! Amogh -Original Message- From: bharath vissapragada [mailto:bharathvissapragada1...@gmail.com] Sent: Friday, September 04, 2009 9:15 PM To: common-user@hadoop.apache.org Subject: Re: Some issues! Hey , I have one more doubt , Suppose I have some cascading mapred jobs and suppose some data which was collected in MRjob1 is to be used in MRjob2 m is there any way? Thanks On Fri, Sep 4, 2009 at 1:54 PM, Amandeep Khurana ama...@gmail.com wrote: Or you can output the data in the keys and NullWritable as the value. That ways you'll get only unique data... On 9/4/09, zhang jianfeng zjf...@gmail.com wrote: Hi Sugandha , If you only want to the value, you need to set the key as NullWritable in reduce. e.g. output.collect(NullWritable.get(), value); On Fri, Sep 4, 2009 at 12:46 AM, Sugandha Naolekar sugandha@gmail.comwrote: Hello! Running a simple MR job, and setting a replication factor of 2. Now, after its execution, the output is split in files named as part-0 and so on. I want to ask is, can't we avoid these keys or key values to get printed in output files? I mean, I am getting the output in the files in key-value pair. I want just the data and not the keys(integers) in it. -- Regards! Sugandha -- Amandeep Khurana Computer Science Graduate Student University of California, Santa Cruz
RE: difference between mapper and map runnable
Hi, Mapper is used to process the K,V pair passed to it, MapRunnable is an interface, when implemented is responsible for generating a conforming K,V pair and pass it to Mapper. Cheers! Amogh -Original Message- From: Rakhi Khatwani [mailto:rkhatw...@gmail.com] Sent: Thursday, August 27, 2009 5:56 PM To: common-user@hadoop.apache.org Subject: difference between mapper and map runnable Hi, Whats the difference between a mapper and map runnable and its usage? Regards Raakhi
RE: Hadoop streaming: How is data distributed from mappers to reducers?
Hadoop will make sure that every k,v pair with same key will land up in same reducer and consumed in a single reduce instance. -Original Message- From: Nipun Saggar [mailto:nipun.sag...@gmail.com] Sent: Tuesday, August 25, 2009 10:41 AM To: common-user@hadoop.apache.org Subject: Re: Hadoop streaming: How is data distributed from mappers to reducers? Does that mean that, if the same key is emitted more than once from a mapper, it is not necessary that the key value pairs (for that same key) will go to the same reducer? -Nipun On Tue, Aug 25, 2009 at 6:13 AM, Aaron Kimball aa...@cloudera.com wrote: Yes. It works just like Java-based MapReduce in that regard. - Aaron On Sun, Aug 23, 2009 at 5:09 AM, Nipun Saggar nipun.sag...@gmail.com wrote: Hi all, I have recently started using Hadoop streaming. From the documentation, I understand that by default, each line output from a mapper up to the first tab becomes the key and rest of the line is the value. I wanted to know that between the mapper and reducer, is there a shuffling(sorting) phase? More specifically, Would it be correct to assume that output from all mappers with the same key will go to the same reducer? Thanks, Nipun
RE: MR job scheduler
I'm not sure that is the case with Hadoop. I think its assigning reduce task to an available tasktracker at any instant; Since a reducer polls JT for completed maps. And if it were the case as you said, a reducer wont be initialized until all maps have completed , after which copy phase would start. Thanks, Amogh -Original Message- From: bharath vissapragada [mailto:bharathvissapragada1...@gmail.com] Sent: Friday, August 21, 2009 9:50 AM To: common-user@hadoop.apache.org Subject: Re: MR job scheduler OK i'll be a bit more specific , Suppose map outputs 100 different keys . Consider a key K whose correspoding values may be on N diff datanodes. Consider a datanode D which have maximum number of values . So instead of moving the values on D to other systems , it is useful to bring in the values from other datanodes to D to minimize the data movement and also the delay. Similar is the case with All the other keys . How does the scheduler take care of this ? 2009/8/21 zjffdu zjf...@gmail.com Add some detials: 1. #map is determined by the block size and InputFormat (whether you can want to split or not split) 2. The default scheduler for Hadoop is FIFO, and the Fair Scheduler and Capacity Scheduler are other two options as I know. JobTracker has the scheduler. 3. Once the map task is done, it will tell its own tasktracker, and the tasktracker will tell jobtracker, so jobtracker manage all the tasks, and it will decide how to and when to start the reduce task -Original Message- From: Arun C Murthy [mailto:a...@yahoo-inc.com] Sent: 2009年8月20日 11:41 To: common-user@hadoop.apache.org Subject: Re: MR job scheduler On Aug 20, 2009, at 9:00 AM, bharath vissapragada wrote: Hi all, Can anyone tell me how the MR scheduler schedule the MR jobs? How does it decide where t create MAP tasks and how many to create. Once the MAP tasks are over how does it decide to move the keys to the reducer efficiently(minimizing the data movement across the network). Is there any doc available which describes this scheduling process quite efficiently The #maps is decided by the application. The scheduler decides where to execute them. Once the map is done, the reduce tasks connect to the tasktracker (on the node where the map-task executed) and copies the entire output over http. Arun
RE: MR job scheduler
Yes, but the copy phase starts with the initialization for a reducer, after which it would keep polling for completed map tasks to fetch the respective outputs. -Original Message- From: bharath vissapragada [mailto:bharathvissapragada1...@gmail.com] Sent: Friday, August 21, 2009 12:00 PM To: common-user@hadoop.apache.org Subject: Re: MR job scheduler Amogh i think Reduce phase starts only when all the map phases are completed . Because it needs all the values corresponding to a particular key! 2009/8/21 Amogh Vasekar am...@yahoo-inc.com I'm not sure that is the case with Hadoop. I think its assigning reduce task to an available tasktracker at any instant; Since a reducer polls JT for completed maps. And if it were the case as you said, a reducer wont be initialized until all maps have completed , after which copy phase would start. Thanks, Amogh -Original Message- From: bharath vissapragada [mailto:bharathvissapragada1...@gmail.com] Sent: Friday, August 21, 2009 9:50 AM To: common-user@hadoop.apache.org Subject: Re: MR job scheduler OK i'll be a bit more specific , Suppose map outputs 100 different keys . Consider a key K whose correspoding values may be on N diff datanodes. Consider a datanode D which have maximum number of values . So instead of moving the values on D to other systems , it is useful to bring in the values from other datanodes to D to minimize the data movement and also the delay. Similar is the case with All the other keys . How does the scheduler take care of this ? 2009/8/21 zjffdu zjf...@gmail.com Add some detials: 1. #map is determined by the block size and InputFormat (whether you can want to split or not split) 2. The default scheduler for Hadoop is FIFO, and the Fair Scheduler and Capacity Scheduler are other two options as I know. JobTracker has the scheduler. 3. Once the map task is done, it will tell its own tasktracker, and the tasktracker will tell jobtracker, so jobtracker manage all the tasks, and it will decide how to and when to start the reduce task -Original Message- From: Arun C Murthy [mailto:a...@yahoo-inc.com] Sent: 2009年8月20日 11:41 To: common-user@hadoop.apache.org Subject: Re: MR job scheduler On Aug 20, 2009, at 9:00 AM, bharath vissapragada wrote: Hi all, Can anyone tell me how the MR scheduler schedule the MR jobs? How does it decide where t create MAP tasks and how many to create. Once the MAP tasks are over how does it decide to move the keys to the reducer efficiently(minimizing the data movement across the network). Is there any doc available which describes this scheduling process quite efficiently The #maps is decided by the application. The scheduler decides where to execute them. Once the map is done, the reduce tasks connect to the tasktracker (on the node where the map-task executed) and copies the entire output over http. Arun
RE: MR job scheduler
Let me rephrase, 1. Copy phase starts after reducer initialization, which happens before all maps have completed. 2. Which mapper has maximum values for a particular key wont be known until all mappers have completed ( to be more precise, until a particular percentage of running mappers is completed as we have the current maximum value mapper). Also, there is no rule which says one record can go to only one reducer. Thanks, Amogh -Original Message- From: bharath vissapragada [mailto:bharathvissapragada1...@gmail.com] Sent: Friday, August 21, 2009 12:12 PM To: common-user@hadoop.apache.org Subject: Re: MR job scheduler Yes , My doubt is that how is the location of the reducer selected . Is it selected arbitrarily or is selected on a particular machine which has already the more values (corresponding to the key of that reducer) which reduces the cost of transferring data across the network(because already many values to that key are on that machine where the map phase completed).. 2009/8/21 Amogh Vasekar am...@yahoo-inc.com Yes, but the copy phase starts with the initialization for a reducer, after which it would keep polling for completed map tasks to fetch the respective outputs. -Original Message- From: bharath vissapragada [mailto:bharathvissapragada1...@gmail.com] Sent: Friday, August 21, 2009 12:00 PM To: common-user@hadoop.apache.org Subject: Re: MR job scheduler Amogh i think Reduce phase starts only when all the map phases are completed . Because it needs all the values corresponding to a particular key! 2009/8/21 Amogh Vasekar am...@yahoo-inc.com I'm not sure that is the case with Hadoop. I think its assigning reduce task to an available tasktracker at any instant; Since a reducer polls JT for completed maps. And if it were the case as you said, a reducer wont be initialized until all maps have completed , after which copy phase would start. Thanks, Amogh -Original Message- From: bharath vissapragada [mailto:bharathvissapragada1...@gmail.com] Sent: Friday, August 21, 2009 9:50 AM To: common-user@hadoop.apache.org Subject: Re: MR job scheduler OK i'll be a bit more specific , Suppose map outputs 100 different keys . Consider a key K whose correspoding values may be on N diff datanodes. Consider a datanode D which have maximum number of values . So instead of moving the values on D to other systems , it is useful to bring in the values from other datanodes to D to minimize the data movement and also the delay. Similar is the case with All the other keys . How does the scheduler take care of this ? 2009/8/21 zjffdu zjf...@gmail.com Add some detials: 1. #map is determined by the block size and InputFormat (whether you can want to split or not split) 2. The default scheduler for Hadoop is FIFO, and the Fair Scheduler and Capacity Scheduler are other two options as I know. JobTracker has the scheduler. 3. Once the map task is done, it will tell its own tasktracker, and the tasktracker will tell jobtracker, so jobtracker manage all the tasks, and it will decide how to and when to start the reduce task -Original Message- From: Arun C Murthy [mailto:a...@yahoo-inc.com] Sent: 2009年8月20日 11:41 To: common-user@hadoop.apache.org Subject: Re: MR job scheduler On Aug 20, 2009, at 9:00 AM, bharath vissapragada wrote: Hi all, Can anyone tell me how the MR scheduler schedule the MR jobs? How does it decide where t create MAP tasks and how many to create. Once the MAP tasks are over how does it decide to move the keys to the reducer efficiently(minimizing the data movement across the network). Is there any doc available which describes this scheduling process quite efficiently The #maps is decided by the application. The scheduler decides where to execute them. Once the map is done, the reduce tasks connect to the tasktracker (on the node where the map-task executed) and copies the entire output over http. Arun
RE: passing job arguments as an xml file
Hi, GenericOptionsParser is customized only for Hadoop specific params : * codeGenericOptionsParser/code recognizes several standarad command * line arguments, enabling applications to easily specify a namenode, a * jobtracker, additional configuration resources etc. Ideally, all params must be passed via Tool interface. In my application, I have a custom serialiizer/deseralizer classes to parse any xml file I might be supplying, which I use to generate some metadata. This can be a part of the above interface as well. Hope this helps. Thanks, Amogh -Original Message- From: ishwar ramani [mailto:rvmish...@gmail.com] Sent: Friday, August 21, 2009 1:38 AM To: common-user Subject: passing job arguments as an xml file Hi, I am looking at an easy way to passing the job arguments trough a config file. The genericoptionsparser seems to parse only the hadoop options. Normally i use jsap but that would not co-exist with genericoptionsparser thanks ishwar
RE: utilizing all cores on single-node hadoop
While setting mapred.tasktracker.map.tasks.maximum and mapred.tasktracker.reduce.tasks.maximum, please consider the memory usage your application might have since all tasks will be competing for the same and might reduce overall performance. Thanks, Amogh -Original Message- From: Harish Mallipeddi [mailto:harish.mallipe...@gmail.com] Sent: Tuesday, August 18, 2009 10:37 AM To: common-user@hadoop.apache.org Subject: Re: utilizing all cores on single-node hadoop Hi Vasilis, Here's some info that I know: mapred.map.tasks - this is a job-specific setting. This is just a hint to InputFormat as to how many InputSplits (and hence MapTasks) you want for your job. The default InputFormat classes usually keep each split size to the HDFS block size (64MB default). So if your input data is less than 64 MB, it will just result in only 1 split and hence 1 MapTask only. mapred.reduce.tasks - this is also a job-specific setting. mapred.tasktracker.map.tasks.maximum mapred.tasktracker.reduce.tasks.maximum The above 2 are tasktracker-specific config options and determine how many simultaneous MapTasks and ReduceTasks run on each TT. Ideally on a 8-core box, you would want to set map.tasks.maximum to something like 6 and reduce.tasks.maximum to 4 to utilize all the 8 cores to the maximum (there's a little bit of over-subscription to account for tasks idling while doing I/O). In the web admin console, how many map-tasks and reduce-tasks are reported to have been launched for your job? Cheers, Harish On Tue, Aug 18, 2009 at 5:47 AM, Vasilis Liaskovitis vlias...@gmail.comwrote: Hi, I am a beginner trying to setup a few simple hadoop tests on a single node before moving on to a cluster. I am just using the simple wordcount example for now. My question is what's the best way to guarantee utilization of all cores on a single-node? So assuming a single node with 16-cores what are the suggested values for: mapred.map.tasks mapred.reduce.tasks mapred.tasktracker.map.tasks.maximum mapred.tasktracker.map.tasks.maxium I found an old similar thread http://www.mail-archive.com/hadoop-u...@lucene.apache.org/msg00152.html and I have followed similar settings for my 16-core system (e.g. map.tasks=reduce.tasks=90 and map.tasks.maximum=100), however I always see only 3-4 cores utilized using top. - The description for mapred.map.tasks says Ignored when mapred.job.tracker is local , and in my case mapred.job.tracker=hdfs://localhost:54311 is it possible that the map.tasks and reduce.tasks I am setting are being ignored? How can I verify this? Is there a way to enforce my values even on a localhost scenario like this? - Are there other config options/values that I need to set besides the 4 I mentioned above? - Also is it possible that for short tasks, I won't see full utilization of all cores anyway? Something along those lines is mentioned in an issue a year ago: http://issues.apache.org/jira/browse/HADOOP-3136 If the individual tasks are very short i.e. run for less than the heartbeat interval the TaskTracker serially runs one task at a time I am using hadoop-0.19.2 thanks for any guidance, - Vasilis -- Harish Mallipeddi http://blog.poundbang.in
RE: Some tasks fail to report status between the end of the map and the beginning of the merge
10 mins reminds me of parameter mapred.task.timeout . This is configurable. Or alternatively you might just do a sysout to let tracker know of its existence ( not an ideal solution though ) Thanks, Amogh -Original Message- From: Mathias De Maré [mailto:mathias.dem...@gmail.com] Sent: Wednesday, August 05, 2009 12:33 PM To: common-user@hadoop.apache.org Subject: Some tasks fail to report status between the end of the map and the beginning of the merge Hi, I'm having some problems (Hadoop 0.20.0) where map tasks fail to report status for 10 minutes and get killed eventually. All of the tasks output around the same amount of data, some only take a few seconds before starting the 'merge' on the segments, but some seem to fail by just stopping to work for about 10 minutes. Several of these failed tasks eventually do succeed, on their 2nd or 3rd task attempt. It's nice to see them succeed eventually, but each of those tasks crawls a few thousand websites, and it seems like a terrible waste to let them retry a few times, in the meantime downloading all of those websites again, after 10 minutes of doing nothing. Even more annoyingly, eventually, one of the tasks fails completely, which then kills the entire job. I could probably increase the amount of task attempts and simply hope the tasks will succeed eventually, but that doesn't solve the huge slowdowns and the recrawling required. Here's an example of a successful task attempt (this is attempt 3 of a specific task -- note that it takes around 8 seconds between spill 133 and spill 134): 2009-08-04 18:38:48,059 INFO org.apache.hadoop.mapred.MapTask: Finished spill 129 2009-08-04 18:39:00,626 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true 2009-08-04 18:39:00,626 INFO org.apache.hadoop.mapred.MapTask: bufstart = 2492163; bufend = 628913; bufvoid = 2988446 2009-08-04 18:39:00,626 INFO org.apache.hadoop.mapred.MapTask: kvstart = 9727; kvend = 7760; length = 9830 2009-08-04 18:39:01,467 INFO org.apache.hadoop.mapred.MapTask: Finished spill 130 2009-08-04 18:39:08,136 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true 2009-08-04 18:39:08,136 INFO org.apache.hadoop.mapred.MapTask: bufstart = 628913; bufend = 1880222; bufvoid = 2988448 2009-08-04 18:39:08,136 INFO org.apache.hadoop.mapred.MapTask: kvstart = 7760; kvend = 5793; length = 9830 2009-08-04 18:39:08,463 INFO org.apache.hadoop.mapred.MapTask: Finished spill 131 2009-08-04 18:39:12,456 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true 2009-08-04 18:39:12,459 INFO org.apache.hadoop.mapred.MapTask: bufstart = 1880222; bufend = 136018; bufvoid = 2988448 2009-08-04 18:39:12,459 INFO org.apache.hadoop.mapred.MapTask: kvstart = 5793; kvend = 3826; length = 9830 2009-08-04 18:39:12,697 INFO org.apache.hadoop.mapred.MapTask: Finished spill 132 2009-08-04 18:39:23,138 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true 2009-08-04 18:39:23,138 INFO org.apache.hadoop.mapred.MapTask: bufstart = 136018; bufend = 1347353; bufvoid = 2988448 2009-08-04 18:39:23,138 INFO org.apache.hadoop.mapred.MapTask: kvstart = 3826; kvend = 1859; length = 9830 2009-08-04 18:39:25,747 INFO org.apache.hadoop.mapred.MapTask: Finished spill 133 2009-08-04 18:47:49,823 INFO org.apache.hadoop.mapred.MapTask: Starting flush of map output 2009-08-04 18:47:50,132 INFO org.apache.hadoop.mapred.MapTask: Finished spill 134 2009-08-04 18:47:50,525 INFO org.apache.hadoop.mapred.Merger: Merging 135 sorted segments 2009-08-04 18:47:50,528 INFO org.apache.hadoop.mapred.Merger: Merging 9 intermediate segments out of a total of 135 2009-08-04 18:47:52,224 INFO org.apache.hadoop.mapred.Merger: Merging 10 intermediate segments out of a total of 127 2009-08-04 18:47:53,837 INFO org.apache.hadoop.mapred.Merger: Merging 10 intermediate segments out of a total of 118 2009-08-04 18:47:55,417 INFO org.apache.hadoop.mapred.Merger: Merging 10 intermediate segments out of a total of 109 2009-08-04 18:47:56,990 INFO org.apache.hadoop.mapred.Merger: Merging 10 intermediate segments out of a total of 100 2009-08-04 18:47:58,492 INFO org.apache.hadoop.mapred.Merger: Merging 10 intermediate segments out of a total of 91 2009-08-04 18:48:00,191 INFO org.apache.hadoop.mapred.Merger: Merging 10 intermediate segments out of a total of 82 2009-08-04 18:48:02,315 INFO org.apache.hadoop.mapred.Merger: Merging 10 intermediate segments out of a total of 73 2009-08-04 18:48:04,184 INFO org.apache.hadoop.mapred.Merger: Merging 10 intermediate segments out of a total of 64 2009-08-04 18:48:06,162 INFO org.apache.hadoop.mapred.Merger: Merging 10 intermediate segments out of a total of 55 2009-08-04 18:48:08,149 INFO org.apache.hadoop.mapred.Merger: Merging 10 intermediate segments out of a total of 46 2009-08-04 18:48:09,888 INFO org.apache.hadoop.mapred.Merger: Merging 10 intermediate segments out of a total of 37 2009-08-04 18:48:11,744
RE: :!
Maybe I'm missing the point, but in terms of execution performance benefit, what does copying to dfs and then compressing to be fed to a map/reduce job provide? Isn't it better to compress offline / outside latency window and make available on dfs? Also, your mapreduce program will launch one map task per compressed file, so make sure you design your compression accordingly. Thanks, Amogh -Original Message- From: Sugandha Naolekar [mailto:sugandha@gmail.com] Sent: Monday, August 03, 2009 12:32 PM To: common-user@hadoop.apache.org Subject: Re: :! dats fine. But, if I place the data in HDFS and then run map reduce code to provide compression, then the data will get compressed in sequence files but, even the original data will reside in the memory;thereby leading or causing a kind of redundancy of data... Can u pls suggest me a way out?/ On Mon, Aug 3, 2009 at 12:07 PM, prashant ullegaddi prashullega...@gmail.com wrote: I don't think you will be able to compress some data unless it's on HDFS. What you can do is 1. Manually compress the data on the machine where the data resides. Then, copy the same to HDFS. or 2. Copy the data without compressing to HDFS, then run a job which just emits the data as it reads in key/value pair. You can set FileOutputFormat.setOutputCompressorClass(job,GzipCodec.class) so that output gets gzipped. Does that solve your problem? btw you didn't exactly specify your data size (how many TBs). On Mon, Aug 3, 2009 at 11:02 AM, Sugandha Naolekar sugandha@gmail.comwrote: Yes, You are right. Here goes the details related:: - I have a Hadoop cluster of 7 nodes. Now there is this 8th machine, which is not a part of the hadoop cluster. - I want to place the data of that machine into the HDFS. Thus, before placing it in HDFS, I want to compress it, and then dump in the HDFS. - I have 4 datanodes in my cluster. also, data might get extended upto tera bytes. - Also, i have set thr replication factor as 2. - I guess, for compression, I will have to run map reduce...? right..please tel me the complete approach that is needed to be followed. On Mon, Aug 3, 2009 at 10:48 AM, prashant ullegaddi prashullega...@gmail.com wrote: By I want to compress the data first and then place it in HDFS, do you mean you want to compress the data locally and then copy to DFS? What's the size of your data? What's the capacity of HDFS? On Mon, Aug 3, 2009 at 10:45 AM, Sugandha Naolekar sugandha@gmail.comwrote: I want to compress the data first and then place it in HDFS. Again, while retrieving the same, I want to uncompress it and place on the desired destination. Can this be possible. How to get started? Also, I want to get started with actual coding part of compression and MAP reduce. PLease suggest me aptly...! -- Regards! Sugandha -- Regards! Sugandha -- Regards! Sugandha
RE: Counting no. of keys.
Have you had a look at the reporter.counter hadoop provides? I think it might be helpful in your case, where in you can locally aggregate for each map task and then push it to global counter. -Original Message- From: Zhong Wang [mailto:wangzhong@gmail.com] Sent: Monday, August 03, 2009 6:31 PM To: common-user@hadoop.apache.org Subject: Re: Counting no. of keys. I have the same question, but i want to use map records number in reduce phase exactly after the map. This is very useful in solving problems like TF-IDF. In reduce (IDF calculating) phase, you must know the total number of all documents. Is there any method to solve the problem without running two Map-Reduce jobs? On Sun, Aug 2, 2009 at 2:08 PM, Ted Dunningted.dunn...@gmail.com wrote: Sure. Write a word count map-reduce program. The mapper outputs the key from the sequence file as the output key and includes a count. Then you do the normal combiner and reducer from a normal word count program. On Sat, Aug 1, 2009 at 9:53 PM, prashant ullegaddi prashullega...@gmail.com wrote: Hi, I've say 800 sequence files written using SequenceFileOutputFormat. Is there any way to know no. of unique keys in those sequence files? Thanks, Prashant. -- Ted Dunning, CTO DeepDyve -- Zhong Wang
RE: map side join
This is particularly useful if your input is the output of another MR job, else is a killer. You may want to write your own mapper in case one of the files to be joined is small enough to fit in memory / can be handled in splits. Thanks, Amogh -Original Message- From: Jason Venner [mailto:jason.had...@gmail.com] Sent: Thursday, July 30, 2009 8:20 PM To: common-user@hadoop.apache.org Subject: Re: map side join The mapside join code builds multiple map tasks, each map task will receive as input one partition from each of your input sources. In your case, your job would have 3 map tasks, and each map task would be receive data from 1 partition in each source file. The mapside join code maintains a reader open for each input file in the input split and produces key value sets via a stream merge sort of the these input data files. The merge is essentially done key by key before the key, value set is presented to the map. Implicit in the mapside join is that the input files are already sorted, so the join code only has to figure out which key is next out of the set of input files in the task. On Wed, Jul 29, 2009 at 8:48 AM, bonito bonito.pe...@gmail.com wrote: Hello, I would like to ask a question regarding the map side join. I am trying to understand the implementation of it and I would be grateful if you could tell me whether there is any I/O cost included. In detail, if we have 2 source files of 3 splits each (so as to ensure the constraints that is, sorted, partitioned etc.) then during map side join these 2 files are merged before the map function takes place. I am trying to comprehend how this merge is done. If I am not mistaken, each pair of corresponding splits is merged at a time. That is, first the splits(1) of both sources are taken into account. How? Is this done in a 'on the fly' fashion (in-memory buffer)? Is there any file locally created? I read the relevant details about the iterators but I wonder about the memory requirements... If each split need to be in-memory stored so as to have an iterator over it, then there should be a requirement of memory space. Thank you! -- View this message in context: http://www.nabble.com/map-side-join-tp24722077p24722077.html Sent from the Hadoop core-user mailing list archive at Nabble.com. -- Pro Hadoop, a book to guide you from beginner to hadoop mastery, http://www.amazon.com/dp/1430219424?tag=jewlerymall www.prohadoopbook.com a community for Hadoop Professionals
RE: Running 145K maps, zero reduces- does Hadoop scale?
What is the use case for this? Especially since you have 0 reducers. Thanks, Amogh -Original Message- From: Saptarshi Guha [mailto:saptarshi.g...@gmail.com] Sent: Friday, July 31, 2009 12:08 PM To: core-u...@hadoop.apache.org Subject: Re: Running 145K maps, zero reduces- does Hadoop scale? In this particular example, the record reader emits a single number per split as both key and value. Regards S On Fri, Jul 31, 2009 at 1:55 AM, Saptarshi Guha saptarshi.g...@gmail.comwrote: Hello, Does Hadoop scale well for 100K+ input splits? I have not tried with sequence files. My custom inputformat, generates 145K splits. The record reader emits about 15 bytes as key and 8 bytes as value. It doesn't do anything else, in fact it doesn't read from disk (basically it emits splitbeginning ... splitend for every split,) So essentially, my inputformat is creating 145K InputSplit objects.(see below) However I got this 09/07/31 01:41:41 INFO mapred.JobClient: Running job: job_200907251335_0005 09/07/31 01:41:42 INFO mapred.JobClient: map 0% reduce 0% 09/07/31 01:43:06 INFO mapred.JobClient: Job complete: job_200907251335_0005 And the job does not end! Hangs here. Very strange. The jobtracker does not respond to web requests. This is on Hadoop 0.20 though am using 0.19.1. api. The master is 64 bit with 4 cores and 16GB ram and not running any tasktrackers. Any pointers would be appreciated Regards Saptarshi //Basically FileInputSplit reworded public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { long n = the_length_of_something ; //==145K long chunkSize = n / (numSplits == 0 ? 1 : numSplits); InputSplit[] splits = new InputSplit[numSplits]; for (int i = 0; i numSplits; i++) { MyInputSplit split; if ((i + 1) == numSplits) split = new MySplit(i * chunkSize, n); else split = new MySplit(i * chunkSize, (i * chunkSize) + chunkSize); splits[i] = split; } return splits; }
RE: Why is single reducer called twice?
the reducer is called a second time to do nothing, before all is done Can you elaborate please? Amogh -Original Message- From: Mark Kerzner [mailto:markkerz...@gmail.com] Sent: Monday, July 27, 2009 8:51 PM To: core-u...@hadoop.apache.org Subject: Why is single reducer called twice? Hi, In my code, I set the number of reducers to 1, conf.setNumReduceTasks(1); and it all works as intended, but I notice that the reducer is called a second time to do nothing, before all is done. Can I use this side effect to close the static resources (like a zip output stream) that I open on the first call? Thank you, Mark
RE: best way to set memory
I haven't played a lot with it, but you may want to check if setting HADOOP_NAMENODE_OPTS, HADOOP_TASKTRACKER_OPTS help. Let me know if you find a way to do this :) Cheers! Amogh -Original Message- From: Fernando Padilla [mailto:f...@alum.mit.edu] Sent: Wednesday, July 22, 2009 9:47 AM To: common-user@hadoop.apache.org Subject: Re: best way to set memory I was thinking not for M/R, but for the actual daemons: When I go and start up a daemon (like below). They all use the same hadoop-env.sh. Which allows you to only set the HADOOP_HEAPSIZE once.. not differently for each daemon-type.. bin/hadoop-daemon.sh start namenode bin/hadoop-daemon.sh start datanode bin/hadoop-daemon.sh start secondarynamenode bin/hadoop-daemon.sh start jobtracker bin/hadoop-daemon.sh start tasktracker Amogh Vasekar wrote: If you need to set the java_options for mem., you can do this via configure in your MR job. -Original Message- From: Fernando Padilla [mailto:f...@alum.mit.edu] Sent: Wednesday, July 22, 2009 9:11 AM To: common-user@hadoop.apache.org Subject: best way to set memory So.. I want to have different memory profiles for NameNode/DataNode/JobTracker/TaskTracker. But it looks like I only have one environment variable to modify, HADOOP_HEAPSIZE, but I might be running more than one on a single box/deployment/conf directory. Is there a proper way to set the memory for each kind of server? Or has an issue been created to document this bug/deficiency??
RE: best way to set memory
If you need to set the java_options for mem., you can do this via configure in your MR job. -Original Message- From: Fernando Padilla [mailto:f...@alum.mit.edu] Sent: Wednesday, July 22, 2009 9:11 AM To: common-user@hadoop.apache.org Subject: best way to set memory So.. I want to have different memory profiles for NameNode/DataNode/JobTracker/TaskTracker. But it looks like I only have one environment variable to modify, HADOOP_HEAPSIZE, but I might be running more than one on a single box/deployment/conf directory. Is there a proper way to set the memory for each kind of server? Or has an issue been created to document this bug/deficiency??
RE: Question about job distribution
Confused. What do you mean by query be distributed over all datanodes or just 1 node . If your data is small enough so that it fits in just one block ( and replicated by hadoop ), then just one task will be run ( assuming default input split). If the data is spread across multiple blocks, you can make it run on just one compute node by setting your input split to be large enough ( yes there are use cases for this when whole data is to be fed to a single mapper ). Else, the job will be scheduled on numerous nodes with each getting a block / chunk ( input split size set ) of your actual data. The nodes picked for running your job depends on data-locality to reduce network latency. Thanks, Amogh -Original Message- From: Divij Durve [mailto:divij.t...@gmail.com] Sent: Wednesday, July 15, 2009 2:32 AM To: common-user@hadoop.apache.org; core-u...@hadoop.apache.org Subject: Question about job distribution If i have a query that i would normally fire on a database and i want o fire that using the data loaded into multiple nodes on hadoop. Will the query be distributed over all the datanodes so it returns results faster or will it just send it to 1 node? If so is there a way to get it to distribute the query instead of sending it to 1 node? Thanks Divij
RE: Question regarding Map side Join
Yes it is. However, I assume file 2 is comparatively small to be distributed across all computing nodes without much delay, else the whole point of map side join is defeated. If keys in file 2 are unique, it is a simple lookup you need to implement. Else iterate over them to implement the join. -Original Message- From: Pankil Doshi [mailto:forpan...@gmail.com] Sent: Tuesday, July 14, 2009 4:49 AM To: core-u...@hadoop.apache.org Subject: Question regarding Map side Join I have question regarding Mapside Join. Finally I got a copy of your book.I tried Implementing it. and I have few Questions on it. File 1: 31Rafferty 33Jones 33Steinberg 34Robinson 34Smith nullJasper File 2: 31sales 33Engg 34Clerical 35Marketing Results I got using mapside join File1 inner join with File2 31Rafferty 31sales 33Jones 33Engg 33Steinberg 33Engg File2 inner join with File1 31sales 31Rafferty 33Engg 33Jones 33Engg 33Steinberg 34Clerical 34Robinson 34Clerical 34Smith But I am looking some result like below: 31salesRafferty 33EnggJones 33EnggSteinberg 34ClericalRobinson 34ClericalSmith Is it possible using map-side join only?? I am looking simple join such that key values present in both files . Pankil