github-actions[bot] commented on PR #61768: URL: https://github.com/apache/doris/pull/61768#issuecomment-4152238319
## Code Review Summary **Overall assessment: This PR is a well-reasoned refactor with a genuine bug fix. No blocking issues found.** ### Critical Checkpoint Conclusions **1. Goal and correctness:** The PR has two goals: (a) rename `terminate()` to `unblock_all_dependencies()` to clarify that operator-level termination must happen on the worker thread, and (b) fix a bug in `set_thread_id()` where `_core_change_times` was never incremented. Both goals are achieved correctly. The test is updated accordingly. **2. Modification scope:** The change is focused and minimal — rename + documentation + two small fixes (`set_thread_id` ordering, `_wake_by` atomicity). No unnecessary changes. **3. Concurrency analysis:** - `_wake_by` changed to `std::atomic<int>` — correct. It is written in `set_wake_up_early()` (potentially from any thread) and read in `debug_string()`. Making it atomic eliminates a data race on a diagnostic variable. - The loop merge in `Pipeline::make_all_runnable()` (two loops → one) is **safe**: the seq_cst ordering argument in the comments is per-task, not cross-task. Each task's execution thread only observes its own `_wake_up_early` and `_always_ready`, so the only required ordering is that `set_wake_up_early()` precedes `unblock_all_dependencies()` for the same task, which is preserved. - The compound write in `set_wake_up_early()` (`_wake_up_early` + `_wake_by` not atomic together) is benign because `_wake_by` is only used for debug logging. **4. set_thread_id bug fix:** The old code was: `_thread_id = thread_id; if (thread_id != _thread_id)` — the comparison was always false after the assignment, so `_core_change_times` was never incremented. The fix correctly moves the assignment inside the `if` block. Good catch. **5. Lifecycle management:** No new lifecycle concerns introduced. The `unblock_all_dependencies()` method has the same locking and lifecycle checks as the old `terminate()`. **6. Parallel code paths:** All call sites of the old `terminate()` are updated: `pipeline.cpp`, `pipeline_fragment_context.cpp`, `pipeline_task.cpp` (4 locations), and the test. No missed call sites. **7. Test coverage:** The existing test (`TEST_TERMINATE`) is updated to use `unblock_all_dependencies()`. The test correctly validates that operator-level termination still happens via the `execute()` path (`_terminated` flags are checked on both the operator and sink). **8. Incompatible changes:** None — this is purely internal refactoring with no ABI/protocol impact. **9. Configuration / Observability:** No new configs or metrics. The `_core_change_times` counter fix improves observability accuracy. **10. Performance:** The loop merge eliminates one redundant iteration over `_tasks` — minor but positive. ### Minor observations (non-blocking, pre-existing): - `THROW_IF_ERROR` inside `Defer` blocks in `execute()` and `do_revoke_memory()` (for operator terminate calls) is a pre-existing pattern, not introduced by this PR. Per `be/src/common/AGENTS.md`, `WARN_IF_ERROR` is preferred in Defer/destructor contexts. The current code is partially safe due to the `Defer` class's `noexcept(false)` + `uncaught_exceptions()` guard, but could still lose error info during stack unwinding. This is outside the scope of this PR. - `_thread_id` remains a plain `int` rather than `std::atomic<int>`. It is not technically a data race due to indirect synchronization through `Dependency::_task_lock`, but making it atomic would be more self-documenting. Also outside scope of this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
