Recovering the namenode from failure
I'm on CDH4, and trying to recover both the namenode and cloudera manager VMs from HDFS after losing the namenode. All of our backup VMs are on HDFS, so for the moment I just want to hack something together, copy the backup VMs off HDFS and get on with properly reconfiguring via CDH Manger. So I've installed a plain 'ol namenode on one of my cluster nodes and started it with -importCheckpoint (with the data from the secondary NN), this seems to have worked, I have a namenode web UI up which expects to find 32178 blocks. But my plain namenode (on the same hostname and IP as the old namenode) says that there are no datanodes in the cluster. What do I need in order to configure the datanodes to report their blocks into this new namenode (same IP & hostname)? Thanks, David
RE: About configuring cluster setup
We have a box that's a bit overpowered for just running our namenode and jobtracker on a 10-node cluster and we also wanted to make use of the storage and processor resources of that node, like you. What we did is use LXC containers to segregate the different processes. LXC is a very light weight psudo-virtualization platform for linux (near 0 overhead). The key benefit to LXC, in this case, is that we can use linux cgroups (standard, simple config in LXC) to specify that the container/VM running the namenode/jobtracker should have 10x the CPU and IO resources than the container that runs a tasktracker/data node (though since LXC containers all run under the same kernel, any "unused" resources are assigned to runnable processes). We run cloudera hadoop and deployed a slightly modified tasktracker configuration on the shared box (fewer task slots so as to not over utilize memory). That tasktracker doesn't do as much work as the other dedicated nodes, but it does a fair share, and the cgroup configurations (cpu.shares & blkio.weight for the curious) ensure that the bulk processing doesn't interfere with the critical namenode & jobtracker systems. From: Robert Dyer [mailto:psyb...@gmail.com] Sent: Tuesday, May 14, 2013 11:23 PM To: user@hadoop.apache.org Subject: Re: About configuring cluster setup You can, however note that unless you also run a TaskTracker on that node (bad idea) then any blocks that are replicated to this node won't be available as input to MapReduces and you are lowering the odds of having data locality on those blocks. On Tue, May 14, 2013 at 2:01 AM, Ramya S wrote: Hi, Can we configure 1 node as both Name node and Data node ?
RE: JobClient: Error reading task output - after instituting a DNS server
So simple I was hoping to avoid admitting to it. ;-) I had set the tasks java options at -Xmx1.5g, that needed to be -Xmx1500m, the telltale output of a mistake like that is rather tricky to find, I had to dig into the task tracker UI/logs, it doesn't show up on the job tracker's normal logs. The timing perfectly coincided with a DNS change, and Googles first hit, on the error that I *could* see in the jobtracker logs, suggested DNS, so I went down that rabbit hole for quite a while. Dave From: Shahab Yunus [mailto:shahab.yu...@gmail.com] Sent: Tuesday, May 14, 2013 6:56 PM To: user@hadoop.apache.org Subject: Re: JobClient: Error reading task output - after instituting a DNS server HI David. an you explain in a bit more detail what was the issue? Thanks. Shahab On Tue, May 14, 2013 at 2:29 AM, David Parks wrote: I just hate it when I figure out a problem right after asking for help. Finding the task logs via the task tracker website identified the problem which didn't show up elsewhere. Simple mis-configuration which I did concurrently with the DNS update that threw me off track. Dave From: David Parks [mailto:davidpark...@yahoo.com] Sent: Tuesday, May 14, 2013 1:20 PM To: user@hadoop.apache.org Subject: JobClient: Error reading task output - after instituting a DNS server So we just configured a local DNS server for hostname resolution and stopped using a hosts file and now jobs fail on us. But I can't figure out why. You can see the error below, but if I run curl to any of those URLs they come back "Failed to retrieve stdout log", which doesn't look much like a DNS issue. I can ping and do nslookup from any host to any other host. This is a CDH4 cluster and the host inspector is happy as could be; also Cloudera Manager indicates all is well. When I open the task tracker website I see the first task attempt show up on the site there for maybe 10 seconds or so before it fails. Any idea what I need to look at here? Job: 13/05/14 05:13:40 INFO input.FileInputFormat: Total input paths to process : 131 13/05/14 05:13:41 INFO input.FileInputFormat: Total input paths to process : 1 13/05/14 05:13:42 INFO mapred.JobClient: Running job: job_201305131758_0003 13/05/14 05:13:43 INFO mapred.JobClient: map 0% reduce 0% 13/05/14 05:13:47 INFO mapred.JobClient: Task Id : attempt_201305131758_0003_m_000353_0, Status : FAILED java.lang.Throwable: Child Error at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:250) Caused by: java.io.IOException: Task process exit with nonzero status of 1. at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:237) 13/05/14 05:13:47 WARN mapred.JobClient: Error reading task outputhttp://hadoop-fullslot2:50060/tasklog?plaintext=true&attemptid=attempt _201305131758_0003_m_000353_0&filter=stdout 13/05/14 05:13:47 WARN mapred.JobClient: Error reading task outputhttp://hadoop-fullslot2:50060/tasklog?plaintext=true&attemptid=attempt _201305131758_0003_m_000353_0&filter=stderr 13/05/14 05:13:50 INFO mapred.JobClient: Task Id : attempt_201305131758_0003_r_000521_0, Status : FAILED java.lang.Throwable: Child Error at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:250) Caused by: java.io.IOException: Task process exit with nonzero status of 1. at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:237) 13/05/14 05:13:50 WARN mapred.JobClient: Error reading task outputhttp://hadoop-fullslot2:50060/tasklog?plaintext=true&attemptid=attempt _201305131758_0003_r_000521_0&filter=stdout 13/05/14 05:13:50 WARN mapred.JobClient: Error reading task outputhttp://hadoop-fullslot2:50060/tasklog?plaintext=true&attemptid=attempt _201305131758_0003_r_000521_0&filter=stderr curl of above URL: davidparks21@hadoop-meta1:~$ curl 'http://hadoop-fullslot2:50060/tasklog?plaintext=true <http://hadoop-fullslot2:50060/tasklog?plaintext=true&attemptid=attempt_2013 05131758_0003_m_000353_0&filter=stdout> &attemptid=attempt_201305131758_0003_m_000353_0&filter=stdout' Error 410 Failed to retrieve stdout log for task: attempt_201305131758_0003_m_000353_0 HTTP ERROR 410 Problem accessing /tasklog. Reason: Failed to retrieve stdout log for task: attempt_201305131758_0003_m_000353_0Powered by Jetty://
RE: JobClient: Error reading task output - after instituting a DNS server
I just hate it when I figure out a problem right after asking for help. Finding the task logs via the task tracker website identified the problem which didn't show up elsewhere. Simple mis-configuration which I did concurrently with the DNS update that threw me off track. Dave From: David Parks [mailto:davidpark...@yahoo.com] Sent: Tuesday, May 14, 2013 1:20 PM To: user@hadoop.apache.org Subject: JobClient: Error reading task output - after instituting a DNS server So we just configured a local DNS server for hostname resolution and stopped using a hosts file and now jobs fail on us. But I can't figure out why. You can see the error below, but if I run curl to any of those URLs they come back "Failed to retrieve stdout log", which doesn't look much like a DNS issue. I can ping and do nslookup from any host to any other host. This is a CDH4 cluster and the host inspector is happy as could be; also Cloudera Manager indicates all is well. When I open the task tracker website I see the first task attempt show up on the site there for maybe 10 seconds or so before it fails. Any idea what I need to look at here? Job: 13/05/14 05:13:40 INFO input.FileInputFormat: Total input paths to process : 131 13/05/14 05:13:41 INFO input.FileInputFormat: Total input paths to process : 1 13/05/14 05:13:42 INFO mapred.JobClient: Running job: job_201305131758_0003 13/05/14 05:13:43 INFO mapred.JobClient: map 0% reduce 0% 13/05/14 05:13:47 INFO mapred.JobClient: Task Id : attempt_201305131758_0003_m_000353_0, Status : FAILED java.lang.Throwable: Child Error at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:250) Caused by: java.io.IOException: Task process exit with nonzero status of 1. at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:237) 13/05/14 05:13:47 WARN mapred.JobClient: Error reading task outputhttp://hadoop-fullslot2:50060/tasklog?plaintext=true&attemptid=attempt _201305131758_0003_m_000353_0&filter=stdout 13/05/14 05:13:47 WARN mapred.JobClient: Error reading task outputhttp://hadoop-fullslot2:50060/tasklog?plaintext=true&attemptid=attempt _201305131758_0003_m_000353_0&filter=stderr 13/05/14 05:13:50 INFO mapred.JobClient: Task Id : attempt_201305131758_0003_r_000521_0, Status : FAILED java.lang.Throwable: Child Error at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:250) Caused by: java.io.IOException: Task process exit with nonzero status of 1. at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:237) 13/05/14 05:13:50 WARN mapred.JobClient: Error reading task outputhttp://hadoop-fullslot2:50060/tasklog?plaintext=true&attemptid=attempt _201305131758_0003_r_000521_0&filter=stdout 13/05/14 05:13:50 WARN mapred.JobClient: Error reading task outputhttp://hadoop-fullslot2:50060/tasklog?plaintext=true&attemptid=attempt _201305131758_0003_r_000521_0&filter=stderr curl of above URL: davidparks21@hadoop-meta1:~$ curl 'http://hadoop-fullslot2:50060/tasklog?plaintext=true&attemptid=attempt_2013 05131758_0003_m_000353_0&filter=stdout' Error 410 Failed to retrieve stdout log for task: attempt_201305131758_0003_m_000353_0 HTTP ERROR 410 Problem accessing /tasklog. Reason: Failed to retrieve stdout log for task: attempt_201305131758_0003_m_000353_0Powered by Jetty://
RE: Access HDFS from OpenCL
Hadoop just runs as a standard java process, you should find something that bridges between OpenCL and java, a quick google search yields: http://www.jocl.org/ I expect that you'll find everything you need to accomplish the handoff from your mapreduce code to OpenCL there. As for HDFS, hadoop will generally handle marshaling data in/out of HDFS, remember that you're thinking of your problem in terms of KEY,VALUE pairs, you're going to implement a map(.) and reduce(.) method and in those methods you'll pass the data to/from OpenCL via the OpenCL java bindings. It's quite common to need to run multiple map/reduce steps to accomplish an end goal. David From: rohit sarewar [mailto:rohitsare...@gmail.com] Sent: Monday, May 13, 2013 8:35 PM To: user@hadoop.apache.org Subject: Access HDFS from OpenCL Hi All My data set resides in HDFS. I need to compute 5 metrics, among which 2 are compute intensive. So I want to compute those 2 metrics on GPU using OpenCL and the rest 3 metrics using java map reduce code on Hadoop. How can I pass data from HDFS to GPU ? or How can my opencl code access data from HDFS ? How can I trigger OpenCL codes from my Java map reduce codes ? It would be great if someone could share a sample code. Thanks& Regards Rohit Sarewar
Using FairScheduler to limit # of tasks
Can I use the FairScheduler to limit the number of map/reduce tasks directly from the job configuration? E.g. I have 1 job that I know should run a more limited # of map/reduce tasks than is set as the default, I want to configure a queue with a limited # of map/reduce tasks, but only apply it to that job, I don't want to deploy this queue configuration to the cluster. Assuming the above answer is 'yes', if I were to limit the # of map tasks to 10 in a cluster of 10 nodes, would the fair scheduler tend to distribute those 10 map tasks evenly across the nodes (assuming a cluster that's otherwise unused at the moment), or would it be prone to over-loading a single node just because those are the first open slots it sees? David
600s timeout during copy phase of job
I have a job that's getting 600s task timeouts during the copy phase of the reduce step. I see a lot of copy tasks all moving at about 2.5MB/sec, and it's taking longer than 10 min to do that copy. The process starts copying when the reduce step is 80% complete. This is a very IO bound task as I'm just joining 1.5TB of data via 2 map/reduce steps on 6 nodes (each node has 1x 4TB disk, and 24GB of ram). What should I be thinking in terms of fixing this? . Increase timeout? (seems odd that it would timeout on the internal copy) . Reduce # tasks? (I've got 8 reducers, 1-per-core, 25 io.sort.factor & 256 io.sort.mb) o Can I do that per job?? . Increase copy threads? . Don't start the reducers until 100% complete on the mappers?
What's the best disk configuration for hadoop? SSD's Raid levels, etc?
We've got a cluster of 10x 8core/24gb nodes, currently with 1 4TB disk (3 disk slots max), they chug away ok currently, only slightly IO bound on average. I'm going to upgrade the disk configuration at some point (we do need more space on HDFS) and I'm thinking about what's best hardware-wise: . Would it be wise to use one of the three disk slots for a 1TB SSD? I wouldn't use it for HDFS, but for map-output and sorting it might make a big difference no? . If I put in either 1 or 2 more 4TB disks for HDFS, should I RAID-0 them for speed, or will HDFS balance well across multiple partitions on its own? . Would anyone suggest 3 4TB disks and a RAID-5 configuration to guard against disk replacements over the above options? Dave
RE: Uploading file to HDFS
I just realized another trick you might trying. The Hadoop dfs client can read input from STDIN, you could use netcat to pipe the stuff across to HDFS without hitting the hard drive, I haven’t tried it, but here’s what I would think might work: On the Hadoop box, open a listening port and feed that to the HDFS command: nc -l 2342 | hdfs dfs -copyFromLocal - /tmp/x.txt On the remote server: cat my_big_2tb_file > nc 10.1.1.1 2342 I haven’t tried it yet, but in theory this would work. I just happened to test out the hdfs dfs command reading from stdin. You might have to correct the above syntax, I just wrote it off the top of my head. Dave From: 超级塞亚人 [mailto:shel...@gmail.com] Sent: Friday, April 19, 2013 11:35 AM To: user@hadoop.apache.org Subject: Uploading file to HDFS I have a problem. Our cluster has 32 nodes. Each disk is 1TB. I wanna upload 2TB file to HDFS.How can I put the file to the namenode and upload to HDFS?
RE: Uploading file to HDFS
I think the problem here is that he doesn't have Hadoop installed on this other location so there's no Hadoop DFS client to do the put directly into HDFS on, he would normally copy the file to one of the nodes in the cluster where the client files are installed. I've had the same problem recently. I've tried setting up dfs-hdfs-proxy, though I must say that it's been crashing when I try to put modest to large files through it (but I've got a thread going with the developer on that issue). That, or one of the other remote mount options might work well. https://www.google.com/url?sa=t&rct=j&q=&esrc=s&source=web&cd=1&cad=rja&ved= 0CDQQFjAA&url=http%3A%2F%2Fwiki.apache.org%2Fhadoop%2FMountableHDFS&ei=T-pwU Y74A8jPrQfYooHoBw&usg=AFQjCNEQbxmrMGKAETj3FPEw3Lr1PBHz-w&sig2=4JpEzZ_8IAyJ-N PofSRmMg&bvm=bv.45373924,d.bmk You could also install Hadoop on the box that has the 2TB file (I realize that you might not control it or want to do that depending on the configuration). A remote NFS mount that you can access from one of the Hadoop boxes... ? Split up the file into smaller pieces? There are some ideas. I'd love to hear your final solution as I've also been having fits getting into HDFS from outside the Hadoop environment. I wish it natively supported NFS mounts or some light weight/easy to install remote DFS tools. Dave -Original Message- From: Harsh J [mailto:ha...@cloudera.com] Sent: Friday, April 19, 2013 1:40 PM To: Subject: Re: Uploading file to HDFS Can you not simply do a fs -put from the location where the 2 TB file currently resides? HDFS should be able to consume it just fine, as the client chunks them into fixed size blocks. On Fri, Apr 19, 2013 at 10:05 AM, 超级塞亚人 wrote: > I have a problem. Our cluster has 32 nodes. Each disk is 1TB. I wanna > upload 2TB file to HDFS.How can I put the file to the namenode and upload to HDFS? -- Harsh J
Mapreduce jobs to download job input from across the internet
For a set of jobs to run I need to download about 100GB of data from the internet (~1000 files of varying sizes from ~10 different domains). Currently I do this in a simple linux script as it's easy to script FTP, curl, and the like. But it's a mess to maintain a separate server for that process. I'd rather it run in mapreduce. Just give it a bill of materials and let it go about downloading it, retrying as necessary to deal with iffy network conditions. I wrote one such job to craw images we need to acquire, and it was the royalist of royal pains. I wonder if there are any good approaches to this kind of data acquisition task in Hadoop. It would certainly be nicer just to schedule a data-acquisition job ahead of the processing jobs in Oozie rather than try to maintain synchronization between the download processes and the jobs. Ideas?
RE: Hadoop distcp from CDH4 to Amazon S3 - Improve Throughput
4-20MB/sec are common transfer rates from S3 to *1* local AWS box, this was, of course, a cluster, and s3distcp is specifically designed to take advantage of the cluster, so it was a 45 minute job to transfer the 1.5 TB to the full cluster of, I forget how many servers I had at the time, maybe 15-30 m1.xlarge. The numbers are rough, I could be mistaken and it was 1 ½ hours to do the transfer (but I recall 45 min), in either case the s3distcp job ran longer than the task timeout period, which was the real point I was focusing on. I seem to recall needing to re-package their jar as well, but for different reasons, they package in some other open source utilities and I had version conflicts, so might want to watch for that. Ive never seen this ProgressableResettableBufferedFileInputStream, so I cant offer much more advise on that one. Good luck! Let us know how it turns out. Dave From: Himanish Kushary [mailto:himan...@gmail.com] Sent: Friday, March 29, 2013 9:57 PM To: user@hadoop.apache.org Subject: Re: Hadoop distcp from CDH4 to Amazon S3 - Improve Throughput Yes you are right CDH4 is the 2.x line, but I even checked in the javadocs for 1.0.4 branch (could not find 1.0.3 API's so used http://hadoop.apache.org/docs/r1.0.4/api/index.html) but did not find the "ProgressableResettableBufferedFileInputStream" class.Not sure how it is present in the hadoop-core.jar in Amazon EMR. In the meantime I have come out with a dirty workaround by extracting the class from the Amazon jar and packaging it into its own separate jar.I am actually able to run the s3distcp now on local CDH4 using amazon's jar and transfer from my local hadoop to Amazon S3. But the real issue is the throughput. You mentioned that you had transferred 1.5 TB in 45 mins which comes to around 583 MB/s. I am barely getting 4 MB/s upload speed !! How did you get 100x times speed compared to me ? Could you please share any settings/tweaks that you may have done to achieve this. Were you on some very specific high bandwidth network ? Was is between HDFS on EC2 and amazon S3 ? Looking forward to hear from you. Thanks Himanish On Fri, Mar 29, 2013 at 10:34 AM, David Parks wrote: CDH4 can be either 1.x or2.x hadoop, are you using the 2.x line? I've used it primarily with 1.0.3, which is what AWS uses, so I presume that's what it's tested on. Himanish Kushary wrote: Thanks Dave. I had already tried using the s3distcp jar. But got stuck on the below error,which made me think that this is something specific to Amazon hadoop distribution. Exception in thread "Thread-28" java.lang.NoClassDefFoundError: org/apache/hadoop/fs/s3native/ProgressableResettableBufferedFileInputStream Also, I noticed that the Amazon EMR hadoop-core.jar has this class but it is not present on the CDH4 (my local env) hadoop jars. Could you suggest how I could get around this issue. One option could be using the amazon specific jars but then probably I would need to get all the jars ( else it could cause version mismatch errors for HDFS - NoSuchMethodError etc etc ) Appreciate your help regarding this. - Himanish On Fri, Mar 29, 2013 at 1:41 AM, David Parks wrote: None of that complexity, they distribute the jar publicly (not the source, but the jar). You can just add this to your libjars: s3n://region.elasticmapreduce/libs/s3distcp/latest/s3distcp.jar No VPN or anything, if you can access the internet you can get to S3. Follow their docs here: http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s 3distcp.html Doesnt matter where youre Hadoop instance is running. Heres an example of code/parameters I used to run it from within another Tool, its a Tool, so its actually designed to run from the Hadoop command line normally. ToolRunner.run(getConf(), new S3DistCp(), new String[] { "--src", "/frugg/image-cache-stage2/", "--srcPattern", ".*part.*", "--dest","s3n://fruggmapreduce/results-"+env+"/" + JobUtils.isoDate + "/output/itemtable/", "--s3Endpoint", "s3.amazonaws.com" }); Watch the srcPattern, make sure you have that leading `.*`, that one threw me for a loop once. Dave From: Himanish Kushary [mailto:himan...@gmail.com] Sent: Thursday, March 28, 2013 5:51 PM To: user@hadoop.apache.org Subject: Re: Hadoop distcp from CDH4 to Amazon S3 - Improve Throughput Hi Dave, Thanks for your reply. Our hadoop instance is inside our corporate LAN.Could you please provide some details on how i could use the s3distcp from amazon to transfer data from our on-premises hadoop to amazon s3. Wouldn't some kind of VPN be needed between the Amazon EMR instance and our on-premises hadoop
Re: Hadoop distcp from CDH4 to Amazon S3 - Improve Throughput
CDH4 can be either 1.x or2.x hadoop, are you using the 2.x line? I've used it primarily with 1.0.3, which is what AWS uses, so I presume that's what it's tested on. Himanish Kushary wrote: >Thanks Dave. > > >I had already tried using the s3distcp jar. But got stuck on the below >error,which made me think that this is something specific to Amazon hadoop >distribution. > > >Exception in thread "Thread-28" java.lang.NoClassDefFoundError: >org/apache/hadoop/fs/s3native/ProgressableResettableBufferedFileInputStream > > >Also, I noticed that the Amazon EMR hadoop-core.jar has this class but it is >not present on the CDH4 (my local env) hadoop jars. > > >Could you suggest how I could get around this issue. One option could be using >the amazon specific jars but then probably I would need to get all the jars ( >else it could cause version mismatch errors for HDFS - NoSuchMethodError etc >etc ) > > >Appreciate your help regarding this. > > >- Himanish > > > > >On Fri, Mar 29, 2013 at 1:41 AM, David Parks wrote: > >None of that complexity, they distribute the jar publicly (not the source, but >the jar). You can just add this to your libjars: >s3n://region.elasticmapreduce/libs/s3distcp/latest/s3distcp.jar > > > >No VPN or anything, if you can access the internet you can get to S3. > > > >Follow their docs here: >http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s3distcp.html > > > >Doesn’t matter where you’re Hadoop instance is running. > > > >Here’s an example of code/parameters I used to run it from within another >Tool, it’s a Tool, so it’s actually designed to run from the Hadoop command >line normally. > > > > ToolRunner.run(getConf(), new S3DistCp(), new String[] { > > "--src", "/frugg/image-cache-stage2/", > > "--srcPattern", ".*part.*", > > "--dest", "s3n://fruggmapreduce/results-"+env+"/" + >JobUtils.isoDate + "/output/itemtable/", > > "--s3Endpoint", "s3.amazonaws.com" }); > > > >Watch the “srcPattern”, make sure you have that leading `.*`, that one threw >me for a loop once. > > > >Dave > > > > > >From: Himanish Kushary [mailto:himan...@gmail.com] >Sent: Thursday, March 28, 2013 5:51 PM >To: user@hadoop.apache.org >Subject: Re: Hadoop distcp from CDH4 to Amazon S3 - Improve Throughput > > > >Hi Dave, > > > >Thanks for your reply. Our hadoop instance is inside our corporate LAN.Could >you please provide some details on how i could use the s3distcp from amazon to >transfer data from our on-premises hadoop to amazon s3. Wouldn't some kind of >VPN be needed between the Amazon EMR instance and our on-premises hadoop >instance ? Did you mean use the jar from amazon on our local server ? > > > >Thanks > >On Thu, Mar 28, 2013 at 3:56 AM, David Parks wrote: > >Have you tried using s3distcp from amazon? I used it many times to transfer >1.5TB between S3 and Hadoop instances. The process took 45 min, well over the >10min timeout period you’re running into a problem on. > > > >Dave > > > > > >From: Himanish Kushary [mailto:himan...@gmail.com] >Sent: Thursday, March 28, 2013 10:54 AM >To: user@hadoop.apache.org >Subject: Hadoop distcp from CDH4 to Amazon S3 - Improve Throughput > > > >Hello, > > > >I am trying to transfer around 70 GB of files from HDFS to Amazon S3 using the >distcp utility.There are aaround 2200 files distributed over 15 >directories.The max individual file size is approx 50 MB. > > > >The distcp mapreduce job keeps on failing with this error > > > >"Task attempt_201303211242_0260_m_05_0 failed to report status for 600 >seconds. Killing!" > > > >and in the task attempt logs I can see lot of INFO messages like > > > >"INFO org.apache.commons.httpclient.HttpMethodDirector: I/O exception >(java.io.IOException) caught when processing request: Resetting to invalid >mark" > > > >I am thinking either transferring individual folders instead of the entire 70 >GB folders as a workaround or as another option increasing the >"mapred.task.timeout" parameter to something like 6-7 hour ( as the avg rate >of transfer to S3 seems to be 5 MB/s).Is there any other better option to >increase the throughput for transferring bulk data from HDFS to S3 ? Looking >forward for suggestions. > > > > > >-- >Thanks & Regards >Himanish > > > > > >-- >Thanks & Regards >Himanish > > > > >-- >Thanks & Regards >Himanish >
RE: Which hadoop installation should I use on ubuntu server?
Ive never used the Cloudera distributions, but you cant not hear about them. Is it really much easier to manage the whole platform using clouderas manager? 50 nodes free is generous enough that Id feel comfortable committing to them as a platform (and thus the future potential cost), I think. My only real experience comes from AWSs environment, which, other than having a dedicated DFS, and launching jobs via their steps process, they seem like a pretty straight forward Hadoop configuration. Dave From: Håvard Wahl Kongsgård [mailto:haavard.kongsga...@gmail.com] Sent: Friday, March 29, 2013 3:21 PM To: user Subject: Re: Which hadoop installation should I use on ubuntu server? I recommend cloudera's CDH4 on ubuntu 12.04 LTS On Thu, Mar 28, 2013 at 7:07 AM, David Parks wrote: Im moving off AWS MapReduce to our own cluster, Im installing Hadoop on Ubuntu Server 12.10. I see a .deb installer and installed that, but it seems like files are all over the place `/usr/share/Hadoop`, `/etc/hadoop`, `/usr/bin/hadoop`. And the documentation is a bit harder to follow: http://hadoop.apache.org/docs/r1.1.2/cluster_setup.html So I just wonder if this installer is the best approach, or if itll be easier/better to just install the basic build in /opt/hadoop and perhaps the docs become easier to follow. Thoughts? Thanks, Dave -- Håvard Wahl Kongsgård Data Scientist Faculty of Medicine & Department of Mathematical Sciences NTNU http://havard.dbkeeping.com/
RE: Which hadoop installation should I use on ubuntu server?
Hmm, seems intriguing. I'm still not totally clear on bigtop here. It seems like they're creating and maintain basically an installer for Hadoop? I tried following their docs for Ubuntu, but just get a 404 error on the first step, so it makes me wonder how reliable that project is. https://cwiki.apache.org/confluence/display/BIGTOP/How+to+install+Hadoop+dis tribution+from+Bigtop Has anyone actually used bigtop to deploy Hadoop in a production environment? From: Nitin Pawar [mailto:nitinpawar...@gmail.com] Sent: Thursday, March 28, 2013 1:22 PM To: user@hadoop.apache.org Subject: Re: Which hadoop installation should I use on ubuntu server? apache bigtop has builds done for ubuntu you can check them at jenkins mentioned on bigtop.apache.org On Thu, Mar 28, 2013 at 11:37 AM, David Parks wrote: I'm moving off AWS MapReduce to our own cluster, I'm installing Hadoop on Ubuntu Server 12.10. I see a .deb installer and installed that, but it seems like files are all over the place `/usr/share/Hadoop`, `/etc/hadoop`, `/usr/bin/hadoop`. And the documentation is a bit harder to follow: http://hadoop.apache.org/docs/r1.1.2/cluster_setup.html So I just wonder if this installer is the best approach, or if it'll be easier/better to just install the basic build in /opt/hadoop and perhaps the docs become easier to follow. Thoughts? Thanks, Dave -- Nitin Pawar
RE: Hadoop distcp from CDH4 to Amazon S3 - Improve Throughput
None of that complexity, they distribute the jar publicly (not the source, but the jar). You can just add this to your libjars: s3n://region.elasticmapreduce/libs/s3distcp/latest/s3distcp.jar No VPN or anything, if you can access the internet you can get to S3. Follow their docs here: http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s 3distcp.html Doesn't matter where you're Hadoop instance is running. Here's an example of code/parameters I used to run it from within another Tool, it's a Tool, so it's actually designed to run from the Hadoop command line normally. ToolRunner.run(getConf(), new S3DistCp(), new String[] { "--src", "/frugg/image-cache-stage2/", "--srcPattern", ".*part.*", "--dest","s3n://fruggmapreduce/results-"+env+"/" + JobUtils.isoDate + "/output/itemtable/", "--s3Endpoint", "s3.amazonaws.com" }); Watch the "srcPattern", make sure you have that leading `.*`, that one threw me for a loop once. Dave From: Himanish Kushary [mailto:himan...@gmail.com] Sent: Thursday, March 28, 2013 5:51 PM To: user@hadoop.apache.org Subject: Re: Hadoop distcp from CDH4 to Amazon S3 - Improve Throughput Hi Dave, Thanks for your reply. Our hadoop instance is inside our corporate LAN.Could you please provide some details on how i could use the s3distcp from amazon to transfer data from our on-premises hadoop to amazon s3. Wouldn't some kind of VPN be needed between the Amazon EMR instance and our on-premises hadoop instance ? Did you mean use the jar from amazon on our local server ? Thanks On Thu, Mar 28, 2013 at 3:56 AM, David Parks wrote: Have you tried using s3distcp from amazon? I used it many times to transfer 1.5TB between S3 and Hadoop instances. The process took 45 min, well over the 10min timeout period you're running into a problem on. Dave From: Himanish Kushary [mailto:himan...@gmail.com] Sent: Thursday, March 28, 2013 10:54 AM To: user@hadoop.apache.org Subject: Hadoop distcp from CDH4 to Amazon S3 - Improve Throughput Hello, I am trying to transfer around 70 GB of files from HDFS to Amazon S3 using the distcp utility.There are aaround 2200 files distributed over 15 directories.The max individual file size is approx 50 MB. The distcp mapreduce job keeps on failing with this error "Task attempt_201303211242_0260_m_05_0 failed to report status for 600 seconds. Killing!" and in the task attempt logs I can see lot of INFO messages like "INFO org.apache.commons.httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when processing request: Resetting to invalid mark" I am thinking either transferring individual folders instead of the entire 70 GB folders as a workaround or as another option increasing the "mapred.task.timeout" parameter to something like 6-7 hour ( as the avg rate of transfer to S3 seems to be 5 MB/s).Is there any other better option to increase the throughput for transferring bulk data from HDFS to S3 ? Looking forward for suggestions. -- Thanks & Regards Himanish -- Thanks & Regards Himanish
RE: Hadoop distcp from CDH4 to Amazon S3 - Improve Throughput
Have you tried using s3distcp from amazon? I used it many times to transfer 1.5TB between S3 and Hadoop instances. The process took 45 min, well over the 10min timeout period you're running into a problem on. Dave From: Himanish Kushary [mailto:himan...@gmail.com] Sent: Thursday, March 28, 2013 10:54 AM To: user@hadoop.apache.org Subject: Hadoop distcp from CDH4 to Amazon S3 - Improve Throughput Hello, I am trying to transfer around 70 GB of files from HDFS to Amazon S3 using the distcp utility.There are aaround 2200 files distributed over 15 directories.The max individual file size is approx 50 MB. The distcp mapreduce job keeps on failing with this error "Task attempt_201303211242_0260_m_05_0 failed to report status for 600 seconds. Killing!" and in the task attempt logs I can see lot of INFO messages like "INFO org.apache.commons.httpclient.HttpMethodDirector: I/O exception (java.io.IOException) caught when processing request: Resetting to invalid mark" I am thinking either transferring individual folders instead of the entire 70 GB folders as a workaround or as another option increasing the "mapred.task.timeout" parameter to something like 6-7 hour ( as the avg rate of transfer to S3 seems to be 5 MB/s).Is there any other better option to increase the throughput for transferring bulk data from HDFS to S3 ? Looking forward for suggestions. -- Thanks & Regards Himanish
Which hadoop installation should I use on ubuntu server?
I'm moving off AWS MapReduce to our own cluster, I'm installing Hadoop on Ubuntu Server 12.10. I see a .deb installer and installed that, but it seems like files are all over the place `/usr/share/Hadoop`, `/etc/hadoop`, `/usr/bin/hadoop`. And the documentation is a bit harder to follow: http://hadoop.apache.org/docs/r1.1.2/cluster_setup.html So I just wonder if this installer is the best approach, or if it'll be easier/better to just install the basic build in /opt/hadoop and perhaps the docs become easier to follow. Thoughts? Thanks, Dave
RE: For a new installation: use the BackupNode or the CheckPointNode?
Thanks for the update, I understand now that I'll be installing a "secondary name node" which performs checkpoints on the primary name node and keeps a working backup copy of the fsimage file. The primary name node should write its fsimage file to at least 2 different physical mediums for improved safety as well (example: locally and an nfs share). One point of query: were the primary name node to be lost, we would be best off re-building it and copying the fsimage files into place, either from the nfs share, or from the secondary name node, as the situation dictates. There's no mechanism to "fail over" to the "secondary name node" per-se. Am I on track here? Thanks! David -Original Message- From: Konstantin Shvachko [mailto:shv.had...@gmail.com] Sent: Wednesday, March 27, 2013 5:07 AM To: user@hadoop.apache.org Cc: davidpark...@yahoo.com Subject: Re: For a new installation: use the BackupNode or the CheckPointNode? There is no BackupNode in Hadoop 1. That was a bug in documentation. Here is the updated link: http://hadoop.apache.org/docs/r1.1.2/hdfs_user_guide.html Thanks, --Konstantin On Sat, Mar 23, 2013 at 12:04 AM, varun kumar wrote: > Hope below link will be useful.. > > http://hadoop.apache.org/docs/stable/hdfs_user_guide.html > > > On Sat, Mar 23, 2013 at 12:29 PM, David Parks > wrote: >> >> For a new installation of the current stable build (1.1.2 ), is there >> any reason to use the CheckPointNode over the BackupNode? >> >> >> >> It seems that we need to choose one or the other, and from the docs >> it seems like the BackupNode is more efficient in its processes. > > > > > -- > Regards, > Varun Kumar.P
RE:
Can I suggest an answer of "Yes, but you probably don't want to"? As a "typical user" of Hadoop you would not do this. Hadoop already chooses the best server to do the work based on the location of the data (a server that is available to do work and also has the data locally will generally be assigned to do that work). There are a couple of mechanisms for which you can do this. Neither of which I'm terribly familiar with so I'll just provide a brief introduction and you can research more deeply and ask more pointed questions. I believe there is some ability to "suggest" a good location to run a particular task in the InputFormat, thus if you extended, say, FileInputFormat you could inject some kind of recommendation, but it wouldn't force Hadoop to do one thing or another, it would just be a recommendation. The next place I'd look is at the scheduler, but you're gonna really get your hands dirty by digging in here and I doubt, from the tone of your email, that you'll have interest in digging to this level. But mostly, I would suggest you explain your use case more thoroughly and I bet you'll just be directed down a more logical path to accomplish your goals. David -Original Message- From: Fan Bai [mailto:fb...@student.gsu.edu] Sent: Monday, March 25, 2013 5:24 AM To: user@hadoop.apache.org Subject: Dear Sir, I have a question about Hadoop, when I use Hadoop and Mapreduce to finish a job (only one job in here), can I control the file to work in which node? For example, I have only one job and this job have 10 files (10 mapper need to run). Also in my severs, I have one head node and four working node. My question is: can I control those 10 files to working in which node? Such as: No.1 file work in node1, No.3 file work in node2, No.5 file work in node3 and No.8 file work in node4. If I can do this, that means I can control the task. Is that means I still can control this file in next around (I have a loop in head node;I can do another mapreduce work). For example, I can set up No.5 file in 1st around worked node3 and I also can set up No.5 file work in node 2 in 2nd around. If I cannot, is that means, for Hadoop, the file will work in which node just like a "black box", the user cannot control the file will work in which node, because you think the user do not need control it, just let HDFS help them to finish the parallel work. Therefore, the Hadoop cannot control the task in one job, but can control the multiple jobs. Thank you so much! Fan Bai PhD Candidate Computer Science Department Georgia State University Atlanta, GA 30303
RE: For a new installation: use the BackupNode or the CheckPointNode?
So... the answer is... SecondaryNameNode is what I should be installing here. And the SecondaryNameNode is essentially just an earlier version of the checkpoint node, in terms of functionality. If I understood everything correctly. Can you confirm? Thanks, David -Original Message- From: Harsh J [mailto:ha...@cloudera.com] Sent: Saturday, March 23, 2013 5:33 PM To: Cc: davidpark...@yahoo.com Subject: Re: For a new installation: use the BackupNode or the CheckPointNode? Neither CheckpointNode nor BackupNode exists in version 1.x. This was a documentation oversight that should be cleared in the docs now (or by next release I think). And on 2.x, neither has been tested for stability and the SecondaryNameNode continues to exist and is fully supported (not deprecated). On Sat, Mar 23, 2013 at 12:34 PM, varun kumar wrote: > Hope below link will be useful.. > > http://hadoop.apache.org/docs/stable/hdfs_user_guide.html > > > On Sat, Mar 23, 2013 at 12:29 PM, David Parks > wrote: >> >> For a new installation of the current stable build (1.1.2 ), is there >> any reason to use the CheckPointNode over the BackupNode? >> >> >> >> It seems that we need to choose one or the other, and from the docs >> it seems like the BackupNode is more efficient in its processes. > > > > > -- > Regards, > Varun Kumar.P -- Harsh J
For a new installation: use the BackupNode or the CheckPointNode?
For a new installation of the current stable build (1.1.2 ), is there any reason to use the CheckPointNode over the BackupNode? It seems that we need to choose one or the other, and from the docs it seems like the BackupNode is more efficient in its processes.
RE: On a small cluster can we double up namenode/master with tasktrackers?
Good points all, The mapreduce jobs are, well. intensive. We've got a whole variety, but typically I see them use a lot of CPU, a lot of Disk, and upon occasion a whole bunch of Network bandwidth. Duh right? J The master node is mostly CPU intensive right? We're using LXC to segregate (psudo-virtualize) our environments for ease of development and management. I'm looking into whether I can use LXC's quota system to guarantee a certain level of CPU resources to the container where the master node is housed. If I can do that I guess we wouldn't have any issue here. Thanks! David From: Jens Scheidtmann [mailto:jens.scheidtm...@gmail.com] Sent: Tuesday, March 19, 2013 3:12 PM To: user@hadoop.apache.org Subject: Re: On a small cluster can we double up namenode/master with tasktrackers? David, You didn't look at how resource intensive your map/reduce jobs are. Best regards, Jens
On a small cluster can we double up namenode/master with tasktrackers?
I want 20 servers, I got 7, so I want to make the most of the 7 I have. Each of the 7 servers have: 24GB of ram, 4TB, and 8 cores. Would it be terribly unwise of me to Run such a configuration: . Server #1: NameNode + Master + TaskTracker(reduced slots) . Server #2: CheckpointNode(aka Secondary Name Node) + TaskTracker(slightly reduced slots) . Server #3: TaskTracker . Server #4: TaskTracker . Server #5: TaskTracker . Server #6: TaskTracker . Server #7: TaskTracker Did I miss anything? Did I shoot myself in the foot anywhere?
How "Alpha" is "alpha"?
>From the release page on hadoop's website: "This release, like previous releases in hadoop-2.x series is still considered alpha primarily since some of APIs aren't fully-baked and we expect some churn in future." How "alpha" is the 2.x line? We're moving off AWS (1.0.3) onto our own cluster of 10 servers, naturally we want the latest and greatest, but don't need to live on the bleeding edge (we just wanna process some data, you know!). So how "Alpha" is "alpha"?
Re: Unexpected Hadoop behavior: map task re-running after reducer has been running
-failures Too many fetch-failures Too many fetch-failures From: David Parks To: user@hadoop.apache.org Sent: Monday, March 11, 2013 3:23 PM Subject: Unexpected Hadoop behavior: map task re-running after reducer has been running I can’t explain this behavior, can someone help me here: Kind % Complete Num Tasks Pending Running Complete Killed Failed/Killed Task Attempts map 100.00% 23547 0 1 23546 0 247 / 0 reduce 62.40% 1 3738 30 6232 0 336 / 0 This job has been running for 48 hours and the reducer is quite a ways through its processing. But we’ve hit a snag (I’m not sure what snag exactly). The map tasks were 100% complete, none running, but now I see 1 map task running. In a few minutes that map task will finish and I’ll see “Running map tasks” change to 0, the # of failed map tasks will increment by 1, and the map task will run again a short time thereafter. This seems perpetual as we had 1 map task failed at the end of the map processing when the reducer started running, now we have 247 failed map tasks and this is the pattern I’ve been watching for hours now. Anyone want to venture some guesses here? Thanks, David
Unexpected Hadoop behavior: map task re-running after reducer has been running
I can't explain this behavior, can someone help me here: Kind % Complete Num Tasks Pending Running Complete Killed Failed/Killed Task Attempts map 100.00%23547 0 123546 0 247 / 0 reduce 62.40%13738 30 6232 0 336 / 0 This job has been running for 48 hours and the reducer is quite a ways through its processing. But we've hit a snag (I'm not sure what snag exactly). The map tasks were 100% complete, none running, but now I see 1 map task running. In a few minutes that map task will finish and I'll see "Running map tasks" change to 0, the # of failed map tasks will increment by 1, and the map task will run again a short time thereafter. This seems perpetual as we had 1 map task failed at the end of the map processing when the reducer started running, now we have 247 failed map tasks and this is the pattern I've been watching for hours now. Anyone want to venture some guesses here? Thanks, David
How do _you_ document your hadoop jobs?
We've taken to documenting our Hadoop jobs in a simple visual manner using PPT (attached example). I wonder how others document their jobs? We often add notes to the text section of the PPT slides as well. <>
RE: How can I limit reducers to one-per-node?
I tried that approach at first, one domain to one reducer, but it failed me because my data set has many domains with just a few thousand images, trivial, but we also have reasonably many massive domains with 10 million+ images. One host downloading 10 or 20 million images, while obeying politeness standards, will take multiple weeks. So I decided to randomly distribute URLs to each host and, per host, follow web politeness standards. The domains with 10M+ images should be able to support the load (they're big sites like iTunes for example), the smaller ones are (hopefully) randomized across hosts enough to be reasonably safe. From: Ted Dunning [mailto:tdunn...@maprtech.com] Sent: Monday, February 11, 2013 12:55 PM To: user@hadoop.apache.org Subject: Re: How can I limit reducers to one-per-node? For crawler type apps, typically you direct all of the URL's to crawl from a single domain to a single reducer. Typically, you also have many reducers so that you can get decent bandwidth. It is also common to consider the normal web politeness standards with a grain of salt, particularly by taking it as an average rate and doing several requests with a single connection, then waiting a bit longer than would otherwise be done. This helps the target domain and improves your crawler's utilization. Large scale crawlers typically work out of a large data store with a flags column that is pinned into memory. Successive passes of the crawler can scan the flag column very quickly to find domains with work to be done. This work can be done using map-reduce, but it is only vaguely like a map-reduce job. On Sun, Feb 10, 2013 at 10:48 PM, Harsh J wrote: The suggestion to add a combiner is to help reduce the shuffle load (and perhaps, reduce # of reducers needed?), but it doesn't affect scheduling of a set number of reduce tasks nor does a scheduler care currently if you add that step in or not. On Mon, Feb 11, 2013 at 7:59 AM, David Parks wrote: > I guess the FairScheduler is doing multiple assignments per heartbeat, hence > the behavior of multiple reduce tasks per node even when they should > otherwise be full distributed. > > > > Adding a combiner will change this behavior? Could you explain more? > > > > Thanks! > > David > > > > > > From: Michael Segel [mailto:michael_se...@hotmail.com] > Sent: Monday, February 11, 2013 8:30 AM > > > To: user@hadoop.apache.org > Subject: Re: How can I limit reducers to one-per-node? > > > > Adding a combiner step first then reduce? > > > > > > On Feb 8, 2013, at 11:18 PM, Harsh J wrote: > > > > Hey David, > > There's no readily available way to do this today (you may be > interested in MAPREDUCE-199 though) but if your Job scheduler's not > doing multiple-assignments on reduce tasks, then only one is assigned > per TT heartbeat, which gives you almost what you're looking for: 1 > reduce task per node, round-robin'd (roughly). > > On Sat, Feb 9, 2013 at 9:24 AM, David Parks wrote: > > I have a cluster of boxes with 3 reducers per node. I want to limit a > particular job to only run 1 reducer per node. > > > > This job is network IO bound, gathering images from a set of webservers. > > > > My job has certain parameters set to meet "web politeness" standards (e.g. > limit connects and connection frequency). > > > > If this job runs from multiple reducers on the same node, those per-host > limits will be violated. Also, this is a shared environment and I don't > want long running network bound jobs uselessly taking up all reduce slots. > > > > > -- > Harsh J > > > > Michael Segel | (m) 312.755.9623 > > Segel and Associates > > -- Harsh J
File does not exist on part-r-00000 file after reducer runs
Are there any rules against writing results to Reducer.Context while in the cleanup() method? Ive got a reducer that is downloading a few 10s of millions of images from a set of URLs feed to it. To be efficient I run many connections in parallel, but limit connections per domain and frequency of connections. In order to do that efficiently I read in many URLs from the reduce method and queue them in a processing queue, so at some point we read in all the data and Hadoop calls the cleanup() method where I block until all threads have finished processing. We may continue processing and writing results (in a synchronized manner) for 20 or 30 minutes after Hadoop reports 100% input records delivered, then at the end, my code appears to exit normally and I get this exception immediately after: 2013-02-11 05:15:23,606 INFO com.frugg.mapreduce.UrlProcessor (URL Processor Main Loop): Processing complete, shut down normally 1 2013-02-11 05:15:23,653 INFO org.apache.hadoop.mapred.TaskLogsTruncater (main): Initializing logsʼ truncater with mapRetainSize=-1 and reduceRetainSize=-1 2013-02-11 05:15:23,685 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Initialized cache for UID to User mapping with a cache timeout of 14400 seconds. 2013-02-11 05:15:23,685 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Got UserName hadoop for UID 106 from the native implementation 2013-02-11 05:15:23,687 ERROR org.apache.hadoop.security.UserGroupInformation (main): PriviledgedActionException as:hadoop cause:org.apache.hadoop.ipc.RemoteException: org.apache.hadoop .hdfs.server.namenode.LeaseExpiredException: No lease on /frugg/image-cache-stage1/_temporary/_attempt_201302110210_0019_r_02_0/p art-r-2 File does not exist. Holder DFSClient_attempt_201302110210_0019_r_02_0 does not have any open files. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem. java:1642) I have suspicion that there are some subtle rules of Hadoops Im violating here.
RE: Question related to Decompressor interface
In the EncryptedWritableWrapper idea you would create an object that takes any Writable object as it's parameter. Your EncryptedWritableWrapper would naturally implement Writable. . When write(DataOutput out) is called on your object, create your own DataOutputStream which reads data into a byte array that you control (i.e. new DataOutputStream(new myByteArrayOutputStream()), keeping references to the objects of course). . Now encrypt the bytes and pass them on to the DataOutput object you received in write(DataOutput out) To decrypt is basically the same with the readFields(DataInput in) method. . Read in the bytes and decrypt them (you will probably have needed to write out the length of bytes previously so you know how much to read in). . Take the decrypted bytes and pass them to the readFields(.) method of the Writable object you're wrapping The rest of Hadoop doesn't know or care if the data is encrypted, your Writable objects are just a bunch of bytes, you're Key and Value class in this case are now EncryptedWritableWrapper, and you'll need to know which type of Writable to pass it in the code. This would be good for encrypting in Hadoop. If your file comes in encrypted then it necessarily can't be split (you should aim to limit the maximum size of the file on the source side). In the case of an encrypted input you would need your own record reader to decrypt it, your description of the scenario below is correct, extending TextinputFormat would be the way to go. If your input is just a plain text file and your goal is to store it in an encrypted fashion then the EncryptedWritable idea works and is a more simple implementation. From: java8964 java8964 [mailto:java8...@hotmail.com] Sent: Sunday, February 10, 2013 10:13 PM To: user@hadoop.apache.org Subject: RE: Question related to Decompressor interface Hi, Dave: Thanks for you reply. I am not sure how the EncryptedWritable will work, can you share more ideas about it? For example, if I have a text file as my source raw file. Now I need to store it in HDFS. If I use any encryption to encrypt the whole file, then there is no good InputFormat or RecordReader to process it, unless whole file is decrypted first at runtime, then using TextInputFormat to process it, right? What you suggest is when I encrypted the file, store it as a SequenceFile, using anything I want as the key, then encrypt each line (Record), and stores it as the value, put both (key, value) pair into the sequence file, is that right? Then in the runtime, each value can be decrypted from the sequence file, and ready for next step in the by the EncryptedWritable class. Is my understanding correct? In this case, of course I don't need to worry about split any more, as each record is encrypted/decrypted separately. I think it is a valid option, but problem is that the data has to be encrypted by this EncryptedWritable class. What I was thinking about is allow data source to encrypt its data any way they want, as long as it is supported by Java security package, then only provide the private key to the runtime to decrypt it. Yong _ From: davidpark...@yahoo.com To: user@hadoop.apache.org Subject: RE: Question related to Decompressor interface Date: Sun, 10 Feb 2013 09:36:40 +0700 I can't answer your question about the Decompressor interface, but I have a query for you. Why not just create an EncryptedWritable object? Encrypt/decrypt the bytes on the read/write method, that should be darn near trivial. Then stick with good 'ol SequenceFile, which, as you note, is splittable. Otherwise you'd have to deal with making the output splittable, and given encrypted data, the only solution that I see is basically rolling your own SequenceFile with encrypted innards. Come to think of it, a simple, standardized EncryptedWritable object out of the box with Hadoop would be great. Or perhaps better yet, an EncryptedWritableWrapper so we can convert any existing Writable into an encrypted form. Dave From: java8964 java8964 [mailto:java8...@hotmail.com] Sent: Sunday, February 10, 2013 3:50 AM To: user@hadoop.apache.org Subject: Question related to Decompressor interface HI, Currently I am researching about options of encrypting the data in the MapReduce, as we plan to use the Amazon EMR or EC2 services for our data. I am thinking that the compression codec is good place to integrate with the encryption logic, and I found out there are some people having the same idea as mine. I google around and found out this code: https://github.com/geisbruch/HadoopCryptoCompressor/ It doesn't seem maintained any more, but it gave me a starting point. I download the source code, and try to do some tests with it. It doesn't work out of box. There are some bugs I have to fix to make it work. I believe it contains 'AES' as an example algorithm. But right now,
RE: How can I limit reducers to one-per-node?
I guess the FairScheduler is doing multiple assignments per heartbeat, hence the behavior of multiple reduce tasks per node even when they should otherwise be full distributed. Adding a combiner will change this behavior? Could you explain more? Thanks! David From: Michael Segel [mailto:michael_se...@hotmail.com] Sent: Monday, February 11, 2013 8:30 AM To: user@hadoop.apache.org Subject: Re: How can I limit reducers to one-per-node? Adding a combiner step first then reduce? On Feb 8, 2013, at 11:18 PM, Harsh J wrote: Hey David, There's no readily available way to do this today (you may be interested in MAPREDUCE-199 though) but if your Job scheduler's not doing multiple-assignments on reduce tasks, then only one is assigned per TT heartbeat, which gives you almost what you're looking for: 1 reduce task per node, round-robin'd (roughly). On Sat, Feb 9, 2013 at 9:24 AM, David Parks wrote: I have a cluster of boxes with 3 reducers per node. I want to limit a particular job to only run 1 reducer per node. This job is network IO bound, gathering images from a set of webservers. My job has certain parameters set to meet "web politeness" standards (e.g. limit connects and connection frequency). If this job runs from multiple reducers on the same node, those per-host limits will be violated. Also, this is a shared environment and I don't want long running network bound jobs uselessly taking up all reduce slots. -- Harsh J Michael Segel <mailto:mse...@segel.com> | (m) 312.755.9623 Segel and Associates
RE: Question related to Decompressor interface
I can't answer your question about the Decompressor interface, but I have a query for you. Why not just create an EncryptedWritable object? Encrypt/decrypt the bytes on the read/write method, that should be darn near trivial. Then stick with good 'ol SequenceFile, which, as you note, is splittable. Otherwise you'd have to deal with making the output splittable, and given encrypted data, the only solution that I see is basically rolling your own SequenceFile with encrypted innards. Come to think of it, a simple, standardized EncryptedWritable object out of the box with Hadoop would be great. Or perhaps better yet, an EncryptedWritableWrapper so we can convert any existing Writable into an encrypted form. Dave From: java8964 java8964 [mailto:java8...@hotmail.com] Sent: Sunday, February 10, 2013 3:50 AM To: user@hadoop.apache.org Subject: Question related to Decompressor interface HI, Currently I am researching about options of encrypting the data in the MapReduce, as we plan to use the Amazon EMR or EC2 services for our data. I am thinking that the compression codec is good place to integrate with the encryption logic, and I found out there are some people having the same idea as mine. I google around and found out this code: https://github.com/geisbruch/HadoopCryptoCompressor/ It doesn't seem maintained any more, but it gave me a starting point. I download the source code, and try to do some tests with it. It doesn't work out of box. There are some bugs I have to fix to make it work. I believe it contains 'AES' as an example algorithm. But right now, I faced a problem when I tried to use it in my testing MapReduer program. Here is the stack trace I got: 2013-02-08 23:16:47,038 INFO org.apache.hadoop.io.compress.crypto.CryptoBasicDecompressor: buf length = 512, and offset = 0, length = -132967308 java.lang.IndexOutOfBoundsException at java.nio.ByteBuffer.wrap(ByteBuffer.java:352) at org.apache.hadoop.io.compress.crypto.CryptoBasicDecompressor.setInput(Crypto BasicDecompressor.java:100) at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecomp ressorStream.java:97) at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.jav a:83) at java.io.InputStream.read(InputStream.java:82) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineReco rdReader.java:114) at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTas k.java:458) at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl. java:76) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(Wrapp edMapper.java:85) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:139) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:645) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325) at org.apache.hadoop.mapred.Child$4.run(Child.java:268) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.ja va:1332) at org.apache.hadoop.mapred.Child.main(Child.java:262) I know the error is thrown out of this custom CryptoBasicDecompressor class, but I really have questions related to the interface it implemented: Decompressor. There is limited document about this interface, for example, when and how the method setInput() will be invoked. If I want to write my own Decompressor, what do these methods mean in the interface? In the above case, I enable some debug information, you can see that in this case, the byte[] array passed to setInput method, only have 512 as the length, but the 3rd parameter of length passed in is a negative number: -132967308. That caused the IndexOutOfBoundsException. If I check the GzipDecompressor class of this method in the hadoop, the code will also throw IndexOutoutBoundsException in this case, so this is a RuntimeException case. Why it happened in my test case? Here is my test case: I have a simpel log text file about 700k. I encrypted it with above code using 'AES'. I can encrypted and decrypted to get my original content. The file name is foo.log.crypto, this file extension is registered to invoke this CryptoBasicDecompressor in my testing hadoop using CDH4.1.2 release (hadoop 2.0). Everything works as I expected. The CryptoBasicDecompressor is invoked when the input file is foo.log.crypto, as you can see in the above stack trace. But I don't know why the 3rd parameter (length) in setInput() is a negative number at runtime. In additional to it, I also have further questions related to use Compressor/Decompressor to handle the encrypting/decrypting file. Ideal
RE: How can I limit reducers to one-per-node?
Looking at the Job File for my job I see that this property is set to 1, however I have 3 reducers per node (I’m not clear what configuration is causing this behavior). My problem is that, on a 15 node cluster, I set 15 reduce tasks on my job, in hopes that each would be assigned to a different node, but in the last run 3 nodes had nothing to do, and 3 other nodes had 2 reduce tasks assigned. From: Nan Zhu [mailto:zhunans...@gmail.com] Sent: Saturday, February 09, 2013 11:31 AM To: user@hadoop.apache.org Subject: Re: How can I limit reducers to one-per-node? I haven't use AWS MR before…..if your instances are configured with 3 reducer slots, it means that 3 reducers can run at the same time in this node, what do you mean by "this property is already set to 1 on my cluster"? actually this value can be node-specific, if AWS MR instance allows you to do that, you can modify mapred-site.xml to change it from 3 to 1 Best, -- Nan Zhu School of Computer Science, McGill University On Friday, 8 February, 2013 at 11:24 PM, David Parks wrote: Hmm, odd, I’m using AWS Mapreduce, and this property is already set to 1 on my cluster by default (using 15 m1.xlarge boxes which come with 3 reducer slots configured by default). From: Nan Zhu [mailto:zhunans...@gmail.com] Sent: Saturday, February 09, 2013 10:59 AM To: user@hadoop.apache.org Subject: Re: How can I limit reducers to one-per-node? I think set tasktracker.reduce.tasks.maximum to be 1 may meet your requirement Best, -- Nan Zhu School of Computer Science, McGill University On Friday, 8 February, 2013 at 10:54 PM, David Parks wrote: I have a cluster of boxes with 3 reducers per node. I want to limit a particular job to only run 1 reducer per node. This job is network IO bound, gathering images from a set of webservers. My job has certain parameters set to meet “web politeness” standards (e.g. limit connects and connection frequency). If this job runs from multiple reducers on the same node, those per-host limits will be violated. Also, this is a shared environment and I don’t want long running network bound jobs uselessly taking up all reduce slots.
RE: How can I limit reducers to one-per-node?
Hmm, odd, I’m using AWS Mapreduce, and this property is already set to 1 on my cluster by default (using 15 m1.xlarge boxes which come with 3 reducer slots configured by default). From: Nan Zhu [mailto:zhunans...@gmail.com] Sent: Saturday, February 09, 2013 10:59 AM To: user@hadoop.apache.org Subject: Re: How can I limit reducers to one-per-node? I think set tasktracker.reduce.tasks.maximum to be 1 may meet your requirement Best, -- Nan Zhu School of Computer Science, McGill University On Friday, 8 February, 2013 at 10:54 PM, David Parks wrote: I have a cluster of boxes with 3 reducers per node. I want to limit a particular job to only run 1 reducer per node. This job is network IO bound, gathering images from a set of webservers. My job has certain parameters set to meet “web politeness” standards (e.g. limit connects and connection frequency). If this job runs from multiple reducers on the same node, those per-host limits will be violated. Also, this is a shared environment and I don’t want long running network bound jobs uselessly taking up all reduce slots.
How can I limit reducers to one-per-node?
I have a cluster of boxes with 3 reducers per node. I want to limit a particular job to only run 1 reducer per node. This job is network IO bound, gathering images from a set of webservers. My job has certain parameters set to meet "web politeness" standards (e.g. limit connects and connection frequency). If this job runs from multiple reducers on the same node, those per-host limits will be violated. Also, this is a shared environment and I don't want long running network bound jobs uselessly taking up all reduce slots.
RE: Tricks to upgrading Sequence Files?
I'll consider a patch to the SequenceFile, if we could manually override the sequence file input Key and Value that's read from the sequence file headers we'd have a clean solution. I don't like versioning my Model object because it's used by 10's of other classes and I don't want to risk less maintained classes continuing to use an old version. For the time being I just used 2 jobs. First I renamed the old Model Object to the original name, read it in, upgraded it, and wrote the new version with a different class name. Then I renamed the classes again so the new model object used the original name and read in the altered name and cloned it into the original name. All in all an hours work only, but having a cleaner process would be better. I'll add the request to JIRA at a minimum. Dave -Original Message- From: Harsh J [mailto:ha...@cloudera.com] Sent: Wednesday, January 30, 2013 2:32 AM To: Subject: Re: Tricks to upgrading Sequence Files? This is a pretty interesting question, but unfortunately there isn't an inbuilt way in SequenceFiles itself to handle this. However, your key/value classes can be made to handle versioning perhaps - detecting if what they've read is of an older time and decoding it appropriately (while handling newer encoding separately, in the normal fashion). This would be much better than going down the classloader hack paths I think? On Tue, Jan 29, 2013 at 1:11 PM, David Parks wrote: > Anyone have any good tricks for upgrading a sequence file. > > > > We maintain a sequence file like a flat file DB and the primary object > in there changed in recent development. > > > > It's trivial to write a job to read in the sequence file, update the > object, and write it back out in the new format. > > > > But since sequence files read and write the key/value class I would > either need to rename the model object with a version number, or > change the header of each sequence file. > > > > Just wondering if there are any nice tricks to this. -- Harsh J
Tricks to upgrading Sequence Files?
Anyone have any good tricks for upgrading a sequence file. We maintain a sequence file like a flat file DB and the primary object in there changed in recent development. It's trivial to write a job to read in the sequence file, update the object, and write it back out in the new format. But since sequence files read and write the key/value class I would either need to rename the model object with a version number, or change the header of each sequence file. Just wondering if there are any nice tricks to this.
Symbolic links available in 1.0.3?
Is it possible to use symbolic links in 1.0.3? If yes: can I use symbolic links to create a single, final directory structure of files from many locations; then use DistCp/S3DistCp to copy that final directory structure to another filesystem such as S3? Usecase: I currently launch 4 S3DistCp jobs concurrently (a royal pain in the a** to do) and copy files from 4 different locations to various subfolders of a single directory on S3 (the final result directory). I'd love to create 1 S3DistCp job to do all of that work, but S3DistCp/DistCp don't support specifying multiple source/destinations.
RE: Skipping entire task
Thinking here... if you submitted the task programmatically you should be able to capture the failure of the task and gracefully move past it to your next tasks. To say it in a long-winded way: Let's say you submit a job to Hadoop, a java jar, and your main class implements Tool. That code has the responsibility to submit a series of jobs to hadoop, something like this: try{ Job myJob = new MyJob(getConf()); myJob.submitAndWait(); }catch(Exception uhhohh){ //Deal with the issue and move on } Job myNextJob = new MyNextJob(getConf()); myNextJob.submit(); Just pseudo code there to demonstrate my thought. David -Original Message- From: Håvard Wahl Kongsgård [mailto:haavard.kongsga...@gmail.com] Sent: Saturday, January 05, 2013 4:54 PM To: user Subject: Skipping entire task Hi, hadoop can skip bad records http://devblog.factual.com/practical-hadoop-streaming-dealing-with-brittle-c ode. But it is also possible to skip entire tasks? -Håvard -- Håvard Wahl Kongsgård Faculty of Medicine & Department of Mathematical Sciences NTNU http://havard.security-review.net/
RE: Fastest way to transfer files
Here’s an example of running distcp (actually in this case s3distcp, but it’s about the same, just new DistCp()) from java: ToolRunner.run(getConf(), new S3DistCp(), new String[] { "--src", "/src/dir/", "--srcPattern", ".*(itemtable)-r-[0-9]*.*", "--dest","s3://yourbucket/results/", "--s3Endpoint", "s3.amazonaws.com" }); From: Joep Rottinghuis [mailto:jrottingh...@gmail.com] Sent: Saturday, December 29, 2012 2:51 PM To: user@hadoop.apache.org Cc: user@hadoop.apache.org; hdfs-u...@hadoop.apache.org Subject: Re: Fastest way to transfer files Not sure why you are implying a contradiction when you say: "... distcp is useful _but_ you want to do 'it' in java..." First of all distcp _is_ written in Java. You can call distcp or any other MR job from Java just fine. Cheers, Joep Sent from my iPhone On Dec 28, 2012, at 12:01 PM, burakkk wrote: Hi, I have two different hdfs cluster. I need to transfer files between these environments. What's the fastest way to transfer files for that situation? I've researched about it. I found distcp command. It's useful but I want to do in java so is there any way to do this? Is there any way to transfer files chunk by chunk from one hdfs cluster to another one or is there any way to implement a process using chunks without whole file? Thanks Best Regards... -- BURAK ISIKLI | http://burakisikli.wordpress.com
RE: What does mapred.map.tasksperslot do?
Ah this is on AWS EMR, hadoop 1.0.3. This could be an AWS feature based on my reading of the AWS docs, but I thought it was hadoop. From: Hemanth Yamijala [mailto:yhema...@thoughtworks.com] Sent: Thursday, December 27, 2012 3:43 PM To: user@hadoop.apache.org Subject: Re: What does mapred.map.tasksperslot do? David, Could you please tell what version of Hadoop you are using ? I don't see this parameter in the stable (1.x) or current branch. I only see references to it with respect to EMR and with Hadoop 0.18 or so. On Thu, Dec 27, 2012 at 1:51 PM, David Parks wrote: I didn't come up with much in a google search. In particular, what are the side effects of changing this setting? Memory? Sort process? I'm guessing it means that it'll feed 2 map tasks as input to each map task, a map task in turn is a self-contained JVM which consumes one map slot. Thus 4 map slots, and 2 tasksperslot means 4 map task JVMs each of which process 2 input splits at a time. By increasing the tasksperslot I presume we reduce overhead needed to start a new task (even though we're re-using the JVM in typical configuration, ours included), but we have more map output to sort and shuffle (I presume the results of both map splits go into the same output). Can someone verify those presumptions?
What does mapred.map.tasksperslot do?
I didn't come up with much in a google search. In particular, what are the side effects of changing this setting? Memory? Sort process? I'm guessing it means that it'll feed 2 map tasks as input to each map task, a map task in turn is a self-contained JVM which consumes one map slot. Thus 4 map slots, and 2 tasksperslot means 4 map task JVMs each of which process 2 input splits at a time. By increasing the tasksperslot I presume we reduce overhead needed to start a new task (even though we're re-using the JVM in typical configuration, ours included), but we have more map output to sort and shuffle (I presume the results of both map splits go into the same output). Can someone verify those presumptions?
How to troubleshoot OutOfMemoryError
I'm pretty consistently seeing a few reduce tasks fail with OutOfMemoryError (below). It doesn't kill the job, but it slows it down. In my current case the reducer is pretty darn simple, the algorithm basically does: 1. Do you have 2 values for this key? 2. If so, build a json string and emit a NullWritable and Text value. The string buffer I use to build the json is re-used, and I can't see anywhere in my code that would be taking more than ~50k of memory at any point in time. But I want to verify, is there a way to get the heap dump and all after this error? I'm running on AWS MapReduce v1.0.3 of Hadoop. Error: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.shuffleInMe mory(ReduceTask.java:1711) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.getMapOutpu t(ReduceTask.java:1571) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput( ReduceTask.java:1412) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceT ask.java:1344)
OutOfMemory in ReduceTask$ReduceCopier$MapOutputCopier.shuffleInMemory
I've got 15 boxes in a cluster, 7.5GB of ram each on AWS (m1.large), 1 reducer per node. I'm seeing this exception sometimes. It's not stopping the job from completing, it's just failing 3 or 4 reduce tasks and slowing things down: Error: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.shuffleInMe mory(ReduceTask.java:1711) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.getMapOutpu t(ReduceTask.java:1571) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput( ReduceTask.java:1412) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceT ask.java:1344) Seems like it's clearly addressed here. https://issues.apache.org/jira/browse/MAPREDUCE-1182 I've talked with AWS support and verified that the patch listed in that JIRA issue has been applied to 1.0.3 on AWS. Any thoughts here?
RE: How to submit Tool jobs programatically in parallel?
Can I do that with s3distcp / distcp? The job is being configured in the run() method of s3distcp (as it implements Tool). So I think I can't use this approach. I use this for the jobs I control of course, but the problem is things like distcp where I don't control the configuration. Dave From: Manoj Babu [mailto:manoj...@gmail.com] Sent: Friday, December 14, 2012 12:57 PM To: user@hadoop.apache.org Subject: Re: How to submit Tool jobs programatically in parallel? David, You try like below instead of runJob() you can try submitJob(). JobClient jc = new JobClient(job); jc.submitJob(job); Cheers! Manoj. On Fri, Dec 14, 2012 at 10:09 AM, David Parks wrote: I'm submitting unrelated jobs programmatically (using AWS EMR) so they run in parallel. I'd like to run an s3distcp job in parallel as well, but the interface to that job is a Tool, e.g. ToolRunner.run(...). ToolRunner blocks until the job completes though, so presumably I'd need to create a thread pool to run these jobs in parallel. But creating multiple threads to submit concurrent jobs via ToolRunner, blocking on the jobs completion, just feels improper. Is there an alternative?
How to submit Tool jobs programatically in parallel?
I'm submitting unrelated jobs programmatically (using AWS EMR) so they run in parallel. I'd like to run an s3distcp job in parallel as well, but the interface to that job is a Tool, e.g. ToolRunner.run(...). ToolRunner blocks until the job completes though, so presumably I'd need to create a thread pool to run these jobs in parallel. But creating multiple threads to submit concurrent jobs via ToolRunner, blocking on the jobs completion, just feels improper. Is there an alternative?
RE: Shuffle's getMapOutput() fails with EofException, followed by IllegalStateException
If anyone follows this thread in the future, it turns out that I was being lead astray by these errors, they weren't the cause of the problem. This was the resolution: http://stackoverflow.com/questions/9803939/why-is-reduce-stuck-at-16/9815715 #comment19074114_9815715 I was messing with the filesystem directly and was leaving a connection to it open which was hanging the map tasks (without error) that used that code. -Original Message- From: David Parks [mailto:davidpark...@yahoo.com] Sent: Thursday, December 13, 2012 11:23 AM To: user@hadoop.apache.org Subject: Shuffle's getMapOutput() fails with EofException, followed by IllegalStateException I'm having exactly this problem, and it's causing my job to fail when I try to process a larger amount of data (I'm attempting to process 30GB of compressed CSVs and the entire job fails every time). This issues is open for it: https://issues.apache.org/jira/browse/MAPREDUCE-5 Anyone have any idea about a workaround for the problem? To my eyes Hadoop is just crashing when I try to process a large job (v1.0.3 on Elastic MapReduce). But this just seems crazy, there must be something I can do to get things working. The only difference between what is stated in that bug report and mine is that some of my map tasks fail at the end, but I believe that is due to the reduce tasks causing problems because the map tasks are just timing out without much more information than that. Description (copied from JIRA): --- During the shuffle phase, I'm seeing a large sequence of the following actions: 1) WARN org.apache.hadoop.mapred.TaskTracker: getMapOutput(attempt_200905181452_0002_m_10_0,0) failed : org.mortbay.jetty.EofException 2) WARN org.mortbay.log: Committed before 410 getMapOutput(attempt_200905181452_0002_m_10_0,0) failed : org.mortbay.jetty.EofException 3) ERROR org.mortbay.log: /mapOutput java.lang.IllegalStateException: Committed The map phase completes with 100%, and then the reduce phase crawls along with the above errors in each of the TaskTracker logs. None of the tasktrackers get lost. When I run non-data jobs like the 'pi' test from the example jar, everything works fine.
Shuffle's getMapOutput() fails with EofException, followed by IllegalStateException
I'm having exactly this problem, and it's causing my job to fail when I try to process a larger amount of data (I'm attempting to process 30GB of compressed CSVs and the entire job fails every time). This issues is open for it: https://issues.apache.org/jira/browse/MAPREDUCE-5 Anyone have any idea about a workaround for the problem? To my eyes Hadoop is just crashing when I try to process a large job (v1.0.3 on Elastic MapReduce). But this just seems crazy, there must be something I can do to get things working. The only difference between what is stated in that bug report and mine is that some of my map tasks fail at the end, but I believe that is due to the reduce tasks causing problems because the map tasks are just timing out without much more information than that. Description (copied from JIRA): --- During the shuffle phase, I'm seeing a large sequence of the following actions: 1) WARN org.apache.hadoop.mapred.TaskTracker: getMapOutput(attempt_200905181452_0002_m_10_0,0) failed : org.mortbay.jetty.EofException 2) WARN org.mortbay.log: Committed before 410 getMapOutput(attempt_200905181452_0002_m_10_0,0) failed : org.mortbay.jetty.EofException 3) ERROR org.mortbay.log: /mapOutput java.lang.IllegalStateException: Committed The map phase completes with 100%, and then the reduce phase crawls along with the above errors in each of the TaskTracker logs. None of the tasktrackers get lost. When I run non-data jobs like the 'pi' test from the example jar, everything works fine.
RE: Hadoop 101
Nothing that I'm aware of for text files, I'd just use standard unix utils to process it outside of Hadoop. As to getting a reader from any of the Input Formats, here's the typical example you'd follow to get the reader for a sequence file, you could extrapolate the example to access whichever reader you're interested in. http://my.safaribooksonline.com/book/databases/hadoop/9780596521974/file-bas ed-data-structures/id3555432 -Original Message- From: Pat Ferrel [mailto:pat.fer...@gmail.com] Sent: Wednesday, December 12, 2012 11:37 PM To: user@hadoop.apache.org Subject: Re: Hadoop 101 Yeah I found the TextInputFormat and TextKeyValueInputFormat and I know how to parse text--I'm just too lazy. I was hoping there was a Text equivalent of a SequenceFile that was hidden somewhere. As I said there is no mapper, this is running outside of hadoop M/R. So I at least need a line reader and not sure how the InputFormat works outside a mapper. But who cares, parsing is simple enough from scratch. All the TextKeyValueInputFormat gives me is splitting at the tab afaict. Actually this convinces me to look further into getting the values from method calls. They aren't quite what I want to begin with. Thanks for saving me more fruitless searches. On Dec 11, 2012, at 10:04 PM, David Parks wrote: You use TextInputFormat, you'll get the following key, value pairs in your mapper: file_position, your_input Example: 0, "0\t[356:0.3481597,359:0.3481597,358:0.3481597,361:0.3481597,360:0.3481597]" 100, "8\t[356:0.34786037,359:0.34786037,358:0.34786037,361:0.34786037,360:0.34786 037]" 200, "25\t[284:0.34821576,286:0.34821576,287:0.34821576,288:0.34821576,289:0.3482 1576]" Then just parse it out in your mapper. -Original Message- From: Pat Ferrel [mailto:pat.fer...@gmail.com] Sent: Wednesday, December 12, 2012 7:50 AM To: user@hadoop.apache.org Subject: Hadoop 101 Stupid question for the day. I have a file created by a mahout job of the form: 0 [356:0.3481597,359:0.3481597,358:0.3481597,361:0.3481597,360:0.3481597] 8 [356:0.34786037,359:0.34786037,358:0.34786037,361:0.34786037,360:0.34786037] 25 [284:0.34821576,286:0.34821576,287:0.34821576,288:0.34821576,289:0.34821576] 28 [452:0.34802154,454:0.34802154,453:0.34802154,456:0.34802154,455:0.34802154] . If this were a SequenceFile I could read it and be merrily on my way but it's a text file. The classes written are key, value pairs but the file is tab delimited text. I was hoping to do something like: SequenceFile.Reader reader = new SequenceFile.Reader(fs, inputFile, conf); Writable userId = new LongWritable(); VectorWritable recommendations = new VectorWritable(); while (reader.next(userId, recommendations)) { //do something with each pair } But alas Google fails me. How do you read in key, values pairs from text files outside of a map or reduce?
RE: Map output copy failure
I had the same problem yesterday, it sure does look to be dead on that issue. I found another forum discussion on AWS that suggested more memory as a stop-gap way to deal with it, or apply the patch. I checked the code on hadoop 1.0.3 (the version on AWS) and it didn't have the fix, so it looks like it's only in the newer builds. I actually have an AWS ticket opened for it seeing if their engineers can offer any guidance as well. My understanding is that it should be doing a shuffle on disk in this case, it appeared to be just a small fix (a few lines) to apply the patch to src/mapred/org/apache/hadoop/mapred/ReduceTask.java Dave From: Manoj Babu [mailto:manoj...@gmail.com] Sent: Monday, December 10, 2012 8:09 PM To: user@hadoop.apache.org Subject: Reg: Map output copy failure Hi All I got the below exception, Is the issue related to https://issues.apache.org/jira/browse/MAPREDUCE-1182 ? Am using CDH3U1 2012-12-10 06:22:39,688 FATAL org.apache.hadoop.mapred.Task: attempt_201211120903_9197_r_24_0 : Map output copy failure : java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.shuffleInMe mory(ReduceTask.java:1593) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.getMapOutpu t(ReduceTask.java:1453) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput( ReduceTask.java:1302) at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceT ask.java:1234) Cheers! Manoj.
RE: Hadoop Deployment usecases
You're likely to find answers to your questions here, but you'll need specific questions and some rudimentary subject matter knowledge. I'd suggest starting off with a good book on Hadoop, you'll probably find a lot of your questions are answered in a casual afternoon of reading. I was pretty happy with O'Reilly's book "Hadoop the definitive guide". Manning is another favorite of mine and they have "Hadoop in Action" http://www.manning.com/lam/ Dave From: Henjarappa, Savitha [mailto:savitha.henjara...@hp.com] Sent: Tuesday, December 11, 2012 10:34 PM To: user@hadoop.apache.org Subject: Hadoop Deployment usecases Hi, I am new to Hadoop. Would like to understand the deployment use cases of Hadoop. Can I expect some info on this from this group? Thanks, Savitha
RE: Hadoop 101
You use TextInputFormat, you'll get the following key, value pairs in your mapper: file_position, your_input Example: 0, "0\t[356:0.3481597,359:0.3481597,358:0.3481597,361:0.3481597,360:0.3481597]" 100, "8\t[356:0.34786037,359:0.34786037,358:0.34786037,361:0.34786037,360:0.34786 037]" 200, "25\t[284:0.34821576,286:0.34821576,287:0.34821576,288:0.34821576,289:0.3482 1576]" Then just parse it out in your mapper. -Original Message- From: Pat Ferrel [mailto:pat.fer...@gmail.com] Sent: Wednesday, December 12, 2012 7:50 AM To: user@hadoop.apache.org Subject: Hadoop 101 Stupid question for the day. I have a file created by a mahout job of the form: 0 [356:0.3481597,359:0.3481597,358:0.3481597,361:0.3481597,360:0.3481597] 8 [356:0.34786037,359:0.34786037,358:0.34786037,361:0.34786037,360:0.34786037] 25 [284:0.34821576,286:0.34821576,287:0.34821576,288:0.34821576,289:0.34821576] 28 [452:0.34802154,454:0.34802154,453:0.34802154,456:0.34802154,455:0.34802154] . If this were a SequenceFile I could read it and be merrily on my way but it's a text file. The classes written are key, value pairs but the file is tab delimited text. I was hoping to do something like: SequenceFile.Reader reader = new SequenceFile.Reader(fs, inputFile, conf); Writable userId = new LongWritable(); VectorWritable recommendations = new VectorWritable(); while (reader.next(userId, recommendations)) { //do something with each pair } But alas Google fails me. How do you read in key, values pairs from text files outside of a map or reduce?
Can we declare some HDFS nodes "primary"
Assume for a moment that you have a large cluster of 500 AWS spot instance servers running. And you want to keep the bid price low, so at some point it's likely that the whole cluster will get axed until the spot price comes down some. In order to maintain HDFS continuity I'd want say 10 servers running as normal instances, and I'd want to ensure that HDFS is replicating 100% of data to those 10 that don't run the risk of group elimination. Is it possible for HDFS to ensure replication to these "primary" nodes?
RE: When reduce function is used as combiner?
The map task may use a combiner 0+ times. Basically that means (as far as I understand), if the map output data is below some internal hadoop threshold, it'll just send it to the reducer, if it's larger then it'll run it through the combiner first. And at hadoops discretion, it may run the combiner more than once on the same set of data if it deems it likely to be useful (the algorithms which determine that are beyond my understanding). Your second question, "Is there any maximum size.": Hadoop is, as I understand, looking at the whole of the map output to determine if it should run the combiner, not the individual keys/values. "Values must be the same correct?", yes, your combiner keys must match the mapper. If that's different from your reducer you'll need a separate combiner class, which may, other than the output type, be the same business logic. Fourth question: The reduce phase will run only once, it's only the combiner that may be run a variable number of times. The output of your reduce phase goes straight to whatever filesystem you've defined for the output (i.e. HDFS or S3 usually). From: Majid Azimi [mailto:majid.merk...@gmail.com] Sent: Friday, December 07, 2012 9:02 PM To: user@hadoop.apache.org Subject: When reduce function is used as combiner? Hi guys, When reduce function is used as combiner? It is used as combiner when the iterable passed to reduce function is large? correct? Is there any maximum size for that iterable? I mean for example if that iterable size is more than 1000 then reduce function will be called more than once for that key. another question is when reduce function is used as combiner the Input Key, Value and output Key, Value must be the same. correct? If it is different what will happen? exception thrown at runtime? Forth question is: lets say iterable size is very large so hadoop will add output of reduce to iterable and pass it to reduce again with other values that have not been processed. The question is when hadoop will now that from that point output of reduce function should be written to HDFS as a real output? When there is no more value to put into that iterable?
RE: Map tasks processing some files multiple times
I'm using multiple inputs because I actually have another type of input with a different mapper, a single, unrelated file, that I omitted from this discussion for simplicity. The basic formula is: read in a single database of existing items, read in a bunch of catalogs of items, then merge and toss like a salad (in a few map/reduce steps that follow). From: Hemanth Yamijala [mailto:yhema...@thoughtworks.com] Sent: Thursday, December 06, 2012 9:44 PM To: user@hadoop.apache.org Subject: Re: Map tasks processing some files multiple times Glad it helps. Could you also explain the reason for using MultipleInputs ? On Thu, Dec 6, 2012 at 2:59 PM, David Parks wrote: Figured it out, it is, as usual, with my code. I had wrapped TextInputFormat to replace the LongWritable key with a key representing the file name. It was a bit tricky to do because of changing the generics from to and I goofed up and mis-directed a call to isSplittable, which was causing the issue. It now works fine. Thanks very much for the response, it gave me pause to think enough to work out what I had done. Dave From: Hemanth Yamijala [mailto:yhema...@thoughtworks.com] Sent: Thursday, December 06, 2012 3:25 PM To: user@hadoop.apache.org Subject: Re: Map tasks processing some files multiple times David, You are using FileNameTextInputFormat. This is not in Hadoop source, as far as I can see. Can you please confirm where this is being used from ? It seems like the isSplittable method of this input format may need checking. Another thing, given you are adding the same input format for all files, do you need MultipleInputs ? Thanks Hemanth On Thu, Dec 6, 2012 at 1:06 PM, David Parks wrote: I believe I just tracked down the problem, maybe you can help confirm if you're familiar with this. I see that FileInputFormat is specifying that gzip files (.gz extension) from s3n filesystem are being reported as splittable, and I see that it's creating multiple input splits for these files. I'm mapping the files directly off S3: Path lsDir = new Path("s3n://fruggmapreduce/input/catalogs/linkshare_catalogs/*~*"); MultipleInputs.addInputPath(job, lsDir, FileNameTextInputFormat.class, LinkShareCatalogImportMapper.class); I see in the map phase, based on my counters, that it's actually processing the entire file (I set up a counter per file input). So the 2 files which were processed twice had 2 splits (I now see that in some debug logs I created), and the 1 file that was processed 3 times had 3 splits (the rest were smaller and were only assigned one split by default anyway). Am I wrong in expecting all files on the s3n filesystem to come through as not-splittable? This seems to be a bug in hadoop code if I'm right. David From: Raj Vishwanathan [mailto:rajv...@yahoo.com] Sent: Thursday, December 06, 2012 1:45 PM To: user@hadoop.apache.org Subject: Re: Map tasks processing some files multiple times Could it be due to spec-ex? Does it make a diffrerence in the end? Raj _ From: David Parks To: user@hadoop.apache.org Sent: Wednesday, December 5, 2012 10:15 PM Subject: Map tasks processing some files multiple times I've got a job that reads in 167 files from S3, but 2 of the files are being mapped twice and 1 of the files is mapped 3 times. This is the code I use to set up the mapper: Path lsDir = new Path("s3n://fruggmapreduce/input/catalogs/linkshare_catalogs/*~*"); for(FileStatus f : lsDir.getFileSystem(getConf()).globStatus(lsDir)) log.info("Identified linkshare catalog: " + f.getPath().toString()); if( lsDir.getFileSystem(getConf()).globStatus(lsDir).length > 0 ){ MultipleInputs.addInputPath(job, lsDir, FileNameTextInputFormat.class, LinkShareCatalogImportMapper.class); } I can see from the logs that it sees only 1 copy of each of these files, and correctly identifies 167 files. I also have the following confirmation that it found the 167 files correctly: 2012-12-06 04:56:41,213 INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat (main): Total input paths to process : 167 When I look through the syslogs I can see that the file in question was opened by two different map attempts: ./task-attempts/job_201212060351_0001/attempt_201212060351_0001_m_05_0/s yslog:2012-12-06 03:56:05,265 INFO org.apache.hadoop.fs.s3native.NativeS3FileSystem (main): Opening 's3n://fruggmapreduce/input/catalogs/linkshare_catalogs/linkshare~CD%20Unive rse~85.csv.gz' for reading ./task-attempts/job_201212060351_0001/attempt_201212060351_0001_m_000173_0/s yslog:2012-12-06 03:53:18,765 INFO org.apache.hadoop.fs.s3native.NativeS3FileSystem (main): Opening 's3n://fruggmapreduce/input/catalogs/linkshare_catalogs/linkshare~CD%20Unive rse~85.csv.gz' for reading This is only happening to thes
RE: Map tasks processing some files multiple times
Figured it out, it is, as usual, with my code. I had wrapped TextInputFormat to replace the LongWritable key with a key representing the file name. It was a bit tricky to do because of changing the generics from to and I goofed up and mis-directed a call to isSplittable, which was causing the issue. It now works fine. Thanks very much for the response, it gave me pause to think enough to work out what I had done. Dave From: Hemanth Yamijala [mailto:yhema...@thoughtworks.com] Sent: Thursday, December 06, 2012 3:25 PM To: user@hadoop.apache.org Subject: Re: Map tasks processing some files multiple times David, You are using FileNameTextInputFormat. This is not in Hadoop source, as far as I can see. Can you please confirm where this is being used from ? It seems like the isSplittable method of this input format may need checking. Another thing, given you are adding the same input format for all files, do you need MultipleInputs ? Thanks Hemanth On Thu, Dec 6, 2012 at 1:06 PM, David Parks wrote: I believe I just tracked down the problem, maybe you can help confirm if you're familiar with this. I see that FileInputFormat is specifying that gzip files (.gz extension) from s3n filesystem are being reported as splittable, and I see that it's creating multiple input splits for these files. I'm mapping the files directly off S3: Path lsDir = new Path("s3n://fruggmapreduce/input/catalogs/linkshare_catalogs/*~*"); MultipleInputs.addInputPath(job, lsDir, FileNameTextInputFormat.class, LinkShareCatalogImportMapper.class); I see in the map phase, based on my counters, that it's actually processing the entire file (I set up a counter per file input). So the 2 files which were processed twice had 2 splits (I now see that in some debug logs I created), and the 1 file that was processed 3 times had 3 splits (the rest were smaller and were only assigned one split by default anyway). Am I wrong in expecting all files on the s3n filesystem to come through as not-splittable? This seems to be a bug in hadoop code if I'm right. David From: Raj Vishwanathan [mailto:rajv...@yahoo.com] Sent: Thursday, December 06, 2012 1:45 PM To: user@hadoop.apache.org Subject: Re: Map tasks processing some files multiple times Could it be due to spec-ex? Does it make a diffrerence in the end? Raj _ From: David Parks To: user@hadoop.apache.org Sent: Wednesday, December 5, 2012 10:15 PM Subject: Map tasks processing some files multiple times I've got a job that reads in 167 files from S3, but 2 of the files are being mapped twice and 1 of the files is mapped 3 times. This is the code I use to set up the mapper: Path lsDir = new Path("s3n://fruggmapreduce/input/catalogs/linkshare_catalogs/*~*"); for(FileStatus f : lsDir.getFileSystem(getConf()).globStatus(lsDir)) log.info("Identified linkshare catalog: " + f.getPath().toString()); if( lsDir.getFileSystem(getConf()).globStatus(lsDir).length > 0 ){ MultipleInputs.addInputPath(job, lsDir, FileNameTextInputFormat.class, LinkShareCatalogImportMapper.class); } I can see from the logs that it sees only 1 copy of each of these files, and correctly identifies 167 files. I also have the following confirmation that it found the 167 files correctly: 2012-12-06 04:56:41,213 INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat (main): Total input paths to process : 167 When I look through the syslogs I can see that the file in question was opened by two different map attempts: ./task-attempts/job_201212060351_0001/attempt_201212060351_0001_m_05_0/s yslog:2012-12-06 03:56:05,265 INFO org.apache.hadoop.fs.s3native.NativeS3FileSystem (main): Opening 's3n://fruggmapreduce/input/catalogs/linkshare_catalogs/linkshare~CD%20Unive rse~85.csv.gz' for reading ./task-attempts/job_201212060351_0001/attempt_201212060351_0001_m_000173_0/s yslog:2012-12-06 03:53:18,765 INFO org.apache.hadoop.fs.s3native.NativeS3FileSystem (main): Opening 's3n://fruggmapreduce/input/catalogs/linkshare_catalogs/linkshare~CD%20Unive rse~85.csv.gz' for reading This is only happening to these 3 files, all others seem to be fine. For the life of me I can't see a reason why these files might be processed multiple times. Notably, map attempt 173 is more map attempts than should be possible. There are 167 input files (from S3, gzipped), thus there should be 167 map attempts. But I see a total of 176 map tasks. Any thoughts/ideas/guesses?
RE: Map tasks processing some files multiple times
I believe I just tracked down the problem, maybe you can help confirm if you’re familiar with this. I see that FileInputFormat is specifying that gzip files (.gz extension) from s3n filesystem are being reported as splittable, and I see that it’s creating multiple input splits for these files. I’m mapping the files directly off S3: Path lsDir = new Path("s3n://fruggmapreduce/input/catalogs/linkshare_catalogs/*~*"); MultipleInputs.addInputPath(job, lsDir, FileNameTextInputFormat.class, LinkShareCatalogImportMapper.class); I see in the map phase, based on my counters, that it’s actually processing the entire file (I set up a counter per file input). So the 2 files which were processed twice had 2 splits (I now see that in some debug logs I created), and the 1 file that was processed 3 times had 3 splits (the rest were smaller and were only assigned one split by default anyway). Am I wrong in expecting all files on the s3n filesystem to come through as not-splittable? This seems to be a bug in hadoop code if I’m right. David From: Raj Vishwanathan [mailto:rajv...@yahoo.com] Sent: Thursday, December 06, 2012 1:45 PM To: user@hadoop.apache.org Subject: Re: Map tasks processing some files multiple times Could it be due to spec-ex? Does it make a diffrerence in the end? Raj _ From: David Parks To: user@hadoop.apache.org Sent: Wednesday, December 5, 2012 10:15 PM Subject: Map tasks processing some files multiple times I’ve got a job that reads in 167 files from S3, but 2 of the files are being mapped twice and 1 of the files is mapped 3 times. This is the code I use to set up the mapper: Path lsDir = new Path("s3n://fruggmapreduce/input/catalogs/linkshare_catalogs/*~*"); for(FileStatus f : lsDir.getFileSystem(getConf()).globStatus(lsDir)) log.info("Identified linkshare catalog: " + f.getPath().toString()); if( lsDir.getFileSystem(getConf()).globStatus(lsDir).length > 0 ){ MultipleInputs.addInputPath(job, lsDir, FileNameTextInputFormat.class, LinkShareCatalogImportMapper.class); } I can see from the logs that it sees only 1 copy of each of these files, and correctly identifies 167 files. I also have the following confirmation that it found the 167 files correctly: 2012-12-06 04:56:41,213 INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat (main): Total input paths to process : 167 When I look through the syslogs I can see that the file in question was opened by two different map attempts: ./task-attempts/job_201212060351_0001/attempt_201212060351_0001_m_05_0/syslog:2012-12-06 03:56:05,265 INFO org.apache.hadoop.fs.s3native.NativeS3FileSystem (main): Opening 's3n://fruggmapreduce/input/catalogs/linkshare_catalogs/linkshare~CD%20Universe~85.csv.gz' for reading ./task-attempts/job_201212060351_0001/attempt_201212060351_0001_m_000173_0/syslog:2012-12-06 03:53:18,765 INFO org.apache.hadoop.fs.s3native.NativeS3FileSystem (main): Opening 's3n://fruggmapreduce/input/catalogs/linkshare_catalogs/linkshare~CD%20Universe~85.csv.gz' for reading This is only happening to these 3 files, all others seem to be fine. For the life of me I can’t see a reason why these files might be processed multiple times. Notably, map attempt 173 is more map attempts than should be possible. There are 167 input files (from S3, gzipped), thus there should be 167 map attempts. But I see a total of 176 map tasks. Any thoughts/ideas/guesses?
Map tasks processing some files multiple times
I've got a job that reads in 167 files from S3, but 2 of the files are being mapped twice and 1 of the files is mapped 3 times. This is the code I use to set up the mapper: Path lsDir = new Path("s3n://fruggmapreduce/input/catalogs/linkshare_catalogs/*~*"); for(FileStatus f : lsDir.getFileSystem(getConf()).globStatus(lsDir)) log.info("Identified linkshare catalog: " + f.getPath().toString()); if( lsDir.getFileSystem(getConf()).globStatus(lsDir).length > 0 ){ MultipleInputs.addInputPath(job, lsDir, FileNameTextInputFormat.class, LinkShareCatalogImportMapper.class); } I can see from the logs that it sees only 1 copy of each of these files, and correctly identifies 167 files. I also have the following confirmation that it found the 167 files correctly: 2012-12-06 04:56:41,213 INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat (main): Total input paths to process : 167 When I look through the syslogs I can see that the file in question was opened by two different map attempts: ./task-attempts/job_201212060351_0001/attempt_201212060351_0001_m_05_0/s yslog:2012-12-06 03:56:05,265 INFO org.apache.hadoop.fs.s3native.NativeS3FileSystem (main): Opening 's3n://fruggmapreduce/input/catalogs/linkshare_catalogs/linkshare~CD%20Unive rse~85.csv.gz' for reading ./task-attempts/job_201212060351_0001/attempt_201212060351_0001_m_000173_0/s yslog:2012-12-06 03:53:18,765 INFO org.apache.hadoop.fs.s3native.NativeS3FileSystem (main): Opening 's3n://fruggmapreduce/input/catalogs/linkshare_catalogs/linkshare~CD%20Unive rse~85.csv.gz' for reading This is only happening to these 3 files, all others seem to be fine. For the life of me I can't see a reason why these files might be processed multiple times. Notably, map attempt 173 is more map attempts than should be possible. There are 167 input files (from S3, gzipped), thus there should be 167 map attempts. But I see a total of 176 map tasks. Any thoughts/ideas/guesses?
RE: Question on Key Grouping
First rule to be wary of is your use of the combiner. The combiner *might* be run, it *might not* be run, and it *might be run multiple times*. The combiner is only for reducing the amount of data going to the reducer, and it will only be run *if and when* it's deemed likely to be useful by Hadoop. Don't use it for logic. Although I didn't quite follow your example (it's not clear what your keys and values are), I think what you need to do is just run 2 map/reduce phases here. The first map/reduce phase groups the first set of keys you need, then reduce, write it to disk (hdfs probably), and run a 2nd map/reduce phase that reads that input and does the mapping you need. Most even modestly complex applications are going through multiple map/reduce phases to accomplish their task. If you need 2 map phases, then the first reduce phase might just be the identity reducer (org.apache.hadoop.mapreduce.Reducer), which just writes the results of the first map phase straight out. David From: Joey Krabacher [mailto:jkrabac...@gmail.com] Sent: Wednesday, December 05, 2012 6:37 AM To: user@hadoop.apache.org Subject: Question on Key Grouping Is there a way to group Keys a second time before sending results to the Reducer in the same job? I thought maybe a combiner would do this for me, but it just acts like a reducer, so I need an intermediate step that acts like another mapper instead. To try to visualize this, how I want it to work: Map output: <1, [{2, "John",""},{1, "",""},{1, "", "Doe"}]> Combiner Output: <1, [{1, "John",""},{1, "",""},{1, "", "Doe"}]> Reduce Output: <1, "John","Doe"> How it currently works: Map output: <1, [{2, "John",""},{1, "",""},{1, "", "Doe"}]> Combiner Output: <1, {1, "John",""}> <1, {1, "",""}> <1, {1, "", "Doe"}> Reduce Output: <1, "John","Doe"> <1, "John","Doe"> <1, "John","Doe"> So, basically the issue is that even though the 2 in the first map record should really be a one, I still need to extract the value of "John" and have it included in the output for key 1. Hope this makes sense. Thanks in advance, /* Joey */
RE: [Bulk] Re: Failed To Start SecondaryNameNode in Secure Mode
I'm curious about profiling, I see some documentation about it (1.0.3 on AWS), but the references to JobConf seem to be for the "old api" and I've got everything running on the "new api". I've got a job to handle processing of about 30GB of compressed CSVs and it's taking over a day with 3 m1.medium boxes, more than I expected, so I'd like to see where the time is being spent. http://hadoop.apache.org/docs/r1.0.3/mapred_tutorial.html#Profiling I've never set up any kind of profiling, so I don't really know what to expect here. Any pointers to help me set up what's suggested here? Am I correct in understanding that this doc is a little outdated?
Moving files
I want to move a file in HDFS after a job using the Java API, I'm trying this command but I always get false (could not rename): Path from = new Path("hdfs://localhost/process-changes/itemtable-r-1"); Path to = new Path("hdfs://localhost/output/itemtable-r-1"); boolean wasCopied = from.getFileSystem(getConf()).rename(from, to); All I want to do is copy some files from HDFS to S3 at the end of my job, but DistCp copies a whole directory, so I think I need to move the files around to prepare them for DistCp's directory->directory copy.
XMLOutputFormat, anything in the works?
Is there an XMLOutputFormat in existence somewhere? I need to output Solr XML change docs, I'm betting I'm not the first. David
RE: Cluster wide atomic operations
That's a very helpful discussion. Thank you. I'd like to go with assigning blocks of IDs for each reducer. Snowflake would require external changes that are a pain, I'd rather make my job fit our current constraints. Is there a way to get an index number for each reducer such that I could identify which block of IDs to assign each one? Thanks, David From: Ted Dunning [mailto:tdunn...@maprtech.com] Sent: Monday, October 29, 2012 12:58 PM To: user@hadoop.apache.org Subject: Re: Cluster wide atomic operations On Sun, Oct 28, 2012 at 9:15 PM, David Parks wrote: I need a unique & permanent ID assigned to new item encountered, which has a constraint that it is in the range of, let's say for simple discussion, one to one million. Having such a limited range may require that you have a central service to generate ID's. The use of a central service can be disastrous for throughput. I suppose I could assign a range of usable IDs to each reduce task (where ID's are assigned) and keep those organized somehow at the end of the job, but this seems clunky too. Yes. Much better. Since this is on AWS, zookeeper is not a good option. I thought it was part of the hadoop cluster (and thus easy to access), but guess I was wrong there. No. This is specifically not part of Hadoop for performance reasons. I would think that such a service would run most logically on the taskmaster server. I'm surprised this isn't a common issue. I guess I could launch a separate job that runs such a sequence service perhaps. But that's non trivial its self with failure concerns. The problem is that a serial number service is a major loss of performance in a parallel system. Unless you relax the idea considerably (by allowing blocks, or having lots of bits like Snowflake), then you wind up with a round-trip per id and you have a critical section on the ID generator. This is bad. Look up Amdahl's Law. Perhaps there's just a better way of thinking of this? Yes. Use lots of bits and be satisfied with uniqueness rather than perfect ordering and limited range. As the other respondent said, look up Snowflake.
RE: Cluster wide atomic operations
I need a unique & permanent ID assigned to new item encountered, which has a constraint that it is in the range of, let's say for simple discussion, one to one million. I suppose I could assign a range of usable IDs to each reduce task (where ID's are assigned) and keep those organized somehow at the end of the job, but this seems clunky too. Since this is on AWS, zookeeper is not a good option. I thought it was part of the hadoop cluster (and thus easy to access), but guess I was wrong there. I would think that such a service would run most logically on the taskmaster server. I'm surprised this isn't a common issue. I guess I could launch a separate job that runs such a sequence service perhaps. But that's non trivial its self with failure concerns. Perhaps there's just a better way of thinking of this? From: Ted Dunning [mailto:tdunn...@maprtech.com] Sent: Saturday, October 27, 2012 12:23 PM To: user@hadoop.apache.org Subject: Re: Cluster wide atomic operations This is better asked on the Zookeeper lists. The first answer is that global atomic operations are a generally bad idea. The second answer is that if you an batch these operations up then you can cut the evilness of global atomicity by a substantial factor. Are you sure you need a global counter? On Fri, Oct 26, 2012 at 11:07 PM, David Parks wrote: How can we manage cluster-wide atomic operations? Such as maintaining an auto-increment counter. Does Hadoop provide native support for these kinds of operations? An in case ultimate answer involves zookeeper, I'd love to work out doing this in AWS/EMR.
Cluster wide atomic operations
How can we manage cluster-wide atomic operations? Such as maintaining an auto-increment counter. Does Hadoop provide native support for these kinds of operations? An in case ultimate answer involves zookeeper, I'd love to work out doing this in AWS/EMR.
MultipleOutputs directed to two different locations
I've got MultipleOutputs configured to generate 2 named outputs. I'd like to send one to s3n:// and one to hdfs:// Is this possible? One is a final summary report, the other is input to the next job. Thanks, David
RE: How do map tasks get assigned efficiently?
So the thing that just doesn't click for me yet is this: On a typical computer, if I try to read two huge files off disk simultaneously it'll just kill the disk performance. This seems like a risk. What's preventing such disk contention in Hadoop? Is HDFS smart enough to serialize major disk access? From: Michael Segel [mailto:michael_se...@hotmail.com] Sent: Wednesday, October 24, 2012 6:51 PM To: user@hadoop.apache.org Subject: Re: How do map tasks get assigned efficiently? So... Data locality only works when you actually have data on the cluster itself. Otherwise how can the data be local. Assuming 3X replication, and you're not doing a custom split and your input file is splittable... You will split along the block delineation. So if your input file has 5 blocks, you will have 5 mappers. Since there are 3 copies of the block, its possible that for that map task to run on the DN which has a copy of that block. So its pretty straight forward to a point. When your cluster starts to get a lot of jobs and a slot opens up, your job may not be data local. With HBase... YMMV With S3 the data isn't local so it doesn't matter which Data Node gets the job. HTH -Mike On Oct 24, 2012, at 1:10 AM, David Parks wrote: Even after reading O'reillys book on hadoop I don't feel like I have a clear vision of how the map tasks get assigned. They depend on splits right? But I have 3 jobs running. And splits will come from various sources: HDFS, S3, and slow HTTP sources. So I've got some concern as to how the map tasks will be distributed to handle the data acquisition. Can I do anything to ensure that I don't let the cluster go idle processing slow HTTP downloads when the boxes could simultaneously be doing HTTP downloads for one job and reading large files off HDFS for another job? I'm imagining a scenario where the only map tasks that are running are all blocking on splits requiring HTTP downloads and the splits coming from HDFS are all queuing up behind it, when they'd run more efficiently in parallel per node.
How do map tasks get assigned efficiently?
Even after reading O'reillys book on hadoop I don't feel like I have a clear vision of how the map tasks get assigned. They depend on splits right? But I have 3 jobs running. And splits will come from various sources: HDFS, S3, and slow HTTP sources. So I've got some concern as to how the map tasks will be distributed to handle the data acquisition. Can I do anything to ensure that I don't let the cluster go idle processing slow HTTP downloads when the boxes could simultaneously be doing HTTP downloads for one job and reading large files off HDFS for another job? I'm imagining a scenario where the only map tasks that are running are all blocking on splits requiring HTTP downloads and the splits coming from HDFS are all queuing up behind it, when they'd run more efficiently in parallel per node.
RE: Large input files via HTTP
I might very well be overthinking this. But I have a cluster Im firing up on EC2 that I want to keep utilized. I have some other unrelated jobs that dont need to wait for the downloads, so I dont want to fill all the map slots with long downloads. Id rather the other jobs run in parallel while the downloads are happening. From: vseetha...@gmail.com [mailto:vseetha...@gmail.com] On Behalf Of Seetharam Venkatesh Sent: Tuesday, October 23, 2012 1:10 PM To: user@hadoop.apache.org Subject: Re: Large input files via HTTP Well, it depends. :-) If the XML cannot be split, then you'd end up with only one map task for the entire set of files. I think it'd make sense to have multiple splits so you can get en even spread of copy across maps, retry only the failed copy and not manage the scheduling of the downloads. Look at DistCp for some intelligent splitting. What are the constraints that you are working with? On Mon, Oct 22, 2012 at 5:59 PM, David Parks wrote: Would it make sense to write a map job that takes an unsplittable XML file (which defines all of the files I need to download); that one map job then kicks off the downloads in multiple threads. This way I can easily manage the most efficient download pattern within the map job, and my output is emitted as key,values straight to the reducer step? From: vseetha...@gmail.com [mailto:vseetha...@gmail.com] On Behalf Of Seetharam Venkatesh Sent: Tuesday, October 23, 2012 7:28 AM To: user@hadoop.apache.org Subject: Re: Large input files via HTTP One possible way is to first create a list of files with tuples. Then use a map-only job to pull each file using NLineInputFormat. Another way is to write a HttpInputFormat and HttpRecordReader and stream the data in a map-only job. On Mon, Oct 22, 2012 at 1:54 AM, David Parks wrote: I want to create a MapReduce job which reads many multi-gigabyte input files from various HTTP sources & processes them nightly. Is there a reasonably flexible way to acquire the files in the Hadoop job its self? I expect the initial downloads to take many hours and I'd hope I can optimize the # of connections (example: I'm limited to 5 connections to one host, whereas another host has a 3-connection limit, so maximize as much as possible). Also the set of files to download will change a little over time so the input list should be easily configurable (in a config file or equivalent). - Is it normal to perform batch downloads like this *before* running the mapreduce job? - Or is it ok to include such steps in with the job? - It seems handy to keep the whole process as one neat package in Hadoop if possible. - What class should I implement if I wanted to manage this myself? Would I just extend TextInputFormat for example, and do the HTTP processing there? Or am I created a FileSystem? Thanks, David -- Regards, Venkatesh Perfection (in design) is achieved not when there is nothing more to add, but rather when there is nothing more to take away. - Antoine de Saint-Exupéry
RE: Large input files via HTTP
Would it make sense to write a map job that takes an unsplittable XML file (which defines all of the files I need to download); that one map job then kicks off the downloads in multiple threads. This way I can easily manage the most efficient download pattern within the map job, and my output is emitted as key,values straight to the reducer step? From: vseetha...@gmail.com [mailto:vseetha...@gmail.com] On Behalf Of Seetharam Venkatesh Sent: Tuesday, October 23, 2012 7:28 AM To: user@hadoop.apache.org Subject: Re: Large input files via HTTP One possible way is to first create a list of files with tuples. Then use a map-only job to pull each file using NLineInputFormat. Another way is to write a HttpInputFormat and HttpRecordReader and stream the data in a map-only job. On Mon, Oct 22, 2012 at 1:54 AM, David Parks wrote: I want to create a MapReduce job which reads many multi-gigabyte input files from various HTTP sources & processes them nightly. Is there a reasonably flexible way to acquire the files in the Hadoop job its self? I expect the initial downloads to take many hours and I'd hope I can optimize the # of connections (example: I'm limited to 5 connections to one host, whereas another host has a 3-connection limit, so maximize as much as possible). Also the set of files to download will change a little over time so the input list should be easily configurable (in a config file or equivalent). - Is it normal to perform batch downloads like this *before* running the mapreduce job? - Or is it ok to include such steps in with the job? - It seems handy to keep the whole process as one neat package in Hadoop if possible. - What class should I implement if I wanted to manage this myself? Would I just extend TextInputFormat for example, and do the HTTP processing there? Or am I created a FileSystem? Thanks, David -- Regards, Venkatesh Perfection (in design) is achieved not when there is nothing more to add, but rather when there is nothing more to take away. - Antoine de Saint-Exupéry
Large input files via HTTP
I want to create a MapReduce job which reads many multi-gigabyte input files from various HTTP sources & processes them nightly. Is there a reasonably flexible way to acquire the files in the Hadoop job its self? I expect the initial downloads to take many hours and I'd hope I can optimize the # of connections (example: I'm limited to 5 connections to one host, whereas another host has a 3-connection limit, so maximize as much as possible). Also the set of files to download will change a little over time so the input list should be easily configurable (in a config file or equivalent). - Is it normal to perform batch downloads like this *before* running the mapreduce job? - Or is it ok to include such steps in with the job? - It seems handy to keep the whole process as one neat package in Hadoop if possible. - What class should I implement if I wanted to manage this myself? Would I just extend TextInputFormat for example, and do the HTTP processing there? Or am I created a FileSystem? Thanks, David
Large input files via HTTP
I want to create a MapReduce job which reads many multi-gigabyte input files from various HTTP sources & processes them nightly. Is there a reasonably flexible way to do this in the Hadoop job its self? I expect the initial downloads to take many hours and I'd hope I can optimize the # of connections (example: I'm limited to 5 connections to one host, whereas another host has a 3-connection limit, so maximize as much as possible). Also the set of files to download will change a little over time so the input list should be easily configurable (in a config file or equivalent). Is it normal to perform batch downloads like this before running the mapreduce job? Or is it ok to include such steps in with the job? It seems handy to keep the whole process as one neat package in Hadoop if possible.