peloyeje commented on code in PR #34529:
URL: https://github.com/apache/airflow/pull/34529#discussion_r1334603444


##########
airflow/www/views.py:
##########
@@ -5803,21 +5859,42 @@ def duration_f(self):
     @provide_session
     @action_logging
     def action_clear(self, task_instances, session: Session = NEW_SESSION):
-        """Clears the action."""
+        """Clears an arbitrary number of task instances."""
         try:
-            dag_to_tis = collections.defaultdict(list)
-
-            for ti in task_instances:
-                dag = get_airflow_app().dag_bag.get_dag(ti.dag_id)
-                dag_to_tis[dag].append(ti)
+            count = self._clear_task_instances(
+                task_instances=task_instances, session=session, 
clear_downstream=False
+            )
+            session.commit()
+            flash(f"{count} task instances have been cleared")
+        except Exception as e:
+            flash(f'Failed to clear task instances: "{e}"', "error")
 
-            for dag, task_instances_list in dag_to_tis.items():
-                models.clear_task_instances(task_instances_list, session, 
dag=dag)
+        self.update_redirect()
+        return redirect(self.get_redirect())
 
+    @action(
+        "clear_downstream",
+        lazy_gettext("Clear (including downstream tasks)"),
+        lazy_gettext(
+            "Are you sure you want to clear the state of the selected task"
+            " instance(s) and all their downstream dependencie(s), and set 
their dagruns to the QUEUED state?"

Review Comment:
   Agreed, this was not great! Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to