vba opened a new issue, #45516:
URL: https://github.com/apache/airflow/issues/45516

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.10.3
   
   ### What happened?
   
   Since our migration from Airflow 2.4.3 to 2.9.3 and then to 2.10.3, we have 
noticed that it has become impossible to access logs via the web UI or the Rest 
API for a running Task instance.
   
   We run our Airflow instance within the in-house k8s infrastructure, using S3 
as our remote logging end.
   
   When the Task instance completes its run, the remote log is visible through 
the web UI. In v2.4.3 for the same params we never encountered similar issues. 
Here are our logging config section:
   
   ```ini
   [logging]
   base_log_folder = /opt/airflow/logs
   remote_logging = True
   remote_log_conn_id = s3_airflow_logs
   delete_local_logs = False
   google_key_path = 
   remote_base_log_folder = s3://the-bucket
   remote_task_handler_kwargs = 
   encrypt_s3_logs = False
   logging_level = INFO
   celery_logging_level = 
   fab_logging_level = WARNING
   logging_config_class = 
   colored_console_log = False
   colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] 
{%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} 
%%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
   colored_formatter_class = 
airflow.utils.log.colored_log.CustomTTYColoredFormatter
   log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - 
%%(message)s
   simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
   dag_processor_log_target = file
   dag_processor_log_format = [%%(asctime)s] [SOURCE:DAG_PROCESSOR] 
{%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
   log_formatter_class = airflow.utils.log.timezone_aware.TimezoneAware
   secret_mask_adapter = 
   task_log_prefix_template = 
   log_filename_template = dag_id={{ ti.dag_id }}/run_id={{ ti.run_id 
}}/task_id={{ ti.task_id }}/{%% if ti.map_index >= 0 %%}map_index={{ 
ti.map_index }}/{%% endif %%}attempt={{ try_number }}.log
   log_processor_filename_template = {{ filename }}.log
   dag_processor_manager_log_location = 
/opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
   dag_processor_manager_log_stdout = False
   task_log_reader = task
   extra_logger_names = 
   worker_log_server_port = 8793
   trigger_log_server_port = 8794
   # interleave_timestamp_parser = 
   file_task_handler_new_folder_permissions = 0o775
   file_task_handler_new_file_permissions = 0o664
   celery_stdout_stderr_separation = False
   enable_task_context_logger = True
   color_log_error_keywords = error,exception
   color_log_warning_keywords = warn
   ```
   
   When we try to access the logs for the running task, we see the following 
text with no content:
   
