This is an automated email from the ASF dual-hosted git repository. turbaszek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push: new 289c9b5 Use default view in TriggerDagRunLink (#11778) 289c9b5 is described below commit 289c9b5a994a3e26951ca23b6edd30b2329b3089 Author: Tomek Urbaszek <turbas...@gmail.com> AuthorDate: Wed Nov 11 23:11:53 2020 +0100 Use default view in TriggerDagRunLink (#11778) --- airflow/operators/dagrun_operator.py | 5 +++-- airflow/sensors/external_task_sensor.py | 5 +++-- airflow/utils/helpers.py | 12 ++++++++++++ tests/utils/test_helpers.py | 13 ++++++++++++- 4 files changed, 30 insertions(+), 5 deletions(-) diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py index 0ca0cf5..7547a0f 100644 --- a/airflow/operators/dagrun_operator.py +++ b/airflow/operators/dagrun_operator.py @@ -18,13 +18,13 @@ import datetime from typing import Dict, Optional, Union -from urllib.parse import quote from airflow.api.common.experimental.trigger_dag import trigger_dag from airflow.exceptions import DagNotFound, DagRunAlreadyExists from airflow.models import BaseOperator, BaseOperatorLink, DagBag, DagModel, DagRun from airflow.utils import timezone from airflow.utils.decorators import apply_defaults +from airflow.utils.helpers import build_airflow_url_with_query from airflow.utils.types import DagRunType @@ -37,7 +37,8 @@ class TriggerDagRunLink(BaseOperatorLink): name = 'Triggered DAG' def get_link(self, operator, dttm): - return f"/graph?dag_id={operator.trigger_dag_id}&root=&execution_date={quote(dttm.isoformat())}" + query = {"dag_id": operator.trigger_dag_id, "execution_date": dttm.isoformat()} + return build_airflow_url_with_query(query) class TriggerDagRunOperator(BaseOperator): diff --git a/airflow/sensors/external_task_sensor.py b/airflow/sensors/external_task_sensor.py index 06137a4..c72c0b7e 100644 --- a/airflow/sensors/external_task_sensor.py +++ b/airflow/sensors/external_task_sensor.py @@ -19,7 +19,6 @@ import datetime import os from typing import FrozenSet, Optional, Union -from urllib.parse import quote from sqlalchemy import func @@ -28,6 +27,7 @@ from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInsta from airflow.operators.dummy_operator import DummyOperator from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults +from airflow.utils.helpers import build_airflow_url_with_query from airflow.utils.session import provide_session from airflow.utils.state import State @@ -41,7 +41,8 @@ class ExternalTaskSensorLink(BaseOperatorLink): name = 'External DAG' def get_link(self, operator, dttm): - return f"/graph?dag_id={operator.external_dag_id}&root=&execution_date={quote(dttm.isoformat())}" + query = {"dag_id": operator.external_dag_id, "execution_date": dttm.isoformat()} + return build_airflow_url_with_query(query) class ExternalTaskSensor(BaseSensorOperator): diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py index 5ccb618..69ac5a0 100644 --- a/airflow/utils/helpers.py +++ b/airflow/utils/helpers.py @@ -22,9 +22,11 @@ from datetime import datetime from functools import reduce from itertools import filterfalse, tee from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, TypeVar +from urllib import parse from jinja2 import Template +from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.utils.module_loading import import_string @@ -202,3 +204,13 @@ def cross_downstream(*args, **kwargs): stacklevel=2, ) return import_string('airflow.models.baseoperator.cross_downstream')(*args, **kwargs) + + +def build_airflow_url_with_query(query: Dict[str, Any]) -> str: + """ + Build airflow url using base_url and default_view and provided query + For example: + 'http://0.0.0.0:8000/base/graph?dag_id=my-task&root=&execution_date=2020-10-27T10%3A59%3A25.615587 + """ + view = conf.get('webserver', 'dag_default_view').lower() + return f"/{view}?{parse.urlencode(query)}" diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py index 85c53c5..c53aa4f 100644 --- a/tests/utils/test_helpers.py +++ b/tests/utils/test_helpers.py @@ -23,7 +23,8 @@ from airflow.models import TaskInstance from airflow.models.dag import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.utils import helpers -from airflow.utils.helpers import merge_dicts +from airflow.utils.helpers import build_airflow_url_with_query, merge_dicts +from tests.test_utils.config import conf_vars class TestHelpers(unittest.TestCase): @@ -136,3 +137,13 @@ class TestHelpers(unittest.TestCase): dict2 = {'a': 1, 'r': {'c': 3, 'b': 0}} merged = merge_dicts(dict1, dict2) self.assertDictEqual(merged, {'a': 1, 'r': {'b': 0, 'c': 3}}) + + @conf_vars( + { + ("webserver", "dag_default_view"): "custom", + } + ) + def test_build_airflow_url_with_query(self): + query = {"dag_id": "test_dag", "param": "key/to.encode"} + url = build_airflow_url_with_query(query) + assert url == "/custom?dag_id=test_dag¶m=key%2Fto.encode"