[ https://issues.apache.org/jira/browse/HIVE-25071?focusedWorklogId=591383&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-591383 ]
ASF GitHub Bot logged work on HIVE-25071: ----------------------------------------- Author: ASF GitHub Bot Created on: 30/Apr/21 07:28 Start Date: 30/Apr/21 07:28 Worklog Time Spent: 10m Work Description: kasakrisz commented on pull request #2231: URL: https://github.com/apache/hive/pull/2231#issuecomment-829900602 Hi Marta, Thanks for reviewing this patch. This is what I found about distributing rows to reducers while I was debugging: Let's say we have the following statements: ``` create table acidtbl(a int, b int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true'); insert ... delete from acidtbl where a = 1 or a = 3; ``` This case the the plan of the delete statement after ReduceSinkDeDuplication looks like: ``` TS[0]-FIL[8]-SEL[2]-RS[5]-SEL[6]-FS[7] ``` So with Tez we have a mapper: TS[0]-FIL[8]-SEL[2]-RS[5] and have two reducers each of them has: SEL[6]-FS[7] RS[5] has Partition keys: GenericUDFBridge ==> UDFToInteger (Column[_col0]) Sort keys: Column[_col0] And maxReducers: 2 where _col0 is the row_id coming from SEL[2]. UDFToInteger(<row_id_type>) extracts the bucket_id field which is going to be used to generate a `reducesink.key` in the RS operator. This is going to be passed to the wrapped `OutputCollector` with the row. This case this is an `org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput`. This class is part of Tez which I'm not familiar with but I found that this is where rows are distributed to reducers by the key coming from RS. Hive/hadoop also has a setting `hive.exec.reducers.max`/`mapreduce.job.reduces`. This limits the maxReducers in RS operator. If the table has more buckets than the max reducers then FileSink operator also distributes the rows into different files. If I understand correctly this is done by the `multiFileSpray` functionality. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 591383) Time Spent: 0.5h (was: 20m) > Number of reducers limited to fixed 1 when updating/deleting > ------------------------------------------------------------ > > Key: HIVE-25071 > URL: https://issues.apache.org/jira/browse/HIVE-25071 > Project: Hive > Issue Type: Bug > Reporter: Krisztian Kasa > Assignee: Krisztian Kasa > Priority: Major > Labels: pull-request-available > Time Spent: 0.5h > Remaining Estimate: 0h > > When updating/deleting bucketed tables an extra ReduceSink operator is > created to enforce bucketing. After HIVE-22538 number of reducers limited to > fixed 1 in these RS operators. > This can lead to performance degradation. > Prior HIVE-22538 multiple reducers was available such cases. The reason for > limiting the number of reducers is to ensure RowId ascending order in delete > delta files produced by the update/delete statements. > This is the plan of delete statement like: > {code} > DELETE FROM t1 WHERE a = 1; > {code} > {code} > TS[0]-FIL[8]-SEL[2]-RS[3]-SEL[4]-RS[5]-SEL[6]-FS[7] > {code} > RowId order is ensured by RS[3] and bucketing is enforced by RS[5]: number of > reducers were limited to bucket number in the table or > hive.exec.reducers.max. However RS[5] does not provide any ordering so above > plan may generate unsorted deleted deltas which leads to corrupted data reads. > Prior HIVE-22538 these RS operators were merged by ReduceSinkDeduplication > and the resulting RS kept the ordering and enabled multiple reducers. It > could do because ReduceSinkDeduplication was prepared for ACID writes. This > was removed by HIVE-22538 to get a more generic ReduceSinkDeduplication. -- This message was sent by Atlassian Jira (v8.3.4#803005)