This should be expected. Compressed text files are not splittable so that CombineHiveInputFormat cannot read multiple files per mapper. CombinedHiveInputFormat is used when hive.merge.maponly=true. If you set it to false, we'll use HiveInputFormat and that should be able to merge compressed text files.
On Nov 22, 2010, at 10:05 PM, Leo Alekseyev wrote: > I found another criterion that determines whether or not the merge job > runs with compression turned on. It seems that if the target table is > stored as an rcfile, merges work, but if a text file, merges will > fail. For instance: > > -- merge will work here: > create table alogs_dbg_sample3 (server_host STRING, client_ip INT, > time_stamp INT) row format serde > 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' stored as > rcfile; > insert overwrite table alogs_dbg_sample3 select > server_host,client_ip,time_stamp from alogs_dbg_subset > TABLESAMPLE(BUCKET 1 OUT OF 1000 ON rand()) s; > > -- merge will fail here: > create table alogs_dbg_sample2 (server_host STRING, client_ip INT, > time_stamp INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES > TERMINATED BY '\n' STORED AS TEXTFILE; > insert overwrite table alogs_dbg_sample2 select > server_host,client_ip,time_stamp from alogs_dbg_subset > TABLESAMPLE(BUCKET 1 OUT OF 1000 ON rand()) s; > > Is this a bug?.. I don't see why merge job running should be > sensitive to the output table format. > P.S: I have hive.merge.maponly=true and am using LZO compression > > On Fri, Nov 19, 2010 at 5:20 PM, Ning Zhang <nzh...@fb.com> wrote: >> It makes sense. CombineHiveInputFormat does not work with compressed text >> files (suffix *.gz) since it is not splittable. I think your default >> hive.file.format=CombineHiveInputFormat. But I think by setting >> hive.merge.maponly it should work (meaning merge should be succeeded). By >> setting hive.merge.maponly, you'll have multiple mappers (the same # of >> small files) and 1 reducer. The reducer's output should be the merged result. >> >> On Nov 19, 2010, at 1:22 PM, Leo Alekseyev wrote: >> >>> Folks, thanks for your help. I've narrowed the problem down to >>> compression. When I set hive.exec.compress.output=false, merges >>> proceed as expected. When compression is on, the merge job doesn't >>> seem to actually merge, it just spits out the input. >>> >>> On Fri, Nov 19, 2010 at 10:51 AM, yongqiang he <heyongqiang...@gmail.com> >>> wrote: >>>> These are the parameters that control the behavior. (Try to set them >>>> to different values if it does not work in your environment.) >>>> >>>> set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; >>>> set mapred.min.split.size.per.node=1000000000; >>>> set mapred.min.split.size.per.rack=1000000000; >>>> set mapred.max.split.size=1000000000; >>>> >>>> set hive.merge.size.per.task=1000000000; >>>> set hive.merge.smallfiles.avgsize=1000000000; >>>> set hive.merge.size.smallfiles.avgsize=1000000000; >>>> set hive.exec.dynamic.partition.mode=nonstrict; >>>> >>>> >>>> The output size of the second job is also controlled by the split >>>> size, as shown in the first 4 lines. >>>> >>>> >>>> On Fri, Nov 19, 2010 at 10:22 AM, Leo Alekseyev <dnqu...@gmail.com> wrote: >>>>> I'm using Hadoop 0.20.2. Merge jobs (with static partitions) have >>>>> worked for me in the past. Again, what's strange here is with the >>>>> latest Hive build the merge stage appears to run, but it doesn't >>>>> actually merge -- it's a quick map-only job that, near as I can tell, >>>>> doesn't do anything. >>>>> >>>>> On Fri, Nov 19, 2010 at 6:14 AM, Dave Brondsema <dbronds...@geek.net> >>>>> wrote: >>>>>> What version of Hadoop are you on? >>>>>> >>>>>> On Thu, Nov 18, 2010 at 10:48 PM, Leo Alekseyev <dnqu...@gmail.com> >>>>>> wrote: >>>>>>> >>>>>>> I thought I was running Hive with those changes merged in, but to make >>>>>>> sure, I built the latest trunk version. The behavior changed somewhat >>>>>>> (as in, it runs 2 stages instead of 1), but it still generates the >>>>>>> same number of files (# of files generated is equal to the number of >>>>>>> the original mappers, so I have no idea what the second stage is >>>>>>> actually doing). >>>>>>> >>>>>>> See below for query / explain query. Stage 1 runs always; Stage 3 >>>>>>> runs if hive.merge.mapfiles=true is set, but it still generates lots >>>>>>> of small files. >>>>>>> >>>>>>> The query is kind of large, but in essence it's simply >>>>>>> insert overwrite table foo partition(bar) select [columns] from >>>>>>> [table] tablesample(bucket 1 out of 10000 on rand()) where >>>>>>> [conditions]. >>>>>>> >>>>>>> >>>>>>> explain insert overwrite table hbase_prefilter3_us_sample partition >>>>>>> (ds) select >>>>>>> server_host,client_ip,time_stamp,concat(server_host,':',regexp_extract(request_url,'/[^/]+/[^/]+/([^/]+)$',1)),referrer,parse_url(referrer,'HOST'),user_agent,cookie,geoip_int(client_ip, >>>>>>> 'COUNTRY_CODE', './GeoIP.dat'),'',ds from alogs_master >>>>>>> TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rand()) am_s where >>>>>>> am_s.ds='2010-11-05' and am_s.request_url rlike >>>>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$' and >>>>>>> geoip_int(am_s.client_ip, 'COUNTRY_CODE', './GeoIP.dat')='US'; >>>>>>> OK >>>>>>> ABSTRACT SYNTAX TREE: >>>>>>> (TOK_QUERY (TOK_FROM (TOK_TABREF alogs_master (TOK_TABLESAMPLE 1 >>>>>>> 10000 (TOK_FUNCTION rand)) am_s)) (TOK_INSERT (TOK_DESTINATION >>>>>>> (TOK_TAB hbase_prefilter3_us_sample (TOK_PARTSPEC (TOK_PARTVAL ds)))) >>>>>>> (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL server_host)) (TOK_SELEXPR >>>>>>> (TOK_TABLE_OR_COL client_ip)) (TOK_SELEXPR (TOK_TABLE_OR_COL >>>>>>> time_stamp)) (TOK_SELEXPR (TOK_FUNCTION concat (TOK_TABLE_OR_COL >>>>>>> server_host) ':' (TOK_FUNCTION regexp_extract (TOK_TABLE_OR_COL >>>>>>> request_url) '/[^/]+/[^/]+/([^/]+)$' 1))) (TOK_SELEXPR >>>>>>> (TOK_TABLE_OR_COL referrer)) (TOK_SELEXPR (TOK_FUNCTION parse_url >>>>>>> (TOK_TABLE_OR_COL referrer) 'HOST')) (TOK_SELEXPR (TOK_TABLE_OR_COL >>>>>>> user_agent)) (TOK_SELEXPR (TOK_TABLE_OR_COL cookie)) (TOK_SELEXPR >>>>>>> (TOK_FUNCTION geoip_int (TOK_TABLE_OR_COL client_ip) 'COUNTRY_CODE' >>>>>>> './GeoIP.dat')) (TOK_SELEXPR '') (TOK_SELEXPR (TOK_TABLE_OR_COL ds))) >>>>>>> (TOK_WHERE (and (and (= (. (TOK_TABLE_OR_COL am_s) ds) '2010-11-05') >>>>>>> (rlike (. (TOK_TABLE_OR_COL am_s) request_url) >>>>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$')) (= (TOK_FUNCTION >>>>>>> geoip_int (. (TOK_TABLE_OR_COL am_s) client_ip) 'COUNTRY_CODE' >>>>>>> './GeoIP.dat') 'US'))))) >>>>>>> >>>>>>> STAGE DEPENDENCIES: >>>>>>> Stage-1 is a root stage >>>>>>> Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3 >>>>>>> Stage-4 >>>>>>> Stage-0 depends on stages: Stage-4, Stage-3 >>>>>>> Stage-2 depends on stages: Stage-0 >>>>>>> Stage-3 >>>>>>> >>>>>>> STAGE PLANS: >>>>>>> Stage: Stage-1 >>>>>>> Map Reduce >>>>>>> Alias -> Map Operator Tree: >>>>>>> am_s >>>>>>> TableScan >>>>>>> alias: am_s >>>>>>> Filter Operator >>>>>>> predicate: >>>>>>> expr: (((hash(rand()) & 2147483647) % 10000) = 0) >>>>>>> type: boolean >>>>>>> Filter Operator >>>>>>> predicate: >>>>>>> expr: ((request_url rlike >>>>>>> '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$') and >>>>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US')) >>>>>>> type: boolean >>>>>>> Filter Operator >>>>>>> predicate: >>>>>>> expr: (((ds = '2010-11-05') and (request_url >>>>>>> rlike '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$')) and >>>>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US')) >>>>>>> type: boolean >>>>>>> Select Operator >>>>>>> expressions: >>>>>>> expr: server_host >>>>>>> type: string >>>>>>> expr: client_ip >>>>>>> type: int >>>>>>> expr: time_stamp >>>>>>> type: int >>>>>>> expr: concat(server_host, ':', >>>>>>> regexp_extract(request_url, '/[^/]+/[^/]+/([^/]+)$', 1)) >>>>>>> type: string >>>>>>> expr: referrer >>>>>>> type: string >>>>>>> expr: parse_url(referrer, 'HOST') >>>>>>> type: string >>>>>>> expr: user_agent >>>>>>> type: string >>>>>>> expr: cookie >>>>>>> type: string >>>>>>> expr: GenericUDFGeoIP ( client_ip, >>>>>>> 'COUNTRY_CODE', './GeoIP.dat' ) >>>>>>> type: string >>>>>>> expr: '' >>>>>>> type: string >>>>>>> expr: ds >>>>>>> type: string >>>>>>> outputColumnNames: _col0, _col1, _col2, _col3, >>>>>>> _col4, _col5, _col6, _col7, _col8, _col9, _col10 >>>>>>> File Output Operator >>>>>>> compressed: true >>>>>>> GlobalTableId: 1 >>>>>>> table: >>>>>>> input format: >>>>>>> org.apache.hadoop.mapred.TextInputFormat >>>>>>> output format: >>>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat >>>>>>> serde: >>>>>>> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe >>>>>>> name: hbase_prefilter3_us_sample >>>>>>> >>>>>>> Stage: Stage-5 >>>>>>> Conditional Operator >>>>>>> >>>>>>> Stage: Stage-4 >>>>>>> Move Operator >>>>>>> files: >>>>>>> hdfs directory: true >>>>>>> destination: >>>>>>> >>>>>>> hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10000 >>>>>>> >>>>>>> Stage: Stage-0 >>>>>>> Move Operator >>>>>>> tables: >>>>>>> partition: >>>>>>> ds >>>>>>> replace: true >>>>>>> table: >>>>>>> input format: org.apache.hadoop.mapred.TextInputFormat >>>>>>> output format: >>>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat >>>>>>> serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe >>>>>>> name: hbase_prefilter3_us_sample >>>>>>> >>>>>>> Stage: Stage-2 >>>>>>> Stats-Aggr Operator >>>>>>> >>>>>>> Stage: Stage-3 >>>>>>> Map Reduce >>>>>>> Alias -> Map Operator Tree: >>>>>>> >>>>>>> >>>>>>> hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10002 >>>>>>> File Output Operator >>>>>>> compressed: true >>>>>>> GlobalTableId: 0 >>>>>>> table: >>>>>>> input format: org.apache.hadoop.mapred.TextInputFormat >>>>>>> output format: >>>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat >>>>>>> serde: >>>>>>> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe >>>>>>> name: hbase_prefilter3_us_sample >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Nov 18, 2010 at 3:44 PM, Ning Zhang <nzh...@fb.com> wrote: >>>>>>>> I see. If you are using dynamic partitions, HIVE-1307 and HIVE-1622 >>>>>>>> need >>>>>>>> to be there for merging to take place. HIVE-1307 was committed to >>>>>>>> trunk on >>>>>>>> 08/25 and HIVE-1622 was committed on 09/13. The simplest way is to >>>>>>>> update >>>>>>>> your Hive trunk and rerun the query. If it still doesn't work maybe >>>>>>>> you can >>>>>>>> post your query and the result of 'explain <query>' and we can take a >>>>>>>> look. >>>>>>>> >>>>>>>> Ning >>>>>>>> >>>>>>>> On Nov 18, 2010, at 2:57 PM, Leo Alekseyev wrote: >>>>>>>> >>>>>>>>> Hi Ning, >>>>>>>>> For the dataset I'm experimenting with, the total size of the output >>>>>>>>> is 2mb, and the files are at most a few kb in size. My >>>>>>>>> hive.input.format was set to default HiveInputFormat; however, when I >>>>>>>>> set it to CombineHiveInputFormat, it only made the first stage of the >>>>>>>>> job use fewer mappers. The merge job was *still* filtered out at >>>>>>>>> runtime. I also tried set hive.mergejob.maponly=false; that didn't >>>>>>>>> have any effect. >>>>>>>>> >>>>>>>>> I am a bit at a loss what to do here. Is there a way to see what's >>>>>>>>> going on exactly using e.g. debug log levels?.. Btw, I'm also using >>>>>>>>> dynamic partitions; could that somehow be interfering with the merge >>>>>>>>> job?.. >>>>>>>>> >>>>>>>>> I'm running a relatively fresh Hive from trunk (built maybe a month >>>>>>>>> ago). >>>>>>>>> >>>>>>>>> --Leo >>>>>>>>> >>>>>>>>> On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang <nzh...@fb.com> wrote: >>>>>>>>>> The settings looks good. The parameter >>>>>>>>>> hive.merge.size.smallfiles.avgsize is used to determine at run time >>>>>>>>>> if a >>>>>>>>>> merge should be triggered: if the average size of the files in the >>>>>>>>>> partition >>>>>>>>>> is SMALLER than the parameter and there are more than 1 file, the >>>>>>>>>> merge >>>>>>>>>> should be scheduled. Can you try to see if you have any big files as >>>>>>>>>> well in >>>>>>>>>> your resulting partition? If it is because of a very large file, you >>>>>>>>>> can set >>>>>>>>>> the parameter large enough. >>>>>>>>>> >>>>>>>>>> Another possibility is that your Hadoop installation does not support >>>>>>>>>> CombineHiveInputFormat, which is used for the new merge job. Someone >>>>>>>>>> reported previously merge was not successful because of this. If >>>>>>>>>> that's the >>>>>>>>>> case, you can turn off CombineHiveInputFormat and use the old >>>>>>>>>> HiveInputFormat (though slower) by setting >>>>>>>>>> hive.mergejob.maponly=false. >>>>>>>>>> >>>>>>>>>> Ning >>>>>>>>>> On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote: >>>>>>>>>> >>>>>>>>>>> I have jobs that sample (or generate) a small amount of data from a >>>>>>>>>>> large table. At the end, I get e.g. about 3000 or more files of 1kb >>>>>>>>>>> or so. This becomes a nuisance. How can I make Hive do another >>>>>>>>>>> pass >>>>>>>>>>> to merge the output? I have the following settings: >>>>>>>>>>> >>>>>>>>>>> hive.merge.mapfiles=true >>>>>>>>>>> hive.merge.mapredfiles=true >>>>>>>>>>> hive.merge.size.per.task=256000000 >>>>>>>>>>> hive.merge.size.smallfiles.avgsize=16000000 >>>>>>>>>>> >>>>>>>>>>> After setting hive.merge* to true, Hive started indicating "Total >>>>>>>>>>> MapReduce jobs = 2". However, after generating the >>>>>>>>>>> lots-of-small-files table, Hive says: >>>>>>>>>>> Ended Job = job_201011021934_1344 >>>>>>>>>>> Ended Job = 781771542, job is filtered out (removed at runtime). >>>>>>>>>>> >>>>>>>>>>> Is there a way to force the merge, or am I missing something? >>>>>>>>>>> --Leo >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Dave Brondsema >>>>>> Software Engineer >>>>>> Geeknet >>>>>> >>>>>> www.geek.net >>>>>> >>>>> >>>> >> >>