GitHub user apmechev edited a discussion: "XCom not found" error thrown by EMR 
Sensors after Airflow 3.0

We get 'Xcom not found' errors from the `EmrJobFlowSensor`: 
```
{
  "run_id": "scheduled__2025-06-11T11:00:00+00:00",
  "status_code": 404,
  "level": "error",
  "map_index": -1,
  "task_id": "clean-hdfs.check_steps",
  "dag_id": "ycp-reporting-pipeline-dev",
  "detail": {
    "detail": {
      "reason": "not_found",
      "message": "XCom with key='emr_cluster' map_index=-1 not found for task 
'clean-hdfs.check_steps' in DAG run 'scheduled__2025-06-11T11:00:00+00:00' of 
'*-dev'"
    }
  },
  "event": "XCom not found",
  "key": "emr_cluster",
  "timestamp": "2025-06-11T12:19:49.552298Z",
  "logger": "airflow.sdk.api.client"
}
```
The two errors appear every time an EMR sensor executes, it is due to two XComs 
keys not existing: `emr_cluster` and `emr_logs`

I traced the issue to the[ operator extra 
links](https://github.com/apache/airflow/blob/main/providers/amazon/src/airflow/providers/amazon/aws/sensors/emr.py#L441-L444)
 in the EMR sensor. These links get 
[keys](https://github.com/apache/airflow/blob/main/providers/amazon/src/airflow/providers/amazon/aws/links/emr.py#L36)
 from the [task 
instance](https://github.com/apache/airflow/blob/main/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py#L85),
 however the keys don't seem to exist on the Sensor's Xcom. 

I've added the code below, is this something we've misconfigured or is this an 
issue in airflow? It started appearing in our logs after our upgrade to airflow 
3.0

```
def create_steps_task_group(
    start_date, stack_name, aws_region, app_name, steps, parent_dag, xcoms
):
...
    with TaskGroup(group_id=group_id, dag=parent_dag) as tg:
        emr_cluster_info_operator = PythonOperator(
            task_id="get_cluster_info",
            python_callable=emr_cluster_info,
            op_kwargs={"stack_name": stack_name, "cf_region_name": aws_region},
            dag=parent_dag,
        )
        emr_add_steps_op = EmrAddStepsOperator(
            task_id="add_cluster_steps",
            job_flow_id=xcoms["add_cluster_steps"],
            steps=steps,
            dag=parent_dag,
        )
        check_cluster_sensor = EmrJobFlowSensor(
            task_id="check_steps",
            job_flow_id=xcoms["add_cluster_steps"],
            poke_interval=SENSOR_POKE_INTERVAL,
            dag=parent_dag,
            on_failure_callback=check_steps_failure,
            do_xcom_push=True,
        )
        emr_cluster_info_operator >> emr_add_steps_op >> check_cluster_sensor
        return tg
```
```
        "apache-airflow-providers-amazon": {
...
            "markers": "python_version ~= '3.9'",
            "version": "==9.8.0"
        "apache-airflow-core": {
...
            "markers": "python_version < '3.13' and python_version ~= '3.9'",
            "version": "==3.0.2"
```


GitHub link: https://github.com/apache/airflow/discussions/51652

----
This is an automatically sent email for commits@airflow.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org

Reply via email to