Ferdinanddb commented on issue #55747:
URL: https://github.com/apache/airflow/issues/55747#issuecomment-3335613676

   For info I have the same problem with the [`SparkKubernetesOperator` from 
`cncf.kubernetes` 
provider.](https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html#sparkkubernetesoperator).
 Since I migrated to Airflow 3.x.x, I was not able to run any task in the 
deferrable mode, and everything was working fine on Airflow 2.11 / 2.10.
   
   I suspect that something is off in the way the triggerer is behaving, 
because my task (Spark job in my case) is succeeding, but the Airflow task 
fails with the following log (similar to the one mentioned in this issue, hence 
my comment):
   ```
   AttributeError: 'SparkKubernetesOperator' object has no attribute 'launcher'
   
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
 line 920 in run
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
 line 1215 in _execute_task
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py",
 line 1606 in resume_execution
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 956 in trigger_reentry
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 978 in _clean
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 1013 in post_complete_action
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 1056 in cleanup
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py",
 line 265 in process_pod_deletion
   
   AirflowException: Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py",
 line 148, in run
       state = await self._wait_for_pod_start()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py",
 line 213, in _wait_for_pod_start
       pod = await self._get_pod()
             ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py",
 line 189, in async_wrapped
       return await copy(fn, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py",
 line 111, in __call__
       do = await self.iter(retry_state=retry_state)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py",
 line 153, in iter
       result = await action(retry_state)
                ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/_utils.py", line 
99, in inner
       return call(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 
420, in exc_check
       raise retry_exc.reraise()
             ^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 
187, in reraise
       raise self.last_attempt.result()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in 
result
       return self.__get_result()
              ^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in 
__get_result
       raise self._exception
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py",
 line 114, in __call__
       result = await fn(*args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py",
 line 276, in _get_pod
       pod = await self.hook.get_pod(name=self.pod_name, 
namespace=self.pod_namespace)
             
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 851, in get_pod
       async with self.get_conn() as connection:
                  ^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/contextlib.py", line 210, in __aenter__
       return await anext(self.gen)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 838, in get_conn
       kube_client = await self._load_config() or async_client.ApiClient()
                     ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 757, in _load_config
       in_cluster = self._coalesce_param(self.in_cluster, await 
self._get_field("in_cluster"))
                                                          
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 828, in _get_field
       extras = await self.get_conn_extras()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 817, in get_conn_extras
       self._extras = connection.extra_dejson
                      ^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/connection.py",
 line 449, in extra_dejson
       return self.get_extra_dejson()
              ^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/connection.py",
 line 442, in get_extra_dejson
       mask_secret(extra)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/secrets_masker.py",
 line 134, in mask_secret
       comms.send(MaskSecret(value=secret, name=name))
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 740, in send
       return async_to_sync(self.asend)(msg)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py", 
line 186, in __call__
       raise RuntimeError(
   RuntimeError: You cannot use AsyncToSync in the same thread as an async 
event loop - just await the async function directly.
   ```
   
   Hope that helps!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to