GitHub user vba added a comment to the discussion: Unable to see logs in the 
web UI when the job is running

Hi @potiuk,

Thanks for your reply.

>  It simply optimizes for time of those who try to help you to solve your 
> problem.

No pb, as the subject is very complex, I didn't know how to present the key 
elements.

> it seems that somewhere the k8s reads logs from remote pod and something does 
> not let it read it.

I've actually researched this problem quite a bit. Firstly, if I roll back to 
Airflow 2.4.3, the problem disappears. Another thing is that I've patched the 
Airflow code with `icecream`, trying to understand the problem step by step. In 
the log below, I call the API for the task instance that is running, you will 
notice that the remote log was fetched, and the problem starts afterwards :

```
10.*.*.* - - [09/Jan/2025:10:23:02 +0000] "GET 
/api/v1/dags/code-python-airflow-sample/dagRuns/manual__2024-12-27T20:29:34.637180+00:00/taskInstances/python_task/logs/8?full_content=false
 HTTP/1.1" 500 1589 
"https://airflow1-sandbox-dv.numberly.dev/dags/code-python-airflow-sample/grid?dag_run_id=manual__2024-12-27T20%3A29%3A34.637180%2B00%3A00&tab=logs&task_id=python_task";
 "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:133.0) Gecko/20100101 
Firefox/133.0"
[2025-01-09 10:23:02 +0000] [70] [INFO] Parent changed, shutting down: <Worker 
61>
[2025-01-09 10:23:02 +0000] [70] [INFO] Worker exiting (pid: 61)
10.*.*.* - - [09/Jan/2025:10:23:03 +0000] "GET /health HTTP/1.1" 200 283 "-" 
"kube-probe/1.30"
10.*.*.* - - [09/Jan/2025:10:23:03 +0000] "GET /health HTTP/1.1" 200 283 "-" 
"kube-probe/1.30"
LOG ISSUE DEBUG -> log_endpoint.py:60 in get_log()
                   "Starting #get_log": 'Starting #get_log'
LOG ISSUE DEBUG -> log_endpoint.py:62 in get_log()
                   key: '****************'
LOG ISSUE DEBUG -> log_endpoint.py:63 in get_log()- token: None
LOG ISSUE DEBUG -> log_endpoint.py:71 in get_log()- metadata: {}
LOG ISSUE DEBUG -> log_endpoint.py:80 in get_log()
                   metadata: {'download_logs': False}
LOG ISSUE DEBUG -> log_endpoint.py:82 in get_log()
                   task_log_reader: <airflow.utils.log.log_reader.TaskLogReader 
object at 0x7f539c1ed510>
LOG ISSUE DEBUG -> log_reader.py:119 in log_handler()
                   task_log_reader: 'task'
LOG ISSUE DEBUG -> log_reader.py:120 in log_handler()
                   logging.getLogger("airflow.task").handlers: [<S3TaskHandler 
(NOTSET)>]
LOG ISSUE DEBUG -> log_reader.py:121 in log_handler()
                   logging.getLogger().handlers: [<RedirectStdHandler <stdout> 
(NOTSET)>]
LOG ISSUE DEBUG -> log_endpoint.py:96 in get_log()
                   ti: <TaskInstance: code-python-airflow-sample.python_task 
manual__2024-12-27T20:29:34.637180+00:00 [running]>
LOG ISSUE DEBUG -> log_endpoint.py:103 in get_log()
                   dag: <DAG: code-python-airflow-sample>
LOG ISSUE DEBUG -> log_endpoint.py:107 in get_log()
                   ti.task: <Task(PythonOperator): python_task>
LOG ISSUE DEBUG -> log_endpoint.py:112 in get_log()
                   return_type: 'text/plain'
LOG ISSUE DEBUG -> log_endpoint.py:128 in get_log()
                   logs: <generator object TaskLogReader.read_log_stream at 
0x7f5396d165e0>
LOG ISSUE DEBUG -> log_endpoint.py:130 in get_log()- 'Ending'
LOG ISSUE DEBUG -> log_reader.py:78 in read_log_stream()
                   "Starting #read_log_stream": 'Starting #read_log_stream'
LOG ISSUE DEBUG -> log_reader.py:84 in read_log_stream()
                   try_numbers: [8]
LOG ISSUE DEBUG -> log_reader.py:85 in read_log_stream()
                   ti: <TaskInstance: code-python-airflow-sample.python_task 
manual__2024-12-27T20:29:34.637180+00:00 [running]>
LOG ISSUE DEBUG -> log_reader.py:86 in read_log_stream()
                   metadata: {'download_logs': False}
LOG ISSUE DEBUG -> log_reader.py:92 in read_log_stream()
                   metadata: {'download_logs': False}
LOG ISSUE DEBUG -> log_reader.py:65 in read_log_chunks()
                   self.log_handler: <S3TaskHandler (NOTSET)>
LOG ISSUE DEBUG -> s3_task_handler.py:122 in _read_remote_logs()
                   ti: <TaskInstance: code-python-airflow-sample.python_task 
manual__2024-12-27T20:29:34.637180+00:00 [running]>
LOG ISSUE DEBUG -> s3_task_handler.py:123 in _read_remote_logs()
                   metadata: {'download_logs': False}
LOG ISSUE DEBUG -> s3_task_handler.py:125 in _read_remote_logs()
                   worker_log_rel_path: 
'dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log'
LOG ISSUE DEBUG -> s3_task_handler.py:130 in _read_remote_logs()
                   bucket: 'my-bucket-logs'
LOG ISSUE DEBUG -> s3_task_handler.py:131 in _read_remote_logs()
                   prefix: 
'airflow1-sandbox-dv/dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log'
LOG ISSUE DEBUG -> s3_task_handler.py:133 in _read_remote_logs()
                   keys: 
['airflow1-sandbox-dv/dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log']
LOG ISSUE DEBUG -> s3_task_handler.py:142 in _read_remote_logs()
                   messages: ['Found logs in s3:',
                              '  * '
                              
's3://my-bucket-logs/airflow1-sandbox-dv/dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log']
LOG ISSUE DEBUG -> s3_task_handler.py:143 in _read_remote_logs()
                   logs: ['[2025-01-08T19:29:02.537+0000] 
{local_task_job_runner.py:123} INFO - '
                          '::group::Pre task execution logs
                         '
                          '[2025-01-08T19:29:02.589+0000] 
{taskinstance.py:2613} INFO - Dependencies '
                          'all met for dep_context=non-requeueable deps 
ti=<TaskInstance: '
                          'code-python-airflow-sample.python_task '
                          'manual__2024-12-27T20:29:34.637180+00:00 [queued]>
                         '
                          '[2025-01-08T19:29:02.602+0000] 
{taskinstance.py:2613} INFO - Dependencies '
                          'all met for dep_context=requeueable deps 
ti=<TaskInstance: '
                          'code-python-airflow-sample.python_task '
                          'manual__2024-12-27T20:29:34.637180+00:00 [queued]>
                         '
                          '[2025-01-08T19:29:02.602+0000] 
{taskinstance.py:2866} INFO - Starting '
                          'attempt 8 of 9
                         '
                          '[2025-01-08T19:29:02.627+0000] 
{taskinstance.py:2889} INFO - Executing '
                          '<Task(PythonOperator): python_task> on 2024-12-27 
20:29:34.637180+00:00
                         '
                          '[2025-01-08T19:29:02.633+0000] 
{standard_task_runner.py:72} INFO - Started '
                          'process 9 to run task
                         '
                          '[2025-01-08T19:29:02.641+0000] 
{standard_task_runner.py:104} INFO - Running: '
                          "['airflow', 'tasks', 'run', 
'code-python-airflow-sample', 'python_task', "
                          "'manual__2024-12-27T20:29:34.637180+00:00', 
'--job-id', '133', '--raw', "
                          "'--subdir', 
'DAGS_FOLDER/code_python_airflow_sample/airflow_dag.py', "
                          "'--cfg-path', '/tmp/tmp38klyd4c']
                         "
                          '[2025-01-08T19:29:02.646+0000] 
{standard_task_runner.py:105} INFO - Job 133: '
                          'Subtask python_task
                          '[2025-01-08T19:59:03.705+0000] {taskinstance.py:352} 
INFO - Marking task as '
                          'SUCCESS. dag_id=code-python-airflow-sample, 
task_id=python_task, '
                          'run_id=manual__2024-12-27T20:29:34.637180+00:00, '
                          'execution_date=20241227T202934, 
start_date=20250108T192902, '
                          'end_date=20250108T195903
                         '
                          '[2025-01-08T19:59:03.763+0000] 
{local_task_job_runner.py:266} INFO - Task '
                          'exited with return code 0
                         '
                          '[2025-01-08T19:59:03.857+0000] 
{taskinstance.py:3895} INFO - 0 downstream '
                          'tasks scheduled from follow-on schedule check
                         '
                          '[2025-01-08T19:59:03.860+0000] 
{local_task_job_runner.py:245} INFO - '
                          '::endgroup::
                         ']
[2025-01-09T10:23:03.662+0000] {app.py:1744} ERROR - Exception on 
/api/v1/dags/code-python-airflow-sample/dagRuns/manual__2024-12-27T20:29:34.637180+00:00/taskInstances/python_task/logs/8
 [GET]
Traceback (most recent call last):
  File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 2529, in 
wsgi_app
    response = self.full_dispatch_request()
  File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 1825, in 
full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 1823, in 
full_dispatch_request
    rv = self.dispatch_request()
  File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 1799, in 
dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
  File 
"/my-loc/lib/python3.10/site-packages/connexion/decorators/decorator.py", line 
68, in wrapper
    response = function(request)
  File 
"/my-loc/lib/python3.10/site-packages/connexion/decorators/uri_parsing.py", 
line 149, in wrapper
    response = function(request)
  File 
"/my-loc/lib/python3.10/site-packages/connexion/decorators/validation.py", line 
399, in wrapper
    return function(request)
  File "/my-loc/lib/python3.10/site-packages/connexion/decorators/response.py", 
line 113, in wrapper
    return _wrapper(request, response)
  File "/my-loc/lib/python3.10/site-packages/connexion/decorators/response.py", 
line 90, in _wrapper
    self.operation.api.get_connexion_response(response, self.mimetype)
  File "/my-loc/lib/python3.10/site-packages/connexion/apis/abstract.py", line 
366, in get_connexion_response
    return cls._framework_to_connexion_response(response=response, 
mimetype=mimetype)
  File "/my-loc/lib/python3.10/site-packages/connexion/apis/flask_api.py", line 
165, in _framework_to_connexion_response
    body=response.get_data() if not response.direct_passthrough else None,
  File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", 
line 314, in get_data
    self._ensure_sequence()
  File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", 
line 376, in _ensure_sequence
    self.make_sequence()
  File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", 
line 391, in make_sequence
    self.response = list(self.iter_encoded())
  File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", 
line 50, in _iter_encoded
    for item in iterable:
  File "/my-loc/lib/python3.10/site-packages/airflow/utils/log/log_reader.py", 
line 94, in read_log_stream
    logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
  File "/my-loc/lib/python3.10/site-packages/airflow/utils/log/log_reader.py", 
line 66, in read_log_chunks
    logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
  File 
"/my-loc/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py", 
line 491, in read
    log, out_metadata = self._read(task_instance, try_number_element, metadata)
  File 
"/my-loc/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py", 
line 389, in _read
    response = self._executor_get_task_log(ti, try_number)
  File "/my-loc/lib/python3.10/functools.py", line 981, in __get__
    val = self.func(instance)
  File 
"/my-loc/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py", 
line 346, in _executor_get_task_log
    executor = ExecutorLoader.get_default_executor()
  File 
"/my-loc/lib/python3.10/site-packages/airflow/executors/executor_loader.py", 
line 165, in get_default_executor
    default_executor = cls.load_executor(cls.get_default_executor_name())
  File 
"/my-loc/lib/python3.10/site-packages/airflow/executors/executor_loader.py", 
line 246, in load_executor
    executor = executor_cls()
  File 
"/my-loc/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
 line 135, in __init__
    self.task_queue: Queue[KubernetesJobType] = self._manager.Queue()
  File "/my-loc/lib/python3.10/multiprocessing/managers.py", line 723, in temp
    token, exp = self._create(typeid, *args, **kwds)
  File "/my-loc/lib/python3.10/multiprocessing/managers.py", line 606, in 
_create
    conn = self._Client(self._address, authkey=self._authkey)
  File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 508, in 
Client
    answer_challenge(c, authkey)
  File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 752, in 
answer_challenge
    message = connection.recv_bytes(256)         # reject large message
  File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 216, in 
recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 414, in 
_recv_bytes
    buf = self._recv(4)
  File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 379, in 
_recv
    chunk = read(handle, remaining)
BlockingIOError: [Errno 11] Resource temporarily unavailable
```

As you can see, all lines of code are slightly offset.

GitHub link: 
https://github.com/apache/airflow/discussions/45624#discussioncomment-11823675

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to