BiteTheDDDDt opened a new pull request, #61130: URL: https://github.com/apache/doris/pull/61130
This pull request introduces significant improvements and refactoring to the pipeline fragment lifecycle management and runtime filter handling, especially in the context of recursive CTE (Common Table Expressions) and runtime filter stages. The changes streamline fragment closure and notification, enhance runtime filter correctness by tracking stages, and simplify some internal APIs. Additionally, there are cleanup and safety improvements for resource management. **Pipeline fragment lifecycle and notification improvements:** - Refactored fragment closure logic to better handle notification and resource cleanup, including managing a `ClosureGuard` for RPC response synchronization and ensuring fragments are correctly removed from the fragment manager after cancellation or completion. (`be/src/exec/pipeline/pipeline_fragment_context.cpp`, `be/src/exec/pipeline/pipeline_fragment_context.h`) [[1]](diffhunk://#diff-3643eb9cad1657c99d794887ca26a78d9641b1d1f5dde3c86444370b7ffc9548R182-R191) [[2]](diffhunk://#diff-3643eb9cad1657c99d794887ca26a78d9641b1d1f5dde3c86444370b7ffc9548L1805-R1808) [[3]](diffhunk://#diff-3643eb9cad1657c99d794887ca26a78d9641b1d1f5dde3c86444370b7ffc9548R1852-R1854) [[4]](diffhunk://#diff-3643eb9cad1657c99d794887ca26a78d9641b1d1f5dde3c86444370b7ffc9548L1907-R1910) [[5]](diffhunk://#diff-3643eb9cad1657c99d794887ca26a78d9641b1d1f5dde3c86444370b7ffc9548R1920-R1923) [[6]](diffhunk://#diff-8be24ee0b9e88161663cf3538d9d0a9e42e9a5b232da8829c49d582c6fdfbacdL131-R150) [[7]](diffhunk://#diff-8be24ee 0b9e88161663cf3538d9d0a9e42e9a5b232da8829c49d582c6fdfbacdR360) - Removed the `wait_close`, `set_to_rerun`, and `rebuild` methods, replacing them with a new `listen_wait_close` API and a method to collect deregistered runtime filters. (`be/src/exec/pipeline/pipeline_fragment_context.cpp`, `be/src/exec/pipeline/pipeline_fragment_context.h`) [[1]](diffhunk://#diff-8be24ee0b9e88161663cf3538d9d0a9e42e9a5b232da8829c49d582c6fdfbacdL131-R150) [[2]](diffhunk://#diff-3643eb9cad1657c99d794887ca26a78d9641b1d1f5dde3c86444370b7ffc9548L2070-R2088) **Runtime filter stage tracking and correctness:** - Added a `stage` field to runtime filter merge contexts and ensured that all remote filter requests and merges carry and check the stage, preventing lower-stage filters from affecting higher-stage consumers. (`be/src/exec/runtime_filter/runtime_filter_mgr.cpp`, `be/src/exec/runtime_filter/runtime_filter_mgr.h`, `be/src/exec/runtime_filter/runtime_filter.cpp`, `be/src/exec/runtime_filter/runtime_filter_producer.cpp`) [[1]](diffhunk://#diff-12f3db49e096facb2547d57939880d204cea247d6d1c489d5335fc0dedec895eR38) [[2]](diffhunk://#diff-11530eb41cf13c12e3a7ba0e79bdfe559c1e57d35151ba87e1e931df650f5a7cR240-R243) [[3]](diffhunk://#diff-11530eb41cf13c12e3a7ba0e79bdfe559c1e57d35151ba87e1e931df650f5a7cR261-R266) [[4]](diffhunk://#diff-11530eb41cf13c12e3a7ba0e79bdfe559c1e57d35151ba87e1e931df650f5a7cR337-R344) [[5]](diffhunk://#diff-11530eb41cf13c12e3a7ba0e79bdfe559c1e57d35151ba87e1e931df650f5a7cR391-R396) [[6]](diffhunk://#diff-11530eb41cf13c12e3a7ba0e79bdfe559c1e57d35151ba87e1e931df650f5a7cR46 6-R473) [[7]](diffhunk://#diff-0b279ceec090ef3902ff5e4387dbb12737d2175f8266a1a59f9c8bed78ca4178R75-R78) [[8]](diffhunk://#diff-0b279ceec090ef3902ff5e4387dbb12737d2175f8266a1a59f9c8bed78ca4178L107-R115) [[9]](diffhunk://#diff-fec0c472eed51495c0aeffd572e5c4979a9d4235088948ade61034142cbebd5dR202-R203) **Recursive CTE and fragment rerun adjustments:** - Updated recursive CTE operator rerun logic to use more precise rerun fragment parameters and improved runtime filter deregistration merging. (`be/src/exec/operator/rec_cte_source_operator.h`, `be/src/exec/operator/partitioned_hash_join_sink_operator.cpp`) [[1]](diffhunk://#diff-d810d9878df7f7792df0b8018e52717264beb0dc1d5c03616f7de1941f7e8978L141-L149) [[2]](diffhunk://#diff-71e403ca9f968c4129c7073f832b0467d8c3543acb3fc87a1f15dd97aaf8199eR568-R569) **Resource and memory management cleanup:** - Removed unnecessary reference counting for task execution contexts in `ScannerContext`, simplifying memory management. (`be/src/exec/scan/scanner_context.cpp`) [[1]](diffhunk://#diff-0c9a817d45d8130ea3211189e1321d1275e22fd4a9a3fac2bd707b1cfeefa5e5L101-L103) [[2]](diffhunk://#diff-0c9a817d45d8130ea3211189e1321d1275e22fd4a9a3fac2bd707b1cfeefa5e5L198-L200) - Ensured proper cleanup of rerunnable parameters in `FragmentMgr::stop`. (`be/src/runtime/fragment_mgr.cpp`) **Debugging and logging improvements:** - Enhanced debug output for pipeline fragment contexts to include fragment IDs and clarified output fields. (`be/src/exec/pipeline/pipeline_fragment_context.cpp`) These changes collectively improve the robustness, correctness, and maintainability of the pipeline execution and runtime filter mechanisms, particularly in complex scenarios involving recursive queries and distributed runtime filter propagation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
