This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 95169d1  Add a link to Databricks Job Run (#22541)
95169d1 is described below

commit 95169d1d07e66a8c7647e5b0f6a14cea57d515fc
Author: Alex Ott <alex...@gmail.com>
AuthorDate: Sun Mar 27 22:29:04 2022 +0200

    Add a link to Databricks Job Run (#22541)
    
    It will be easier for users/admins to go to the specific run of
    Databricks Job
---
 .../providers/databricks/operators/databricks.py   | 24 +++++++++++++++++-----
 airflow/providers/databricks/provider.yaml         |  3 +++
 .../providers/databricks/hooks/test_databricks.py  |  2 +-
 3 files changed, 23 insertions(+), 6 deletions(-)

diff --git a/airflow/providers/databricks/operators/databricks.py 
b/airflow/providers/databricks/operators/databricks.py
index ec0a9a0..7636842 100644
--- a/airflow/providers/databricks/operators/databricks.py
+++ b/airflow/providers/databricks/operators/databricks.py
@@ -22,7 +22,7 @@ import time
 from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union
 
 from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
+from airflow.models import BaseOperator, BaseOperatorLink, TaskInstance
 from airflow.providers.databricks.hooks.databricks import DatabricksHook
 
 if TYPE_CHECKING:
@@ -70,11 +70,11 @@ def _handle_databricks_operator_execution(operator, hook, 
log, context) -> None:
     :param operator: Databricks operator being handled
     :param context: Airflow context
     """
-    if operator.do_xcom_push:
+    if operator.do_xcom_push and context is not None:
         context['ti'].xcom_push(key=XCOM_RUN_ID_KEY, value=operator.run_id)
     log.info('Run submitted with run_id: %s', operator.run_id)
     run_page_url = hook.get_run_page_url(operator.run_id)
-    if operator.do_xcom_push:
+    if operator.do_xcom_push and context is not None:
         context['ti'].xcom_push(key=XCOM_RUN_PAGE_URL_KEY, value=run_page_url)
 
     if operator.wait_for_termination:
@@ -102,6 +102,18 @@ def _handle_databricks_operator_execution(operator, hook, 
log, context) -> None:
         log.info('View run status, Spark UI, and logs at %s', run_page_url)
 
 
+class DatabricksJobRunLink(BaseOperatorLink):
+    """Constructs a link to monitor a Databricks Job Run."""
+
+    name = "See Databricks Job Run"
+
+    def get_link(self, operator, dttm):
+        ti = TaskInstance(task=operator, execution_date=dttm)
+        run_page_url = ti.xcom_pull(task_ids=operator.task_id, 
key=XCOM_RUN_PAGE_URL_KEY)
+
+        return run_page_url
+
+
 class DatabricksSubmitRunOperator(BaseOperator):
     """
     Submits a Spark job run to Databricks using the
@@ -255,6 +267,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
     # Databricks brand color (blue) under white text
     ui_color = '#1CB1C2'
     ui_fgcolor = '#fff'
+    operator_extra_links = (DatabricksJobRunLink(),)
 
     def __init__(
         self,
@@ -276,7 +289,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
         databricks_retry_limit: int = 3,
         databricks_retry_delay: int = 1,
         databricks_retry_args: Optional[Dict[Any, Any]] = None,
-        do_xcom_push: bool = False,
+        do_xcom_push: bool = True,
         idempotency_token: Optional[str] = None,
         access_control_list: Optional[List[Dict[str, str]]] = None,
         wait_for_termination: bool = True,
@@ -498,6 +511,7 @@ class DatabricksRunNowOperator(BaseOperator):
     # Databricks brand color (blue) under white text
     ui_color = '#1CB1C2'
     ui_fgcolor = '#fff'
+    operator_extra_links = (DatabricksJobRunLink(),)
 
     def __init__(
         self,
@@ -514,7 +528,7 @@ class DatabricksRunNowOperator(BaseOperator):
         databricks_retry_limit: int = 3,
         databricks_retry_delay: int = 1,
         databricks_retry_args: Optional[Dict[Any, Any]] = None,
-        do_xcom_push: bool = False,
+        do_xcom_push: bool = True,
         wait_for_termination: bool = True,
         **kwargs,
     ) -> None:
diff --git a/airflow/providers/databricks/provider.yaml 
b/airflow/providers/databricks/provider.yaml
index ba9b3f0..77ced24 100644
--- a/airflow/providers/databricks/provider.yaml
+++ b/airflow/providers/databricks/provider.yaml
@@ -91,3 +91,6 @@ connection-types:
     connection-type: databricks
   - hook-class-name: 
airflow.providers.databricks.hooks.databricks_sql.DatabricksSqlHook
     connection-type: databricks
+
+extra-links:
+  - airflow.providers.databricks.operators.databricks.DatabricksJobRunLink
diff --git a/tests/providers/databricks/hooks/test_databricks.py 
b/tests/providers/databricks/hooks/test_databricks.py
index d1adba0..5a93ed7 100644
--- a/tests/providers/databricks/hooks/test_databricks.py
+++ b/tests/providers/databricks/hooks/test_databricks.py
@@ -779,7 +779,7 @@ class TestRunState(unittest.TestCase):
     def test_is_terminal_with_nonexistent_life_cycle_state(self):
         run_state = RunState('blah', '', '')
         with pytest.raises(AirflowException):
-            run_state.is_terminal
+            assert run_state.is_terminal
 
     def test_is_successful(self):
         run_state = RunState('TERMINATED', 'SUCCESS', '')

Reply via email to