optiluca opened a new issue, #63173:
URL: https://github.com/apache/airflow/issues/63173

   ### Apache Airflow version
   
   3.1.7
   
   ### If "Other Airflow 3 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   I'm running a DAG where a task is implemented as a KubernetesPodOperator 
(KPO), running in deferrable mode.  As the task can take a couple of hours and 
I am interested in seeing logs, I've set logging_interval=600.
   
   In doing this, however, I've been hitting sporadic task failures due to 
AirflowException:
   
     ```
     File 
/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py,
 line 1068 in run
       File 
/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py,
 line 1477 in _execute_task
       File 
/usr/local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py, line 
1633 in resume_execution
       File 
/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py,
 line 993 in trigger_reentry
       File 
/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py,
 line 1019 in _clean
       File 
/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py,
 line 1057 in post_complete_action
       File 
/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py,
 line 1129 in cleanup
   ,AirflowException: Traceback (most recent call last):
     File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py",
 line 183, in run
       event = await self._wait_for_container_completion()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py",
 line 307, in _wait_for_container_completion
       self.last_log_time = await 
self.pod_manager.fetch_container_logs_before_current_sec(
                            
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py",
 line 1076, in fetch_container_logs_before_current_sec
       logs = await self._hook.read_logs(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line 
189, in async_wrapped
       return await copy(fn, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line 
111, in __call__
       do = await self.iter(retry_state=retry_state)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line 
153, in iter
       result = await action(retry_state)
                ^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/tenacity/_utils.py", line 
99, in inner
       return call(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 
400, in <lambda>
       self._add_action_func(lambda rs: rs.outcome.result())
                                        ^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in 
result
       return self.__get_result()
              ^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in 
__get_result
       raise self._exception
     File 
"/usr/local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line 
114, in __call__
       result = await fn(*args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 997, in read_logs
       logs = await v1_api.read_namespaced_pod_log(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 98, in call_api
       return await super().call_api(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/kubernetes_asyncio/client/api_client.py",
 line 209, in __call_api
       response_data.data = response_data.data.decode(encoding)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   UnicodeDecodeError: 'utf-8' codec can't decode bytes in position 
16382-16383: unexpected end of data
   ```
   
   Judging by the symptoms, it seems something in my logs occasionally produces 
invalid utf-8 data, and this brings down the entire task.
   
   I'm mitigating the issue by setting the following environment variables in 
my kpo task:
   
      ```
        # Force UTF-8 stdout so K8s pod trigger log fetch never hits 
UnicodeDecodeError (provider decodes as UTF-8; truncation/invalid bytes cause 
intermittent failures).
           "PYTHONIOENCODING": "utf-8",
           "LANG": "C.UTF-8",
           "LC_ALL": "C.UTF-8",
           # Use dumb terminal so progress bars (e.g. tqdm) use ASCII-only 
output, avoiding multi-byte chars that can be truncated at chunk boundaries.
           "TERM": "dumb",
   ```
   
   ### What you think should happen instead?
   
   I think that handling of "invalid" utf-8 chars should be more graceful, and 
not bring down the entire execution of the task.
   
   ### How to reproduce
   
   - Prepare a DAG with a KPO step, configured in deferrable mode, with 
logging_interval set
   - The KPO step should contain e.g. tqdm progress bars
   
   Run the DAG.  In my case, accessing logs every 10 minutes for a task that 
runs over around 2 hours results in >50% failure rate due to the error above.
   
   ### Operating System
   
   SLES 15-SP7 (this is where the KPO k8s pods are being run)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-cncf-kubernetes, in whichever version is enforced 
by astro/runtime:3.1-12
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to