enrique-ayala opened a new issue, #32757:
URL: https://github.com/apache/airflow/issues/32757

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   version | 2.5.3                                                
   executor | CeleryExecutor
   
   When using Dynamic Task Mapping I noticed a behavior during a rerun/clearing 
of a particular mapped index.
   
   In my DAG, I have task ``submit_job`` that is dynamically mapped using the 
expand method. Downstream from each ``submit_job`` task, I have a 
``wait_and_get_report`` sensor that is also dynamically mapped using the expand 
method (See How to reproduce section with a sample dag) . 
   
   During a rerun or clearing of a specific mapped index for example 
``submit_job[0]``, I expected only the corresponding  downstream task 
``wait_and_get_report[0]`` to be rerun. However, I observed that all downstream 
tasks  ``wait_and_get_report[0]`` and ``wait_and_get_report[1]`` were rerun as 
well: 
   
![image](https://github.com/apache/airflow/assets/10963531/da456407-40fe-4f0d-ab3c-891ff10dc671)
   
   
   
   
   ### What you think should happen instead
   
   During a rerun or clearing of a specific mapped index, I expect the behavior 
to be more granular. Specifically:
   
   When I rerun or clear a particular mapped index ``submit_job[0]`` , I 
expected that : 
   - ``submit_job[0]``should be rerun (initiating a new API call) and only the 
corresponding downstream ``wait_and_get_report[0]`` should be rerun, waiting 
for the completion of the new API call and retrieving the report results for 
that specific index.
   
   Currently, the rerun or clearing operation triggers all downstream tasks, 
regardless of the mapped index, which is causing additional processing and 
potentially unnecessary API calls.
   
   ### How to reproduce
   
   This is a sample dag that reflects this use case : 
   
   ```
   from airflow import DAG
   from airflow.decorators import task
   from airflow.utils.dates import days_ago
   from airflow.sensors.base import BaseSensorOperator
   
   default_args = {
       'owner': 'dummy_owner',
       'start_date': days_ago(1),
   }
   
   
   class WaitAndGetReport(BaseSensorOperator):
       """
       Wait for job_id to finish and retrieve report file
       """
   
       template_fields = ('job_id',)
   
       def __init__(
           self,
           *args,
           job_id: str,
           **kwargs
       ):
           super().__init__(**kwargs)
           self.job_id = job_id
   
       def poke(self, context: dict) -> bool:
           print(f"Check until JobStatus==true and immediate call GetReportAPI 
for: {self.job_id}")   
           return True
   
   
   with DAG(
       'sample_dag_to_reproduce_issue',
       default_args=default_args,
       schedule_interval=None, 
       catchup=False,
   ) as dag:
   
       @task
       def prepare_report_request():
           # API json request body is generated here
           return ["request_1", "request_2"]
   
       @task
       def submit_job(job_request):
           # API call is executed here which returns a corresponding job_id . 
           print(f"Calling submitJobAPI ...")
           return f"a_returned_job_id_for_{job_request}_from_submitJobAPI"
   
   
       report_requests = prepare_report_request()
       job_ids = submit_job.expand(job_request=report_requests)
       get_reports = WaitAndGetReport.partial(task_id='wait_and_get_report',
                                             mode='reschedule', 
poke_interval=5).expand(job_id=job_ids)
   ```
   
   ### Operating System
   
   PRETTY_NAME="Debian GNU/Linux 11 (bullseye)"
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to