[ 
https://issues.apache.org/jira/browse/HIVE-27788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17776189#comment-17776189
 ] 

Krisztian Kasa commented on HIVE-27788:
---------------------------------------

Another repro with 3 records and inner join
{code}
set hive.optimize.semijoin.conversion = false;

CREATE TABLE tbl1_n5(key int, value string) CLUSTERED BY (key) SORTED BY (key) 
INTO 2 BUCKETS;

insert into tbl1_n5(key, value)
values
(0, 'val_0'),
(2, 'val_2'),
(9, 'val_9');

explain
SELECT t1.key from
(SELECT  key , row_number() over(partition by key order by value desc) as rk 
from tbl1_n5) t1
join
( SELECT key,count(distinct value) as cp_count from tbl1_n5 group by key) t2
on t1.key = t2.key where rk = 1;
{code}
{code}
POSTHOOK: query: explain
SELECT t1.key from
(SELECT  key , row_number() over(partition by key order by value desc) as rk 
from tbl1_n5) t1
join
( SELECT key,count(distinct value) as cp_count from tbl1_n5 group by key) t2
on t1.key = t2.key where rk = 1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl1_n5
#### A masked pattern was here ####
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-1
    Tez
#### A masked pattern was here ####
      Edges:
        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE)
#### A masked pattern was here ####
      Vertices:
        Map 1 
            Map Operator Tree:
                TableScan
                  alias: tbl1_n5
                  filterExpr: key is not null (type: boolean)
                  Statistics: Num rows: 3 Data size: 279 Basic stats: COMPLETE 
Column stats: COMPLETE
                  Filter Operator
                    predicate: key is not null (type: boolean)
                    Statistics: Num rows: 3 Data size: 279 Basic stats: 
COMPLETE Column stats: COMPLETE
                    Reduce Output Operator
                      key expressions: key (type: int), value (type: string)
                      null sort order: aa
                      sort order: +-
                      Map-reduce partition columns: key (type: int)
                      Statistics: Num rows: 3 Data size: 279 Basic stats: 
COMPLETE Column stats: COMPLETE
            Execution mode: vectorized, llap
            LLAP IO: all inputs
        Map 3 
            Map Operator Tree:
                TableScan
                  alias: tbl1_n5
                  filterExpr: key is not null (type: boolean)
                  Statistics: Num rows: 3 Data size: 279 Basic stats: COMPLETE 
Column stats: COMPLETE
                  Filter Operator
                    predicate: key is not null (type: boolean)
                    Statistics: Num rows: 3 Data size: 279 Basic stats: 
COMPLETE Column stats: COMPLETE
                    Group By Operator
                      keys: key (type: int), value (type: string)
                      minReductionHashAggr: 0.4
                      mode: hash
                      outputColumnNames: _col0, _col1
                      Statistics: Num rows: 3 Data size: 279 Basic stats: 
COMPLETE Column stats: COMPLETE
                      Reduce Output Operator
                        key expressions: _col0 (type: int), _col1 (type: string)
                        null sort order: zz
                        sort order: ++
                        Map-reduce partition columns: _col0 (type: int)
                        Statistics: Num rows: 3 Data size: 279 Basic stats: 
COMPLETE Column stats: COMPLETE
            Execution mode: vectorized, llap
            LLAP IO: all inputs
        Reducer 2 
            Reduce Operator Tree:
              Group By Operator
                keys: KEY._col0 (type: int), KEY._col1 (type: string)
                mode: mergepartial
                outputColumnNames: _col0, _col1
                Statistics: Num rows: 3 Data size: 279 Basic stats: COMPLETE 
Column stats: COMPLETE
                Select Operator
                  expressions: _col0 (type: int)
                  outputColumnNames: _col0
                  Statistics: Num rows: 3 Data size: 279 Basic stats: COMPLETE 
Column stats: COMPLETE
                  Group By Operator
                    keys: _col0 (type: int)
                    mode: complete
                    outputColumnNames: _col0
                    Statistics: Num rows: 3 Data size: 12 Basic stats: COMPLETE 
