This should be expected. Compressed text files are not splittable so that 
CombineHiveInputFormat cannot read multiple files per mapper. 
CombinedHiveInputFormat is used when hive.merge.maponly=true. If you set it to 
false, we'll use HiveInputFormat and that should be able to merge compressed 
text files. 


On Nov 22, 2010, at 10:05 PM, Leo Alekseyev wrote:

> I found another criterion that determines whether or not the merge job
> runs with compression turned on.  It seems that if the target table is
> stored as an rcfile, merges work, but if a text file, merges will
> fail.  For instance:
> 
> -- merge will work here:
> create table  alogs_dbg_sample3 (server_host STRING, client_ip INT,
> time_stamp INT) row format serde
> 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' stored as
> rcfile;
> insert overwrite table alogs_dbg_sample3 select
> server_host,client_ip,time_stamp from alogs_dbg_subset
> TABLESAMPLE(BUCKET 1 OUT OF 1000 ON rand()) s;
> 
> -- merge will fail here:
> create table alogs_dbg_sample2 (server_host STRING, client_ip INT,
> time_stamp INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES
> TERMINATED BY '\n' STORED AS TEXTFILE;
> insert overwrite table alogs_dbg_sample2 select
> server_host,client_ip,time_stamp from alogs_dbg_subset
> TABLESAMPLE(BUCKET 1 OUT OF 1000 ON rand()) s;
> 
> Is this a bug?..  I don't see why merge job running should be
> sensitive to the output table format.
> P.S:  I have hive.merge.maponly=true and am using LZO compression
> 
> On Fri, Nov 19, 2010 at 5:20 PM, Ning Zhang <nzh...@fb.com> wrote:
>> It makes sense. CombineHiveInputFormat does not work with compressed text 
>> files (suffix *.gz) since it is not splittable. I think your default 
>> hive.file.format=CombineHiveInputFormat. But I think by setting 
>> hive.merge.maponly it should work (meaning merge should be succeeded). By 
>> setting hive.merge.maponly, you'll have multiple mappers (the same # of 
>> small files) and 1 reducer. The reducer's output should be the merged result.
>> 
>> On Nov 19, 2010, at 1:22 PM, Leo Alekseyev wrote:
>> 
>>> Folks, thanks for your help.  I've narrowed the problem down to
>>> compression.  When I set hive.exec.compress.output=false, merges
>>> proceed as expected.  When compression is on, the merge job doesn't
>>> seem to actually merge, it just spits out the input.
>>> 
>>> On Fri, Nov 19, 2010 at 10:51 AM, yongqiang he <heyongqiang...@gmail.com> 
>>> wrote:
>>>> These are the parameters that control the behavior. (Try to set them
>>>> to different values if it does not work in your environment.)
>>>> 
>>>> set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
>>>> set mapred.min.split.size.per.node=1000000000;
>>>> set mapred.min.split.size.per.rack=1000000000;
>>>> set mapred.max.split.size=1000000000;
>>>> 
>>>> set hive.merge.size.per.task=1000000000;
>>>> set hive.merge.smallfiles.avgsize=1000000000;
>>>> set hive.merge.size.smallfiles.avgsize=1000000000;
>>>> set hive.exec.dynamic.partition.mode=nonstrict;
>>>> 
>>>> 
>>>> The output size of the second job is also controlled by the split
>>>> size, as shown in the first 4 lines.
>>>> 
>>>> 
>>>> On Fri, Nov 19, 2010 at 10:22 AM, Leo Alekseyev <dnqu...@gmail.com> wrote:
>>>>> I'm using Hadoop 0.20.2.  Merge jobs (with static partitions) have
>>>>> worked for me in the past.  Again, what's strange here is with the
>>>>> latest Hive build the merge stage appears to run, but it doesn't
>>>>> actually merge -- it's a quick map-only job that, near as I can tell,
>>>>> doesn't do anything.
>>>>> 
>>>>> On Fri, Nov 19, 2010 at 6:14 AM, Dave Brondsema <dbronds...@geek.net> 
>>>>> wrote:
>>>>>> What version of Hadoop are you on?
>>>>>> 
>>>>>> On Thu, Nov 18, 2010 at 10:48 PM, Leo Alekseyev <dnqu...@gmail.com> 
>>>>>> wrote:
>>>>>>> 
>>>>>>> I thought I was running Hive with those changes merged in, but to make
>>>>>>> sure, I built the latest trunk version.  The behavior changed somewhat
>>>>>>> (as in, it runs 2 stages instead of 1), but it still generates the
>>>>>>> same number of files (# of files generated is equal to the number of
>>>>>>> the original mappers, so I have no idea what the second stage is
>>>>>>> actually doing).
>>>>>>> 
>>>>>>> See below for query / explain query.  Stage 1 runs always; Stage 3
>>>>>>> runs if hive.merge.mapfiles=true is set, but it still generates lots
>>>>>>> of small files.
>>>>>>> 
>>>>>>> The query is kind of large, but in essence it's simply
>>>>>>> insert overwrite table foo partition(bar) select [columns] from
>>>>>>> [table] tablesample(bucket 1 out of 10000 on rand()) where
>>>>>>> [conditions].
>>>>>>> 
>>>>>>> 
>>>>>>> explain insert overwrite table hbase_prefilter3_us_sample partition
>>>>>>> (ds) select
>>>>>>> server_host,client_ip,time_stamp,concat(server_host,':',regexp_extract(request_url,'/[^/]+/[^/]+/([^/]+)$',1)),referrer,parse_url(referrer,'HOST'),user_agent,cookie,geoip_int(client_ip,
>>>>>>> 'COUNTRY_CODE',  './GeoIP.dat'),'',ds from alogs_master
>>>>>>> TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rand()) am_s where
>>>>>>> am_s.ds='2010-11-05' and am_s.request_url rlike
>>>>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$' and
>>>>>>> geoip_int(am_s.client_ip, 'COUNTRY_CODE',  './GeoIP.dat')='US';
>>>>>>> OK
>>>>>>> ABSTRACT SYNTAX TREE:
>>>>>>>  (TOK_QUERY (TOK_FROM (TOK_TABREF alogs_master (TOK_TABLESAMPLE 1
>>>>>>> 10000 (TOK_FUNCTION rand)) am_s)) (TOK_INSERT (TOK_DESTINATION
>>>>>>> (TOK_TAB hbase_prefilter3_us_sample (TOK_PARTSPEC (TOK_PARTVAL ds))))
>>>>>>> (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL server_host)) (TOK_SELEXPR
>>>>>>> (TOK_TABLE_OR_COL client_ip)) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>>>>>> time_stamp)) (TOK_SELEXPR (TOK_FUNCTION concat (TOK_TABLE_OR_COL
>>>>>>> server_host) ':' (TOK_FUNCTION regexp_extract (TOK_TABLE_OR_COL
>>>>>>> request_url) '/[^/]+/[^/]+/([^/]+)$' 1))) (TOK_SELEXPR
>>>>>>> (TOK_TABLE_OR_COL referrer)) (TOK_SELEXPR (TOK_FUNCTION parse_url
>>>>>>> (TOK_TABLE_OR_COL referrer) 'HOST')) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>>>>>> user_agent)) (TOK_SELEXPR (TOK_TABLE_OR_COL cookie)) (TOK_SELEXPR
>>>>>>> (TOK_FUNCTION geoip_int (TOK_TABLE_OR_COL client_ip) 'COUNTRY_CODE'
>>>>>>> './GeoIP.dat')) (TOK_SELEXPR '') (TOK_SELEXPR (TOK_TABLE_OR_COL ds)))
>>>>>>> (TOK_WHERE (and (and (= (. (TOK_TABLE_OR_COL am_s) ds) '2010-11-05')
>>>>>>> (rlike (. (TOK_TABLE_OR_COL am_s) request_url)
>>>>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$')) (= (TOK_FUNCTION
>>>>>>> geoip_int (. (TOK_TABLE_OR_COL am_s) client_ip) 'COUNTRY_CODE'
>>>>>>> './GeoIP.dat') 'US')))))
>>>>>>> 
>>>>>>> STAGE DEPENDENCIES:
>>>>>>>  Stage-1 is a root stage
>>>>>>>  Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3
>>>>>>>  Stage-4
>>>>>>>  Stage-0 depends on stages: Stage-4, Stage-3
>>>>>>>  Stage-2 depends on stages: Stage-0
>>>>>>>  Stage-3
>>>>>>> 
>>>>>>> STAGE PLANS:
>>>>>>>  Stage: Stage-1
>>>>>>>    Map Reduce
>>>>>>>      Alias -> Map Operator Tree:
>>>>>>>        am_s
>>>>>>>          TableScan
>>>>>>>            alias: am_s
>>>>>>>            Filter Operator
>>>>>>>              predicate:
>>>>>>>                  expr: (((hash(rand()) & 2147483647) % 10000) = 0)
>>>>>>>                  type: boolean
>>>>>>>              Filter Operator
>>>>>>>                predicate:
>>>>>>>                    expr: ((request_url rlike
>>>>>>> '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$') and
>>>>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>>>>>>                    type: boolean
>>>>>>>                Filter Operator
>>>>>>>                  predicate:
>>>>>>>                      expr: (((ds = '2010-11-05') and (request_url
>>>>>>> rlike '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$')) and
>>>>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>>>>>>                      type: boolean
>>>>>>>                  Select Operator
>>>>>>>                    expressions:
>>>>>>>                          expr: server_host
>>>>>>>                          type: string
>>>>>>>                          expr: client_ip
>>>>>>>                          type: int
>>>>>>>                          expr: time_stamp
>>>>>>>                          type: int
>>>>>>>                          expr: concat(server_host, ':',
>>>>>>> regexp_extract(request_url, '/[^/]+/[^/]+/([^/]+)$', 1))
>>>>>>>                          type: string
>>>>>>>                          expr: referrer
>>>>>>>                          type: string
>>>>>>>                          expr: parse_url(referrer, 'HOST')
>>>>>>>                          type: string
>>>>>>>                          expr: user_agent
>>>>>>>                          type: string
>>>>>>>                          expr: cookie
>>>>>>>                          type: string
>>>>>>>                          expr: GenericUDFGeoIP ( client_ip,
>>>>>>> 'COUNTRY_CODE', './GeoIP.dat' )
>>>>>>>                          type: string
>>>>>>>                          expr: ''
>>>>>>>                          type: string
>>>>>>>                          expr: ds
>>>>>>>                          type: string
>>>>>>>                    outputColumnNames: _col0, _col1, _col2, _col3,
>>>>>>> _col4, _col5, _col6, _col7, _col8, _col9, _col10
>>>>>>>                    File Output Operator
>>>>>>>                      compressed: true
>>>>>>>                      GlobalTableId: 1
>>>>>>>                      table:
>>>>>>>                          input format:
>>>>>>> org.apache.hadoop.mapred.TextInputFormat
>>>>>>>                          output format:
>>>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>>>>                          serde:
>>>>>>> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>>>>                          name: hbase_prefilter3_us_sample
>>>>>>> 
>>>>>>>  Stage: Stage-5
>>>>>>>    Conditional Operator
>>>>>>> 
>>>>>>>  Stage: Stage-4
>>>>>>>    Move Operator
>>>>>>>      files:
>>>>>>>          hdfs directory: true
>>>>>>>          destination:
>>>>>>> 
>>>>>>> hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10000
>>>>>>> 
>>>>>>>  Stage: Stage-0
>>>>>>>    Move Operator
>>>>>>>      tables:
>>>>>>>          partition:
>>>>>>>            ds
>>>>>>>          replace: true
>>>>>>>          table:
>>>>>>>              input format: org.apache.hadoop.mapred.TextInputFormat
>>>>>>>              output format:
>>>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>>>>              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>>>>              name: hbase_prefilter3_us_sample
>>>>>>> 
>>>>>>>  Stage: Stage-2
>>>>>>>    Stats-Aggr Operator
>>>>>>> 
>>>>>>>  Stage: Stage-3
>>>>>>>    Map Reduce
>>>>>>>      Alias -> Map Operator Tree:
>>>>>>> 
>>>>>>>  
>>>>>>> hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10002
>>>>>>>            File Output Operator
>>>>>>>              compressed: true
>>>>>>>              GlobalTableId: 0
>>>>>>>              table:
>>>>>>>                  input format: org.apache.hadoop.mapred.TextInputFormat
>>>>>>>                  output format:
>>>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>>>>                  serde: 
>>>>>>> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>>>>                  name: hbase_prefilter3_us_sample
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Thu, Nov 18, 2010 at 3:44 PM, Ning Zhang <nzh...@fb.com> wrote:
>>>>>>>> I see. If you are using dynamic partitions, HIVE-1307 and HIVE-1622 
>>>>>>>> need
>>>>>>>> to be there for merging to take place. HIVE-1307 was committed to 
>>>>>>>> trunk on
>>>>>>>> 08/25 and HIVE-1622 was committed on 09/13. The simplest way is to 
>>>>>>>> update
>>>>>>>> your Hive trunk and rerun the query. If it still doesn't work maybe 
>>>>>>>> you can
>>>>>>>> post your query and the result of 'explain <query>' and we can take a 
>>>>>>>> look.
>>>>>>>> 
>>>>>>>> Ning
>>>>>>>> 
>>>>>>>> On Nov 18, 2010, at 2:57 PM, Leo Alekseyev wrote:
>>>>>>>> 
>>>>>>>>> Hi Ning,
>>>>>>>>> For the dataset I'm experimenting with, the total size of the output
>>>>>>>>> is 2mb, and the files are at most a few kb in size.  My
>>>>>>>>> hive.input.format was set to default HiveInputFormat; however, when I
>>>>>>>>> set it to CombineHiveInputFormat, it only made the first stage of the
>>>>>>>>> job use fewer mappers.  The merge job was *still* filtered out at
>>>>>>>>> runtime.  I also tried set hive.mergejob.maponly=false; that didn't
>>>>>>>>> have any effect.
>>>>>>>>> 
>>>>>>>>> I am a bit at a loss what to do here.  Is there a way to see what's
>>>>>>>>> going on exactly using e.g. debug log levels?..  Btw, I'm also using
>>>>>>>>> dynamic partitions; could that somehow be interfering with the merge
>>>>>>>>> job?..
>>>>>>>>> 
>>>>>>>>> I'm running a relatively fresh Hive from trunk (built maybe a month
>>>>>>>>> ago).
>>>>>>>>> 
>>>>>>>>> --Leo
>>>>>>>>> 
>>>>>>>>> On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang <nzh...@fb.com> wrote:
>>>>>>>>>> The settings looks good. The parameter
>>>>>>>>>> hive.merge.size.smallfiles.avgsize is used to determine at run time 
>>>>>>>>>> if a
>>>>>>>>>> merge should be triggered: if the average size of the files in the 
>>>>>>>>>> partition
>>>>>>>>>> is SMALLER than the parameter and there are more than 1 file, the 
>>>>>>>>>> merge
>>>>>>>>>> should be scheduled. Can you try to see if you have any big files as 
>>>>>>>>>> well in
>>>>>>>>>> your resulting partition? If it is because of a very large file, you 
>>>>>>>>>> can set
>>>>>>>>>> the parameter large enough.
>>>>>>>>>> 
>>>>>>>>>> Another possibility is that your Hadoop installation does not support
>>>>>>>>>> CombineHiveInputFormat, which is used for the new merge job. Someone
>>>>>>>>>> reported previously merge was not successful because of this. If 
>>>>>>>>>> that's the
>>>>>>>>>> case, you can turn off CombineHiveInputFormat and use the old
>>>>>>>>>> HiveInputFormat (though slower) by setting 
>>>>>>>>>> hive.mergejob.maponly=false.
>>>>>>>>>> 
>>>>>>>>>> Ning
>>>>>>>>>> On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote:
>>>>>>>>>> 
>>>>>>>>>>> I have jobs that sample (or generate) a small amount of data from a
>>>>>>>>>>> large table.  At the end, I get e.g. about 3000 or more files of 1kb
>>>>>>>>>>> or so.  This becomes a nuisance.  How can I make Hive do another 
>>>>>>>>>>> pass
>>>>>>>>>>> to merge the output?  I have the following settings:
>>>>>>>>>>> 
>>>>>>>>>>> hive.merge.mapfiles=true
>>>>>>>>>>> hive.merge.mapredfiles=true
>>>>>>>>>>> hive.merge.size.per.task=256000000
>>>>>>>>>>> hive.merge.size.smallfiles.avgsize=16000000
>>>>>>>>>>> 
>>>>>>>>>>> After setting hive.merge* to true, Hive started indicating "Total
>>>>>>>>>>> MapReduce jobs = 2".  However, after generating the
>>>>>>>>>>> lots-of-small-files table, Hive says:
>>>>>>>>>>> Ended Job = job_201011021934_1344
>>>>>>>>>>> Ended Job = 781771542, job is filtered out (removed at runtime).
>>>>>>>>>>> 
>>>>>>>>>>> Is there a way to force the merge, or am I missing something?
>>>>>>>>>>> --Leo
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> Dave Brondsema
>>>>>> Software Engineer
>>>>>> Geeknet
>>>>>> 
>>>>>> www.geek.net
>>>>>> 
>>>>> 
>>>> 
>> 
>> 

Reply via email to