hinnefe2 opened a new issue, #24487: URL: https://github.com/apache/airflow/issues/24487
### Apache Airflow version 2.3.2 (latest released) ### What happened Attempting to use [dynamic task mapping](https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html#mapping-over-result-of-classic-operators) on the results of a `KubernetesPodOperator` (or `GKEStartPodOperator`) produces 3x as many downstream task instances as it should. Two-thirds of the downstream tasks fail more or less instantly. ### What you think should happen instead The problem is that the number of downstream tasks is calculated by counting XCOMs associated with the upstream task, assuming that each `task_id` has a single XCOM: https://github.com/apache/airflow/blob/fe5e689adfe3b2f9bcc37d3975ae1aea9b55e28a/airflow/models/mappedoperator.py#L606-L615 However the `KubernetesPodOperator` pushes two XCOMs in its `.execute()` method: https://github.com/apache/airflow/blob/fe5e689adfe3b2f9bcc37d3975ae1aea9b55e28a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py#L425-L426 So the number of downstream tasks ends up being 3x what it should. ### How to reproduce Reproducing the behavior requires access to a kubernetes cluster, but in psedo-code, a dag like this should demonstrate the behavior: ``` with DAG(...) as dag: # produces an output list with N elements first_pod = GKEStartPodOperator(..., do_xcom_push=True) # produces 1 output per input, so N task instances are created each with a single output second_pod = GKEStartPodOperator.partial(..., do_xcom_push=True).expand(id=XComArg(first_pod)) # should have N task instances created, but actually gets 3N task instances created third_pod = GKEStartPodOperator.partial(..., do_xcom_push=True).expand(id=XComArg(second_pod)) ``` ### Operating System macOS 12.4 ### Versions of Apache Airflow Providers apache-airflow-providers-cncf-kubernetes==4.1.0 apache-airflow-providers-google==8.0.0 ### Deployment Virtualenv installation ### Deployment details _No response_ ### Anything else When I edit `mappedoperator.py` in my local deployment to filter on the XCOM key things behave as expected: ``` # Collect lengths from mapped upstreams. xcom_query = ( session.query(XCom.task_id, func.count(XCom.map_index)) .group_by(XCom.task_id) .filter( XCom.dag_id == self.dag_id, XCom.run_id == run_id, XCom.key == 'return_value', <------- added this line XCom.task_id.in_(mapped_dep_keys), XCom.map_index >= 0, ) ) ``` ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
