Would you be able to file a JIRA?

On Tue, Jan 16, 2018 at 3:00 PM, Tim Armstrong <tarmstr...@cloudera.com>
wrote:

> Looks to me like you found a bug. I think the scanners should be checking
> both cancellation conditions, i.e. RuntimeState::is_cancelled_ for MT and
> non-MT scanners and hdfs_scan_node::done_ for non-MT scanners.
>
> On Tue, Jan 16, 2018 at 2:48 PM, Quanlong Huang <huang_quanl...@126.com>
> wrote:
>
>> Hi Tim,
>>
>> Thanks for your reply! I have a further question. When given MT_DOP=0,
>> why don't we use RuntimeState::is_cancelled() to detect cancellation in
>> hdfs scanners? For example, use it in the loop of ProcessSplit.
>> There might be a scenario that the FragementInstance was canceled, but
>> the scanner still don't know about it and then go ahead and pass up all the
>> row batches. If the FragementInstance just consists of an HdfsScanNode, the
>> DataStreamSender will try to send these row batches to the upstream
>> FragmentInstance which has been cancelled. Apparently it'll fail but it
>> will retry for 2 minutes (in default). The memory resources kept by the
>> DataStreamSender cannot be released in this 2 minutes window, which might
>> cause other queries in parallel raising MemLimitExceeded error.
>>
>> For example, the plan of query "select 1 from alltypessmall a join 
>> alltypessmall b on a.id != b.id" is
>> +------------------------------------------------------------------------------------+
>> | Max Per-Host Resource Reservation: Memory=0B                               
>>         |
>> | Per-Host Resource Estimates: Memory=2.06GB                                 
>>         |
>> | WARNING: The following tables are missing relevant table and/or column 
>> statistics. |
>> | functional_orc.alltypessmall                                               
>>         |
>> |                                                                            
>>         |
>> | F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1                      
>>         |
>> | Per-Host Resources: mem-estimate=0B mem-reservation=0B                     
>>         |
>> |   PLAN-ROOT SINK                                                           
>>         |
>> |   |  mem-estimate=0B mem-reservation=0B                                    
>>         |
>> |   |                                                                        
>>         |
>> |   04:EXCHANGE [UNPARTITIONED]                                              
>>         |
>> |      mem-estimate=0B mem-reservation=0B                                    
>>         |
>> |      tuple-ids=0,1 row-size=8B cardinality=unavailable                     
>>         |
>> |                                                                            
>>         |
>> | F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3                             
>>         |
>> | Per-Host Resources: mem-estimate=2.03GB mem-reservation=0B                 
>>         |
>> |   DATASTREAM SINK [FRAGMENT=F02, EXCHANGE=04, UNPARTITIONED]               
>>         |
>> |   |  mem-estimate=0B mem-reservation=0B                                    
>>         |
>> |   02:NESTED LOOP JOIN [INNER JOIN, BROADCAST]                              
>>         |
>> |   |  predicates: a.id != b.id                                              
>>         |
>> |   |  mem-estimate=2.00GB mem-reservation=0B                                
>>         |
>> |   |  tuple-ids=0,1 row-size=8B cardinality=unavailable                     
>>         |
>> |   |                                                                        
>>         |
>> |   |--03:EXCHANGE [BROADCAST]                                               
>>         |
>> |   |     mem-estimate=0B mem-reservation=0B                                 
>>         |
>> |   |     tuple-ids=1 row-size=4B cardinality=unavailable                    
>>         |
>> |   |                                                                        
>>         |
>> |   00:SCAN HDFS [functional_orc.alltypessmall a, RANDOM]                    
>>         |
>> |      partitions=4/4 files=4 size=4.82KB                                    
>>         |
>> |      stored statistics:                                                    
>>         |
>> |        table: rows=unavailable size=unavailable                            
>>         |
>> |        partitions: 0/4 rows=unavailable                                    
>>         |
>> |        columns: unavailable                                                
>>         |
>> |      extrapolated-rows=disabled                                            
>>         |
>> |      mem-estimate=32.00MB mem-reservation=0B                               
>>         |
>> |      tuple-ids=0 row-size=4B cardinality=unavailable                       
>>         |
>> |                                                                            
>>         |
>> | F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3                             
>>         |
>> | Per-Host Resources: mem-estimate=32.00MB mem-reservation=0B                
>>         |
>> |   DATASTREAM SINK [FRAGMENT=F00, EXCHANGE=03, BROADCAST]                   
>>         |
>> |   |  mem-estimate=0B mem-reservation=0B                                    
>>         |
>> |   01:SCAN HDFS [functional_orc.alltypessmall b, RANDOM]                    
>>         |
>> |      partitions=4/4 files=4 size=4.82KB                                    
>>         |
>> |      stored statistics:                                                    
>>         |
>> |        table: rows=unavailable size=unavailable                            
>>         |
>> |        partitions: 0/4 rows=unavailable                                    
>>         |
>> |        columns: unavailable                                                
>>         |
>> |      extrapolated-rows=disabled                                            
>>         |
>> |      mem-estimate=32.00MB mem-reservation=0B                               
>>         |
>> |      tuple-ids=1 row-size=4B cardinality=unavailable                       
>>         |
>> +------------------------------------------------------------------------------------+
>>
>> When errors happen in F00, cancellation rpc will be sent to F01. However, 
>> the hdfs scanner in F01 does not notice it in time and pass up all the row 
>> batches. Then the DataStreamSender will try to send these row batches to 
>> F01. It will retry for 2 minutes. In this time window it might hold 
>> significant memory resources, which causes other queries cannot allocate 
>> memory and fail. This can be avoid if the hdfs scanner use 
>> RuntimeState::is_cancelled() to detect the cancellation in time.
>>
>> Am I right?
>>
>> Thanks,
>> Quanlong
>>
>> At 2018-01-17 01:05:57, "Tim Armstrong" <tarmstr...@cloudera.com> wrote:
>> >ScannerContext::cancelled() == true means that the scan has completed,
>> >either because it has returned enough rows, because the query is cancelled,
>> >or because it hit an error.
>> >
>> >RuntimeState::cancelled() == true means that the query is cancelled.
>> >
>> >So there are cases where ScannerContext::cancelled() == true and
>> >RuntimeState::cancelled() is false. E.g. where there's a limit on the scan.
>> >
>> >I think the name of ScannerContext::cancelled() is misleading, it should
>> >probably be called "done()" to match HdfsScanNode::done(). More generally,
>> >the cancellation logic could probably be cleaned up and simplified further.
>> >
>> >On Mon, Jan 15, 2018 at 6:20 PM, Quanlong Huang <huang_quanl...@126.com>
>> >wrote:
>> >
>> >> Hi all,
>> >>
>> >>
>> >> I'm confused about the cancellation logic in hdfs scanners. There're two
>> >> functions to detect cancellation: ScannerContext::cancelled() and
>> >> RuntimeState::is_cancelled().
>> >> When MT_DOP is not set (i.e. MT_DOP=0), ScannerContext::cancelled() will
>> >> return HdfsScanNode::done(). However, the field done_ in HdfsScanNode 
>> >> seems
>> >> to be set according to status return from scanners.
>> >> I've witnessed some points when RuntimeState::is_cancelled() is true but
>> >> ScannerContext::cancelled() is false.
>> >>
>> >>
>> >> My question is why scanners don't use RuntimeState::is_cancelled() to
>> >> detect cancellation, which is more timely than using
>> >> ScannerContext::cancelled(). There must be some detailed reasons that I've
>> >> missed. Would you be so kind to answer my question?
>> >>
>> >>
>> >> Thanks,
>> >> Quanlong
>>
>>
>>
>>
>>
>
>

Reply via email to