[ 
https://issues.apache.org/jira/browse/HIVE-25071?focusedWorklogId=591383&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-591383
 ]

ASF GitHub Bot logged work on HIVE-25071:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Apr/21 07:28
            Start Date: 30/Apr/21 07:28
    Worklog Time Spent: 10m 
      Work Description: kasakrisz commented on pull request #2231:
URL: https://github.com/apache/hive/pull/2231#issuecomment-829900602


   Hi Marta,
   Thanks for reviewing this patch.
   
   This is what I found about distributing rows to reducers while I was 
debugging:
   Let's say we have the following statements:
   ```
   create table acidtbl(a int, b int) clustered by (a) into 2 buckets stored as 
orc TBLPROPERTIES ('transactional'='true');
   insert ...
   delete from acidtbl where a = 1 or a = 3;
   ```
   This case the the plan of the delete statement after ReduceSinkDeDuplication 
looks like:
   ```
   TS[0]-FIL[8]-SEL[2]-RS[5]-SEL[6]-FS[7]
   ```
   So with Tez we have a mapper: TS[0]-FIL[8]-SEL[2]-RS[5]
   and have two reducers each of them has: SEL[6]-FS[7]
   
   RS[5] has
   Partition keys: GenericUDFBridge ==> UDFToInteger (Column[_col0])
   Sort keys: Column[_col0]
   And maxReducers: 2
   
   where _col0 is the row_id coming from SEL[2].
   
   UDFToInteger(<row_id_type>) extracts the bucket_id field which is going to 
be used to generate a `reducesink.key` in the RS operator. This is going to be 
passed to the wrapped `OutputCollector` with the row. This case this is an 
`org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput`. This class 
is part of Tez which I'm not familiar with but I found that this is where rows 
are distributed to reducers by the key coming from RS.
   
   Hive/hadoop also has a setting 
`hive.exec.reducers.max`/`mapreduce.job.reduces`. This limits the maxReducers 
in RS operator. If the table has more buckets than the max reducers then 
FileSink operator also distributes the rows into different files. If I 
understand correctly this is done by the `multiFileSpray` functionality.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 591383)
    Time Spent: 0.5h  (was: 20m)

> Number of reducers limited to fixed 1 when updating/deleting
> ------------------------------------------------------------
>
>                 Key: HIVE-25071
>                 URL: https://issues.apache.org/jira/browse/HIVE-25071
>             Project: Hive
>          Issue Type: Bug
>            Reporter: Krisztian Kasa
>            Assignee: Krisztian Kasa
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> When updating/deleting bucketed tables an extra ReduceSink operator is 
> created to enforce bucketing. After HIVE-22538 number of reducers limited to 
> fixed 1 in these RS operators.
> This can lead to performance degradation.
> Prior HIVE-22538 multiple reducers was available such cases. The reason for 
> limiting the number of reducers is to ensure RowId ascending order in delete 
> delta files produced by the update/delete statements.
> This is the plan of delete statement like:
> {code}
> DELETE FROM t1 WHERE a = 1;
> {code}
> {code}
> TS[0]-FIL[8]-SEL[2]-RS[3]-SEL[4]-RS[5]-SEL[6]-FS[7]
> {code}
> RowId order is ensured by RS[3] and bucketing is enforced by RS[5]: number of 
> reducers were limited to bucket number in the table or 
> hive.exec.reducers.max. However RS[5] does not provide any ordering so above 
> plan may generate unsorted deleted deltas which leads to corrupted data reads.
> Prior HIVE-22538 these RS operators were merged by ReduceSinkDeduplication 
> and the resulting RS kept the ordering and enabled multiple reducers. It 
> could do because ReduceSinkDeduplication was prepared for ACID writes. This 
> was removed by HIVE-22538 to get a more generic ReduceSinkDeduplication.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to