Re: confused on different behavior of Bucketized tables do not support INSERT INTO
I'm using hive 0.9.0 On Thursday, May 31, 2012, Bruce Bian wrote: Hi, I've got a table vt_new_data which is defined as follows: CREATE TABLE VT_NEW_DATA ( V_ACCOUNT_NUM string ,V_ACCOUNT_MODIFIER_NUM string ,V_DEPOSIT_TYPE_CD string ,V_DEPOSIT_TERM int ,V_LEDGER_SUBJECT_ID string ,V_ACCOUNTING_ORG_CD string ,V_OPEN_DT string ,V_CLOSE_DT string ,V_CURRENCY_CD string ,V_ACCOUNT_BAL float ,V_INNER_MONTH_DELAY_ACCUM float ) CLUSTERED BY (V_ACCOUNT_NUM,V_ACCOUNT_MODIFIER_NUM) SORTED BY (V_ACCOUNT_NUM,V_ACCOUNT_MODIFIER_NUM ASC) INTO 256 BUCKETS STORED AS RCFile; when I execute the following query explain insert into table vt_new_data select * from vt_new_data limit 1; (this is just a test) an FAILED: Error in semantic analysis: Bucketized tables do not support INSERT INTO: Table: vt_new_data error is occurred but when I execute the query: explain insert into table vt_new_data select /*+ MAPJOIN(T4) */ t1.account_num as v_account_num ,t1.account_modifier_num as v_account_modifier_num ,'3006' as v_deposit_type_cd ,0 as v_deposit_term ,'23201000' v_ledger_subject_id ,coalesce(t2.party_id,'') as v_accounting_org_cd ,coalesce(t3.card_begin_dt,'19000101') as v_open_dt ,coalesce(t3.card_live_dt,'19000101') as v_close_dt ,coalesce(t4.currency_cd,substr(t1.account_modifier_num,3,3)) as v_currency_cd ,coalesce(t4.agt_amt,0) as v_account_bal ,0 as v_inner_month_delay_accum from t03_e_cash_bucket t1 left outer join t03_agt_amount_h_bucket t4 on t1.account_num=t4.account_num and t1.account_modifier_num=t4.account_modifier_num and t4.agt_amt_type_cd = '001' and t4.start_date='$TXNDATE' and t4.end_date'$TXNDATE' left outer join t01_party_card_rela_h_bucket t2 on t1.card_no = t2.card_no and t2.party_card_rela_type_cd = '01' and t2.start_date='$TXNDATE' and t2.end_date'$TXNDATE' left outer join t03_card_bucket t3 on t1.card_no = t3.card_no; the execution plan is generated successfully and triggered an SMB Map Join, which is great. But I don't see the difference here? As both are inserting into a bucketized and sorted table?
Re: confused on different behavior of Bucketized tables do not support INSERT INTO
So I did another test on this. hive create table test(foo int,bar string) clustered by(foo) sorted by (foo asc) into 2 buckets; OK Time taken: 0.097 seconds hive create table test2 (foo int,bar string) clustered by(foo) sorted by (foo asc) into 2 buckets; OK hive LOAD DATA LOCAL INPATH 'hive/examples/files/kv1.txt' OVERWRITE INTO TABLE test; hive set hive.enforce.bucketing=true; hive set hive.enforce.sorting=true; hive insert into table test2 select * from test; Total MapReduce jobs = 1 Launching Job 1 out of 1 …… hive insert into table test2 select * from test2; FAILED: Error in semantic analysis: Bucketized tables do not support INSERT INTO: Table: test2 Seems like the errorFAILED: Error in semantic analysis: Bucketized tables do not support INSERT INTO: Table: vt_new_data error is occurred is only thrown when insert into a bucketized table from the same table? And when insert into a bucketized table multi-times, it will create a original_file_copy_n under the same bucket. -rw-r--r-- 3 wbian supergroup 2856 2012-05-31 22:03 /user/hive/warehouse/test2/00_0 -rw-r--r-- 3 wbian supergroup 2856 2012-05-31 22:04 /user/hive/warehouse/test2/00_0_copy_1 -rw-r--r-- 3 wbian supergroup 2956 2012-05-31 22:03 /user/hive/warehouse/test2/01_0 -rw-r--r-- 3 wbian supergroup 2956 2012-05-31 22:04 /user/hive/warehouse/test2/01_0_copy_1 And since what I want to do is SMB Map Join, the following triggered the SMB Map Join successfully set hive.optimize.bucketmapjoin= true; set hive.optimize.bucketmapjoin.sortedmerge = true; set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; select /*+mapjoin(test)*/ * from pokes join test on pokes.foo=test.foo; So what's the reason for throwing that error(i mean why not support insert into a bucketized table from the same table)?And isn't that error message kind of misleading? On Thu, May 31, 2012 at 6:43 PM, Bruce Bian weidong@gmail.com wrote: I'm using hive 0.9.0 On Thursday, May 31, 2012, Bruce Bian wrote: Hi, I've got a table vt_new_data which is defined as follows: CREATE TABLE VT_NEW_DATA ( V_ACCOUNT_NUM string ,V_ACCOUNT_MODIFIER_NUM string ,V_DEPOSIT_TYPE_CD string ,V_DEPOSIT_TERM int ,V_LEDGER_SUBJECT_ID string ,V_ACCOUNTING_ORG_CD string ,V_OPEN_DT string ,V_CLOSE_DT string ,V_CURRENCY_CD string ,V_ACCOUNT_BAL float ,V_INNER_MONTH_DELAY_ACCUM float ) CLUSTERED BY (V_ACCOUNT_NUM,V_ACCOUNT_MODIFIER_NUM) SORTED BY (V_ACCOUNT_NUM,V_ACCOUNT_MODIFIER_NUM ASC) INTO 256 BUCKETS STORED AS RCFile; when I execute the following query explain insert into table vt_new_data select * from vt_new_data limit 1; (this is just a test) an FAILED: Error in semantic analysis: Bucketized tables do not support INSERT INTO: Table: vt_new_data error is occurred but when I execute the query: explain insert into table vt_new_data select /*+ MAPJOIN(T4) */ t1.account_num as v_account_num ,t1.account_modifier_num as v_account_modifier_num ,'3006' as v_deposit_type_cd ,0 as v_deposit_term ,'23201000' v_ledger_subject_id ,coalesce(t2.party_id,'') as v_accounting_org_cd ,coalesce(t3.card_begin_dt,'19000101') as v_open_dt ,coalesce(t3.card_live_dt,'19000101') as v_close_dt ,coalesce(t4.currency_cd,substr(t1.account_modifier_num,3,3)) as v_currency_cd ,coalesce(t4.agt_amt,0) as v_account_bal ,0 as v_inner_month_delay_accum from t03_e_cash_bucket t1 left outer join t03_agt_amount_h_bucket t4 on t1.account_num=t4.account_num and t1.account_modifier_num=t4.account_modifier_num and t4.agt_amt_type_cd = '001' and t4.start_date='$TXNDATE' and t4.end_date'$TXNDATE' left outer join t01_party_card_rela_h_bucket t2 on t1.card_no = t2.card_no and t2.party_card_rela_type_cd = '01' and t2.start_date='$TXNDATE' and t2.end_date'$TXNDATE' left outer join t03_card_bucket t3 on t1.card_no = t3.card_no; the execution plan is generated successfully and triggered an SMB Map Join, which is great. But I don't see the difference here? As both are inserting into a bucketized and sorted table?
Condition for doing a sort merge bucket map join
Hi , I've got 7 large tables to join(each ~10G in size) into one table, all with the same* 2 *join keys, I've read some documents on sort merge bucket map join, but failed to fire that. I've bucketed all the 7 tables into 20 buckets and sorted by one of the join key, set hive.optimize.bucketmapjoin = true; set hive.optimize.bucketmapjoin.sortedmerge = true; set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; Set the above parameters while doing the join. What else do I miss? Do I have to bucket on both of the join keys(I'm currently trying this)? And does each bucket file has to be smaller than one HDFS block? Thanks a lot.
Re: how is number of mappers determined in mapside join?
Thanks Bejoy! That helps. On Tue, Mar 20, 2012 at 12:10 AM, Bejoy Ks bejoy...@yahoo.com wrote: Hi Bruce From my understanding, that formula is not for CombineFileInputFormat but for other basic Input Formats. I'd just brief you on CombineFileInputFormat to get things more clear. In the default TextInputFormat every hdfs block is processed by a mapper. But if the files are small say 5Mb, spawning that may mappers would be an overkill for the job. So here we use Combine file input format,where one mapper process more than one small file and the min data size a mapper should process is defined by the min split size and the maximum data that a mapper can process is defined by max split size. ie data processed by a mapper is guaranteed to be not less than the min split size and not more than max split size specified. As you asked, if you are looking at more mappers in CombinedFileInputFormat then reduce the value of Max split Size. Bump it down to 32 mb (your block size) and just try it out. Or If you are looking at num mappers = num blocks, just change the input format in hive. By the way 32 mb is too small for a hdfs block size, you may hit NN memory issues pretty soon. Consider increasing it at least to 64 mb, though all larger clusters use either 128 or 256 Mb blocks. Hope it helps!.. Regards Bejoy -- *From:* Bruce Bian weidong@gmail.com *To:* user@hive.apache.org; Bejoy Ks bejoy...@yahoo.com *Sent:* Monday, March 19, 2012 7:48 PM *Subject:* Re: how is number of mappers determined in mapside join? Hi Bejoy, Thanks for your reply. The function is from the book, Hadoop The Definitive Guide 2nd edition. On page 203 there is The split size is calculated by the formula (see the computeSplitSize() method in FileInputFormat): max(minimumSize, min(maximumSize, blockSize)) by default:minimumSize blockSize maximumSize so the split size is blockSize. And I've actually used the HDFS block size to control the number of mappers launched before. So as to your response, do you mean that any value of the data between 1B and 256MB is OK for the mappers to process? Then the only way I can think of to increase the #mappers is to reduce the max split size. Regards, Bruce On Mon, Mar 19, 2012 at 8:48 PM, Bejoy Ks bejoy...@yahoo.com wrote: Hi Bruce In map side join the smaller table is loader in memory and hence the number of mappers is dependent only on the data on larger table. Say If CombineHiveInputFormat is used and we have our hdfs block size as 32 mb, min split size as 1B and max split size as 256 mb. Which means one mapper would be processing data chunks not less than 1B and not more than 256 MB. So based on that mappers would be triggered, so a possibility in your case mapper 1 - 200 MB mapper 2 - 120 MB mapper 3 - 140 MB Every mapper is processing data whose size id between 1B and 256 MB. Totally of 460 MB, your table size. I'm not sure of the formula you posted here, Can you point me to the document from which you got this? Regards Bejoy -- *From:* Bruce Bian weidong@gmail.com *To:* user@hive.apache.org *Sent:* Monday, March 19, 2012 2:42 PM *Subject:* how is number of mappers determined in mapside join? Hi there, when I'm executing the following queries in hive set hive.auto.convert.join = true; CREATE TABLE IDAP_ROOT as SELECT a.*,b.acnt_no FROM idap_pi_root a LEFT OUTER JOIN idap_pi_root_acnt b ON a.acnt_id=b.acnt_id the number of mappers to run in the mapside join is 3, how is it determined? When launching a job in hadoop mapreduce, i know it's determined by the function max(Min split size, min(Max split size, HDFS blockSize)) which in my configuration is max(1B, min(256MB ,32MB)=32MB and the two tables are 460MB and 1.5MB respectively. Thus I thought the mappers to launch to be around 15, which is not the case. Thanks Bruce
how is number of mappers determined in mapside join?
Hi there, when I'm executing the following queries in hive set hive.auto.convert.join = true; CREATE TABLE IDAP_ROOT as SELECT a.*,b.acnt_no FROM idap_pi_root a LEFT OUTER JOIN idap_pi_root_acnt b ON a.acnt_id=b.acnt_id the number of mappers to run in the mapside join is 3, how is it determined? When launching a job in hadoop mapreduce, i know it's determined by the function max(Min split size, min(Max split size, HDFS blockSize)) which in my configuration is max(1B, min(256MB ,32MB)=32MB and the two tables are 460MB and 1.5MB respectively. Thus I thought the mappers to launch to be around 15, which is not the case. Thanks Bruce
Re: how is number of mappers determined in mapside join?
Hi Bejoy, Thanks for your reply. The function is from the book, Hadoop The Definitive Guide 2nd edition. On page 203 there is The split size is calculated by the formula (see the computeSplitSize() method in FileInputFormat): max(minimumSize, min(maximumSize, blockSize)) by default:minimumSize blockSize maximumSize so the split size is blockSize. And I've actually used the HDFS block size to control the number of mappers launched before. So as to your response, do you mean that any value of the data between 1B and 256MB is OK for the mappers to process? Then the only way I can think of to increase the #mappers is to reduce the max split size. Regards, Bruce On Mon, Mar 19, 2012 at 8:48 PM, Bejoy Ks bejoy...@yahoo.com wrote: Hi Bruce In map side join the smaller table is loader in memory and hence the number of mappers is dependent only on the data on larger table. Say If CombineHiveInputFormat is used and we have our hdfs block size as 32 mb, min split size as 1B and max split size as 256 mb. Which means one mapper would be processing data chunks not less than 1B and not more than 256 MB. So based on that mappers would be triggered, so a possibility in your case mapper 1 - 200 MB mapper 2 - 120 MB mapper 3 - 140 MB Every mapper is processing data whose size id between 1B and 256 MB. Totally of 460 MB, your table size. I'm not sure of the formula you posted here, Can you point me to the document from which you got this? Regards Bejoy -- *From:* Bruce Bian weidong@gmail.com *To:* user@hive.apache.org *Sent:* Monday, March 19, 2012 2:42 PM *Subject:* how is number of mappers determined in mapside join? Hi there, when I'm executing the following queries in hive set hive.auto.convert.join = true; CREATE TABLE IDAP_ROOT as SELECT a.*,b.acnt_no FROM idap_pi_root a LEFT OUTER JOIN idap_pi_root_acnt b ON a.acnt_id=b.acnt_id the number of mappers to run in the mapside join is 3, how is it determined? When launching a job in hadoop mapreduce, i know it's determined by the function max(Min split size, min(Max split size, HDFS blockSize)) which in my configuration is max(1B, min(256MB ,32MB)=32MB and the two tables are 460MB and 1.5MB respectively. Thus I thought the mappers to launch to be around 15, which is not the case. Thanks Bruce
Reduce the number of map/reduce jobs during join
Yes,it's in my hive-default.xml and Hive figured to use one reducer only, so I thought increase it to 5 might help,which doesn't. Anyway, to scan the largest table 6 times isn't efficient hence my question. On Wed, Mar 14, 2012 at 12:37 AM, Jagat jagatsi...@gmail.com wrote: Hello Weidong Bian Did you see the following configuration properties in conf directory property namemapred.reduce.tasks/name value-1/value descriptionThe default number of reduce tasks per job. Typically set to a prime close to the number of available hosts. Ignored when mapred.job.tracker is local. Hadoop set this to 1 by default, whereas hive uses -1 as its default value. By setting this property to -1, Hive will automatically figure out what should be the number of reducers. /description /property property namehive.exec.reducers.max/name value999/value descriptionmax number of reducers will be used. If the one specified in the configuration parameter mapred.reduce.tasks is negative, hive will use this one as the max number of reducers when automatically determine number of reducers./description /property Thanks and Regards Jagat On Tue, Mar 13, 2012 at 9:54 PM, Bruce Bian weidong@gmail.com wrote: Hi there, when I'm using Hive to doing a query as follows, 6 Map/Reduce jobs are launched, one for each join, and it deals with ~460M data in ~950 seconds, which I think is way t slow for a cluster with 5 slaves and 24GB memory/12 disks each. set mapred.reduce.tasks=5; SELECT a.*,e.code_name as is_internet_flg, f.code_name as wb_access_tp_desc, g.code_name as free_tp_desc, b.acnt_no,b.addr_id,b.postcode,b.acnt_rmnd_tp,b.print_tp,b.media_type, c.cust_code,c.root_cust_code, d.mdf_name,d.sub_bureau_code,d.bureau_cd,d.adm_sub_bureau_name,d.bureau_name FROM prc_idap_pi_root a LEFT OUTER JOIN idap_pi_root_acnt b ON a.acnt_id=b.acnt_id LEFT OUTER JOIN idap_pi_root_cust c ON a.cust_id=c.cust_id LEFT OUTER JOIN ocrm_vt_area d ON a.dev_area_id=d.area_id LEFT OUTER JOIN osor_code e ON a.data_internet_flg=e.code_val and e.code_tp='IS_INTERNET_FLG' LEFT OUTER JOIN osor_code f ON a.wb_access_tp=f.code_val and f.code_tp='WEB_ACCESS_TP' LEFT OUTER JOIN osor_code g ON a.free_tp=g.code_val and g.code_tp='FREE_TP'; For each jobs, most of the time is consumed by the reduce jobs. As the idap_pi_root is very large, to scan over it for 6 times is quite inefficient. Is it possible to reduce the map/reduce jobs to only one? Thanks, Weidong Bian