pierrejeambrun commented on code in PR #55670:
URL: https://github.com/apache/airflow/pull/55670#discussion_r2426513087


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -476,6 +477,19 @@ def get_task_instances(
         get_dag_for_run_or_latest_version(dag_bag, dag_run, dag_id, session)
         query = query.where(TI.dag_id == dag_id)
 
+    # Handle task group filtering: if task_display_name_pattern appears to be 
a group ID,
+    # find all tasks that belong to that group and filter by task_id
+    task_id_filter_from_group = None
+    if task_display_name_pattern.value is not None and dag_id != "~":
+        if dag_run:
+            dag = dag_bag.get_dag_for_run(dag_run, session=session)
+            if dag:
+                group_task_ids = resolve_task_group_pattern_to_task_ids(dag, 
task_display_name_pattern.value)
+                if group_task_ids:
+                    task_id_filter_from_group = FilterParam(
+                        TI.task_id, group_task_ids, FilterOptionEnum.ANY_EQUAL
+                    )

Review Comment:
   Also this piece of code should probably leave in 
`QueryTITaskDisplayNamePatternSearch` implementation. (Don't use the factory, 
subclass it and implement a custom logic for the to_orm method), this way we're 
sure that the 'group' filtering is always applied when using a 
`QueryTITaskDisplayNamePatternSearch`



##########
airflow-core/src/airflow/api_fastapi/core_api/services/ui/task_group.py:
##########
@@ -112,3 +112,17 @@ def task_group_to_dict_grid(task_item_or_group, 
parent_group_is_mapped=False):
         "is_mapped": mapped or None,
         "children": children or None,
     }
+
+
+def resolve_task_group_pattern_to_task_ids(dag, pattern):
+    """Resolve a task group pattern to a list of task IDs."""
+    if not dag or not hasattr(dag, "task_group"):
+        return None
+
+    task_group_dict = dag.task_group.get_task_group_dict()
+    target_group = task_group_dict.get(pattern)
+
+    if target_group:
+        return [task.task_id for task in target_group.iter_tasks()]
+
+    return None

Review Comment:
   This is used by the public api endpoint. And should probably not live in a 
`ui` service, but in a common or public service.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -476,6 +477,19 @@ def get_task_instances(
         get_dag_for_run_or_latest_version(dag_bag, dag_run, dag_id, session)
         query = query.where(TI.dag_id == dag_id)
 
+    # Handle task group filtering: if task_display_name_pattern appears to be 
a group ID,
+    # find all tasks that belong to that group and filter by task_id
+    task_id_filter_from_group = None
+    if task_display_name_pattern.value is not None and dag_id != "~":

Review Comment:
   Why having a condition on `dag_id` ? We are filtering TI per 
`task_display_name_pattern`. That could be 'cross' dag, have all tasks that are 
part of the same goup pattern.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -490,7 +504,7 @@ def get_task_instances(
             queue,
             executor,
             task_id,
-            task_display_name_pattern,
+            task_id_filter_from_group if task_id_filter_from_group else 
task_display_name_pattern,

Review Comment:
   You won't need that anymore.



##########
airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py:
##########
@@ -103,13 +103,28 @@ def _find_aggregates(
         for child in get_task_group_children_getter()(node):
             for child_node in _find_aggregates(node=child, parent_node=node, 
ti_details=ti_details):
                 if child_node["parent_id"] == node_id:
-                    children.append(
-                        {
-                            "state": child_node["state"],
-                            "start_date": child_node["min_start_date"],
-                            "end_date": child_node["max_end_date"],
-                        }
-                    )
+                    # If the child has child_states (like mapped tasks), 
expand them
+                    # to count all individual task instances rather than just 
one child node
+                    if child_node.get("child_states"):
+                        # For each state in child_states, add that many 
entries to children
+                        for state, count in child_node["child_states"].items():
+                            for _ in range(count):
+                                children.append(
+                                    {
+                                        "state": state if state != "None" else 
None,
+                                        "start_date": 
child_node["min_start_date"],
+                                        "end_date": child_node["max_end_date"],

Review Comment:
   This is kind of wrong. We are assigning false values to `start_date` and 
`end_date` of children. 
   
   It feels weird to `aggregate` the children into a `child_states` and then 
re-using that same `child_states` to `de-aggregate` them back into `children` 
based on `child_states` integer values. We should maybe aggregate later in the 
recursion so we still have access to 'children' the whole time and we don't 
need to do that `de-aggregate` step.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to