Don't you think using flume would be easier. Use hdfs sink and use a property to roll out the log file every hour. By doing this way you use a single flume agent to receive logs as and when it is generating and you will be directly dumping to hdfs. If you want to remove unwanted logs you can write a custom sink before dumping to hdfs
I suppose this would he much easier On 2 Mar 2014 12:34, "Fengyun RAO" <raofeng...@gmail.com> wrote: > Thanks, Simon. that's very clear. > > > 2014-03-02 14:53 GMT+08:00 Simon Dong <simond...@gmail.com>: > >> Reading data for each hour shouldn't be a problem, as for Hadoop or shell >> you can pretty much do everything with mmddhh* as you can do with mmddhh. >> >> But if you need the data for the hour all sorted in one file then you >> have to run a post processing MR job for each hour's data to merge them, >> which should be very trivial. >> >> With that being a requirement, using a custom partitioner to send all >> records with in an hour to a particular reducer might be a viable or better >> option to save the additional MR pass to merge them, given: >> >> -You can determine programatically before submitting the job the number >> of hours covered, then you can call job.setNumOfReduceTasks(numOfHours) to >> set the number of reducers >> -The number of hours you cover for each run matches the number of >> reducers your cluster typically assigns so you won't suffer much >> efficiency. For example if each run covers last 24 hours and your cluster >> defaults to 18 reducer slots, it should be fine >> -You can emit timestamp as the key from the mapper so your partitioner >> can decide which reducer the record should be send to, and it will be >> sorted by MR when it reaches the reducer >> >> Even with this, you can still use MultipleOutputs to customize the file >> name each reducer generates for better usability, i.e. instead of >> part-r-0000x have it generate mmddhh-r-00000. >> >> -Simon >> >> On Sat, Mar 1, 2014 at 10:13 PM, Fengyun RAO <raofeng...@gmail.com>wrote: >> >>> Thank you, Simon! It helps a lot! >>> >>> We want one file per hour for the reason of following query. >>> It would be very convenient to select several specified hours' results. >>> >>> We also need each record sorted by timestamp, for following processing. >>> With a set of files for an hour, as you show in MultipleOutputs, we >>> would have to merge sort them later. maybe need another MR job? >>> >>> 2014-03-02 13:14 GMT+08:00 Simon Dong <simond...@gmail.com>: >>> >>> Fengyun, >>>> >>>> Is there any particular reason you have to have exactly 1 file per >>>> hour? As you probably knew already, each reducer will output 1 file, or if >>>> you use MultipleOutputs as I suggested, a set of files. If you have to fit >>>> the number of reducers to the number hours you have from the input, and >>>> generate the number of files accordingly, it will most likely be at the >>>> expense of cluster efficiency and performance. A worst case scenario of >>>> course is if you have a bunch of data all within the same hour, then you >>>> have to settle with 1 reducer without any parallelization at all. >>>> >>>> A workaround is to use MultipleOutputs to generate a set of files for >>>> each hour, with the hour being a the base name. Or if you so choose, a >>>> sub-directory for each hour. For example if you use mmddhh as the base >>>> name, you will have a set of files for an hour like: >>>> >>>> 030119-r-00000 >>>> ... >>>> 030119-r-0000n >>>> 030120-r-00000 >>>> ... >>>> 030120-r-0000n >>>> >>>> Or in a sub-directory: >>>> >>>> 030119/part-r-00000 >>>> ... >>>> 030119/part-r-0000n >>>> >>>> You can then use wild card to glob the output either for manual >>>> processing, or as input path for subsequent jobs. >>>> >>>> -Simon >>>> >>>> >>>> >>>> On Sat, Mar 1, 2014 at 7:37 PM, Fengyun RAO <raofeng...@gmail.com>wrote: >>>> >>>>> Thanks Devin. We don't just want one file. It's complicated. >>>>> >>>>> if the input folder contains data in X hours, we want X files, >>>>> if Y hours, we want Y files. >>>>> >>>>> obviously, X or Y is unknown on compile time. >>>>> >>>>> 2014-03-01 20:48 GMT+08:00 Devin Suiter RDX <dsui...@rdx.com>: >>>>> >>>>>> If you only want one file, then you need to set the number of >>>>>> reducers to 1. >>>>>> >>>>>> If the size of the data makes the original MR job impractical to use >>>>>> a single reducer, you run a second job on the output of the first, with >>>>>> the >>>>>> default mapper and reducer, which are the Identity- ones, and set that >>>>>> numReducers = 1. >>>>>> >>>>>> Or use hdfs getmerge function to collate the results to one file. >>>>>> On Mar 1, 2014 4:59 AM, "Fengyun RAO" <raofeng...@gmail.com> wrote: >>>>>> >>>>>>> Thanks, but how to set reducer number to X? X is dependent on input >>>>>>> (run-time), which is unknown on job configuration (compile time). >>>>>>> >>>>>>> >>>>>>> 2014-03-01 17:44 GMT+08:00 AnilKumar B <akumarb2...@gmail.com>: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Write the custom partitioner on <timestamp> and as you mentioned >>>>>>>> set #reducers to X. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>> >>> >> >