github-actions[bot] commented on code in PR #34752:
URL: https://github.com/apache/doris/pull/34752#discussion_r1598017674
##########
be/src/pipeline/pipeline_task.cpp:
##########
@@ -197,6 +198,55 @@ void PipelineTask::set_task_queue(TaskQueue* task_queue) {
_task_queue = task_queue;
}
+bool PipelineTask::_wait_to_start() {
+ // Before task starting, we should make sure
+ // 1. Execution dependency is ready (which is controlled by FE 2-phase
commit)
+ // 2. Runtime filter dependencies are ready
+ _blocked_dep = _execution_dep->is_blocked_by(this);
+ if (_blocked_dep != nullptr) {
+ static_cast<Dependency*>(_blocked_dep)->start_watcher();
+ return true;
+ }
+
+ for (auto* op_dep : _filter_dependencies) {
+ _blocked_dep = op_dep->is_blocked_by(this);
+ if (_blocked_dep != nullptr) {
+ _blocked_dep->start_watcher();
+ return true;
+ }
+ }
+ return false;
+}
+
+bool PipelineTask::_is_blocked() {
Review Comment:
warning: method '_is_blocked' can be made const
[readability-make-member-function-const]
```suggestion
bool PipelineTask::_is_blocked() const {
```
be/src/pipeline/pipeline_task.h:228:
```diff
- bool _is_blocked();
+ bool _is_blocked() const;
```
##########
be/src/pipeline/pipeline_task.cpp:
##########
@@ -197,6 +198,55 @@
_task_queue = task_queue;
}
+bool PipelineTask::_wait_to_start() {
+ // Before task starting, we should make sure
+ // 1. Execution dependency is ready (which is controlled by FE 2-phase
commit)
+ // 2. Runtime filter dependencies are ready
+ _blocked_dep = _execution_dep->is_blocked_by(this);
+ if (_blocked_dep != nullptr) {
+ static_cast<Dependency*>(_blocked_dep)->start_watcher();
+ return true;
+ }
+
+ for (auto* op_dep : _filter_dependencies) {
+ _blocked_dep = op_dep->is_blocked_by(this);
+ if (_blocked_dep != nullptr) {
+ _blocked_dep->start_watcher();
+ return true;
+ }
+ }
+ return false;
+}
+
+bool PipelineTask::_is_blocked() {
+ // `_dry_run = true` means we do not need data from source operator.
+ if (!_dry_run) {
+ for (int i = _read_dependencies.size() - 1; i >= 0; i--) {
+ // `_read_dependencies` is organized according to operators. For
each operator, running condition is met iff all dependencies are ready.
+ for (auto* dep : _read_dependencies[i]) {
+ _blocked_dep = dep->is_blocked_by(this);
+ if (_blocked_dep != nullptr) {
+ _blocked_dep->start_watcher();
+ return true;
+ }
+ }
+ // If all dependencies are ready for this operator, we can execute
this task if no datum is needed from upstream operators.
+ if (!_operators[i]->need_more_input_data(_state)) {
+ break;
+ }
+ }
+ }
+
+ for (auto* op_dep : _write_dependencies) {
+ _blocked_dep = op_dep->is_blocked_by(this);
+ if (_blocked_dep != nullptr) {
+ _blocked_dep->start_watcher();
+ return true;
+ }
+ }
+ return false;
+}
+
Status PipelineTask::execute(bool* eos) {
Review Comment:
warning: function 'execute' has cognitive complexity of 57 (threshold 50)
[readability-function-cognitive-complexity]
```cpp
Status PipelineTask::execute(bool* eos) {
^
```
<details>
<summary>Additional context</summary>
**be/src/pipeline/pipeline_task.cpp:254:** +1, including nesting penalty of
0, nesting level increased to 1
```cpp
if (_eos) {
^
```
**be/src/pipeline/pipeline_task.cpp:273:** +1, including nesting penalty of
0, nesting level increased to 1
```cpp
if (_wait_to_start()) {
^
```
**be/src/pipeline/pipeline_task.cpp:277:** +1, including nesting penalty of
0, nesting level increased to 1
```cpp
if (!_opened) {
^
```
**be/src/pipeline/pipeline_task.cpp:278:** +2, including nesting penalty of
1, nesting level increased to 2
```cpp
RETURN_IF_ERROR(_open());
^
```
**be/src/common/status.h:570:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/pipeline_task.cpp:278:** +3, including nesting penalty of
2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_open());
^
```
**be/src/common/status.h:572:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/pipeline_task.cpp:281:** +1, including nesting penalty of
0, nesting level increased to 1
```cpp
while (!_fragment_context->is_canceled()) {
^
```
**be/src/pipeline/pipeline_task.cpp:282:** +2, including nesting penalty of
1, nesting level increased to 2
```cpp
if (_is_blocked()) {
^
```
**be/src/pipeline/pipeline_task.cpp:289:** +2, including nesting penalty of
1, nesting level increased to 2
```cpp
if (_fragment_context->is_canceled()) {
^
```
**be/src/pipeline/pipeline_task.cpp:293:** +2, including nesting penalty of
1, nesting level increased to 2
```cpp
if (time_spent > THREAD_TIME_SLICE) {
^
```
**be/src/pipeline/pipeline_task.cpp:301:** +2, including nesting penalty of
1, nesting level increased to 2
```cpp
if (should_revoke_memory(_state, sink_revocable_mem_size)) {
^
```
**be/src/pipeline/pipeline_task.cpp:302:** +3, including nesting penalty of
2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_sink->revoke_memory(_state));
^
```
**be/src/common/status.h:570:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/pipeline_task.cpp:302:** +4, including nesting penalty of
3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(_sink->revoke_memory(_state));
^
```
**be/src/common/status.h:572:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/pipeline_task.cpp:308:** +2, including nesting penalty of
1, nesting level increased to 2
```cpp
if (!_dry_run) {
^
```
**be/src/pipeline/pipeline_task.cpp:312:** +3, including nesting penalty of
2, nesting level increased to 3
```cpp
RETURN_IF_ERROR(_root->get_block_after_projects(_state,
block, eos));
^
```
**be/src/common/status.h:570:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/pipeline_task.cpp:312:** +4, including nesting penalty of
3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(_root->get_block_after_projects(_state,
block, eos));
^
```
**be/src/common/status.h:572:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/pipeline_task.cpp:313:** +3, including nesting penalty of
2, nesting level increased to 3
```cpp
} catch (const Exception& e) {
^
```
**be/src/pipeline/pipeline_task.cpp:317:** +1, nesting level increased to 2
```cpp
} else {
^
```
**be/src/pipeline/pipeline_task.cpp:322:** +2, including nesting penalty of
1, nesting level increased to 2
```cpp
if (_block->rows() != 0 || *eos) {
^
```
**be/src/pipeline/pipeline_task.cpp:326:** +3, including nesting penalty of
2, nesting level increased to 3
```cpp
if (!status.is<ErrorCode::END_OF_FILE>()) {
^
```
**be/src/pipeline/pipeline_task.cpp:327:** +4, including nesting penalty of
3, nesting level increased to 4
```cpp
RETURN_IF_ERROR(status);
^
```
**be/src/common/status.h:570:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/pipeline/pipeline_task.cpp:327:** +5, including nesting penalty of
4, nesting level increased to 5
```cpp
RETURN_IF_ERROR(status);
^
```
**be/src/common/status.h:572:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/pipeline/pipeline_task.cpp:329:** +3, including nesting penalty of
2, nesting level increased to 3
```cpp
*eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos;
^
```
**be/src/pipeline/pipeline_task.cpp:330:** +3, including nesting penalty of
2, nesting level increased to 3
```cpp
if (*eos) { // just return, the scheduler will do finish work
^
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]