JWDobken opened a new issue, #53711:
URL: https://github.com/apache/airflow/issues/53711

   ### Apache Airflow version
   
   3.0.3
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   When I run kubernetes pod operators with XCOM:
   
   ```yaml
   # Store XCOM in the xcom folder
   - 
AIRFLOW__CORE__XCOM_BACKEND=airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
   - 
AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH=abfs://wasb_default@<container-name>/xcom
   - AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_THRESHOLD=0
   - 
AIRFLOW_CONN_WASB_DEFAULT=wasb://:@?extra__wasb__account_name=<account-name>extra__wasb__account_key=<account-key>
   ```
   
   The task runs, the XCOM to Azure Blob Storage is written, but the **task 
fails with RuntimeError: This event loop is already running**:
   
   
   ```logs
   RuntimeError: This event loop is already running
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 680 in execute_sync
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 611 in await_pod_start
   File "/usr/local/lib/python3.12/asyncio/base_events.py", line 667 in 
run_until_complete
   File "/usr/local/lib/python3.12/asyncio/base_events.py", line 626 in 
_check_running
   ```
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   ```py
   from typing import List
   
   from airflow.decorators import dag, task
   from defaults_args import default_args, default_pod_args
   
   # Common configuration for Kubernetes pods
   default_pod_args = {
       "image": f"{IMAGE_CONTAINER_REGISTRY}:{IMAGE_TAG}",
       "image_pull_policy": "IfNotPresent",
       "get_logs": True,
       "do_xcom_push": True,
       },
   
   # Common configuration for Airflow DAGs
   default_args = {
       "owner": "airflow",
       "depends_on_past": False,
       "email_on_failure": False,
       "email_on_retry": False,
       "retries": 0,
   }
   
   
   @dag(
       dag_id="test",
       schedule=None,
       default_args=default_args,
   )
   def test():
       @task.kubernetes(
           task_id="test_list",
           name="test_list",
           **default_pod_args,
       )
       def make_index_list(count: int) -> List[int]:
           return list(range(count))
   
       @task.kubernetes(
           task_id="test_expand",
           name="test_expand",
           **default_pod_args,
       )
       def hello(index):
           print(f"hello: {index}")
   
   
       indexes = make_index_list(3)
       hello.expand(index=indexes)
   
   pipeline = test()
   ```
   
   ### Operating System
   
   Mac OS
   
   ### Versions of Apache Airflow Providers
   
   I run the airflow container: `apache/airflow:3.0.3`
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to