pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1016002541


##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", 
status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in 
task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun).filter(DagRun.dag_id == 
dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date 
== start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = 
dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = 
dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   Really interesting.
   
   I think you are right, some big use cases might be problematic. While 
incorporating @uranusjr changes to not load the DagRun object entirely, do you 
think this can be enough to be a 'first acceptable version'. Then I can start 
working on b) which will require quite a few front-end changes. That said we 
should have what is needed to achieve what you are describing. (especially 
thanks to react query).
   
   What do you think @potiuk ?



##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", 
status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in 
task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun).filter(DagRun.dag_id == 
dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date 
== start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = 
dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = 
dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   Really interesting, thanks for your answers 
   
   I think you are right, some big use cases might be problematic. While 
incorporating @uranusjr changes to not load the DagRun object entirely, do you 
think this can be enough to be a 'first acceptable version'. Then I can start 
working on b) which will require quite a few front-end changes. That said we 
should have what is needed to achieve what you are describing. (especially 
thanks to react query).
   
   What do you think @potiuk ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to