ranmx opened a new issue, #44474: URL: https://github.com/apache/airflow/issues/44474
### Apache Airflow version 2.10.3 ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? I have a latest (2.10.3) airflow installed in k8s. Executors are celeryExecutors. I have two workers deployed. - airflow-worker-0 - airflow-worker-1 I find out that if a task failed and retry, the log address `host` part will be overwritten by the last attemp. Thus if the task retired on different workers, the log on machines different from the last worker will be unavaliable. But if I go to the work node, I can find the log in the local path. This attemp failed to get the log. Note that this actually run on airflow-worker-1 and logs are avaliable on local path ![image](https://github.com/user-attachments/assets/e43a3ce8-6e8f-4690-9fda-aaba125f0ac8) airflow-worker-1 local path ![image](https://github.com/user-attachments/assets/be76ffb4-1afe-4983-a6e9-54fd88a66579) This is because the host is over written by the last attempt: ![image](https://github.com/user-attachments/assets/c25469e9-7c8d-494e-a44f-424e619c8d9b) I think the code here in `airflow/utils/log/file_task_handler.py:461` should be changed as it uses the last task_instance for all the attempts. ``` def read(self, task_instance, try_number=None, metadata=None): """ Read logs of given task instance from local machine. :param task_instance: task instance object :param try_number: task instance try_number to read logs from. If None it returns all logs separated by try_number :param metadata: log metadata, can be used for steaming log reading and auto-tailing. :return: a list of listed tuples which order log string by host """ # Task instance increments its try number when it starts to run. # So the log for a particular task try will only show up when # try number gets incremented in DB, i.e logs produced the time # after cli run and before try_number + 1 in DB will not be displayed. if try_number is None: next_try = task_instance.next_try_number try_numbers = list(range(1, next_try)) elif try_number < 1: logs = [ [("default_host", f"Error fetching the logs. Try number {try_number} is invalid.")], ] return logs, [{"end_of_log": True}] else: try_numbers = [try_number] logs = [""] * len(try_numbers) metadata_array = [{}] * len(try_numbers) # subclasses implement _read and may not have log_type, which was added recently for i, try_number_element in enumerate(try_numbers): log, out_metadata = self._read(task_instance, try_number_element, metadata) # es_task_handler return logs grouped by host. wrap other handler returning log string # with default/ empty host so that UI can render the response in the same way logs[i] = log if self._read_grouped_logs() else [(task_instance.hostname, log)] metadata_array[i] = out_metadata return logs, metadata_array ``` ### What you think should happen instead? Each attempt should has its own host. ### How to reproduce - latest (2.10.3) airflow installed in k8s. - Executors are celeryExecutors. - Multiple workers deployed. - Task failed and retied on different wokers ### Operating System Debian GNU/Linux 12 (bookworm) ### Versions of Apache Airflow Providers _No response_ ### Deployment Docker-Compose ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org