Would you be able to file a JIRA? On Tue, Jan 16, 2018 at 3:00 PM, Tim Armstrong <tarmstr...@cloudera.com> wrote:
> Looks to me like you found a bug. I think the scanners should be checking > both cancellation conditions, i.e. RuntimeState::is_cancelled_ for MT and > non-MT scanners and hdfs_scan_node::done_ for non-MT scanners. > > On Tue, Jan 16, 2018 at 2:48 PM, Quanlong Huang <huang_quanl...@126.com> > wrote: > >> Hi Tim, >> >> Thanks for your reply! I have a further question. When given MT_DOP=0, >> why don't we use RuntimeState::is_cancelled() to detect cancellation in >> hdfs scanners? For example, use it in the loop of ProcessSplit. >> There might be a scenario that the FragementInstance was canceled, but >> the scanner still don't know about it and then go ahead and pass up all the >> row batches. If the FragementInstance just consists of an HdfsScanNode, the >> DataStreamSender will try to send these row batches to the upstream >> FragmentInstance which has been cancelled. Apparently it'll fail but it >> will retry for 2 minutes (in default). The memory resources kept by the >> DataStreamSender cannot be released in this 2 minutes window, which might >> cause other queries in parallel raising MemLimitExceeded error. >> >> For example, the plan of query "select 1 from alltypessmall a join >> alltypessmall b on a.id != b.id" is >> +------------------------------------------------------------------------------------+ >> | Max Per-Host Resource Reservation: Memory=0B >> | >> | Per-Host Resource Estimates: Memory=2.06GB >> | >> | WARNING: The following tables are missing relevant table and/or column >> statistics. | >> | functional_orc.alltypessmall >> | >> | >> | >> | F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 >> | >> | Per-Host Resources: mem-estimate=0B mem-reservation=0B >> | >> | PLAN-ROOT SINK >> | >> | | mem-estimate=0B mem-reservation=0B >> | >> | | >> | >> | 04:EXCHANGE [UNPARTITIONED] >> | >> | mem-estimate=0B mem-reservation=0B >> | >> | tuple-ids=0,1 row-size=8B cardinality=unavailable >> | >> | >> | >> | F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 >> | >> | Per-Host Resources: mem-estimate=2.03GB mem-reservation=0B >> | >> | DATASTREAM SINK [FRAGMENT=F02, EXCHANGE=04, UNPARTITIONED] >> | >> | | mem-estimate=0B mem-reservation=0B >> | >> | 02:NESTED LOOP JOIN [INNER JOIN, BROADCAST] >> | >> | | predicates: a.id != b.id >> | >> | | mem-estimate=2.00GB mem-reservation=0B >> | >> | | tuple-ids=0,1 row-size=8B cardinality=unavailable >> | >> | | >> | >> | |--03:EXCHANGE [BROADCAST] >> | >> | | mem-estimate=0B mem-reservation=0B >> | >> | | tuple-ids=1 row-size=4B cardinality=unavailable >> | >> | | >> | >> | 00:SCAN HDFS [functional_orc.alltypessmall a, RANDOM] >> | >> | partitions=4/4 files=4 size=4.82KB >> | >> | stored statistics: >> | >> | table: rows=unavailable size=unavailable >> | >> | partitions: 0/4 rows=unavailable >> | >> | columns: unavailable >> | >> | extrapolated-rows=disabled >> | >> | mem-estimate=32.00MB mem-reservation=0B >> | >> | tuple-ids=0 row-size=4B cardinality=unavailable >> | >> | >> | >> | F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 >> | >> | Per-Host Resources: mem-estimate=32.00MB mem-reservation=0B >> | >> | DATASTREAM SINK [FRAGMENT=F00, EXCHANGE=03, BROADCAST] >> | >> | | mem-estimate=0B mem-reservation=0B >> | >> | 01:SCAN HDFS [functional_orc.alltypessmall b, RANDOM] >> | >> | partitions=4/4 files=4 size=4.82KB >> | >> | stored statistics: >> | >> | table: rows=unavailable size=unavailable >> | >> | partitions: 0/4 rows=unavailable >> | >> | columns: unavailable >> | >> | extrapolated-rows=disabled >> | >> | mem-estimate=32.00MB mem-reservation=0B >> | >> | tuple-ids=1 row-size=4B cardinality=unavailable >> | >> +------------------------------------------------------------------------------------+ >> >> When errors happen in F00, cancellation rpc will be sent to F01. However, >> the hdfs scanner in F01 does not notice it in time and pass up all the row >> batches. Then the DataStreamSender will try to send these row batches to >> F01. It will retry for 2 minutes. In this time window it might hold >> significant memory resources, which causes other queries cannot allocate >> memory and fail. This can be avoid if the hdfs scanner use >> RuntimeState::is_cancelled() to detect the cancellation in time. >> >> Am I right? >> >> Thanks, >> Quanlong >> >> At 2018-01-17 01:05:57, "Tim Armstrong" <tarmstr...@cloudera.com> wrote: >> >ScannerContext::cancelled() == true means that the scan has completed, >> >either because it has returned enough rows, because the query is cancelled, >> >or because it hit an error. >> > >> >RuntimeState::cancelled() == true means that the query is cancelled. >> > >> >So there are cases where ScannerContext::cancelled() == true and >> >RuntimeState::cancelled() is false. E.g. where there's a limit on the scan. >> > >> >I think the name of ScannerContext::cancelled() is misleading, it should >> >probably be called "done()" to match HdfsScanNode::done(). More generally, >> >the cancellation logic could probably be cleaned up and simplified further. >> > >> >On Mon, Jan 15, 2018 at 6:20 PM, Quanlong Huang <huang_quanl...@126.com> >> >wrote: >> > >> >> Hi all, >> >> >> >> >> >> I'm confused about the cancellation logic in hdfs scanners. There're two >> >> functions to detect cancellation: ScannerContext::cancelled() and >> >> RuntimeState::is_cancelled(). >> >> When MT_DOP is not set (i.e. MT_DOP=0), ScannerContext::cancelled() will >> >> return HdfsScanNode::done(). However, the field done_ in HdfsScanNode >> >> seems >> >> to be set according to status return from scanners. >> >> I've witnessed some points when RuntimeState::is_cancelled() is true but >> >> ScannerContext::cancelled() is false. >> >> >> >> >> >> My question is why scanners don't use RuntimeState::is_cancelled() to >> >> detect cancellation, which is more timely than using >> >> ScannerContext::cancelled(). There must be some detailed reasons that I've >> >> missed. Would you be so kind to answer my question? >> >> >> >> >> >> Thanks, >> >> Quanlong >> >> >> >> >> > >