ranmx opened a new issue, #44474:
URL: https://github.com/apache/airflow/issues/44474

   ### Apache Airflow version
   
   2.10.3
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   I have a latest (2.10.3) airflow installed in k8s. 
   Executors are celeryExecutors.
   I have two workers deployed. 
   - airflow-worker-0
   - airflow-worker-1
   
   I find out that if a task failed and retry, the log address `host` part will 
be overwritten by the last attemp.
   Thus if the task retired on different workers, the log on machines different 
from the last worker will be unavaliable.
   But if I go to the work node, I can find the log in the local path.
   
   This attemp failed to get the log. Note that this actually run on 
airflow-worker-1 and logs are avaliable on local path
   
![image](https://github.com/user-attachments/assets/e43a3ce8-6e8f-4690-9fda-aaba125f0ac8)
   airflow-worker-1 local path
   
![image](https://github.com/user-attachments/assets/be76ffb4-1afe-4983-a6e9-54fd88a66579)
   
   This is because the host is over written by the last attempt:
   
![image](https://github.com/user-attachments/assets/c25469e9-7c8d-494e-a44f-424e619c8d9b)
   
   I think the code here in `airflow/utils/log/file_task_handler.py:461` should 
be changed as it uses the last task_instance for all the attempts.
   ```
    def read(self, task_instance, try_number=None, metadata=None):
           """
           Read logs of given task instance from local machine.
   
           :param task_instance: task instance object
           :param try_number: task instance try_number to read logs from. If 
None
                              it returns all logs separated by try_number
           :param metadata: log metadata, can be used for steaming log reading 
and auto-tailing.
           :return: a list of listed tuples which order log string by host
           """
           # Task instance increments its try number when it starts to run.
           # So the log for a particular task try will only show up when
           # try number gets incremented in DB, i.e logs produced the time
           # after cli run and before try_number + 1 in DB will not be 
displayed.
           if try_number is None:
               next_try = task_instance.next_try_number
               try_numbers = list(range(1, next_try))
           elif try_number < 1:
               logs = [
                   [("default_host", f"Error fetching the logs. Try number 
{try_number} is invalid.")],
               ]
               return logs, [{"end_of_log": True}]
           else:
               try_numbers = [try_number]
   
           logs = [""] * len(try_numbers)
           metadata_array = [{}] * len(try_numbers)
   
           # subclasses implement _read and may not have log_type, which was 
added recently
           for i, try_number_element in enumerate(try_numbers):
               log, out_metadata = self._read(task_instance, 
try_number_element, metadata)
               # es_task_handler return logs grouped by host. wrap other 
handler returning log string
               # with default/ empty host so that UI can render the response in 
the same way
               logs[i] = log if self._read_grouped_logs() else 
[(task_instance.hostname, log)]
               metadata_array[i] = out_metadata
   
           return logs, metadata_array
   ```
   
   ### What you think should happen instead?
   
   Each attempt should has its own host.
   
   ### How to reproduce
   
   - latest (2.10.3) airflow installed in k8s. 
   - Executors are celeryExecutors.
   - Multiple workers deployed. 
   - Task failed and retied on different wokers
   
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to