GitHub user vba added a comment to the discussion: Unable to see logs in the web UI when the job is running
Hi @potiuk, Thanks for your reply. > It simply optimizes for time of those who try to help you to solve your > problem. No pb, as the subject is very complex, I didn't know how to present the key elements. > it seems that somewhere the k8s reads logs from remote pod and something does > not let it read it. I've actually researched this problem quite a bit. Firstly, if I roll back to Airflow 2.4.3, the problem disappears. Another thing is that I've patched the Airflow code with `icecream`, trying to understand the problem step by step. In the log below, I call the API for the task instance that is running, you will notice that the remote log was fetched, and the problem starts afterwards : ``` 10.*.*.* - - [09/Jan/2025:10:23:02 +0000] "GET /api/v1/dags/code-python-airflow-sample/dagRuns/manual__2024-12-27T20:29:34.637180+00:00/taskInstances/python_task/logs/8?full_content=false HTTP/1.1" 500 1589 "https://airflow1-sandbox-dv.numberly.dev/dags/code-python-airflow-sample/grid?dag_run_id=manual__2024-12-27T20%3A29%3A34.637180%2B00%3A00&tab=logs&task_id=python_task" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:133.0) Gecko/20100101 Firefox/133.0" [2025-01-09 10:23:02 +0000] [70] [INFO] Parent changed, shutting down: <Worker 61> [2025-01-09 10:23:02 +0000] [70] [INFO] Worker exiting (pid: 61) 10.*.*.* - - [09/Jan/2025:10:23:03 +0000] "GET /health HTTP/1.1" 200 283 "-" "kube-probe/1.30" 10.*.*.* - - [09/Jan/2025:10:23:03 +0000] "GET /health HTTP/1.1" 200 283 "-" "kube-probe/1.30" LOG ISSUE DEBUG -> log_endpoint.py:60 in get_log() "Starting #get_log": 'Starting #get_log' LOG ISSUE DEBUG -> log_endpoint.py:62 in get_log() key: '****************' LOG ISSUE DEBUG -> log_endpoint.py:63 in get_log()- token: None LOG ISSUE DEBUG -> log_endpoint.py:71 in get_log()- metadata: {} LOG ISSUE DEBUG -> log_endpoint.py:80 in get_log() metadata: {'download_logs': False} LOG ISSUE DEBUG -> log_endpoint.py:82 in get_log() task_log_reader: <airflow.utils.log.log_reader.TaskLogReader object at 0x7f539c1ed510> LOG ISSUE DEBUG -> log_reader.py:119 in log_handler() task_log_reader: 'task' LOG ISSUE DEBUG -> log_reader.py:120 in log_handler() logging.getLogger("airflow.task").handlers: [<S3TaskHandler (NOTSET)>] LOG ISSUE DEBUG -> log_reader.py:121 in log_handler() logging.getLogger().handlers: [<RedirectStdHandler <stdout> (NOTSET)>] LOG ISSUE DEBUG -> log_endpoint.py:96 in get_log() ti: <TaskInstance: code-python-airflow-sample.python_task manual__2024-12-27T20:29:34.637180+00:00 [running]> LOG ISSUE DEBUG -> log_endpoint.py:103 in get_log() dag: <DAG: code-python-airflow-sample> LOG ISSUE DEBUG -> log_endpoint.py:107 in get_log() ti.task: <Task(PythonOperator): python_task> LOG ISSUE DEBUG -> log_endpoint.py:112 in get_log() return_type: 'text/plain' LOG ISSUE DEBUG -> log_endpoint.py:128 in get_log() logs: <generator object TaskLogReader.read_log_stream at 0x7f5396d165e0> LOG ISSUE DEBUG -> log_endpoint.py:130 in get_log()- 'Ending' LOG ISSUE DEBUG -> log_reader.py:78 in read_log_stream() "Starting #read_log_stream": 'Starting #read_log_stream' LOG ISSUE DEBUG -> log_reader.py:84 in read_log_stream() try_numbers: [8] LOG ISSUE DEBUG -> log_reader.py:85 in read_log_stream() ti: <TaskInstance: code-python-airflow-sample.python_task manual__2024-12-27T20:29:34.637180+00:00 [running]> LOG ISSUE DEBUG -> log_reader.py:86 in read_log_stream() metadata: {'download_logs': False} LOG ISSUE DEBUG -> log_reader.py:92 in read_log_stream() metadata: {'download_logs': False} LOG ISSUE DEBUG -> log_reader.py:65 in read_log_chunks() self.log_handler: <S3TaskHandler (NOTSET)> LOG ISSUE DEBUG -> s3_task_handler.py:122 in _read_remote_logs() ti: <TaskInstance: code-python-airflow-sample.python_task manual__2024-12-27T20:29:34.637180+00:00 [running]> LOG ISSUE DEBUG -> s3_task_handler.py:123 in _read_remote_logs() metadata: {'download_logs': False} LOG ISSUE DEBUG -> s3_task_handler.py:125 in _read_remote_logs() worker_log_rel_path: 'dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log' LOG ISSUE DEBUG -> s3_task_handler.py:130 in _read_remote_logs() bucket: 'my-bucket-logs' LOG ISSUE DEBUG -> s3_task_handler.py:131 in _read_remote_logs() prefix: 'airflow1-sandbox-dv/dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log' LOG ISSUE DEBUG -> s3_task_handler.py:133 in _read_remote_logs() keys: ['airflow1-sandbox-dv/dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log'] LOG ISSUE DEBUG -> s3_task_handler.py:142 in _read_remote_logs() messages: ['Found logs in s3:', ' * ' 's3://my-bucket-logs/airflow1-sandbox-dv/dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log'] LOG ISSUE DEBUG -> s3_task_handler.py:143 in _read_remote_logs() logs: ['[2025-01-08T19:29:02.537+0000] {local_task_job_runner.py:123} INFO - ' '::group::Pre task execution logs ' '[2025-01-08T19:29:02.589+0000] {taskinstance.py:2613} INFO - Dependencies ' 'all met for dep_context=non-requeueable deps ti=<TaskInstance: ' 'code-python-airflow-sample.python_task ' 'manual__2024-12-27T20:29:34.637180+00:00 [queued]> ' '[2025-01-08T19:29:02.602+0000] {taskinstance.py:2613} INFO - Dependencies ' 'all met for dep_context=requeueable deps ti=<TaskInstance: ' 'code-python-airflow-sample.python_task ' 'manual__2024-12-27T20:29:34.637180+00:00 [queued]> ' '[2025-01-08T19:29:02.602+0000] {taskinstance.py:2866} INFO - Starting ' 'attempt 8 of 9 ' '[2025-01-08T19:29:02.627+0000] {taskinstance.py:2889} INFO - Executing ' '<Task(PythonOperator): python_task> on 2024-12-27 20:29:34.637180+00:00 ' '[2025-01-08T19:29:02.633+0000] {standard_task_runner.py:72} INFO - Started ' 'process 9 to run task ' '[2025-01-08T19:29:02.641+0000] {standard_task_runner.py:104} INFO - Running: ' "['airflow', 'tasks', 'run', 'code-python-airflow-sample', 'python_task', " "'manual__2024-12-27T20:29:34.637180+00:00', '--job-id', '133', '--raw', " "'--subdir', 'DAGS_FOLDER/code_python_airflow_sample/airflow_dag.py', " "'--cfg-path', '/tmp/tmp38klyd4c'] " '[2025-01-08T19:29:02.646+0000] {standard_task_runner.py:105} INFO - Job 133: ' 'Subtask python_task '[2025-01-08T19:59:03.705+0000] {taskinstance.py:352} INFO - Marking task as ' 'SUCCESS. dag_id=code-python-airflow-sample, task_id=python_task, ' 'run_id=manual__2024-12-27T20:29:34.637180+00:00, ' 'execution_date=20241227T202934, start_date=20250108T192902, ' 'end_date=20250108T195903 ' '[2025-01-08T19:59:03.763+0000] {local_task_job_runner.py:266} INFO - Task ' 'exited with return code 0 ' '[2025-01-08T19:59:03.857+0000] {taskinstance.py:3895} INFO - 0 downstream ' 'tasks scheduled from follow-on schedule check ' '[2025-01-08T19:59:03.860+0000] {local_task_job_runner.py:245} INFO - ' '::endgroup:: '] [2025-01-09T10:23:03.662+0000] {app.py:1744} ERROR - Exception on /api/v1/dags/code-python-airflow-sample/dagRuns/manual__2024-12-27T20:29:34.637180+00:00/taskInstances/python_task/logs/8 [GET] Traceback (most recent call last): File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 2529, in wsgi_app response = self.full_dispatch_request() File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 1825, in full_dispatch_request rv = self.handle_user_exception(e) File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 1823, in full_dispatch_request rv = self.dispatch_request() File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 1799, in dispatch_request return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args) File "/my-loc/lib/python3.10/site-packages/connexion/decorators/decorator.py", line 68, in wrapper response = function(request) File "/my-loc/lib/python3.10/site-packages/connexion/decorators/uri_parsing.py", line 149, in wrapper response = function(request) File "/my-loc/lib/python3.10/site-packages/connexion/decorators/validation.py", line 399, in wrapper return function(request) File "/my-loc/lib/python3.10/site-packages/connexion/decorators/response.py", line 113, in wrapper return _wrapper(request, response) File "/my-loc/lib/python3.10/site-packages/connexion/decorators/response.py", line 90, in _wrapper self.operation.api.get_connexion_response(response, self.mimetype) File "/my-loc/lib/python3.10/site-packages/connexion/apis/abstract.py", line 366, in get_connexion_response return cls._framework_to_connexion_response(response=response, mimetype=mimetype) File "/my-loc/lib/python3.10/site-packages/connexion/apis/flask_api.py", line 165, in _framework_to_connexion_response body=response.get_data() if not response.direct_passthrough else None, File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", line 314, in get_data self._ensure_sequence() File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", line 376, in _ensure_sequence self.make_sequence() File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", line 391, in make_sequence self.response = list(self.iter_encoded()) File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", line 50, in _iter_encoded for item in iterable: File "/my-loc/lib/python3.10/site-packages/airflow/utils/log/log_reader.py", line 94, in read_log_stream logs, metadata = self.read_log_chunks(ti, current_try_number, metadata) File "/my-loc/lib/python3.10/site-packages/airflow/utils/log/log_reader.py", line 66, in read_log_chunks logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata) File "/my-loc/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py", line 491, in read log, out_metadata = self._read(task_instance, try_number_element, metadata) File "/my-loc/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py", line 389, in _read response = self._executor_get_task_log(ti, try_number) File "/my-loc/lib/python3.10/functools.py", line 981, in __get__ val = self.func(instance) File "/my-loc/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py", line 346, in _executor_get_task_log executor = ExecutorLoader.get_default_executor() File "/my-loc/lib/python3.10/site-packages/airflow/executors/executor_loader.py", line 165, in get_default_executor default_executor = cls.load_executor(cls.get_default_executor_name()) File "/my-loc/lib/python3.10/site-packages/airflow/executors/executor_loader.py", line 246, in load_executor executor = executor_cls() File "/my-loc/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 135, in __init__ self.task_queue: Queue[KubernetesJobType] = self._manager.Queue() File "/my-loc/lib/python3.10/multiprocessing/managers.py", line 723, in temp token, exp = self._create(typeid, *args, **kwds) File "/my-loc/lib/python3.10/multiprocessing/managers.py", line 606, in _create conn = self._Client(self._address, authkey=self._authkey) File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 508, in Client answer_challenge(c, authkey) File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 752, in answer_challenge message = connection.recv_bytes(256) # reject large message File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 216, in recv_bytes buf = self._recv_bytes(maxlength) File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 414, in _recv_bytes buf = self._recv(4) File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 379, in _recv chunk = read(handle, remaining) BlockingIOError: [Errno 11] Resource temporarily unavailable ``` As you can see, all lines of code are slightly offset. GitHub link: https://github.com/apache/airflow/discussions/45624#discussioncomment-11823675 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
