JWDobken opened a new issue, #53711:
URL: https://github.com/apache/airflow/issues/53711
### Apache Airflow version
3.0.3
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
When I run kubernetes pod operators with XCOM:
```yaml
# Store XCOM in the xcom folder
-
AIRFLOW__CORE__XCOM_BACKEND=airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
-
AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH=abfs://wasb_default@<container-name>/xcom
- AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_THRESHOLD=0
-
AIRFLOW_CONN_WASB_DEFAULT=wasb://:@?extra__wasb__account_name=<account-name>extra__wasb__account_key=<account-key>
```
The task runs, the XCOM to Azure Blob Storage is written, but the **task
fails with RuntimeError: This event loop is already running**:
```logs
RuntimeError: This event loop is already running
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
line 680 in execute_sync
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
line 611 in await_pod_start
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 667 in
run_until_complete
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 626 in
_check_running
```
### What you think should happen instead?
_No response_
### How to reproduce
```py
from typing import List
from airflow.decorators import dag, task
from defaults_args import default_args, default_pod_args
# Common configuration for Kubernetes pods
default_pod_args = {
"image": f"{IMAGE_CONTAINER_REGISTRY}:{IMAGE_TAG}",
"image_pull_policy": "IfNotPresent",
"get_logs": True,
"do_xcom_push": True,
},
# Common configuration for Airflow DAGs
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
}
@dag(
dag_id="test",
schedule=None,
default_args=default_args,
)
def test():
@task.kubernetes(
task_id="test_list",
name="test_list",
**default_pod_args,
)
def make_index_list(count: int) -> List[int]:
return list(range(count))
@task.kubernetes(
task_id="test_expand",
name="test_expand",
**default_pod_args,
)
def hello(index):
print(f"hello: {index}")
indexes = make_index_list(3)
hello.expand(index=indexes)
pipeline = test()
```
### Operating System
Mac OS
### Versions of Apache Airflow Providers
I run the airflow container: `apache/airflow:3.0.3`
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]