This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 95169d1 Add a link to Databricks Job Run (#22541) 95169d1 is described below commit 95169d1d07e66a8c7647e5b0f6a14cea57d515fc Author: Alex Ott <alex...@gmail.com> AuthorDate: Sun Mar 27 22:29:04 2022 +0200 Add a link to Databricks Job Run (#22541) It will be easier for users/admins to go to the specific run of Databricks Job --- .../providers/databricks/operators/databricks.py | 24 +++++++++++++++++----- airflow/providers/databricks/provider.yaml | 3 +++ .../providers/databricks/hooks/test_databricks.py | 2 +- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/airflow/providers/databricks/operators/databricks.py b/airflow/providers/databricks/operators/databricks.py index ec0a9a0..7636842 100644 --- a/airflow/providers/databricks/operators/databricks.py +++ b/airflow/providers/databricks/operators/databricks.py @@ -22,7 +22,7 @@ import time from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union from airflow.exceptions import AirflowException -from airflow.models import BaseOperator +from airflow.models import BaseOperator, BaseOperatorLink, TaskInstance from airflow.providers.databricks.hooks.databricks import DatabricksHook if TYPE_CHECKING: @@ -70,11 +70,11 @@ def _handle_databricks_operator_execution(operator, hook, log, context) -> None: :param operator: Databricks operator being handled :param context: Airflow context """ - if operator.do_xcom_push: + if operator.do_xcom_push and context is not None: context['ti'].xcom_push(key=XCOM_RUN_ID_KEY, value=operator.run_id) log.info('Run submitted with run_id: %s', operator.run_id) run_page_url = hook.get_run_page_url(operator.run_id) - if operator.do_xcom_push: + if operator.do_xcom_push and context is not None: context['ti'].xcom_push(key=XCOM_RUN_PAGE_URL_KEY, value=run_page_url) if operator.wait_for_termination: @@ -102,6 +102,18 @@ def _handle_databricks_operator_execution(operator, hook, log, context) -> None: log.info('View run status, Spark UI, and logs at %s', run_page_url) +class DatabricksJobRunLink(BaseOperatorLink): + """Constructs a link to monitor a Databricks Job Run.""" + + name = "See Databricks Job Run" + + def get_link(self, operator, dttm): + ti = TaskInstance(task=operator, execution_date=dttm) + run_page_url = ti.xcom_pull(task_ids=operator.task_id, key=XCOM_RUN_PAGE_URL_KEY) + + return run_page_url + + class DatabricksSubmitRunOperator(BaseOperator): """ Submits a Spark job run to Databricks using the @@ -255,6 +267,7 @@ class DatabricksSubmitRunOperator(BaseOperator): # Databricks brand color (blue) under white text ui_color = '#1CB1C2' ui_fgcolor = '#fff' + operator_extra_links = (DatabricksJobRunLink(),) def __init__( self, @@ -276,7 +289,7 @@ class DatabricksSubmitRunOperator(BaseOperator): databricks_retry_limit: int = 3, databricks_retry_delay: int = 1, databricks_retry_args: Optional[Dict[Any, Any]] = None, - do_xcom_push: bool = False, + do_xcom_push: bool = True, idempotency_token: Optional[str] = None, access_control_list: Optional[List[Dict[str, str]]] = None, wait_for_termination: bool = True, @@ -498,6 +511,7 @@ class DatabricksRunNowOperator(BaseOperator): # Databricks brand color (blue) under white text ui_color = '#1CB1C2' ui_fgcolor = '#fff' + operator_extra_links = (DatabricksJobRunLink(),) def __init__( self, @@ -514,7 +528,7 @@ class DatabricksRunNowOperator(BaseOperator): databricks_retry_limit: int = 3, databricks_retry_delay: int = 1, databricks_retry_args: Optional[Dict[Any, Any]] = None, - do_xcom_push: bool = False, + do_xcom_push: bool = True, wait_for_termination: bool = True, **kwargs, ) -> None: diff --git a/airflow/providers/databricks/provider.yaml b/airflow/providers/databricks/provider.yaml index ba9b3f0..77ced24 100644 --- a/airflow/providers/databricks/provider.yaml +++ b/airflow/providers/databricks/provider.yaml @@ -91,3 +91,6 @@ connection-types: connection-type: databricks - hook-class-name: airflow.providers.databricks.hooks.databricks_sql.DatabricksSqlHook connection-type: databricks + +extra-links: + - airflow.providers.databricks.operators.databricks.DatabricksJobRunLink diff --git a/tests/providers/databricks/hooks/test_databricks.py b/tests/providers/databricks/hooks/test_databricks.py index d1adba0..5a93ed7 100644 --- a/tests/providers/databricks/hooks/test_databricks.py +++ b/tests/providers/databricks/hooks/test_databricks.py @@ -779,7 +779,7 @@ class TestRunState(unittest.TestCase): def test_is_terminal_with_nonexistent_life_cycle_state(self): run_state = RunState('blah', '', '') with pytest.raises(AirflowException): - run_state.is_terminal + assert run_state.is_terminal def test_is_successful(self): run_state = RunState('TERMINATED', 'SUCCESS', '')