Repository: incubator-airflow Updated Branches: refs/heads/master 3ff5abee3 -> 0f7ddbbed
[AIRFLOW-970] Load latest_runs on homepage async The latest_runs column on the homepage loads synchronously with an n+1 query. Homepage loads will be significantly faster if this happens asynchronously and as a batch. Closes #2144 from saguziel/aguziel-latest-run- async Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0f7ddbbe Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0f7ddbbe Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0f7ddbbe Branch: refs/heads/master Commit: 0f7ddbbedb05f2f11500250db4989edcb27bc164 Parents: 3ff5abe Author: Alex Guziel <[email protected]> Authored: Wed Apr 5 10:02:42 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Wed Apr 5 10:02:42 2017 +0200 ---------------------------------------------------------------------- airflow/models.py | 23 ++++++++++++++++++++ airflow/www/api/experimental/endpoints.py | 23 +++++++++++++++++++- airflow/www/templates/airflow/dags.html | 29 ++++++++++++++------------ tests/dags/test_latest_runs.py | 27 ++++++++++++++++++++++++ tests/models.py | 21 +++++++++++++++++-- 5 files changed, 107 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0f7ddbbe/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 8a91cc2..95e2255 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -4218,6 +4218,29 @@ class DagRun(Base): return False + @classmethod + @provide_session + def get_latest_runs(cls, session): + """Returns the latest running DagRun for each DAG. """ + subquery = ( + session + .query( + cls.dag_id, + func.max(cls.execution_date).label('execution_date')) + .filter(cls.state == State.RUNNING) + .group_by(cls.dag_id) + .subquery() + ) + dagruns = ( + session + .query(cls) + .join(subquery, + and_(cls.dag_id == subquery.c.dag_id, + cls.execution_date == subquery.c.execution_date)) + .all() + ) + return dagruns + class Pool(Base): __tablename__ = "slot_pool" http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0f7ddbbe/airflow/www/api/experimental/endpoints.py ---------------------------------------------------------------------- diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py index 56b9d79..63355c7 100644 --- a/airflow/www/api/experimental/endpoints.py +++ b/airflow/www/api/experimental/endpoints.py @@ -20,7 +20,8 @@ from airflow.exceptions import AirflowException from airflow.www.app import csrf from flask import ( - g, Markup, Blueprint, redirect, jsonify, abort, request, current_app, send_file + g, Markup, Blueprint, redirect, jsonify, abort, + request, current_app, send_file, url_for ) from datetime import datetime @@ -110,3 +111,23 @@ def task_info(dag_id, task_id): task = dag.get_task(task_id) fields = {k: str(v) for k, v in vars(task).items() if not k.startswith('_')} return jsonify(fields) + + +@api_experimental.route('/latest_runs', methods=['GET']) +@requires_authentication +def latest_dag_runs(): + """Returns the latest running DagRun for each DAG formatted for the UI. """ + from airflow.models import DagRun + dagruns = DagRun.get_latest_runs() + payload = [] + for dagrun in dagruns: + if dagrun.execution_date: + payload.append({ + 'dag_id': dagrun.dag_id, + 'execution_date': dagrun.execution_date.strftime("%Y-%m-%d %H:%M"), + 'start_date': ((dagrun.start_date or '') and + dagrun.start_date.strftime("%Y-%m-%d %H:%M")), + 'dag_run_url': url_for('airflow.graph', dag_id=dagrun.dag_id, + execution_date=dagrun.execution_date) + }) + return jsonify(payload) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0f7ddbbe/airflow/www/templates/airflow/dags.html ---------------------------------------------------------------------- diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html index 8a5a346..5abbc4b 100644 --- a/airflow/www/templates/airflow/dags.html +++ b/airflow/www/templates/airflow/dags.html @@ -105,19 +105,7 @@ </td> <!-- Column 7: Last Run --> - <td class="text-nowrap"> - {% if dag %} - {% set last_run = dag.get_last_dagrun(include_externally_triggered=True) %} - {% if last_run and last_run.start_date %} - <a href="{{ url_for('airflow.graph', dag_id=last_run.dag_id, execution_date=last_run.execution_date ) }}"> - {{ last_run.execution_date.strftime("%Y-%m-%d %H:%M") }} - </a> <span id="statuses_info" class="glyphicon glyphicon-info-sign" aria-hidden="true" title="Start Date: {{last_run.start_date.strftime('%Y-%m-%d %H:%M')}}"></span> - {% else %} - <!--No DAG Runs--> - {% endif %} - {% else %} - <!--No DAG Runs--> - {% endif %} + <td class="text-nowrap latest_dag_run {{ dag.dag_id }}"> </td> <!-- Column 8: Dag Runs --> @@ -237,6 +225,21 @@ } }); }); + $.getJSON("{{ url_for('api_experimental.latest_dag_runs') }}", function(data) { + $.each(data, function() { + var link = $("<a>", { + href: this.dag_run_url, + text: this.execution_date + }); + var info_icon = $('<span>', { + "aria-hidden": "true", + id: "statuses_info", + title: "Start Date: " + this.start_date, + "class": "glyphicon glyphicon-info-sign" + }); + $('.latest_dag_run.' + this.dag_id).append(link).append(info_icon); + }); + }); d3.json("{{ url_for('airflow.dag_stats') }}", function(error, json) { for(var dag_id in json) { states = json[dag_id]; http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0f7ddbbe/tests/dags/test_latest_runs.py ---------------------------------------------------------------------- diff --git a/tests/dags/test_latest_runs.py b/tests/dags/test_latest_runs.py new file mode 100644 index 0000000..dd04c0e --- /dev/null +++ b/tests/dags/test_latest_runs.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from datetime import datetime + +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator + +for i in range(1, 2): + dag = DAG(dag_id='test_latest_runs_{}'.format(i)) + task = DummyOperator( + task_id='dummy_task', + dag=dag, + owner='airflow', + start_date=datetime(2016, 2, 1)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0f7ddbbe/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index 15450dd..20da4d4 100644 --- a/tests/models.py +++ b/tests/models.py @@ -236,11 +236,13 @@ class DagTest(unittest.TestCase): class DagRunTest(unittest.TestCase): - def create_dag_run(self, dag, state=State.RUNNING, task_states=None): + def create_dag_run(self, dag, state=State.RUNNING, task_states=None, execution_date=None): now = datetime.datetime.now() + if execution_date is None: + execution_date = now dag_run = dag.create_dagrun( run_id='manual__' + now.isoformat(), - execution_date=now, + execution_date=execution_date, start_date=now, state=state, external_trigger=False, @@ -412,6 +414,21 @@ class DagRunTest(unittest.TestCase): ti = dag_run.get_task_instance('test_short_circuit_false') self.assertEqual(None, ti) + def test_get_latest_runs(self): + session = settings.Session() + dag = DAG( + dag_id='test_latest_runs_1', + start_date=DEFAULT_DATE) + dag_1_run_1 = self.create_dag_run(dag, + execution_date=datetime.datetime(2015, 1, 1)) + dag_1_run_2 = self.create_dag_run(dag, + execution_date=datetime.datetime(2015, 1, 2)) + dagruns = models.DagRun.get_latest_runs(session) + session.close() + for dagrun in dagruns: + if dagrun.dag_id == 'test_latest_runs_1': + self.assertEqual(dagrun.execution_date, datetime.datetime(2015, 1, 2)) + class DagBagTest(unittest.TestCase):
