This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 57abbac689472fd04db036e0dc06794971a0a68e Author: Aleksey Kirilishin <54231417+avkirilis...@users.noreply.github.com> AuthorDate: Mon Feb 14 18:55:00 2022 +0300 Show task status only for running dags or only for the last finished dag (#21352) * Show task status only for running dags or only for the last finished dag * Brought the logic of getting task statistics into a separate function (cherry picked from commit 28d7bde2750c38300e5cf70ba32be153b1a11f2c) --- airflow/www/views.py | 64 ++++++++++++++++++++++++++++++++++--------- tests/www/views/test_views.py | 35 ++++++++++++++++++++++- 2 files changed, 85 insertions(+), 14 deletions(-) diff --git a/airflow/www/views.py b/airflow/www/views.py index 2ed2a67..9ebe899 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -408,6 +408,31 @@ def dag_edges(dag): return result +def get_task_stats_from_query(qry): + """ + Return a dict of the task quantity, grouped by dag id and task status. + + :param qry: The data in the format (<dag id>, <task state>, <is dag running>, <task count>), + ordered by <dag id> and <is dag running> + """ + data = {} + last_dag_id = None + has_running_dags = False + for dag_id, state, is_dag_running, count in qry: + if last_dag_id != dag_id: + last_dag_id = dag_id + has_running_dags = False + elif not is_dag_running and has_running_dags: + continue + + if is_dag_running: + has_running_dags = True + if dag_id not in data: + data[dag_id] = {} + data[dag_id][state] = count + return data + + ###################################################################################### # Error handlers ###################################################################################### @@ -814,7 +839,9 @@ class Airflow(AirflowBaseView): # Select all task_instances from active dag_runs. running_task_instance_query_result = session.query( - TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state') + TaskInstance.dag_id.label('dag_id'), + TaskInstance.state.label('state'), + sqla.literal(True).label('is_dag_running'), ).join( running_dag_run_query_result, and_( @@ -838,7 +865,11 @@ class Airflow(AirflowBaseView): # Select all task_instances from active dag_runs. # If no dag_run is active, return task instances from most recent dag_run. last_task_instance_query_result = ( - session.query(TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state')) + session.query( + TaskInstance.dag_id.label('dag_id'), + TaskInstance.state.label('state'), + sqla.literal(False).label('is_dag_running'), + ) .join(TaskInstance.dag_run) .join( last_dag_run, @@ -855,18 +886,25 @@ class Airflow(AirflowBaseView): else: final_task_instance_query_result = running_task_instance_query_result.subquery('final_ti') - qry = session.query( - final_task_instance_query_result.c.dag_id, - final_task_instance_query_result.c.state, - sqla.func.count(), - ).group_by(final_task_instance_query_result.c.dag_id, final_task_instance_query_result.c.state) - - data = {} - for dag_id, state, count in qry: - if dag_id not in data: - data[dag_id] = {} - data[dag_id][state] = count + qry = ( + session.query( + final_task_instance_query_result.c.dag_id, + final_task_instance_query_result.c.state, + final_task_instance_query_result.c.is_dag_running, + sqla.func.count(), + ) + .group_by( + final_task_instance_query_result.c.dag_id, + final_task_instance_query_result.c.state, + final_task_instance_query_result.c.is_dag_running, + ) + .order_by( + final_task_instance_query_result.c.dag_id, + final_task_instance_query_result.c.is_dag_running.desc(), + ) + ) + data = get_task_stats_from_query(qry) payload = {} for dag_id in filter_dag_ids: payload[dag_id] = [] diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py index b98c1bc..672d4a1 100644 --- a/tests/www/views/test_views.py +++ b/tests/www/views/test_views.py @@ -24,7 +24,13 @@ import pytest from airflow.configuration import initialize_config from airflow.plugins_manager import AirflowPlugin, EntryPointSource from airflow.www import views -from airflow.www.views import get_key_paths, get_safe_url, get_value_from_path, truncate_task_duration +from airflow.www.views import ( + get_key_paths, + get_safe_url, + get_task_stats_from_query, + get_value_from_path, + truncate_task_duration, +) from tests.test_utils.config import conf_vars from tests.test_utils.mock_plugins import mock_plugin_manager from tests.test_utils.www import check_content_in_response, check_content_not_in_response @@ -333,3 +339,30 @@ def test_dag_edit_privileged_requires_view_has_action_decorators(cls: type): action_funcs = action_funcs - {"action_post"} for action_function in action_funcs: assert_decorator_used(cls, action_function, views.action_has_dag_edit_access) + + +def test_get_task_stats_from_query(): + query_data = [ + ['dag1', 'queued', True, 1], + ['dag1', 'running', True, 2], + ['dag1', 'success', False, 3], + ['dag2', 'running', True, 4], + ['dag2', 'success', True, 5], + ['dag3', 'success', False, 6], + ] + expected_data = { + 'dag1': { + 'queued': 1, + 'running': 2, + }, + 'dag2': { + 'running': 4, + 'success': 5, + }, + 'dag3': { + 'success': 6, + }, + } + + data = get_task_stats_from_query(query_data) + assert data == expected_data