Hi Tim, I believe it's a bug and find out a way to reproduce it. 
Have filed a JIRA: https://issues.apache.org/jira/browse/IMPALA-6423

At 2018-01-17 08:53:44, "Quanlong Huang" <huang_quanl...@126.com> wrote:
>Thanks, Tim! Let me try to reproduce this scenario on existing scanners. I'll 
>file a JIRA when I find it. 
>
>At 2018-01-17 08:39:46, "Tim Armstrong" <tarmstr...@cloudera.com> wrote:
>>I think there is still probably a bug in the existing scanners where they
>>can ignore cancellation under specific conditions.
>>
>>> For non-MT scanners, why don't they just check about 
>>> RuntimeState::is_cancelled()?
>>Are there any reasons that they should go ahead until HdfsScanNode::done()?
>>I think the non-MT scanners should check both RuntimeState::is_cancelled()
>>and HdfsScanNode::done(), since they signal different termination
>>conditions.
>>
>>On Tue, Jan 16, 2018 at 4:09 PM, Quanlong Huang <huang_quanl...@126.com>
>>wrote:
>>
>>> I'm developing the hdfs orc scanner (IMPALA-5717) and encountered such
>>> scenario in test_failpoints.py. The existing scanners can pass this test. I
>>> think this might be my own problem so I haven't filed a JIRA yet.
>>>
>>> Just want to confirm that when setting MT_DOP=0, other scanners won't get
>>> into this scenario. For non-MT scanners, why don't they just check
>>> about RuntimeState::is_cancelled()? Are there any reasons that they
>>> should go ahead until HdfsScanNode::done()?
>>>
>>> At 2018-01-17 07:00:51, "Tim Armstrong" <tarmstr...@cloudera.com> wrote:
>>>
>>> Looks to me like you found a bug. I think the scanners should be checking
>>> both cancellation conditions, i.e. RuntimeState::is_cancelled_ for MT and
>>> non-MT scanners and hdfs_scan_node::done_ for non-MT scanners.
>>>
>>> On Tue, Jan 16, 2018 at 2:48 PM, Quanlong Huang <huang_quanl...@126.com>
>>> wrote:
>>>
>>>> Hi Tim,
>>>>
>>>> Thanks for your reply! I have a further question. When given MT_DOP=0,
>>>> why don't we use RuntimeState::is_cancelled() to detect cancellation in
>>>> hdfs scanners? For example, use it in the loop of ProcessSplit.
>>>> There might be a scenario that the FragementInstance was canceled, but
>>>> the scanner still don't know about it and then go ahead and pass up all the
>>>> row batches. If the FragementInstance just consists of an HdfsScanNode, the
>>>> DataStreamSender will try to send these row batches to the upstream
>>>> FragmentInstance which has been cancelled. Apparently it'll fail but it
>>>> will retry for 2 minutes (in default). The memory resources kept by the
>>>> DataStreamSender cannot be released in this 2 minutes window, which might
>>>> cause other queries in parallel raising MemLimitExceeded error.
>>>>
>>>> For example, the plan of query "select 1 from alltypessmall a join 
>>>> alltypessmall b on a.id != b.id" is
>>>> +------------------------------------------------------------------------------------+
>>>> | Max Per-Host Resource Reservation: Memory=0B                             
>>>>           |
>>>> | Per-Host Resource Estimates: Memory=2.06GB                               
>>>>           |
>>>> | WARNING: The following tables are missing relevant table and/or column 
>>>> statistics. |
>>>> | functional_orc.alltypessmall                                             
>>>>           |
>>>> |                                                                          
>>>>           |
>>>> | F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1                    
>>>>           |
>>>> | Per-Host Resources: mem-estimate=0B mem-reservation=0B                   
>>>>           |
>>>> |   PLAN-ROOT SINK                                                         
>>>>           |
>>>> |   |  mem-estimate=0B mem-reservation=0B                                  
>>>>           |
>>>> |   |                                                                      
>>>>           |
>>>> |   04:EXCHANGE [UNPARTITIONED]                                            
>>>>           |
>>>> |      mem-estimate=0B mem-reservation=0B                                  
>>>>           |
>>>> |      tuple-ids=0,1 row-size=8B cardinality=unavailable                   
>>>>           |
>>>> |                                                                          
>>>>           |
>>>> | F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3                           
>>>>           |
>>>> | Per-Host Resources: mem-estimate=2.03GB mem-reservation=0B               
>>>>           |
>>>> |   DATASTREAM SINK [FRAGMENT=F02, EXCHANGE=04, UNPARTITIONED]             
>>>>           |
>>>> |   |  mem-estimate=0B mem-reservation=0B                                  
>>>>           |
>>>> |   02:NESTED LOOP JOIN [INNER JOIN, BROADCAST]                            
>>>>           |
>>>> |   |  predicates: a.id != b.id                                            
>>>>           |
>>>> |   |  mem-estimate=2.00GB mem-reservation=0B                              
>>>>           |
>>>> |   |  tuple-ids=0,1 row-size=8B cardinality=unavailable                   
>>>>           |
>>>> |   |                                                                      
>>>>           |
>>>> |   |--03:EXCHANGE [BROADCAST]                                             
>>>>           |
>>>> |   |     mem-estimate=0B mem-reservation=0B                               
>>>>           |
>>>> |   |     tuple-ids=1 row-size=4B cardinality=unavailable                  
>>>>           |
>>>> |   |                                                                      
>>>>           |
>>>> |   00:SCAN HDFS [functional_orc.alltypessmall a, RANDOM]                  
>>>>           |
>>>> |      partitions=4/4 files=4 size=4.82KB                                  
>>>>           |
>>>> |      stored statistics:                                                  
>>>>           |
>>>> |        table: rows=unavailable size=unavailable                          
>>>>           |
>>>> |        partitions: 0/4 rows=unavailable                                  
>>>>           |
>>>> |        columns: unavailable                                              
>>>>           |
>>>> |      extrapolated-rows=disabled                                          
>>>>           |
>>>> |      mem-estimate=32.00MB mem-reservation=0B                             
>>>>           |
>>>> |      tuple-ids=0 row-size=4B cardinality=unavailable                     
>>>>           |
>>>> |                                                                          
>>>>           |
>>>> | F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3                           
>>>>           |
>>>> | Per-Host Resources: mem-estimate=32.00MB mem-reservation=0B              
>>>>           |
>>>> |   DATASTREAM SINK [FRAGMENT=F00, EXCHANGE=03, BROADCAST]                 
>>>>           |
>>>> |   |  mem-estimate=0B mem-reservation=0B                                  
>>>>           |
>>>> |   01:SCAN HDFS [functional_orc.alltypessmall b, RANDOM]                  
>>>>           |
>>>> |      partitions=4/4 files=4 size=4.82KB                                  
>>>>           |
>>>> |      stored statistics:                                                  
>>>>           |
>>>> |        table: rows=unavailable size=unavailable                          
>>>>           |
>>>> |        partitions: 0/4 rows=unavailable                                  
>>>>           |
>>>> |        columns: unavailable                                              
>>>>           |
>>>> |      extrapolated-rows=disabled                                          
>>>>           |
>>>> |      mem-estimate=32.00MB mem-reservation=0B                             
>>>>           |
>>>> |      tuple-ids=1 row-size=4B cardinality=unavailable                     
>>>>           |
>>>> +------------------------------------------------------------------------------------+
>>>>
>>>> When errors happen in F00, cancellation rpc will be sent to F01. However, 
>>>> the hdfs scanner in F01 does not notice it in time and pass up all the row 
>>>> batches. Then the DataStreamSender will try to send these row batches to 
>>>> F01. It will retry for 2 minutes. In this time window it might hold 
>>>> significant memory resources, which causes other queries cannot allocate 
>>>> memory and fail. This can be avoid if the hdfs scanner use 
>>>> RuntimeState::is_cancelled() to detect the cancellation in time.
>>>>
>>>> Am I right?
>>>>
>>>> Thanks,
>>>> Quanlong
>>>>
>>>> At 2018-01-17 01:05:57, "Tim Armstrong" <tarmstr...@cloudera.com> wrote:
>>>> >ScannerContext::cancelled() == true means that the scan has completed,
>>>> >either because it has returned enough rows, because the query is 
>>>> >cancelled,
>>>> >or because it hit an error.
>>>> >
>>>> >RuntimeState::cancelled() == true means that the query is cancelled.
>>>> >
>>>> >So there are cases where ScannerContext::cancelled() == true and
>>>> >RuntimeState::cancelled() is false. E.g. where there's a limit on the 
>>>> >scan.
>>>> >
>>>> >I think the name of ScannerContext::cancelled() is misleading, it should
>>>> >probably be called "done()" to match HdfsScanNode::done(). More generally,
>>>> >the cancellation logic could probably be cleaned up and simplified 
>>>> >further.
>>>> >
>>>> >On Mon, Jan 15, 2018 at 6:20 PM, Quanlong Huang <huang_quanl...@126.com>
>>>> >wrote:
>>>> >
>>>> >> Hi all,
>>>> >>
>>>> >>
>>>> >> I'm confused about the cancellation logic in hdfs scanners. There're two
>>>> >> functions to detect cancellation: ScannerContext::cancelled() and
>>>> >> RuntimeState::is_cancelled().
>>>> >> When MT_DOP is not set (i.e. MT_DOP=0), ScannerContext::cancelled() will
>>>> >> return HdfsScanNode::done(). However, the field done_ in HdfsScanNode 
>>>> >> seems
>>>> >> to be set according to status return from scanners.
>>>> >> I've witnessed some points when RuntimeState::is_cancelled() is true but
>>>> >> ScannerContext::cancelled() is false.
>>>> >>
>>>> >>
>>>> >> My question is why scanners don't use RuntimeState::is_cancelled() to
>>>> >> detect cancellation, which is more timely than using
>>>> >> ScannerContext::cancelled(). There must be some detailed reasons that 
>>>> >> I've
>>>> >> missed. Would you be so kind to answer my question?
>>>> >>
>>>> >>
>>>> >> Thanks,
>>>> >> Quanlong
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>>
>>>

Reply via email to