This is an automated email from the ASF dual-hosted git repository. bbovenzi pushed a commit to branch show-mapped-task-in-tree-view in repository https://gitbox.apache.org/repos/asf/airflow.git
commit c646171b7bcbfde1e45dfdc88087b0e9113df2f2 Author: Brent Bovenzi <[email protected]> AuthorDate: Tue Feb 22 12:17:29 2022 -0500 add graph tooltip and map count --- airflow/models/dagrun.py | 48 ++++++------ airflow/www/static/js/graph.js | 42 ++++++++-- airflow/www/static/js/task_instances.js | 32 ++++++++ airflow/www/utils.py | 135 +++++++++++++++++--------------- airflow/www/views.py | 12 ++- 5 files changed, 176 insertions(+), 93 deletions(-) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 69b003f..5170ad3 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -43,6 +43,7 @@ from sqlalchemy.orm.session import Session from sqlalchemy.sql.expression import false, select, true from airflow import settings +from airflow.callbacks.callback_requests import DagCallbackRequest from airflow.configuration import conf as airflow_conf from airflow.exceptions import AirflowException, TaskNotFound from airflow.models.base import COLLATION_ARGS, ID_LEN, Base @@ -52,7 +53,7 @@ from airflow.models.tasklog import LogTemplate from airflow.stats import Stats from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES -from airflow.utils import callback_requests, timezone +from airflow.utils import timezone from airflow.utils.helpers import is_container from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import NEW_SESSION, provide_session @@ -487,7 +488,7 @@ class DagRun(Base, LoggingMixin): @provide_session def update_state( self, session: Session = NEW_SESSION, execute_callbacks: bool = True - ) -> Tuple[List[TI], Optional[callback_requests.DagCallbackRequest]]: + ) -> Tuple[List[TI], Optional[DagCallbackRequest]]: """ Determines the overall state of the DagRun based on the state of its TaskInstances. @@ -499,7 +500,7 @@ class DagRun(Base, LoggingMixin): needs to be executed """ # Callback to execute in case of Task Failures - callback: Optional[callback_requests.DagCallbackRequest] = None + callback: Optional[DagCallbackRequest] = None start_dttm = timezone.utcnow() self.last_scheduling_decision = start_dttm @@ -535,7 +536,7 @@ class DagRun(Base, LoggingMixin): if execute_callbacks: dag.handle_callback(self, success=False, reason='task_failure', session=session) elif dag.has_on_failure_callback: - callback = callback_requests.DagCallbackRequest( + callback = DagCallbackRequest( full_filepath=dag.fileloc, dag_id=self.dag_id, run_id=self.run_id, @@ -550,7 +551,7 @@ class DagRun(Base, LoggingMixin): if execute_callbacks: dag.handle_callback(self, success=True, reason='success', session=session) elif dag.has_on_success_callback: - callback = callback_requests.DagCallbackRequest( + callback = DagCallbackRequest( full_filepath=dag.fileloc, dag_id=self.dag_id, run_id=self.run_id, @@ -571,7 +572,7 @@ class DagRun(Base, LoggingMixin): if execute_callbacks: dag.handle_callback(self, success=False, reason='all_tasks_deadlocked', session=session) elif dag.has_on_failure_callback: - callback = callback_requests.DagCallbackRequest( + callback = DagCallbackRequest( full_filepath=dag.fileloc, dag_id=self.dag_id, run_id=self.run_id, @@ -651,7 +652,7 @@ class DagRun(Base, LoggingMixin): def _get_ready_tis( self, - scheduleable_tis: List[TI], + schedulable_tis: List[TI], finished_tis: List[TI], session: Session, ) -> Tuple[List[TI], bool]: @@ -659,41 +660,40 @@ class DagRun(Base, LoggingMixin): ready_tis: List[TI] = [] changed_tis = False - if not scheduleable_tis: + if not schedulable_tis: return ready_tis, changed_tis # If we expand TIs, we need a new list so that we iterate over them too. (We can't alter - # `scheduleable_tis` in place and have the `for` loop pick them up + # `schedulable_tis` in place and have the `for` loop pick them up expanded_tis: List[TI] = [] # Check dependencies - for st in itertools.chain(scheduleable_tis, expanded_tis): + for schedulable in itertools.chain(schedulable_tis, expanded_tis): # Expansion of last resort! This is ideally handled in the mini-scheduler in LocalTaskJob, but if # for any reason it wasn't, we need to expand it now - if st.map_index < 0 and st.task.is_mapped: + if schedulable.map_index < 0 and schedulable.task.is_mapped: # HACK. This needs a better way, one that copes with multiple upstreams! for ti in finished_tis: - if st.task_id in ti.task.downstream_task_ids: - upstream = ti - - assert isinstance(st.task, MappedOperator) - new_tis = st.task.expand_mapped_task(upstream, session=session) - assert new_tis[0] is st - # Add the new TIs to the list to be checked - for new_ti in new_tis[1:]: - new_ti.task = st.task + if schedulable.task_id in ti.task.downstream_task_ids: + + assert isinstance(schedulable.task, MappedOperator) + new_tis = schedulable.task.expand_mapped_task(self.run_id, session=session) + if schedulable.state == TaskInstanceState.SKIPPED: + # Task is now skipped (likely cos upstream returned 0 tasks + continue + assert new_tis[0] is schedulable expanded_tis.extend(new_tis[1:]) break - old_state = st.state - if st.are_dependencies_met( + old_state = schedulable.state + if schedulable.are_dependencies_met( dep_context=DepContext(flag_upstream_failed=True, finished_tis=finished_tis), session=session, ): - ready_tis.append(st) + ready_tis.append(schedulable) else: - old_states[st.key] = old_state + old_states[schedulable.key] = old_state # Check if any ti changed state tis_filter = TI.filter_for_tis(old_states.keys()) diff --git a/airflow/www/static/js/graph.js b/airflow/www/static/js/graph.js index 254db5a..258004a 100644 --- a/airflow/www/static/js/graph.js +++ b/airflow/www/static/js/graph.js @@ -82,6 +82,33 @@ const render = dagreD3.render(); const svg = d3.select('#graph-svg'); let innerSvg = d3.select('#graph-svg g'); +const updateNodes = (node, instances) => { + const value = { + ...node.value, + label: tasks[node.id] && tasks[node.id].is_mapped + ? `${node.value.label} [${(instances[node.id].mapped_states && instances[node.id].mapped_states.length) || ' '}]` + : node.value.label, + }; + + if (g.node(node.id)) { + g.node(node.id).label = value.label; + } + + if (node.children) { + return { + ...node, + value, + children: node.children.map((n) => updateNodes(n, instances)), + }; + } + return { + ...node, + value, + }; +}; + +let updatedNodes = updateNodes(nodes, taskInstances); + // Remove the node with this nodeId from g. function removeNode(nodeId) { if (g.hasNode(nodeId)) { @@ -377,6 +404,9 @@ function handleRefresh() { if (isFinal) { $('#auto_refresh').prop('checked', false); clearInterval(refreshInterval); + if (JSON.stringify(nodes) !== JSON.stringify(updateNodes)) { + draw(); + } } } prevTis = tis; @@ -476,15 +506,17 @@ function groupTooltip(node, tis) { // Assigning css classes based on state to nodes // Initiating the tooltips function updateNodesStates(tis) { + updatedNodes = updateNodes(nodes, tis); g.nodes().forEach((nodeId) => { - const { elem } = g.node(nodeId); + const node = g.node(nodeId); + const { elem } = node; + const taskId = nodeId; + if (elem) { const classes = `node enter ${getNodeState(nodeId, tis)}`; elem.setAttribute('class', classes); elem.setAttribute('data-toggle', 'tooltip'); - const taskId = nodeId; - const node = g.node(nodeId); elem.onmouseover = (evt) => { let tt; if (taskId in tis) { @@ -713,10 +745,10 @@ const focusNodeId = localStorage.getItem(focusedGroupKey(dagId)); const expandedGroups = getSavedGroups(dagId); // Always expand the root node -expandGroup(null, nodes); +expandGroup(null, updatedNodes); // Expand the node that were previously expanded -expandSavedGroups(expandedGroups, nodes); +expandSavedGroups(expandedGroups, updatedNodes); // Draw once after all groups have been expanded draw(); diff --git a/airflow/www/static/js/task_instances.js b/airflow/www/static/js/task_instances.js index 13f3a9d..e123bd5 100644 --- a/airflow/www/static/js/task_instances.js +++ b/airflow/www/static/js/task_instances.js @@ -69,6 +69,34 @@ export default function tiTooltip(ti, { includeTryNumber = false } = {}) { if (ti.state !== undefined) { tt += `<strong>Status:</strong> ${escapeHtml(ti.state)}<br><br>`; } + if (ti.mapped_states) { + const STATES = [ + ['success', 0], + ['failed', 0], + ['upstream_failed', 0], + ['up_for_retry', 0], + ['up_for_reschedule', 0], + ['running', 0], + ['deferred', 0], + ['sensing', 0], + ['queued', 0], + ['scheduled', 0], + ['skipped', 0], + ['no_status', 0], + ]; + const numMap = new Map(STATES); + ti.mapped_states.forEach((s) => { + const stateKey = s || 'no_status'; + if (numMap.has(stateKey)) numMap.set(stateKey, numMap.get(stateKey) + 1); + }); + tt += `<strong>${escapeHtml(ti.mapped_states.length)} Tasks Mapped</strong><br />`; + numMap.forEach((key, val) => { + if (key > 0) { + tt += `<span style="margin-left: 15px">${escapeHtml(val)}: ${escapeHtml(key)}</span><br />`; + } + }); + tt += '<br />'; + } if (ti.task_id !== undefined) { tt += `Task_id: ${escapeHtml(ti.task_id)}<br>`; } @@ -76,6 +104,10 @@ export default function tiTooltip(ti, { includeTryNumber = false } = {}) { if (ti.run_id !== undefined) { tt += `Run Id: <nobr>${escapeHtml(ti.run_id)}</nobr><br>`; } + // Show mapped index for specific child instance, but not for a summary instance + if (ti.map_index >= 0 && !ti.mapped_states) { + tt += `Map Index: ${escapeHtml(ti.map_index)}<br>`; + } if (ti.operator !== undefined) { tt += `Operator: ${escapeHtml(ti.operator)}<br>`; } diff --git a/airflow/www/utils.py b/airflow/www/utils.py index ed765d4..ef08f8e 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -43,6 +43,7 @@ from airflow.models import errors from airflow.models.taskinstance import TaskInstance from airflow.utils import timezone from airflow.utils.code_utils import get_python_source +from airflow.utils.helpers import alchemy_to_dict from airflow.utils.json import AirflowJsonEncoder from airflow.utils.state import State from airflow.www.forms import DateTimeWithTimezoneField @@ -55,13 +56,83 @@ def datetime_to_string(value: Optional[DateTime]) -> Optional[str]: return value.isoformat() +def get_mapped_instances(task_instance, session): + return ( + session.query(TaskInstance) + .filter( + TaskInstance.dag_id == task_instance.dag_id, + TaskInstance.run_id == task_instance.run_id, + TaskInstance.task_id == task_instance.task_id, + TaskInstance.map_index >= 0, + ) + .all() + ) + + +def get_instance_with_map(task_instance, session): + if task_instance.map_index == -1: + return alchemy_to_dict(task_instance) + mapped_instances = get_mapped_instances(task_instance, session) + return get_mapped_summary(task_instance, mapped_instances) + + +def get_mapped_summary(parent_instance, task_instances): + priority = [ + 'failed', + 'upstream_failed', + 'up_for_retry', + 'up_for_reschedule', + 'queued', + 'scheduled', + 'deferred', + 'sensing', + 'running', + 'shutdown', + 'restarting', + 'removed', + 'no_status', + 'success', + 'skipped', + ] + + mapped_states = [ti.state for ti in task_instances] + + group_state = None + for state in priority: + if state in mapped_states: + group_state = state + break + + group_start_date = datetime_to_string( + min((ti.start_date for ti in task_instances if ti.start_date), default=None) + ) + group_end_date = datetime_to_string( + max((ti.end_date for ti in task_instances if ti.end_date), default=None) + ) + + return { + 'task_id': parent_instance.task_id, + 'run_id': parent_instance.run_id, + 'state': group_state, + 'start_date': group_start_date, + 'end_date': group_end_date, + 'mapped_states': mapped_states, + 'operator': parent_instance.operator, + 'execution_date': datetime_to_string(parent_instance.execution_date), + 'try_number': parent_instance.try_number, + } + + def encode_ti( - task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Session + task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Optional[Session] ) -> Optional[Dict[str, Any]]: if not task_instance: return None - summary = { + if is_mapped: + return get_mapped_summary(task_instance, task_instances=get_mapped_instances(task_instance, session)) + + return { 'task_id': task_instance.task_id, 'dag_id': task_instance.dag_id, 'run_id': task_instance.run_id, @@ -74,66 +145,6 @@ def encode_ti( 'try_number': task_instance.try_number, } - def get_mapped_summary(task_instances): - priority = [ - 'failed', - 'upstream_failed', - 'up_for_retry', - 'up_for_reschedule', - 'queued', - 'scheduled', - 'deferred', - 'sensing', - 'running', - 'shutdown', - 'restarting', - 'removed', - 'no_status', - 'success', - 'skipped', - ] - - mapped_states = [ti.state for ti in task_instances] - - group_state = None - for state in priority: - if state in mapped_states: - group_state = state - break - - group_start_date = datetime_to_string( - min((ti.start_date for ti in task_instances if ti.start_date), default=None) - ) - group_end_date = datetime_to_string( - max((ti.end_date for ti in task_instances if ti.end_date), default=None) - ) - - return { - 'task_id': task_instance.task_id, - 'run_id': task_instance.run_id, - 'state': group_state, - 'start_date': group_start_date, - 'end_date': group_end_date, - 'mapped_states': mapped_states, - 'operator': task_instance.operator, - 'execution_date': datetime_to_string(task_instance.execution_date), - 'try_number': task_instance.try_number, - } - - if is_mapped: - return get_mapped_summary( - session.query(TaskInstance) - .filter( - TaskInstance.dag_id == task_instance.dag_id, - TaskInstance.run_id == task_instance.run_id, - TaskInstance.task_id == task_instance.task_id, - TaskInstance.map_index >= 0, - ) - .all() - ) - - return summary - def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]: if not dag_run: diff --git a/airflow/www/views.py b/airflow/www/views.py index 63c9fb1..a61727f 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -2648,12 +2648,16 @@ class Airflow(AirflowBaseView): form = GraphForm(data=dt_nr_dr_data) form.execution_date.choices = dt_nr_dr_data['dr_choices'] - task_instances = {ti.task_id: alchemy_to_dict(ti) for ti in dag.get_task_instances(dttm, dttm)} + task_instances = { + ti.task_id: wwwutils.get_instance_with_map(ti, session) + for ti in dag.get_task_instances(dttm, dttm) + } tasks = { t.task_id: { 'dag_id': t.dag_id, 'task_type': t.task_type, 'extra_links': t.extra_links, + 'is_mapped': t.is_mapped, } for t in dag.tasks } @@ -3244,7 +3248,11 @@ class Airflow(AirflowBaseView): else: return "Error: Invalid execution_date" - task_instances = {ti.task_id: alchemy_to_dict(ti) for ti in dag.get_task_instances(dttm, dttm)} + with create_session() as session: + task_instances = { + ti.task_id: wwwutils.get_instance_with_map(ti, session) + for ti in dag.get_task_instances(dttm, dttm) + } return json.dumps(task_instances, cls=utils_json.AirflowJsonEncoder)
