GitHub user apmechev edited a discussion: "XCom not found" error thrown by EMR
Sensors after Airflow 3.0
We get 'Xcom not found' errors from the `EmrJobFlowSensor`:
```
{
"run_id": "scheduled__2025-06-11T11:00:00+00:00",
"status_code": 404,
"level": "error",
"map_index": -1,
"task_id": "clean-hdfs.check_steps",
"dag_id": "ycp-reporting-pipeline-dev",
"detail": {
"detail": {
"reason": "not_found",
"message": "XCom with key='emr_cluster' map_index=-1 not found for task
'clean-hdfs.check_steps' in DAG run 'scheduled__2025-06-11T11:00:00+00:00' of
'*-dev'"
}
},
"event": "XCom not found",
"key": "emr_cluster",
"timestamp": "2025-06-11T12:19:49.552298Z",
"logger": "airflow.sdk.api.client"
}
```
The two errors appear every time an EMR sensor executes, it is due to two XComs
keys not existing: `emr_cluster` and `emr_logs`
I traced the issue to the[ operator extra
links](https://github.com/apache/airflow/blob/main/providers/amazon/src/airflow/providers/amazon/aws/sensors/emr.py#L441-L444)
in the EMR sensor. These links get
[keys](https://github.com/apache/airflow/blob/main/providers/amazon/src/airflow/providers/amazon/aws/links/emr.py#L36)
from the [task
instance](https://github.com/apache/airflow/blob/main/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py#L85),
however the keys don't seem to exist on the Sensor's Xcom.
I've added the code below, is this something we've misconfigured or is this an
issue in airflow? It started appearing in our logs after our upgrade to airflow
3.0
```
def create_steps_task_group(
start_date, stack_name, aws_region, app_name, steps, parent_dag, xcoms
):
...
with TaskGroup(group_id=group_id, dag=parent_dag) as tg:
emr_cluster_info_operator = PythonOperator(
task_id="get_cluster_info",
python_callable=emr_cluster_info,
op_kwargs={"stack_name": stack_name, "cf_region_name": aws_region},
dag=parent_dag,
)
emr_add_steps_op = EmrAddStepsOperator(
task_id="add_cluster_steps",
job_flow_id=xcoms["add_cluster_steps"],
steps=steps,
dag=parent_dag,
)
check_cluster_sensor = EmrJobFlowSensor(
task_id="check_steps",
job_flow_id=xcoms["add_cluster_steps"],
poke_interval=SENSOR_POKE_INTERVAL,
dag=parent_dag,
on_failure_callback=check_steps_failure,
do_xcom_push=True,
)
emr_cluster_info_operator >> emr_add_steps_op >> check_cluster_sensor
return tg
```
```
"apache-airflow-providers-amazon": {
...
"markers": "python_version ~= '3.9'",
"version": "==9.8.0"
"apache-airflow-core": {
...
"markers": "python_version < '3.13' and python_version ~= '3.9'",
"version": "==3.0.2"
```
GitHub link: https://github.com/apache/airflow/discussions/51652
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]