vba commented on issue #45516:
URL: https://github.com/apache/airflow/issues/45516#issuecomment-2587728230

   Hi @potiuk,
   
   Thanks for your reply.
   
   >  It simply optimizes for time of those who try to help you to solve your 
problem.
   
   No pb, as the subject is very complex, I didn't know how to present the key 
elements.
   
   > it seems that somewhere the k8s reads logs from remote pod and something 
does not let it read it.
   
   I've actually researched this problem quite a bit. Firstly, if I roll back 
to Airflow 2.4.3, the problem disappears. Another thing is that I've patched 
the Airflow code with `icecream`, trying to understand the problem step by 
step. In the log below, I call the API for the task instance that is running, 
you will notice that the remote log was fetched, and the problem starts 
afterwards :
   
   ```
   10.*.*.* - - [09/Jan/2025:10:23:02 +0000] "GET 
/api/v1/dags/code-python-airflow-sample/dagRuns/manual__2024-12-27T20:29:34.637180+00:00/taskInstances/python_task/logs/8?full_content=false
 HTTP/1.1" 500 1589 
"https://airflow1-sandbox-dv.numberly.dev/dags/code-python-airflow-sample/grid?dag_run_id=manual__2024-12-27T20%3A29%3A34.637180%2B00%3A00&tab=logs&task_id=python_task";
 "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:133.0) Gecko/20100101 
Firefox/133.0"
   [2025-01-09 10:23:02 +0000] [70] [INFO] Parent changed, shutting down: 
<Worker 61>
   [2025-01-09 10:23:02 +0000] [70] [INFO] Worker exiting (pid: 61)
   10.*.*.* - - [09/Jan/2025:10:23:03 +0000] "GET /health HTTP/1.1" 200 283 "-" 
"kube-probe/1.30"
   10.*.*.* - - [09/Jan/2025:10:23:03 +0000] "GET /health HTTP/1.1" 200 283 "-" 
"kube-probe/1.30"
   LOG ISSUE DEBUG -> log_endpoint.py:60 in get_log()
                      "Starting #get_log": 'Starting #get_log'
   LOG ISSUE DEBUG -> log_endpoint.py:62 in get_log()
                      key: '****************'
   LOG ISSUE DEBUG -> log_endpoint.py:63 in get_log()- token: None
   LOG ISSUE DEBUG -> log_endpoint.py:71 in get_log()- metadata: {}
   LOG ISSUE DEBUG -> log_endpoint.py:80 in get_log()
                      metadata: {'download_logs': False}
   LOG ISSUE DEBUG -> log_endpoint.py:82 in get_log()
                      task_log_reader: 
<airflow.utils.log.log_reader.TaskLogReader object at 0x7f539c1ed510>
   LOG ISSUE DEBUG -> log_reader.py:119 in log_handler()
                      task_log_reader: 'task'
   LOG ISSUE DEBUG -> log_reader.py:120 in log_handler()
                      logging.getLogger("airflow.task").handlers: 
[<S3TaskHandler (NOTSET)>]
   LOG ISSUE DEBUG -> log_reader.py:121 in log_handler()
                      logging.getLogger().handlers: [<RedirectStdHandler 
<stdout> (NOTSET)>]
   LOG ISSUE DEBUG -> log_endpoint.py:96 in get_log()
                      ti: <TaskInstance: code-python-airflow-sample.python_task 
manual__2024-12-27T20:29:34.637180+00:00 [running]>
   LOG ISSUE DEBUG -> log_endpoint.py:103 in get_log()
                      dag: <DAG: code-python-airflow-sample>
   LOG ISSUE DEBUG -> log_endpoint.py:107 in get_log()
                      ti.task: <Task(PythonOperator): python_task>
   LOG ISSUE DEBUG -> log_endpoint.py:112 in get_log()
                      return_type: 'text/plain'
   LOG ISSUE DEBUG -> log_endpoint.py:128 in get_log()
                      logs: <generator object TaskLogReader.read_log_stream at 
0x7f5396d165e0>
   LOG ISSUE DEBUG -> log_endpoint.py:130 in get_log()- 'Ending'
   LOG ISSUE DEBUG -> log_reader.py:78 in read_log_stream()
                      "Starting #read_log_stream": 'Starting #read_log_stream'
   LOG ISSUE DEBUG -> log_reader.py:84 in read_log_stream()
                      try_numbers: [8]
   LOG ISSUE DEBUG -> log_reader.py:85 in read_log_stream()
                      ti: <TaskInstance: code-python-airflow-sample.python_task 
manual__2024-12-27T20:29:34.637180+00:00 [running]>
   LOG ISSUE DEBUG -> log_reader.py:86 in read_log_stream()
                      metadata: {'download_logs': False}
   LOG ISSUE DEBUG -> log_reader.py:92 in read_log_stream()
                      metadata: {'download_logs': False}
   LOG ISSUE DEBUG -> log_reader.py:65 in read_log_chunks()
                      self.log_handler: <S3TaskHandler (NOTSET)>
   LOG ISSUE DEBUG -> s3_task_handler.py:122 in _read_remote_logs()
                      ti: <TaskInstance: code-python-airflow-sample.python_task 
manual__2024-12-27T20:29:34.637180+00:00 [running]>
   LOG ISSUE DEBUG -> s3_task_handler.py:123 in _read_remote_logs()
                      metadata: {'download_logs': False}
   LOG ISSUE DEBUG -> s3_task_handler.py:125 in _read_remote_logs()
                      worker_log_rel_path: 
'dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log'
   LOG ISSUE DEBUG -> s3_task_handler.py:130 in _read_remote_logs()
                      bucket: 'my-bucket-logs'
   LOG ISSUE DEBUG -> s3_task_handler.py:131 in _read_remote_logs()
                      prefix: 
'airflow1-sandbox-dv/dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log'
   LOG ISSUE DEBUG -> s3_task_handler.py:133 in _read_remote_logs()
                      keys: 
['airflow1-sandbox-dv/dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log']
   LOG ISSUE DEBUG -> s3_task_handler.py:142 in _read_remote_logs()
                      messages: ['Found logs in s3:',
                                 '  * '
                                 
's3://my-bucket-logs/airflow1-sandbox-dv/dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log']
   LOG ISSUE DEBUG -> s3_task_handler.py:143 in _read_remote_logs()
                      logs: ['[2025-01-08T19:29:02.537+0000] 
{local_task_job_runner.py:123} INFO - '
                             '::group::Pre task execution logs
                            '
                             '[2025-01-08T19:29:02.589+0000] 
{taskinstance.py:2613} INFO - Dependencies '
                             'all met for dep_context=non-requeueable deps 
ti=<TaskInstance: '
                             'code-python-airflow-sample.python_task '
                             'manual__2024-12-27T20:29:34.637180+00:00 [queued]>
                            '
                             '[2025-01-08T19:29:02.602+0000] 
{taskinstance.py:2613} INFO - Dependencies '
                             'all met for dep_context=requeueable deps 
ti=<TaskInstance: '
                             'code-python-airflow-sample.python_task '
                             'manual__2024-12-27T20:29:34.637180+00:00 [queued]>
                            '
                             '[2025-01-08T19:29:02.602+0000] 
{taskinstance.py:2866} INFO - Starting '
                             'attempt 8 of 9
                            '
                             '[2025-01-08T19:29:02.627+0000] 
{taskinstance.py:2889} INFO - Executing '
                             '<Task(PythonOperator): python_task> on 2024-12-27 
20:29:34.637180+00:00
                            '
                             '[2025-01-08T19:29:02.633+0000] 
{standard_task_runner.py:72} INFO - Started '
                             'process 9 to run task
                            '
                             '[2025-01-08T19:29:02.641+0000] 
{standard_task_runner.py:104} INFO - Running: '
                             "['airflow', 'tasks', 'run', 
'code-python-airflow-sample', 'python_task', "
                             "'manual__2024-12-27T20:29:34.637180+00:00', 
'--job-id', '133', '--raw', "
                             "'--subdir', 
'DAGS_FOLDER/code_python_airflow_sample/airflow_dag.py', "
                             "'--cfg-path', '/tmp/tmp38klyd4c']
                            "
                             '[2025-01-08T19:29:02.646+0000] 
{standard_task_runner.py:105} INFO - Job 133: '
                             'Subtask python_task
                             '[2025-01-08T19:59:03.705+0000] 
{taskinstance.py:352} INFO - Marking task as '
                             'SUCCESS. dag_id=code-python-airflow-sample, 
task_id=python_task, '
                             'run_id=manual__2024-12-27T20:29:34.637180+00:00, '
                             'execution_date=20241227T202934, 
start_date=20250108T192902, '
                             'end_date=20250108T195903
                            '
                             '[2025-01-08T19:59:03.763+0000] 
{local_task_job_runner.py:266} INFO - Task '
                             'exited with return code 0
                            '
                             '[2025-01-08T19:59:03.857+0000] 
{taskinstance.py:3895} INFO - 0 downstream '
                             'tasks scheduled from follow-on schedule check
                            '
                             '[2025-01-08T19:59:03.860+0000] 
{local_task_job_runner.py:245} INFO - '
                             '::endgroup::
                            ']
   [2025-01-09T10:23:03.662+0000] {app.py:1744} ERROR - Exception on 
/api/v1/dags/code-python-airflow-sample/dagRuns/manual__2024-12-27T20:29:34.637180+00:00/taskInstances/python_task/logs/8
 [GET]
   Traceback (most recent call last):
     File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 2529, in 
wsgi_app
       response = self.full_dispatch_request()
     File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 1825, in 
full_dispatch_request
       rv = self.handle_user_exception(e)
     File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 1823, in 
full_dispatch_request
       rv = self.dispatch_request()
     File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 1799, in 
dispatch_request
       return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
     File 
"/my-loc/lib/python3.10/site-packages/connexion/decorators/decorator.py", line 
68, in wrapper
       response = function(request)
     File 
"/my-loc/lib/python3.10/site-packages/connexion/decorators/uri_parsing.py", 
line 149, in wrapper
       response = function(request)
     File 
"/my-loc/lib/python3.10/site-packages/connexion/decorators/validation.py", line 
399, in wrapper
       return function(request)
     File 
"/my-loc/lib/python3.10/site-packages/connexion/decorators/response.py", line 
113, in wrapper
       return _wrapper(request, response)
     File 
"/my-loc/lib/python3.10/site-packages/connexion/decorators/response.py", line 
90, in _wrapper
       self.operation.api.get_connexion_response(response, self.mimetype)
     File "/my-loc/lib/python3.10/site-packages/connexion/apis/abstract.py", 
line 366, in get_connexion_response
       return cls._framework_to_connexion_response(response=response, 
mimetype=mimetype)
     File "/my-loc/lib/python3.10/site-packages/connexion/apis/flask_api.py", 
line 165, in _framework_to_connexion_response
       body=response.get_data() if not response.direct_passthrough else None,
     File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", 
line 314, in get_data
       self._ensure_sequence()
     File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", 
line 376, in _ensure_sequence
       self.make_sequence()
     File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", 
line 391, in make_sequence
       self.response = list(self.iter_encoded())
     File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", 
line 50, in _iter_encoded
       for item in iterable:
     File 
"/my-loc/lib/python3.10/site-packages/airflow/utils/log/log_reader.py", line 
94, in read_log_stream
       logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
     File 
"/my-loc/lib/python3.10/site-packages/airflow/utils/log/log_reader.py", line 
66, in read_log_chunks
       logs, metadatas = self.log_handler.read(ti, try_number, 
metadata=metadata)
     File 
"/my-loc/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py", 
line 491, in read
       log, out_metadata = self._read(task_instance, try_number_element, 
metadata)
     File 
"/my-loc/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py", 
line 389, in _read
       response = self._executor_get_task_log(ti, try_number)
     File "/my-loc/lib/python3.10/functools.py", line 981, in __get__
       val = self.func(instance)
     File 
"/my-loc/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py", 
line 346, in _executor_get_task_log
       executor = ExecutorLoader.get_default_executor()
     File 
"/my-loc/lib/python3.10/site-packages/airflow/executors/executor_loader.py", 
line 165, in get_default_executor
       default_executor = cls.load_executor(cls.get_default_executor_name())
     File 
"/my-loc/lib/python3.10/site-packages/airflow/executors/executor_loader.py", 
line 246, in load_executor
       executor = executor_cls()
     File 
"/my-loc/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
 line 135, in __init__
       self.task_queue: Queue[KubernetesJobType] = self._manager.Queue()
     File "/my-loc/lib/python3.10/multiprocessing/managers.py", line 723, in 
temp
       token, exp = self._create(typeid, *args, **kwds)
     File "/my-loc/lib/python3.10/multiprocessing/managers.py", line 606, in 
_create
       conn = self._Client(self._address, authkey=self._authkey)
     File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 508, in 
Client
       answer_challenge(c, authkey)
     File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 752, in 
answer_challenge
       message = connection.recv_bytes(256)         # reject large message
     File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 216, in 
recv_bytes
       buf = self._recv_bytes(maxlength)
     File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 414, in 
_recv_bytes
       buf = self._recv(4)
     File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 379, in 
_recv
       chunk = read(handle, remaining)
   BlockingIOError: [Errno 11] Resource temporarily unavailable
   ```
   
   As you can see, all lines of code are slightly offset.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to