Column stats: COMPLETE
                    Dummy Store
            Execution mode: llap
            Reduce Operator Tree:
              Select Operator
                expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 
(type: string)
                outputColumnNames: _col0, _col1
                Statistics: Num rows: 3 Data size: 279 Basic stats: COMPLETE 
Column stats: COMPLETE
                PTF Operator
                  Function definitions:
                      Input definition
                        input alias: ptf_0
                        output shape: _col0: int, _col1: string
                        type: WINDOWING
                      Windowing table definition
                        input alias: ptf_1
                        name: windowingtablefunction
                        order by: _col1 DESC NULLS FIRST
                        partition by: _col0
                        raw input shape:
                        window functions:
                            window function definition
                              alias: row_number_window_0
                              name: row_number
                              window function: GenericUDAFRowNumberEvaluator
                              window frame: ROWS PRECEDING(MAX)~FOLLOWING(MAX)
                              isPivotResult: true
                  Statistics: Num rows: 3 Data size: 279 Basic stats: COMPLETE 
Column stats: COMPLETE
                  Filter Operator
                    predicate: (row_number_window_0 = 1) (type: boolean)
                    Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE 
Column stats: COMPLETE
                    Select Operator
                      expressions: _col0 (type: int)
                      outputColumnNames: _col0
                      Statistics: Num rows: 1 Data size: 4 Basic stats: 
COMPLETE Column stats: COMPLETE
                      Merge Join Operator
                        condition map:
                             Inner Join 0 to 1
                        keys:
                          0 _col0 (type: int)
                          1 _col0 (type: int)
                        outputColumnNames: _col0
                        Statistics: Num rows: 1 Data size: 4 Basic stats: 
COMPLETE Column stats: COMPLETE
                        File Output Operator
                          compressed: false
                          Statistics: Num rows: 1 Data size: 4 Basic stats: 
COMPLETE Column stats: COMPLETE
                          table:
                              input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
                              output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                              serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink
{code}

The plan has a {{Merge Join Operator}} in {{Reducer 2}}. {{Reducer 2}} has two 
operator trees: one for each join branches.
The first tree
{code}
GBY-SEL-GBY-DS
{code}
belongs to the subquery
{code}
SELECT key,count(distinct value) as cp_count from tbl1_n5 group by key
{code}
The aggregate call
{code}
count(distinct value)
{code}
needs two group by operators
* the first one ensures the distinct values of the {{value}} column.
* the second aggregates by the {{key}} column
The second tree does the other branch of the join with the PTF operator and the 
join itself.

