GitHub user apmechev edited a discussion: "XCom not found" error thrown by EMR Sensors after Airflow 3.0
We get 'Xcom not found' errors from the `EmrJobFlowSensor`: ``` { "run_id": "scheduled__2025-06-11T11:00:00+00:00", "status_code": 404, "level": "error", "map_index": -1, "task_id": "clean-hdfs.check_steps", "dag_id": "ycp-reporting-pipeline-dev", "detail": { "detail": { "reason": "not_found", "message": "XCom with key='emr_cluster' map_index=-1 not found for task 'clean-hdfs.check_steps' in DAG run 'scheduled__2025-06-11T11:00:00+00:00' of '*-dev'" } }, "event": "XCom not found", "key": "emr_cluster", "timestamp": "2025-06-11T12:19:49.552298Z", "logger": "airflow.sdk.api.client" } ``` The two errors appear every time an EMR sensor executes, it is due to two XComs keys not existing: `emr_cluster` and `emr_logs` I traced the issue to the[ operator extra links](https://github.com/apache/airflow/blob/main/providers/amazon/src/airflow/providers/amazon/aws/sensors/emr.py#L441-L444) in the EMR sensor. These links get [keys](https://github.com/apache/airflow/blob/main/providers/amazon/src/airflow/providers/amazon/aws/links/emr.py#L36) from the [task instance](https://github.com/apache/airflow/blob/main/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py#L85), however the keys don't seem to exist on the Sensor's Xcom. I've added the code below, is this something we've misconfigured or is this an issue in airflow? It started appearing in our logs after our upgrade to airflow 3.0 ``` def create_steps_task_group( start_date, stack_name, aws_region, app_name, steps, parent_dag, xcoms ): ... with TaskGroup(group_id=group_id, dag=parent_dag) as tg: emr_cluster_info_operator = PythonOperator( task_id="get_cluster_info", python_callable=emr_cluster_info, op_kwargs={"stack_name": stack_name, "cf_region_name": aws_region}, dag=parent_dag, ) emr_add_steps_op = EmrAddStepsOperator( task_id="add_cluster_steps", job_flow_id=xcoms["add_cluster_steps"], steps=steps, dag=parent_dag, ) check_cluster_sensor = EmrJobFlowSensor( task_id="check_steps", job_flow_id=xcoms["add_cluster_steps"], poke_interval=SENSOR_POKE_INTERVAL, dag=parent_dag, on_failure_callback=check_steps_failure, do_xcom_push=True, ) emr_cluster_info_operator >> emr_add_steps_op >> check_cluster_sensor return tg ``` ``` "apache-airflow-providers-amazon": { ... "markers": "python_version ~= '3.9'", "version": "==9.8.0" "apache-airflow-core": { ... "markers": "python_version < '3.13' and python_version ~= '3.9'", "version": "==3.0.2" ``` GitHub link: https://github.com/apache/airflow/discussions/51652 ---- This is an automatically sent email for commits@airflow.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org