jason810496 opened a new issue, #45079: URL: https://github.com/apache/airflow/issues/45079
### Description Related context: https://github.com/apache/airflow/issues/44753#issuecomment-2526209568 TL;DR After conducting some research and implementing a POC, I would like to propose a potential solution. However, this solution requires changes to the `airflow.utils.log.file_task_handler.FileTaskHandler`. If the solution is accepted, it will necessitate modifications to 10 providers that extend the `FileTaskHandler` class. ## Main Concept for Refactoring The proposed solution focuses on: 1. Returning a generator instead of loading the entire file content at once. 2. Leveraging a heap to merge logs incrementally, rather than sorting entire chunks. The POC for this refactoring shows a **90% reduction in memory usage** with **similar processing times**! ## Experiment Details - **830 MB** - Approximately **8,670,000 lines** ## Main Root Causes of OOM 1. `_interleave_logs` Function in `airflow.utils.log.file_task_handler` - Extends all log strings into the `records` list. - Sorts the entire `records` list. - Yields lines with deduplication. 2. `_read` Method in `airflow.utils.log.file_task_handler.FileTaskHandler` - Joins all aggregated logs into a single string using: ```python "\n".join(_interleave_logs(all_log_sources)) ``` 3. Methods That Use `_read`: These methods read the entire log content and return it as a string instead of a generator: - `_read_from_local` - `_read_from_logs_server` - `_read_remote_logs` (Implemented by providers) ## Proposed Refactoring Solution The main concept includes: - Return a **generator** for reading log sources (local or external) instead of whole file content as string. - Merge logs using **K-Way Merge instead of Sorting** - Since each source of logs is already sorted, merge them incrementally using `heapq` with streams of logs. - Return a stream of the merged result. ### Breaking Changes in This Solution 1. **Interface of the `read` Method** in `FileTaskHandler`: - Will now return a generator instead of a string. 2. **Interfaces of `read_log_chunks` and `read_log_stream`** in `TaskLogReader`: - Adjustments to support the generator-based approach. 3. Methods That Use `_read` - `_read_from_local` - `_read_from_logs_server` - `_read_remote_logs` ( there are 10 providers implement this method ) ## Experimental Environment: - **Setup**: Docker Compose without memory limits. - **Memory Profiling**: [memray](https://github.com/bloomberg/memray) - **Log Size**: `830 MB`, about `8670000` lines ## Benchmark Metrics - **Original Implementation**: - **Memory Usage**: Average 3GB, peaks at 4GB when returning the final stream. - <img width="1198" alt="Original-CPU-Memory" src="https://github.com/user-attachments/assets/bb15261c-9bf2-44c2-af8e-0d05b2c51260" /> - **Processing Time**: ~60 seconds. - Memory Flame Graph - https://www.zhu424.dev/Airflow-Webserver-Resolving-OOM-for-Large-Log-Reads/memray-flamegraph-memray_logs.py.html - **POC (Refactored Implementation)**: - **Memory Usage**: Average 300MB. - <img width="1201" alt="POC-CPU-Memory" src="https://github.com/user-attachments/assets/8e450c12-898b-4218-82f4-6107c432963b" /> - **Processing Time**: ~60 seconds. - Memory Flame Graph - https://www.zhu424.dev/Airflow-Webserver-Resolving-OOM-for-Large-Log-Reads/memray-flamegraph-read_large_logs-k-way-merge-heap-optimize.py.html ### Use case/motivation _No response_ ### Related issues #44753 ### Are you willing to submit a PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
