jason810496 opened a new issue, #45079:
URL: https://github.com/apache/airflow/issues/45079

   ### Description
   
   Related context: 
https://github.com/apache/airflow/issues/44753#issuecomment-2526209568
   
   TL;DR
   
   After conducting some research and implementing a POC, I would like to 
propose a potential solution. However, this solution requires changes to the 
`airflow.utils.log.file_task_handler.FileTaskHandler`. If the solution is 
accepted, it will necessitate modifications to 10 providers that extend the 
`FileTaskHandler` class. 
   
   ## Main Concept for Refactoring
   
   The proposed solution focuses on:
   1. Returning a generator instead of loading the entire file content at once.
   2. Leveraging a heap to merge logs incrementally, rather than sorting entire 
chunks.
   
   The POC for this refactoring shows a **90% reduction in memory usage** with 
**similar processing times**!
   
   
   ## Experiment Details
   
   
   - **830 MB**
   - Approximately **8,670,000 lines**
   
   ## Main Root Causes of OOM
   
   1. `_interleave_logs` Function in `airflow.utils.log.file_task_handler`
   - Extends all log strings into the `records` list.
   - Sorts the entire `records` list.
   - Yields lines with deduplication.
   2. `_read` Method in `airflow.utils.log.file_task_handler.FileTaskHandler`
   - Joins all aggregated logs into a single string using:
     ```python
     "\n".join(_interleave_logs(all_log_sources))
     ```
   3. Methods That Use `_read`:
   These methods read the entire log content and return it as a string instead 
of a generator:
       - `_read_from_local`
       - `_read_from_logs_server`
       - `_read_remote_logs`  (Implemented by providers)
   
   ## Proposed Refactoring Solution
   
   The main concept includes:
   
   - Return a **generator** for reading log sources (local or external) instead 
of whole file content as string. 
   - Merge logs using **K-Way Merge instead of Sorting**
      - Since each source of logs is already sorted, merge them incrementally 
using `heapq` with streams of logs.
      - Return a stream of the merged result.
   
   ### Breaking Changes in This Solution
   
   1. **Interface of the `read` Method** in `FileTaskHandler`:
      - Will now return a generator instead of a string.
   
   2. **Interfaces of `read_log_chunks` and `read_log_stream`** in 
`TaskLogReader`:
      - Adjustments to support the generator-based approach.
   3. Methods That Use `_read`
       - `_read_from_local`
       - `_read_from_logs_server`
       - `_read_remote_logs`  ( there are 10 providers implement this method )
   
   
   
   ## Experimental Environment:
   
   - **Setup**: Docker Compose without memory limits.
   - **Memory Profiling**: [memray](https://github.com/bloomberg/memray)
   - **Log Size**: `830 MB`, about `8670000` lines
   
   ## Benchmark Metrics
   
   - **Original Implementation**:
     - **Memory Usage**: Average 3GB, peaks at 4GB when returning the final 
stream.
       - <img width="1198" alt="Original-CPU-Memory" 
src="https://github.com/user-attachments/assets/bb15261c-9bf2-44c2-af8e-0d05b2c51260";
 />
     - **Processing Time**: ~60 seconds.
     - Memory Flame Graph
         - 
https://www.zhu424.dev/Airflow-Webserver-Resolving-OOM-for-Large-Log-Reads/memray-flamegraph-memray_logs.py.html
   
   - **POC (Refactored Implementation)**:
     - **Memory Usage**: Average 300MB.
       - <img width="1201" alt="POC-CPU-Memory" 
src="https://github.com/user-attachments/assets/8e450c12-898b-4218-82f4-6107c432963b";
 />
     - **Processing Time**: ~60 seconds.
     - Memory Flame Graph
         - 
https://www.zhu424.dev/Airflow-Webserver-Resolving-OOM-for-Large-Log-Reads/memray-flamegraph-read_large_logs-k-way-merge-heap-optimize.py.html
   
   
   ### Use case/motivation
   
   _No response_
   
   ### Related issues
   
   #44753
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to