Looks to me like you found a bug. I think the scanners should be checking both cancellation conditions, i.e. RuntimeState::is_cancelled_ for MT and non-MT scanners and hdfs_scan_node::done_ for non-MT scanners.
On Tue, Jan 16, 2018 at 2:48 PM, Quanlong Huang <huang_quanl...@126.com> wrote: > Hi Tim, > > Thanks for your reply! I have a further question. When given MT_DOP=0, why > don't we use RuntimeState::is_cancelled() to detect cancellation in hdfs > scanners? For example, use it in the loop of ProcessSplit. > There might be a scenario that the FragementInstance was canceled, but the > scanner still don't know about it and then go ahead and pass up all the row > batches. If the FragementInstance just consists of an HdfsScanNode, the > DataStreamSender will try to send these row batches to the upstream > FragmentInstance which has been cancelled. Apparently it'll fail but it > will retry for 2 minutes (in default). The memory resources kept by the > DataStreamSender cannot be released in this 2 minutes window, which might > cause other queries in parallel raising MemLimitExceeded error. > > For example, the plan of query "select 1 from alltypessmall a join > alltypessmall b on a.id != b.id" is > +------------------------------------------------------------------------------------+ > | Max Per-Host Resource Reservation: Memory=0B > | > | Per-Host Resource Estimates: Memory=2.06GB > | > | WARNING: The following tables are missing relevant table and/or column > statistics. | > | functional_orc.alltypessmall > | > | > | > | F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 > | > | Per-Host Resources: mem-estimate=0B mem-reservation=0B > | > | PLAN-ROOT SINK > | > | | mem-estimate=0B mem-reservation=0B > | > | | > | > | 04:EXCHANGE [UNPARTITIONED] > | > | mem-estimate=0B mem-reservation=0B > | > | tuple-ids=0,1 row-size=8B cardinality=unavailable > | > | > | > | F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 > | > | Per-Host Resources: mem-estimate=2.03GB mem-reservation=0B > | > | DATASTREAM SINK [FRAGMENT=F02, EXCHANGE=04, UNPARTITIONED] > | > | | mem-estimate=0B mem-reservation=0B > | > | 02:NESTED LOOP JOIN [INNER JOIN, BROADCAST] > | > | | predicates: a.id != b.id > | > | | mem-estimate=2.00GB mem-reservation=0B > | > | | tuple-ids=0,1 row-size=8B cardinality=unavailable > | > | | > | > | |--03:EXCHANGE [BROADCAST] > | > | | mem-estimate=0B mem-reservation=0B > | > | | tuple-ids=1 row-size=4B cardinality=unavailable > | > | | > | > | 00:SCAN HDFS [functional_orc.alltypessmall a, RANDOM] > | > | partitions=4/4 files=4 size=4.82KB > | > | stored statistics: > | > | table: rows=unavailable size=unavailable > | > | partitions: 0/4 rows=unavailable > | > | columns: unavailable > | > | extrapolated-rows=disabled > | > | mem-estimate=32.00MB mem-reservation=0B > | > | tuple-ids=0 row-size=4B cardinality=unavailable > | > | > | > | F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 > | > | Per-Host Resources: mem-estimate=32.00MB mem-reservation=0B > | > | DATASTREAM SINK [FRAGMENT=F00, EXCHANGE=03, BROADCAST] > | > | | mem-estimate=0B mem-reservation=0B > | > | 01:SCAN HDFS [functional_orc.alltypessmall b, RANDOM] > | > | partitions=4/4 files=4 size=4.82KB > | > | stored statistics: > | > | table: rows=unavailable size=unavailable > | > | partitions: 0/4 rows=unavailable > | > | columns: unavailable > | > | extrapolated-rows=disabled > | > | mem-estimate=32.00MB mem-reservation=0B > | > | tuple-ids=1 row-size=4B cardinality=unavailable > | > +------------------------------------------------------------------------------------+ > > When errors happen in F00, cancellation rpc will be sent to F01. However, the > hdfs scanner in F01 does not notice it in time and pass up all the row > batches. Then the DataStreamSender will try to send these row batches to F01. > It will retry for 2 minutes. In this time window it might hold significant > memory resources, which causes other queries cannot allocate memory and fail. > This can be avoid if the hdfs scanner use RuntimeState::is_cancelled() to > detect the cancellation in time. > > Am I right? > > Thanks, > Quanlong > > At 2018-01-17 01:05:57, "Tim Armstrong" <tarmstr...@cloudera.com> wrote: > >ScannerContext::cancelled() == true means that the scan has completed, > >either because it has returned enough rows, because the query is cancelled, > >or because it hit an error. > > > >RuntimeState::cancelled() == true means that the query is cancelled. > > > >So there are cases where ScannerContext::cancelled() == true and > >RuntimeState::cancelled() is false. E.g. where there's a limit on the scan. > > > >I think the name of ScannerContext::cancelled() is misleading, it should > >probably be called "done()" to match HdfsScanNode::done(). More generally, > >the cancellation logic could probably be cleaned up and simplified further. > > > >On Mon, Jan 15, 2018 at 6:20 PM, Quanlong Huang <huang_quanl...@126.com> > >wrote: > > > >> Hi all, > >> > >> > >> I'm confused about the cancellation logic in hdfs scanners. There're two > >> functions to detect cancellation: ScannerContext::cancelled() and > >> RuntimeState::is_cancelled(). > >> When MT_DOP is not set (i.e. MT_DOP=0), ScannerContext::cancelled() will > >> return HdfsScanNode::done(). However, the field done_ in HdfsScanNode seems > >> to be set according to status return from scanners. > >> I've witnessed some points when RuntimeState::is_cancelled() is true but > >> ScannerContext::cancelled() is false. > >> > >> > >> My question is why scanners don't use RuntimeState::is_cancelled() to > >> detect cancellation, which is more timely than using > >> ScannerContext::cancelled(). There must be some detailed reasons that I've > >> missed. Would you be so kind to answer my question? > >> > >> > >> Thanks, > >> Quanlong > > > > >