Abhishek Rawat has posted comments on this change. ( http://gerrit.cloudera.org:8080/13178 )
Change subject: IMPALA-4658: Potential race if compiler reorders ReachedLimit() usage. ...................................................................... Patch Set 4: (12 comments) http://gerrit.cloudera.org:8080/#/c/13178/4//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/13178/4//COMMIT_MSG@17 PS4, Line 17: The getters/modifiers for single threaded code-paths : are: rows_returned(), ReachedLimit(), SetNumRowsReturned(), : IncrementNumRowsReturned(), and DecrementNumRowsReturned(). Thread safe : counterparts are: rows_returned_shared(), ReachedLimitShared(), : IncrementNumRowsReturnedShared(), and DecrementNumRowsReturnedShared(). > nit: this could be more readable if the functions were written to separate Done http://gerrit.cloudera.org:8080/#/c/13178/4//COMMIT_MSG@33 PS4, Line 33: Some serious thoughts were given to other approaches such as : updating the atomic num_rows_returned_ variable outside of a loop, but : such a change would have been very extensive and error prone. > Did you think about creating a separate variable like limit_reached_shared_ The patch#2 was kind of doing something similar basically having rows_returned and rows_returned_shared. In the latest patch#5 the MT nodes use synchronized functions only in the code path shared with non MT nodes. http://gerrit.cloudera.org:8080/#/c/13178/4//COMMIT_MSG@56 PS4, Line 56: . > nit: please wrap commit message at 72 chars Done http://gerrit.cloudera.org:8080/#/c/13178/4/be/src/exec/exec-node.h File be/src/exec/exec-node.h: http://gerrit.cloudera.org:8080/#/c/13178/4/be/src/exec/exec-node.h@211 PS4, Line 211: !IsScanNode() || type() == TPlanNodeType::HBASE_SCAN_NODE : || type() == TPlanNodeType::DATA_SOURCE_NODE I ended up creating following virtual functions and using them in the debug check: LimitCheckedFromMultipleThreads() - returns true if multiple threads check the limit for a given scan node. IsTaskBasedMultiThreadingSupport() - returns true if scan node supports Task Based MT. its generic but i think that should be ok in the current codebase since scan nodes are the only ones which have task based MT. http://gerrit.cloudera.org:8080/#/c/13178/4/be/src/exec/exec-node.h@401 PS4, Line 401: int64_t num_rows_returned_; > Can you add a comment about this variable's usage? It is not too typical th Done. http://gerrit.cloudera.org:8080/#/c/13178/4/be/src/exec/hdfs-scan-node-mt.cc File be/src/exec/hdfs-scan-node-mt.cc: http://gerrit.cloudera.org:8080/#/c/13178/4/be/src/exec/hdfs-scan-node-mt.cc@114 PS4, Line 114: IncrementNumRowsReturnedShared > The comment states that we use Shared versions in MT nodes for the sake of Made changes such that we limit the use of atomic functions to scanner code-paths only http://gerrit.cloudera.org:8080/#/c/13178/4/be/src/exec/hdfs-scan-node.cc File be/src/exec/hdfs-scan-node.cc: http://gerrit.cloudera.org:8080/#/c/13178/4/be/src/exec/hdfs-scan-node.cc@134 PS4, Line 134: // Update the number of materialized rows now instead of when they are materialized. : // This means that scanners might process and queue up more rows than are necessary : // for the limit case but we want to avoid the synchronized writes to : // num_rows_returned_. : IncrementNumRowsReturnedShared(row_batch->num_rows()); > Yeah the comment is a little weird now in that it's justifying using the st Deleted the comment and added new comment. http://gerrit.cloudera.org:8080/#/c/13178/4/be/src/exec/hdfs-scan-node.cc@144 PS4, Line 144: DecrementNumRowsReturnedShared(num_rows_over); > Is this logic racy in that multiple threads could see that they pushed the Tim already responded, this particular codepath is single threaded, but just to add to that we seem to have different flavors of scan nodes: 1. HdfsScanNode/KuduScanNode are single threaded consumers (single consumer multiple producer scenario). They spawn scanner threads which poll the main scan node thread to determine if limit has been reached. The scan node consumes RowBatchs from the RowBatchQueue, which is populated by the various scanner threads. Once the scan node reaches the (scan) limit_ it shuts down the RowBatchQueue and any excess RowBatches in the q basically gets freed. Any excess rows in the final RowBatch won't be consumed since we cap the final RowBatch once we reach the limit_. So we will never undershoot or overshoot the limit_. 2. DataSourceScanNode/HBaseScanNode are single threaded scan nodes. 3. HdfsScanNodeMt/KuduScanNodeMt (Task Based multi-threading): We execute multiple instances (degree determined by optimizer) of plan fragment per node. Each scan node basically has an associated scanner object. The new *_shared*/*Shared* interfaces are used by the HdfsScanNode/KuduScanNode (1 above) and the related spawned scanner (producers) threads. For code simplicity, 3. above also uses the shared functions in code-paths common with non MT scan nodes. When we do move to Task Based multi-threading we can get rid of the shared interfaces. http://gerrit.cloudera.org:8080/#/c/13178/4/be/src/exec/hdfs-scan-node.cc@144 PS4, Line 144: DecrementNumRowsReturnedShared(num_rows_over); > I do not understand the reason for doing this "increment, then decrement if I added a CheckLimitAndTruncateRowBatchIfNeeded and also a CheckLimitAndTruncateRowBatchIfNeededShared version for consistency. Tried to use this new function in as many exec nodes as possible. http://gerrit.cloudera.org:8080/#/c/13178/4/be/src/exec/hdfs-sequence-scanner.cc File be/src/exec/hdfs-sequence-scanner.cc: http://gerrit.cloudera.org:8080/#/c/13178/4/be/src/exec/hdfs-sequence-scanner.cc@295 PS4, Line 295: scan_node_->limit() - scan_node_->rows_returned_shared(); > similar question here about whether this is going to get wrong results arou There are two cases here: - Single scan-node thread and multiple scanner threads: The scanner threads will try to write tuples using max(max_added_tuples, num_to_process) as the limit. If the scan node thread reaches the limit then it will basically just discard the unnecessary RowBatches and cap the final RowBatch as needed. - Single scan-node thread which does the scan using a scanner object. We should not have any issues. http://gerrit.cloudera.org:8080/#/c/13178/4/be/src/exec/kudu-scan-node.cc File be/src/exec/kudu-scan-node.cc: http://gerrit.cloudera.org:8080/#/c/13178/4/be/src/exec/kudu-scan-node.cc@111 PS4, Line 111: int num_rows_over = rows_returned_shared() - limit_; > same concern here This is a single threaded code-path executed by the main (scan-node) thread. http://gerrit.cloudera.org:8080/#/c/13178/4/be/src/exec/sort-node.cc File be/src/exec/sort-node.cc: http://gerrit.cloudera.org:8080/#/c/13178/4/be/src/exec/sort-node.cc@138 PS4, Line 138: IncrementNumRowsReturned(row_batch->num_rows()); : if (ReachedLimit()) { : row_batch->set_num_rows(row_batch->num_rows() - (rows_returned() - limit_)); : *eos = true; : } : : COUNTER_SET(rows_returned_counter_, rows_returned()); > Is it normal that we can leave rows_returned_counter_ > limit_? This seems I think this should be fixed as the counter should reflect the proper number of rows consumed. I have fixed this. -- To view, visit http://gerrit.cloudera.org:8080/13178 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I4cbbfad80f7ab87dd6f192a24e2c68f7c66b047e Gerrit-Change-Number: 13178 Gerrit-PatchSet: 4 Gerrit-Owner: Abhishek Rawat <ara...@cloudera.com> Gerrit-Reviewer: Abhishek Rawat <ara...@cloudera.com> Gerrit-Reviewer: Csaba Ringhofer <csringho...@cloudera.com> Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Gerrit-Reviewer: Tim Armstrong <tarmstr...@cloudera.com> Gerrit-Reviewer: Todd Lipcon <t...@apache.org> Gerrit-Comment-Date: Fri, 03 May 2019 23:22:39 +0000 Gerrit-HasComments: Yes