enrique-ayala opened a new issue, #32757: URL: https://github.com/apache/airflow/issues/32757
### Apache Airflow version Other Airflow 2 version (please specify below) ### What happened version | 2.5.3 executor | CeleryExecutor When using Dynamic Task Mapping I noticed a behavior during a rerun/clearing of a particular mapped index. In my DAG, I have task ``submit_job`` that is dynamically mapped using the expand method. Downstream from each ``submit_job`` task, I have a ``wait_and_get_report`` sensor that is also dynamically mapped using the expand method (See How to reproduce section with a sample dag) . During a rerun or clearing of a specific mapped index for example ``submit_job[0]``, I expected only the corresponding downstream task ``wait_and_get_report[0]`` to be rerun. However, I observed that all downstream tasks ``wait_and_get_report[0]`` and ``wait_and_get_report[1]`` were rerun as well:  ### What you think should happen instead During a rerun or clearing of a specific mapped index, I expect the behavior to be more granular. Specifically: When I rerun or clear a particular mapped index ``submit_job[0]`` , I expected that : - ``submit_job[0]``should be rerun (initiating a new API call) and only the corresponding downstream ``wait_and_get_report[0]`` should be rerun, waiting for the completion of the new API call and retrieving the report results for that specific index. Currently, the rerun or clearing operation triggers all downstream tasks, regardless of the mapped index, which is causing additional processing and potentially unnecessary API calls. ### How to reproduce This is a sample dag that reflects this use case : ``` from airflow import DAG from airflow.decorators import task from airflow.utils.dates import days_ago from airflow.sensors.base import BaseSensorOperator default_args = { 'owner': 'dummy_owner', 'start_date': days_ago(1), } class WaitAndGetReport(BaseSensorOperator): """ Wait for job_id to finish and retrieve report file """ template_fields = ('job_id',) def __init__( self, *args, job_id: str, **kwargs ): super().__init__(**kwargs) self.job_id = job_id def poke(self, context: dict) -> bool: print(f"Check until JobStatus==true and immediate call GetReportAPI for: {self.job_id}") return True with DAG( 'sample_dag_to_reproduce_issue', default_args=default_args, schedule_interval=None, catchup=False, ) as dag: @task def prepare_report_request(): # API json request body is generated here return ["request_1", "request_2"] @task def submit_job(job_request): # API call is executed here which returns a corresponding job_id . print(f"Calling submitJobAPI ...") return f"a_returned_job_id_for_{job_request}_from_submitJobAPI" report_requests = prepare_report_request() job_ids = submit_job.expand(job_request=report_requests) get_reports = WaitAndGetReport.partial(task_id='wait_and_get_report', mode='reschedule', poke_interval=5).expand(job_id=job_ids) ``` ### Operating System PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" ### Versions of Apache Airflow Providers _No response_ ### Deployment Other 3rd-party Helm chart ### Deployment details _No response_ ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
