Hi Tim, I believe it's a bug and find out a way to reproduce it. Have filed a JIRA: https://issues.apache.org/jira/browse/IMPALA-6423
At 2018-01-17 08:53:44, "Quanlong Huang" <huang_quanl...@126.com> wrote: >Thanks, Tim! Let me try to reproduce this scenario on existing scanners. I'll >file a JIRA when I find it. > >At 2018-01-17 08:39:46, "Tim Armstrong" <tarmstr...@cloudera.com> wrote: >>I think there is still probably a bug in the existing scanners where they >>can ignore cancellation under specific conditions. >> >>> For non-MT scanners, why don't they just check about >>> RuntimeState::is_cancelled()? >>Are there any reasons that they should go ahead until HdfsScanNode::done()? >>I think the non-MT scanners should check both RuntimeState::is_cancelled() >>and HdfsScanNode::done(), since they signal different termination >>conditions. >> >>On Tue, Jan 16, 2018 at 4:09 PM, Quanlong Huang <huang_quanl...@126.com> >>wrote: >> >>> I'm developing the hdfs orc scanner (IMPALA-5717) and encountered such >>> scenario in test_failpoints.py. The existing scanners can pass this test. I >>> think this might be my own problem so I haven't filed a JIRA yet. >>> >>> Just want to confirm that when setting MT_DOP=0, other scanners won't get >>> into this scenario. For non-MT scanners, why don't they just check >>> about RuntimeState::is_cancelled()? Are there any reasons that they >>> should go ahead until HdfsScanNode::done()? >>> >>> At 2018-01-17 07:00:51, "Tim Armstrong" <tarmstr...@cloudera.com> wrote: >>> >>> Looks to me like you found a bug. I think the scanners should be checking >>> both cancellation conditions, i.e. RuntimeState::is_cancelled_ for MT and >>> non-MT scanners and hdfs_scan_node::done_ for non-MT scanners. >>> >>> On Tue, Jan 16, 2018 at 2:48 PM, Quanlong Huang <huang_quanl...@126.com> >>> wrote: >>> >>>> Hi Tim, >>>> >>>> Thanks for your reply! I have a further question. When given MT_DOP=0, >>>> why don't we use RuntimeState::is_cancelled() to detect cancellation in >>>> hdfs scanners? For example, use it in the loop of ProcessSplit. >>>> There might be a scenario that the FragementInstance was canceled, but >>>> the scanner still don't know about it and then go ahead and pass up all the >>>> row batches. If the FragementInstance just consists of an HdfsScanNode, the >>>> DataStreamSender will try to send these row batches to the upstream >>>> FragmentInstance which has been cancelled. Apparently it'll fail but it >>>> will retry for 2 minutes (in default). The memory resources kept by the >>>> DataStreamSender cannot be released in this 2 minutes window, which might >>>> cause other queries in parallel raising MemLimitExceeded error. >>>> >>>> For example, the plan of query "select 1 from alltypessmall a join >>>> alltypessmall b on a.id != b.id" is >>>> +------------------------------------------------------------------------------------+ >>>> | Max Per-Host Resource Reservation: Memory=0B >>>> | >>>> | Per-Host Resource Estimates: Memory=2.06GB >>>> | >>>> | WARNING: The following tables are missing relevant table and/or column >>>> statistics. | >>>> | functional_orc.alltypessmall >>>> | >>>> | >>>> | >>>> | F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 >>>> | >>>> | Per-Host Resources: mem-estimate=0B mem-reservation=0B >>>> | >>>> | PLAN-ROOT SINK >>>> | >>>> | | mem-estimate=0B mem-reservation=0B >>>> | >>>> | | >>>> | >>>> | 04:EXCHANGE [UNPARTITIONED] >>>> | >>>> | mem-estimate=0B mem-reservation=0B >>>> | >>>> | tuple-ids=0,1 row-size=8B cardinality=unavailable >>>> | >>>> | >>>> | >>>> | F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 >>>> | >>>> | Per-Host Resources: mem-estimate=2.03GB mem-reservation=0B >>>> | >>>> | DATASTREAM SINK [FRAGMENT=F02, EXCHANGE=04, UNPARTITIONED] >>>> | >>>> | | mem-estimate=0B mem-reservation=0B >>>> | >>>> | 02:NESTED LOOP JOIN [INNER JOIN, BROADCAST] >>>> | >>>> | | predicates: a.id != b.id >>>> | >>>> | | mem-estimate=2.00GB mem-reservation=0B >>>> | >>>> | | tuple-ids=0,1 row-size=8B cardinality=unavailable >>>> | >>>> | | >>>> | >>>> | |--03:EXCHANGE [BROADCAST] >>>> | >>>> | | mem-estimate=0B mem-reservation=0B >>>> | >>>> | | tuple-ids=1 row-size=4B cardinality=unavailable >>>> | >>>> | | >>>> | >>>> | 00:SCAN HDFS [functional_orc.alltypessmall a, RANDOM] >>>> | >>>> | partitions=4/4 files=4 size=4.82KB >>>> | >>>> | stored statistics: >>>> | >>>> | table: rows=unavailable size=unavailable >>>> | >>>> | partitions: 0/4 rows=unavailable >>>> | >>>> | columns: unavailable >>>> | >>>> | extrapolated-rows=disabled >>>> | >>>> | mem-estimate=32.00MB mem-reservation=0B >>>> | >>>> | tuple-ids=0 row-size=4B cardinality=unavailable >>>> | >>>> | >>>> | >>>> | F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 >>>> | >>>> | Per-Host Resources: mem-estimate=32.00MB mem-reservation=0B >>>> | >>>> | DATASTREAM SINK [FRAGMENT=F00, EXCHANGE=03, BROADCAST] >>>> | >>>> | | mem-estimate=0B mem-reservation=0B >>>> | >>>> | 01:SCAN HDFS [functional_orc.alltypessmall b, RANDOM] >>>> | >>>> | partitions=4/4 files=4 size=4.82KB >>>> | >>>> | stored statistics: >>>> | >>>> | table: rows=unavailable size=unavailable >>>> | >>>> | partitions: 0/4 rows=unavailable >>>> | >>>> | columns: unavailable >>>> | >>>> | extrapolated-rows=disabled >>>> | >>>> | mem-estimate=32.00MB mem-reservation=0B >>>> | >>>> | tuple-ids=1 row-size=4B cardinality=unavailable >>>> | >>>> +------------------------------------------------------------------------------------+ >>>> >>>> When errors happen in F00, cancellation rpc will be sent to F01. However, >>>> the hdfs scanner in F01 does not notice it in time and pass up all the row >>>> batches. Then the DataStreamSender will try to send these row batches to >>>> F01. It will retry for 2 minutes. In this time window it might hold >>>> significant memory resources, which causes other queries cannot allocate >>>> memory and fail. This can be avoid if the hdfs scanner use >>>> RuntimeState::is_cancelled() to detect the cancellation in time. >>>> >>>> Am I right? >>>> >>>> Thanks, >>>> Quanlong >>>> >>>> At 2018-01-17 01:05:57, "Tim Armstrong" <tarmstr...@cloudera.com> wrote: >>>> >ScannerContext::cancelled() == true means that the scan has completed, >>>> >either because it has returned enough rows, because the query is >>>> >cancelled, >>>> >or because it hit an error. >>>> > >>>> >RuntimeState::cancelled() == true means that the query is cancelled. >>>> > >>>> >So there are cases where ScannerContext::cancelled() == true and >>>> >RuntimeState::cancelled() is false. E.g. where there's a limit on the >>>> >scan. >>>> > >>>> >I think the name of ScannerContext::cancelled() is misleading, it should >>>> >probably be called "done()" to match HdfsScanNode::done(). More generally, >>>> >the cancellation logic could probably be cleaned up and simplified >>>> >further. >>>> > >>>> >On Mon, Jan 15, 2018 at 6:20 PM, Quanlong Huang <huang_quanl...@126.com> >>>> >wrote: >>>> > >>>> >> Hi all, >>>> >> >>>> >> >>>> >> I'm confused about the cancellation logic in hdfs scanners. There're two >>>> >> functions to detect cancellation: ScannerContext::cancelled() and >>>> >> RuntimeState::is_cancelled(). >>>> >> When MT_DOP is not set (i.e. MT_DOP=0), ScannerContext::cancelled() will >>>> >> return HdfsScanNode::done(). However, the field done_ in HdfsScanNode >>>> >> seems >>>> >> to be set according to status return from scanners. >>>> >> I've witnessed some points when RuntimeState::is_cancelled() is true but >>>> >> ScannerContext::cancelled() is false. >>>> >> >>>> >> >>>> >> My question is why scanners don't use RuntimeState::is_cancelled() to >>>> >> detect cancellation, which is more timely than using >>>> >> ScannerContext::cancelled(). There must be some detailed reasons that >>>> >> I've >>>> >> missed. Would you be so kind to answer my question? >>>> >> >>>> >> >>>> >> Thanks, >>>> >> Quanlong >>>> >>>> >>>> >>>> >>>> >>> >>> >>> >>> >>>