Currently Merge Join Operator doesn't support more than one Group by operators 
in the same branch and in the same Reducer/Mapper instance because Group by 
operators doesn't forward the records to child operator immediately but buffers 
one output row and forwards it only when an input row comes with a different 
key value than the currently buffered.
Since we have two GBYs in this case 2 rows are buffered. When the underlying 
datasource run out of records the buffered records in GBYs has to be flushed 
https://github.com/apache/hive/blob/b6847ed38b7d32586ab22e224904867f159b510e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java#L268-L270
In this case both buffered records are forwarded to Merge Join Operator but the 
operator couldn't process any records from the other side of the join and it 
leads to the exception mentioned in the description.
Example key values from both sides of the join:
{code}
left 0 2 9
right 0 2 9
{code}
1. Join reads 0 from right.
2. Join [starts reading 
records|https://github.com/apache/hive/blob/b6847ed38b7d32586ab22e224904867f159b510e/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java#L484-L488]
 from the left but GBY operators buffer records with key values 0 and 2 and no 
records forwarded to join.
3. Join is still reading from left because join [key is 
matching|https://github.com/apache/hive/blob/b6847ed38b7d32586ab22e224904867f159b510e/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java#L268-L269].
 When 9 is read 0 is forwarded to the join from the left tree (2nd gby forwards 
its buffered record) and it is still a match.
4. Join tries to read from left again but source is empty hence [flush left 
operator 
tree|https://github.com/apache/hive/blob/b6847ed38b7d32586ab22e224904867f159b510e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java#L268-L270]
5. The 1st GBY forwards 9 hence 2nd GBY forwards 2 to the Join and Join founds 
a key mismatch hence [sets the flag left has a 
mismatch|https://github.com/apache/hive/blob/b6847ed38b7d32586ab22e224904867f159b510e/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java#L271-L276]
6. 2nd GBY flush its buffered record with key 9 because flush is a recursive 
call and Join gets 9 from the left but it didn't had a chance to read anything 
from right. Exception is thrown.

IMHO operator tree in a mapper/reducer having merge join shouldn't have branch 
with more than one GBY operator.
A possible solution is to disable SMB join conversion in such cases:
https://github.com/apache/hive/blob/b6847ed38b7d32586ab22e224904867f159b510e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java#L498

> Exception in Sort Merge join with Group By + PTF Operator
> ---------------------------------------------------------
>
>                 Key: HIVE-27788
>                 URL: https://issues.apache.org/jira/browse/HIVE-27788
>             Project: Hive
>          Issue Type: Bug
>          Components: Operators
>    Affects Versions: 4.0.0-beta-1
>            Reporter: Riju Trivedi
>            Priority: Major
>         Attachments: auto_sortmerge_join_17.q
>
>
> Sort- merge join with Group By + PTF operator leads  to Runtime exception 
> {code:java}
> Caused by: java.lang.RuntimeException: 
> org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while 
> processing row
>       at 
> org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource.pushRecord(ReduceRecordSource.java:313)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor.run(ReduceRecordProcessor.java:291)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:293)
>       ... 15 more
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime 
> Error while processing row
>       at 
> org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource$GroupIterator.next(ReduceRecordSource.java:387)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource.pushRecord(ReduceRecordSource.java:303)
>       ... 17 more
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: 
> org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: 
> org.apache.hadoop.hive.ql.metadata.HiveException: 
> org.apache.hadoop.hive.ql.metadata.HiveException: Attempting to overwrite 
> nextKeyWritables[1]
>       at 
> org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator.joinOneGroup(CommonMergeJoinOperator.java:392)
>       at 
> org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator.joinOneGroup(CommonMergeJoinOperator.java:372)
>       at 
> org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator.process(CommonMergeJoinOperator.java:316)
>       at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:888)
>       at 
> org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:94)
>       at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:888)
>       at 
> org.apache.hadoop.hive.ql.exec.FilterOperator.process(FilterOperator.java:127)
>       at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:888)
>       at 
> org.apache.hadoop.hive.ql.exec.PTFOperator$PTFInvocation.handleOutputRows(PTFOperator.java:337)
>       at 
> org.apache.hadoop.hive.ql.exec.PTFOperator$PTFInvocation.processRow(PTFOperator.java:325)
>       at 
> org.apache.hadoop.hive.ql.exec.PTFOperator.process(PTFOperator.java:139)
>       at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:888)
>       at 
> org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:94)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource$GroupIterator.next(ReduceRecordSource.java:372)
>       ... 18 more
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: 
> java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: 
> org.apache.hadoop.hive.ql.metadata.HiveException: Attempting to overwrite 
> nextKeyWritables[1]
>       at 
> org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator.fetchOneRow(CommonMergeJoinOperator.java:534)
>       at 
> org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator.fetchNextGroup(CommonMergeJoinOperator.java:488)
>       at 
> org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator.joinOneGroup(CommonMergeJoinOperator.java:390)
>       ... 31 more
> Caused by: java.lang.RuntimeException: 
> org.apache.hadoop.hive.ql.metadata.HiveException: 
> org.apache.hadoop.hive.ql.metadata.HiveException: Attempting to overwrite 
> nextKeyWritables[1]
>       at 
> org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource.pushRecord(ReduceRecordSource.java:313)
>       at 
> org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator.fetchOneRow(CommonMergeJoinOperator.java:522)
>       ... 33 more {code}
> Issue can be reproduced with [^auto_sortmerge_join_17.q]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to