![image](https://github.com/user-attachments/assets/791f3c4b-76e1-454f-89ff-e5dad6b665d6)
   
   Same result for already finalized task attempts:
   
![image](https://github.com/user-attachments/assets/7bad069e-9536-4a6e-988d-4c4079a14b62)
   
   When we try to get the logs via the REST API 
(`/api/v1/dags/MY-DAG1/dagRuns/manual__DATE/taskInstances/MY-TASK/logs/8?full_content=false`)
 after long waiting, we get a time-out exception and following page:
   
![image](https://github.com/user-attachments/assets/055c9ee1-f485-43e2-ae30-1fec74d661b0)
   
   
   
   ### What you think should happen instead?
   
   If we check the webserver logs we notice the following exceptions:
   ```
   Traceback (most recent call last):
     File "/my-path/python3.10/site-packages/flask/app.py", line 2529, in 
wsgi_app
       response = self.full_dispatch_request()
     File "/my-path/python3.10/site-packages/flask/app.py", line 1825, in 
full_dispatch_request
       rv = self.handle_user_exception(e)
     File "/my-path/python3.10/site-packages/flask/app.py", line 1823, in 
full_dispatch_request
       rv = self.dispatch_request()
     File "/my-path/python3.10/site-packages/flask/app.py", line 1799, in 
dispatch_request
       return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
     File 
"/my-path/python3.10/site-packages/connexion/decorators/decorator.py", line 68, 
in wrapper
       response = function(request)
     File 
"/my-path/python3.10/site-packages/connexion/decorators/uri_parsing.py", line 
149, in wrapper
       response = function(request)
     File 
"/my-path/python3.10/site-packages/connexion/decorators/validation.py", line 
399, in wrapper
       return function(request)
     File "/my-path/python3.10/site-packages/connexion/decorators/response.py", 
line 113, in wrapper
       return _wrapper(request, response)
     File "/my-path/python3.10/site-packages/connexion/decorators/response.py", 
line 90, in _wrapper
       self.operation.api.get_connexion_response(response, self.mimetype)
     File "/my-path/python3.10/site-packages/connexion/apis/abstract.py", line 
366, in get_connexion_response
       return cls._framework_to_connexion_response(response=response, 
mimetype=mimetype)
     File "/my-path/python3.10/site-packages/connexion/apis/flask_api.py", line 
165, in _framework_to_connexion_response
       body=response.get_data() if not response.direct_passthrough else None,
     File "/my-path/python3.10/site-packages/werkzeug/wrappers/response.py", 
line 314, in get_data
       self._ensure_sequence()
     File "/my-path/python3.10/site-packages/werkzeug/wrappers/response.py", 
line 376, in _ensure_sequence
       self.make_sequence()
     File "/my-path/python3.10/site-packages/werkzeug/wrappers/response.py", 
line 391, in make_sequence
       self.response = list(self.iter_encoded())
     File "/my-path/python3.10/site-packages/werkzeug/wrappers/response.py", 
line 50, in _iter_encoded
       for item in iterable:
     File "/my-path/python3.10/site-packages/airflow/utils/log/log_reader.py", 
line 94, in read_log_stream
       logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
     File "/my-path/python3.10/site-packages/airflow/utils/log/log_reader.py", 
line 66, in read_log_chunks
       logs, metadatas = self.log_handler.read(ti, try_number, 
metadata=metadata)
     File 
"/my-path/python3.10/site-packages/airflow/utils/log/file_task_handler.py", 
line 491, in read
       log, out_metadata = self._read(task_instance, try_number_element, 
metadata)
     File 
"/my-path/python3.10/site-packages/airflow/utils/log/file_task_handler.py", 
line 389, in _read
       response = self._executor_get_task_log(ti, try_number)
     File "/my-path/python3.10/functools.py", line 981, in __get__
       val = self.func(instance)
     File 
"/my-path/python3.10/site-packages/airflow/utils/log/file_task_handler.py", 
line 346, in _executor_get_task_log
       executor = ExecutorLoader.get_default_executor()
     File 
"/my-path/python3.10/site-packages/airflow/executors/executor_loader.py", line 
165, in get_default_executor
       default_executor = cls.load_executor(cls.get_default_executor_name())
     File 
"/my-path/python3.10/site-packages/airflow/executors/executor_loader.py", line 
246, in load_executor
       executor = executor_cls()
     File 
"/my-path/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
 line 135, in __init__
       self.task_queue: Queue[KubernetesJobType] = self._manager.Queue()
     File "/my-path/python3.10/multiprocessing/managers.py", line 723, in temp
       token, exp = self._create(typeid, *args, **kwds)
     File "/my-path/python3.10/multiprocessing/managers.py", line 606, in 
_create
       conn = self._Client(self._address, authkey=self._authkey)
     File "/my-path/python3.10/multiprocessing/connection.py", line 508, in 
Client
       answer_challenge(c, authkey)
     File "/my-path/python3.10/multiprocessing/connection.py", line 752, in 
answer_challenge
       message = connection.recv_bytes(256)         # reject large message
     File "/my-path/python3.10/multiprocessing/connection.py", line 216, in 
recv_bytes
       buf = self._recv_bytes(maxlength)
     File "/my-path/python3.10/multiprocessing/connection.py", line 414, in 
_recv_bytes
       buf = self._recv(4)
     File "/my-path/python3.10/multiprocessing/connection.py", line 379, in 
_recv
       chunk = read(handle, remaining)
   BlockingIOError: [Errno 11] Resource temporarily unavailable
   *.*.*.* - - [???? +0000] "GET 
/api/v1/dags/MY-DAG1/dagRuns/manual__DATE/taskInstances/MY-TASK/logs/8?full_content=false
 HTTP/1.1" 500 1589 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:133.0) 
Gecko/20100101 Firefox/133.0"
   ```
   What we notice is that the `s3_task_handler` does its part of the job 
correctly, for a running task it gets the s3 content and if there is no content 
it clearly says `No logs found on s3 for ti=<TaskInstance: ...` The problem 
starts when we try to get stdout for the running k8s pod, as shown above it 
ends with `BlockingIOError - Resource temporarily unavailable`. It all fails in 
`file_task_handler` within `_read` method:
   
   ```
   log, out_metadata = self._read(task_instance, try_number_element, metadata)
   ```
   
   It looks like this problem has been around for several minor releases.
   
   ### How to reproduce
   
   You need to deploy an instance of airflow within a k8s cluster with remote 
logs activated, it should be enough. For solving another issue related to the 
remote logging, we set up following env vars(not sure if it's relevant):
   ```
   _AIRFLOW_PATCH_GEVENT=1
   AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES___AIRFLOW_PATCH_GEVENT=1
   ```
   
   ### Operating System
   
   Debian GNU/Linux trixie/sid
   
   ### Versions of Apache Airflow Providers
   
   ```
   apache-airflow-providers-amazon==9.0.0
   apache-airflow-providers-apache-cassandra==3.6.0
   apache-airflow-providers-apache-druid==3.12.0
   apache-airflow-providers-apache-hdfs==4.6.0
   apache-airflow-providers-apache-hive==8.2.0
   apache-airflow-providers-apache-kylin==3.7.0
   apache-airflow-providers-apache-livy==3.9.2
   apache-airflow-providers-apache-spark==5.0.0
   apache-airflow-providers-celery==3.8.3
   apache-airflow-providers-cncf-kubernetes==9.0.1
   apache-airflow-providers-common-compat==1.2.1
   apache-airflow-providers-common-io==1.4.2
   apache-airflow-providers-common-sql==1.19.0
   apache-airflow-providers-fab==1.5.0
   apache-airflow-providers-ftp==3.11.1
   apache-airflow-providers-google==10.25.0
   apache-airflow-providers-grpc==3.6.0
   apache-airflow-providers-hashicorp==3.8.0
   apache-airflow-providers-http==4.13.2
   apache-airflow-providers-imap==3.7.0
   apache-airflow-providers-microsoft-azure==11.0.0
   apache-airflow-providers-microsoft-mssql==3.9.1
   apache-airflow-providers-microsoft-winrm==3.6.0
   apache-airflow-providers-mongo==4.2.2
   apache-airflow-providers-papermill==3.8.1
   apache-airflow-providers-postgres==5.13.1
   apache-airflow-providers-sftp==4.11.1
   apache-airflow-providers-slack==8.9.1
   apache-airflow-providers-smtp==1.8.0
   apache-airflow-providers-sqlite==3.9.0
   apache-airflow-providers-ssh==3.14.0
   
   ```
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   Kube version: `v1.30.4`
   Helm: `version.BuildInfo{Version:"v3.15.2", 
GitCommit:"1a500d5625419a524fdae4b33de351cc4f58ec35", GitTreeState:"clean", 
GoVersion:"go1.22.4"}`
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to