答复: Passing information from one job t o the next in a JobControl
Hi Saptarshi: Please refer the following example code, I wish it can help you. JobConf grepJob = new JobConf(getConf(), Grep.class); try { grepJob.setJobName(search); FileInputFormat.setInputPaths(grepJob, args[0]); … FileOutputFormat.setOutputPath(grepJob, tempDir); . JobClient.runJob(grepJob); JobConf sortJob = new JobConf(Grep.class); sortJob.setJobName(sort); FileInputFormat.setInputPaths(sortJob, tempDir); . FileOutputFormat.setOutputPath(sortJob, new Path(args[1])); …….. JobClient.runJob(sortJob); --Jerry -邮件原件- 发件人: Saptarshi Guha [mailto:[EMAIL PROTECTED] 发送时间: 2008年11月11日 12:06 收件人: core-user@hadoop.apache.org 主题: Passing information from one job to the next in a JobControl Hello, I am using JobControl to run a sequence of jobs(Job_1,Job_2,..Job_n) on after the other. Each job returns some information e.g key1 value1,value2 key2 value1,value2 and so on. This can be found in the outdir passed to the jar file. Is there a way for Job_1 to return some data (which can be passed onto the Job_2), without my main program having to read the information from the file in the HDFS? I could use things like Linda Spaces, however does MapReduce have a framework for this? Thanks Saptarshi -- Saptarshi Guha - [EMAIL PROTECTED]
Re: Anyone have a Lucene index InputFormat for Hadoop?
I think maybe you can refered to the contrib/index , I may do some help for you! 2008/11/12 Anthony Urso [EMAIL PROTECTED] Anyone have a Lucene index InputFormat already implemented? Failing that, how about a Writable for the Lucene Document class? Cheers, Anthony
Re: Anyone have a Lucene index InputFormat for Hadoop?
I think you can refered to contrib/index, It maybe will do some help for you ! 2008/11/12 Anthony Urso [EMAIL PROTECTED] Anyone have a Lucene index InputFormat already implemented? Failing that, how about a Writable for the Lucene Document class? Cheers, Anthony
Re: 答复: Passing information from one job to the next in a JobControl
Hi Jerry, This actually makes a lot of sense. Hadn't seen it in this light. Thank you Saptarshi On Nov 12, 2008, at 3:07 AM, jerry ye wrote: Hi Saptarshi: Please refer the following example code, I wish it can help you. JobConf grepJob = new JobConf(getConf(), Grep.class); try { grepJob.setJobName(search); FileInputFormat.setInputPaths(grepJob, args[0]); … FileOutputFormat.setOutputPath(grepJob, tempDir); . JobClient.runJob(grepJob); JobConf sortJob = new JobConf(Grep.class); sortJob.setJobName(sort); FileInputFormat.setInputPaths(sortJob, tempDir); . FileOutputFormat.setOutputPath(sortJob, new Path(args[1])); …….. JobClient.runJob(sortJob); --Jerry -邮件原件- 发件人: Saptarshi Guha [mailto:[EMAIL PROTECTED] 发送时间: 2008年11月11日 12:06 收件人: core-user@hadoop.apache.org 主题: Passing information from one job to the next in a JobControl Hello, I am using JobControl to run a sequence of jobs(Job_1,Job_2,..Job_n) on after the other. Each job returns some information e.g key1 value1,value2 key2 value1,value2 and so on. This can be found in the outdir passed to the jar file. Is there a way for Job_1 to return some data (which can be passed onto the Job_2), without my main program having to read the information from the file in the HDFS? I could use things like Linda Spaces, however does MapReduce have a framework for this? Thanks Saptarshi -- Saptarshi Guha - [EMAIL PROTECTED] Saptarshi Guha | [EMAIL PROTECTED] | http://www.stat.purdue.edu/~sguha Intel CPUs are not defective, they just act that way. -- Henry Spencer
Re: SecondaryNameNode on separate machine
Now SecondaryNameNode connects to the NameNode (after I configured dfs.http.address to the NN's http server - NN hostname on port 50070) and creates(transfers) edits and fsimage from NameNode. It didn't work for me - I get an error: java.io.FileNotFoundException: http://192.168.30.5:50070/getimage?putimage=1port=50090machine=127.0.0.1token=-16:1173009257:0:1226503705000:1226503705207 at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1168) at org.apache.hadoop.dfs.TransferFsImage.getFileClient(TransferFsImage.java:150) at org.apache.hadoop.dfs.SecondaryNameNode.putFSImage(SecondaryNameNode.java:271) at org.apache.hadoop.dfs.SecondaryNameNode.doCheckpoint(SecondaryNameNode.java:311) at org.apache.hadoop.dfs.SecondaryNameNode.run(SecondaryNameNode.java:216) at java.lang.Thread.run(Thread.java:595) And when I run the http request directly (in the browser) , I receive this : GetImage failed. java.io.IOException: Namenode is not expecting an new image UPLOAD_START at org.apache.hadoop.dfs.FSImage.validateCheckpointUpload(FSImage.java:1193) at org.apache.hadoop.dfs.GetImageServlet.doGet(GetImageServlet.java:57) .. If it is a mundane thing (i.e not need to check point now ) why does it throw an Error? What is the UPLOAD_START at the end of the message? (if it failed , how come it starts?) - but more importantly - how do I get rid of it? Thanks! -- View this message in context: http://www.nabble.com/SecondaryNameNode-on-separate-machine-tp20207482p20463349.html Sent from the Hadoop core-user mailing list archive at Nabble.com.
Re: Anyone have a Lucene index InputFormat for Hadoop?
I recommend you check nutch's src, which includes classes for Index input/output from mapred. Anthony Urso wrote: Anyone have a Lucene index InputFormat already implemented? Failing that, how about a Writable for the Lucene Document class? Cheers, Anthony
Re: Hadoop Beijing Meeting
Hi Mr. He Yongqiang, I apply as a speaker, though is very hurried. I have always been a fan of hadoop. This is my technical blog, http://coderplay.javaeye.com/. Regards, Jeremy -- My research interests are distributed systems, parallel computing and bytecode based virtual machine. http://coderplay.javaeye.com
Re: Best way to handle namespace host failures
On 11/10/08 10:42 PM, Dhruba Borthakur [EMAIL PROTECTED] wrote: 2. Create a virtual IP, say name.xx.com that points to the real machine name of the machine on which the namenode runs. Everyone doing this should be aware of the discussion happening in https://issues.apache.org/jira/browse/HADOOP-3988 though.
re: Recommendations on Job Status and Dependency Management
I was able to answer one of my own questions: Is there an example somewhere of code that can read HDFS in order to determine if files exist? I poked around a bit and couldn't find one. Ideally, my code would be able to read the HDFS config info right out of the standard config files so I wouldn't need to create additional configuration information. The following code was all that I needed: Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(configuration); Path path = new Path(filename); boolean fileExists = fileSystem.exists(path) At first, the code didn't work as I expected because my working shell scripts that made use of hadoop/bin/hadoop jar my.jar did not explicitly include HADOOP_CONF_DIR in my classpath. Once I did that, everything worked just fine. On Tue, 11 Nov 2008, Jimmy Wan wrote: I'd like to take my prototype batch processing of hadoop jobs and implement some type of real dependency management and scheduling in order to better utilize my cluster as well as spread out more work over time. I was thinking of adopting one of the existing packages (Cascading, Zookeeper, existing JobControl?) and I was hoping to find some better advice from the mailing list. I tried to find a more direct comparison of Cascading and Zookeeper but I couldn't find one. This is a grossly simplified description my current completely naive approach: 1) for each day in a month, spawn N threads that each contain a dependent series of map/reduce jobs. 2) for each day in a month, spawn N threads that each contain a dependent series of map/reduce jobs that are dependent on the output of step #1. These are currently separated from the tasks in step #1 mainly because it's easier to group them up this way in the event of a failure, but I expect this separation to go away. 3) At the end of the month, serially run a series of jobs outside of Map/Reduce that basically consist of a single SQL query (I could easily convert these to be very simple map/reduce jobs, and probably will, if it makes my job processing easier). The main problems I have are the following: 1) right now I have a hard time determining which processes need to be run in the event of a failure. Every job has an expected input/output in HDFS so if I have to rerun something I usually just use something like hadoop dfs -rmr path in a shell script then hand edit the jobs that need to be rerun. Is there an example somewhere of code that can read HDFS in order to determine if files exist? I poked around a bit and couldn't find one. Ideally, my code would be able to read the HDFS config info right out of the standard config files so I wouldn't need to create additional configuration information. The job dependencies while enumerated well are not isolated all that well. Example: I find a bug in 1 of 10 processes in step #1. I'd like to rerun just that one process and any dependent processes, but not have to rerun everything again. 2) I typically run everything 1 month at a time, but I want to keep the option of doing rollups by day. On the 2nd of the month, I'd like to be able to run anything that requires data from the 1st of the month. On the 1st of the month, I'd like to run anything that requires a full month of data from the previous month. I'd also like my process to be able to account for system failures on previous days. i.e. On any given day I'd like to be able to run everything for which data is available. 3) Certain types of jobs have external dependencies (ex. MySQL) and I don't want to run too many of those types of jobs at the same time since it affects my MySQL performance. I'd like some way of describing some type of lock on external resources that can be shared across jobs. Any recommendations on how to best model these things? I'm thinking that something like Cascading or Zookeeper could help me here. My initial take was that Zookeeper was more heavyweight than Cascading, requiring additional processes to be running at all times. However, it seems like Zookeeper would be better suited to describing mutual exclusions on usage of external resources. Can Cascading even do this? I'd also appreciate any recommendations on how best to tune the hadoop processes. My hadoop 0.16.4 cluster is currently relatively small (10 nodes) so I'm thinking the 1GB defaults for my NameNode, DataNodes, and JobTracker might be overkill. I also plan to upgrade to 0.17.* or 0.18.* at some point in the near future. --
Re: reading input for a map function from 2 different files?
Amar, isn't there a problem with your method in that it gets a small result by subtracting very large numbers? Given a million inputs, won't A and B be so much larger than the standard deviation that there aren't enough no bits left in the floating point number to represent it? I just thought I should mention that, before this thread goes in an archive somewhere and some student looks it up. -Joel On Wed, 2008-11-12 at 12:32 +0530, Amar Kamat wrote: some speed wrote: Thanks for the response. What I am trying is to do is finding the average and then the standard deviation for a very large set (say a million) of numbers. The result would be used in further calculations. I have got the average from the first map-reduce chain. now i need to read this average as well as the set of numbers to calculate the standard deviation. so one file would have the input set and the other resultant file would have just the average. Please do tell me in case there is a better way of doing things than what i am doing. Any input/suggestion is appreciated.:) std_dev^2 = sum_i((Xi - Xa) ^ 2) / N; where Xa is the avg. Why dont you use the formula to compute it in one MR job. std_dev^2 = (sum_i(Xi ^ 2) - N * (Xa ^ 2) ) / N; = (A - N*(avg^2))/N For this your map would look like map (key, val) : output.collect(key^2, key); // imagine your input as (k,v) = (Xi, null) Reduce should simply sum over the keys to find out 'sum_i(Xi ^ 2)' and sum over the values to find out 'Xa'. You could use the close() api to finally dump there 2 values to a file. For example : input : 1,2,3,4 Say input is split in 2 groups [1,2] and [4,5] Now there will be 2 maps with output as follows map1 output : (1,1) (4,2) map2 output : (9,3) (16,4) Reducer will maintain the sum over all keys and all values A = sum(key i.e input squared) = 1+ 4 + 9 + 16 = 30 B = sum(values i.e input) = 1 + 2 + 3 + 4 = 10 With A and B you can compute the standard deviation offline. So avg = B / N = 10/4 = 2.5 Hence the std deviation would be sqrt( (A - N * avg^2) / N) = sqrt ((30 - 4*6.25)/4) = *1.11803399 *Using the main formula the answer is *1.11803399* Amar On Mon, Nov 10, 2008 at 4:22 AM, Amar Kamat [EMAIL PROTECTED] wrote: Amar Kamat wrote: some speed wrote: I was wondering if it was possible to read the input for a map function from 2 different files: 1st file --- user-input file from a particular location(path) Is the input/user file sorted? If yes then you can use map-side join for performance reasons. See org.apache.hadoop.mapred.join for more details. 2nd file=--- A resultant file (has just one key,value pair) from a previous MapReduce job. (I am implementing a chain MapReduce function) Can you explain in more detail the contents of 2nd file? Now, for every key,value pair in the user-input file, I would like to use the same key,value pair from the 2nd file for some calculations. Can you explain this in more detail? Can you give some abstracted example of how file1 and file2 look like and what operation/processing you want to do? I guess you might need to do some kind of join on the 2 files. Look at contrib/data_join for more details. Amar Is it possible for me to do so? Can someone guide me in the right direction please? Thanks!
Re: reading input for a map function from 2 different files?
unless you really care about getting exact averages etc, i would suggest simply sampling the input and computing your statistics from that --it will be a lot faster and you won't have to deal with under/overflow etc if your sample is reasonably large then your results will be pretty close to the true values Miles 2008/11/12 Joel Welling [EMAIL PROTECTED]: Amar, isn't there a problem with your method in that it gets a small result by subtracting very large numbers? Given a million inputs, won't A and B be so much larger than the standard deviation that there aren't enough no bits left in the floating point number to represent it? I just thought I should mention that, before this thread goes in an archive somewhere and some student looks it up. -Joel On Wed, 2008-11-12 at 12:32 +0530, Amar Kamat wrote: some speed wrote: Thanks for the response. What I am trying is to do is finding the average and then the standard deviation for a very large set (say a million) of numbers. The result would be used in further calculations. I have got the average from the first map-reduce chain. now i need to read this average as well as the set of numbers to calculate the standard deviation. so one file would have the input set and the other resultant file would have just the average. Please do tell me in case there is a better way of doing things than what i am doing. Any input/suggestion is appreciated.:) std_dev^2 = sum_i((Xi - Xa) ^ 2) / N; where Xa is the avg. Why dont you use the formula to compute it in one MR job. std_dev^2 = (sum_i(Xi ^ 2) - N * (Xa ^ 2) ) / N; = (A - N*(avg^2))/N For this your map would look like map (key, val) : output.collect(key^2, key); // imagine your input as (k,v) = (Xi, null) Reduce should simply sum over the keys to find out 'sum_i(Xi ^ 2)' and sum over the values to find out 'Xa'. You could use the close() api to finally dump there 2 values to a file. For example : input : 1,2,3,4 Say input is split in 2 groups [1,2] and [4,5] Now there will be 2 maps with output as follows map1 output : (1,1) (4,2) map2 output : (9,3) (16,4) Reducer will maintain the sum over all keys and all values A = sum(key i.e input squared) = 1+ 4 + 9 + 16 = 30 B = sum(values i.e input) = 1 + 2 + 3 + 4 = 10 With A and B you can compute the standard deviation offline. So avg = B / N = 10/4 = 2.5 Hence the std deviation would be sqrt( (A - N * avg^2) / N) = sqrt ((30 - 4*6.25)/4) = *1.11803399 *Using the main formula the answer is *1.11803399* Amar On Mon, Nov 10, 2008 at 4:22 AM, Amar Kamat [EMAIL PROTECTED] wrote: Amar Kamat wrote: some speed wrote: I was wondering if it was possible to read the input for a map function from 2 different files: 1st file --- user-input file from a particular location(path) Is the input/user file sorted? If yes then you can use map-side join for performance reasons. See org.apache.hadoop.mapred.join for more details. 2nd file=--- A resultant file (has just one key,value pair) from a previous MapReduce job. (I am implementing a chain MapReduce function) Can you explain in more detail the contents of 2nd file? Now, for every key,value pair in the user-input file, I would like to use the same key,value pair from the 2nd file for some calculations. Can you explain this in more detail? Can you give some abstracted example of how file1 and file2 look like and what operation/processing you want to do? I guess you might need to do some kind of join on the 2 files. Look at contrib/data_join for more details. Amar Is it possible for me to do so? Can someone guide me in the right direction please? Thanks! -- The University of Edinburgh is a charitable body, registered in Scotland, with registration number SC005336.
Re: Re: Hadoop Beijing Meeting
hi,Jeremy Chow Welcome! Please send me a brief introduction about yourself and your talk diretly to me. I will send you the detailed agenda and other import things next week. Best regards, Yongqiang He 2008-11-12 Email: [EMAIL PROTECTED] Tel: 86-10-62600966(O) Research Center for Grid and Service Computing, Institute of Computing Technology, Chinese Academy of Sciences P.O.Box 2704, 100080, Beijing, China 发件人: Jeremy Chow 发送时间: 2008-11-12 17:04:46 收件人: core-user@hadoop.apache.org 抄送: 主题: Re: Hadoop Beijing Meeting Hi Mr. He Yongqiang, I apply as a speaker, though is very hurried. I have always been a fan of hadoop. This is my technical blog, http://coderplay.javaeye.com/. Regards, Jeremy -- My research interests are distributed systems, parallel computing and bytecode based virtual machine. http://coderplay.javaeye.com
Re: too many open files? Isn't 4K enough???
On 5-Nov-08, at 4:08 PM, Yuri Pradkin wrote: I suspect your total open FDs = (#mappers) x (FDs/map) In my case the second factor was ~5K; so if I ran 8 mappers total might have been as high as 40K! This is totally insane. Perhaps playing with GC modes might help... In general, I've had to do a lot of fine-tuning of my job paramaters to balance memory, file handles, and task timeouts. I'm finding that a setup that works with one input set breaks when I try it on an input set which is twice the size. My productivity is not high while I'm figuring this out, and I wonder why I don't hear about this more. Perhaps this is a streaming issue, and streaming isn't being used very much? I doubt in my case this is a specific to streaming, although streaming might exacerbate the problem by opening pipes, etc. In my case the vast majority of open files were to spills during sorting/shuffling which is not restricted to streaming. This is a scalability issue and I'd really like to hear from developers. -Yuri P.S. It looks like we need to file a jira on this one... Are you able to create a reproducible setup for this? I haven't been able to. I'm only able to cause this to happen after a few runs of my own jobs first, which do various things and involve several Python libraries and downloading from S3. After I've done this, it looks like any streaming job will have tasks die, but if I don't run my jobs first, I don't have a problem. I also can't figure out what's consuming the open files; I'm not seeing the large lsof numbers that you were. Obviously, the jobs I'm running beforehand are causing problems for later jobs, but I haven't isolated what it is yet. My cluster: - hadoop 0.18.1 - cluster of 64 EC2 xlarge nodes, created with the hadoop-ec2 tools, edited to increase the max open files for root to 131072 - 8 max mappers or reducers per node After I had some of my jobs die, I tested the cluster with this streaming job: hadoop jar /usr/local/hadoop-0.18.1/contrib/streaming/hadoop-0.18.1- streaming.jar -mapper cat -reducer cat -input clusters_0 -output foo - jobconf mapred.output.compress=false -jobconf mapred.map.tasks=256 - jobconf mapred.reduce.tasks=256 Ran this manually a few times, not changing anything other than deleting the output directory and never running more than one job at once. While I ran it, I checked the number of open files on two of the nodes with: while true; do lsof | wc -l; sleep 1; done Tasks died on each job due to file not found or too many open files errors. Each job succeeded eventually. The job never got more than 120 or so mappers or reducers at once (because the scheduler couldn't catch up; a real job on this cluster setup was able to get to 8 tasks per node). 1st run: 31 mappers die, 11 reducers die. 2nd run: 16/12 3rd run: 14/6 4th run: 14/6 Never saw more than 1600 or so open files on the two nodes I was checking. Tasks were dying on these nodes during this time. The input directory (clusters_0) contained one 797270 byte, 4096 line ASCII file. I terminated and re-created my cluster. This time I just uploaded the input file and ran the test jobs, I didn't run my jobs first. I wasn't able to cause any errors. Karl Anderson [EMAIL PROTECTED] http://monkey.org/~kra
Re: reading input for a map function from 2 different files?
Since you need to pass only one number (average) to all mappers, you can pass it through jobconf with a config variable defined by you, say my.average.. - milind On 11/11/08 8:25 PM, some speed [EMAIL PROTECTED] wrote: Thanks for the response. What I am trying is to do is finding the average and then the standard deviation for a very large set (say a million) of numbers. The result would be used in further calculations. I have got the average from the first map-reduce chain. now i need to read this average as well as the set of numbers to calculate the standard deviation. so one file would have the input set and the other resultant file would have just the average. Please do tell me in case there is a better way of doing things than what i am doing. Any input/suggestion is appreciated.:) On Mon, Nov 10, 2008 at 4:22 AM, Amar Kamat [EMAIL PROTECTED] wrote: Amar Kamat wrote: some speed wrote: I was wondering if it was possible to read the input for a map function from 2 different files: 1st file --- user-input file from a particular location(path) Is the input/user file sorted? If yes then you can use map-side join for performance reasons. See org.apache.hadoop.mapred.join for more details. 2nd file=--- A resultant file (has just one key,value pair) from a previous MapReduce job. (I am implementing a chain MapReduce function) Can you explain in more detail the contents of 2nd file? Now, for every key,value pair in the user-input file, I would like to use the same key,value pair from the 2nd file for some calculations. Can you explain this in more detail? Can you give some abstracted example of how file1 and file2 look like and what operation/processing you want to do? I guess you might need to do some kind of join on the 2 files. Look at contrib/data_join for more details. Amar Is it possible for me to do so? Can someone guide me in the right direction please? Thanks! -- Milind Bhandarkar Y!IM: GridSolutions 408-349-2136 ([EMAIL PROTECTED])
Re: Hadoop Streaming - running a jar file
You should specify A.jar on the bin/hadoop command line with -file A.jar, so that streaming knows to copy that file on the tasktracker node. - milind On 11/11/08 10:50 AM, Amit_Gupta [EMAIL PROTECTED] wrote: Hi I have a jar file which takes input from stdin and writes something on stdout. i.e. When I run java -jar A.jar input It prints the required output. However, when I run it as a mapper in hadoop streaming using the command $HADOOP_HOME/bin/hadoop jar streaming.jar -input .. -output ... -mapper 'java -jar A.jar' -reducer NONE i get the broken pipe exception. the error message is additionalConfSpec_:null null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming packageJobJar: [/mnt/hadoop/HADOOP/hadoop-0.16.3/tmp/dir/hadoop-hadoop/hadoop-unjar45410/] [] /tmp/streamjob45411.jar tmpDir=null 08/11/11 23:20:14 INFO mapred.FileInputFormat: Total input paths to process : 1 08/11/11 23:20:14 INFO streaming.StreamJob: getLocalDirs(): [/mnt/hadoop/HADOOP/hadoop-0.16.3/tmp/mapred] 08/11/11 23:20:14 INFO streaming.StreamJob: Running job: job_20081724_0014 08/11/11 23:20:14 INFO streaming.StreamJob: To kill this job, run: 08/11/11 23:20:14 INFO streaming.StreamJob: /mnt/hadoop/HADOOP/hadoop-0.16.3/bin/../bin/hadoop job -Dmapred.job.tracker=10.105.41.25:54311 -kill job_20081724_0014 08/11/11 23:20:15 INFO streaming.StreamJob: Tracking URL: http://sayali:50030/jobdetails.jsp?jobid=job_20081724_0014 08/11/11 23:20:16 INFO streaming.StreamJob: map 0% reduce 0% 08/11/11 23:21:00 INFO streaming.StreamJob: map 100% reduce 100% 08/11/11 23:21:00 INFO streaming.StreamJob: To kill this job, run: 08/11/11 23:21:00 INFO streaming.StreamJob: /mnt/hadoop/HADOOP/hadoop-0.16.3/bin/../bin/hadoop job -Dmapred.job.tracker=10.105.41.25:54311 -kill job_20081724_0014 08/11/11 23:21:00 INFO streaming.StreamJob: Tracking URL: http://sayali:50030/jobdetails.jsp?jobid=job_20081724_0014 08/11/11 23:21:00 ERROR streaming.StreamJob: Job not Successful! 08/11/11 23:21:00 INFO streaming.StreamJob: killJob... Streaming Job Failed! Could some one please help me with any ideas or pointers. regards Amit -- View this message in context: http://www.nabble.com/Hadoop-Streamingrunning-a-jar-file-tp20445877p204458 77.html Sent from the Hadoop lucene-users mailing list archive at Nabble.com. -- Milind Bhandarkar Y!IM: GridSolutions 408-349-2136 ([EMAIL PROTECTED])