This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d5150eef00 Add incremental refresh support to `TableauOperator` 
(#67340)
1d5150eef00 is described below

commit 1d5150eef007c0ab8779ceab68e70dd361426b94
Author: Subham <[email protected]>
AuthorDate: Tue May 26 16:58:54 2026 +0530

    Add incremental refresh support to `TableauOperator` (#67340)
    
    * Add incremental refresh support to TableauOperator
    
    * Add warning when incremental_refresh used with non-refresh method
    
    * Fix incremental refresh version compatibility for tableauserverclient
    
    * Raise AirflowOptionalProviderFeatureException on unsupported TS client 
version
    
    * Address review: inspect TypeError message and chain exception
    
    * Fix trailing whitespace in test_tableau.py
---
 .../airflow/providers/tableau/operators/tableau.py |  31 ++-
 .../tests/unit/tableau/operators/test_tableau.py   | 212 ++++++++++++++++++++-
 2 files changed, 241 insertions(+), 2 deletions(-)

diff --git 
a/providers/tableau/src/airflow/providers/tableau/operators/tableau.py 
b/providers/tableau/src/airflow/providers/tableau/operators/tableau.py
index d176e3ea2ff..9cb58657645 100644
--- a/providers/tableau/src/airflow/providers/tableau/operators/tableau.py
+++ b/providers/tableau/src/airflow/providers/tableau/operators/tableau.py
@@ -21,7 +21,11 @@ from typing import TYPE_CHECKING
 
 from tableauserverclient import JobItem
 
-from airflow.providers.common.compat.sdk import AirflowException, BaseOperator
+from airflow.providers.common.compat.sdk import (
+    AirflowException,
+    AirflowOptionalProviderFeatureException,
+    BaseOperator,
+)
 from airflow.providers.tableau.hooks.tableau import (
     TableauHook,
     TableauJobFailedException,
@@ -62,6 +66,8 @@ class TableauOperator(BaseOperator):
     :param blocking_refresh: By default will be blocking means it will wait 
until it has finished.
     :param check_interval: time in seconds that the job should wait in
         between each instance state checks until operation is completed
+    :param incremental_refresh: Whether to perform an incremental refresh 
instead of a full refresh.
+        Only applies to datasource and workbook refresh operations. Defaults 
to False (full refresh).
     :param tableau_conn_id: The :ref:`Tableau Connection id 
<howto/connection:tableau>`
         containing the credentials to authenticate to the Tableau Server.
     """
@@ -81,6 +87,7 @@ class TableauOperator(BaseOperator):
         site_id: str | None = None,
         blocking_refresh: bool = True,
         check_interval: float = 20,
+        incremental_refresh: bool = False,
         tableau_conn_id: str = "tableau_default",
         **kwargs,
     ) -> None:
@@ -92,6 +99,7 @@ class TableauOperator(BaseOperator):
         self.check_interval = check_interval
         self.site_id = site_id
         self.blocking_refresh = blocking_refresh
+        self.incremental_refresh = incremental_refresh
         self.tableau_conn_id = tableau_conn_id
 
     def execute(self, context: Context) -> str:
@@ -111,6 +119,13 @@ class TableauOperator(BaseOperator):
             error_message = f"Method not found! Available methods for 
{self.resource}: {available_methods}"
             raise AirflowException(error_message)
 
+        if self.incremental_refresh and self.method != "refresh":
+            self.log.warning(
+                "incremental_refresh parameter is set to True but method is 
'%s'. "
+                "This parameter only applies to 'refresh' operations and will 
be ignored.",
+                self.method,
+            )
+
         with TableauHook(self.site_id, self.tableau_conn_id) as tableau_hook:
             resource = getattr(tableau_hook.server, self.resource)
             method = getattr(resource, self.method)
@@ -124,6 +139,20 @@ class TableauOperator(BaseOperator):
                 if not job_items:
                     raise ValueError("Tableau tasks.run returned no JobItem in 
response")
                 job_id = job_items[0].id
+            elif self.method == "refresh":
+                if self.incremental_refresh:
+                    try:
+                        response = method(resource_id, incremental=True)
+                    except TypeError as e:
+                        if "incremental" in str(e):
+                            raise AirflowOptionalProviderFeatureException(
+                                "Incremental refresh requires 
tableauserverclient>=0.35. "
+                                "Please upgrade: pip install 
'tableauserverclient>=0.35'"
+                            ) from e
+                        raise
+                else:
+                    response = method(resource_id)
+                job_id = response.id
             else:
                 response = method(resource_id)
                 job_id = response.id
diff --git a/providers/tableau/tests/unit/tableau/operators/test_tableau.py 
b/providers/tableau/tests/unit/tableau/operators/test_tableau.py
index ff22d128c43..c2e9e5157fc 100644
--- a/providers/tableau/tests/unit/tableau/operators/test_tableau.py
+++ b/providers/tableau/tests/unit/tableau/operators/test_tableau.py
@@ -20,7 +20,7 @@ from unittest.mock import Mock, patch
 
 import pytest
 
-from airflow.providers.common.compat.sdk import AirflowException
+from airflow.providers.common.compat.sdk import AirflowException, 
AirflowOptionalProviderFeatureException
 from airflow.providers.tableau.hooks.tableau import TableauJobFinishCode
 from airflow.providers.tableau.operators.tableau import TableauOperator
 
@@ -277,3 +277,213 @@ class TestTableauOperator:
         resource_id = "res_id"
         operator = TableauOperator(resource="tasks", find=resource_id, 
method="run", task_id="t", dag=None)
         assert operator._get_resource_id(Mock()) == resource_id
+
+    @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+    def test_execute_datasources_incremental_refresh(self, mock_tableau_hook):
+        """
+        Test execute datasources with incremental refresh
+        """
+        mock_tableau_hook.get_all = Mock(return_value=self.mock_datasources)
+        mock_tableau_hook.return_value.__enter__ = 
Mock(return_value=mock_tableau_hook)
+        operator = TableauOperator(
+            blocking_refresh=False,
+            find="ds_2",
+            resource="datasources",
+            incremental_refresh=True,
+            **self.kwargs,
+        )
+
+        job_id = operator.execute(context={})
+
+        
mock_tableau_hook.server.datasources.refresh.assert_called_once_with(2, 
incremental=True)
+        assert mock_tableau_hook.server.datasources.refresh.return_value.id == 
job_id
+
+    @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+    def test_execute_datasources_full_refresh(self, mock_tableau_hook):
+        """
+        Test execute datasources with full refresh (default behavior)
+        """
+        mock_tableau_hook.get_all = Mock(return_value=self.mock_datasources)
+        mock_tableau_hook.return_value.__enter__ = 
Mock(return_value=mock_tableau_hook)
+        operator = TableauOperator(
+            blocking_refresh=False,
+            find="ds_2",
+            resource="datasources",
+            incremental_refresh=False,
+            **self.kwargs,
+        )
+
+        job_id = operator.execute(context={})
+
+        mock_tableau_hook.server.datasources.refresh.assert_called_once_with(2)
+        assert mock_tableau_hook.server.datasources.refresh.return_value.id == 
job_id
+
+    @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+    def test_execute_workbooks_incremental_refresh(self, mock_tableau_hook):
+        """
+        Test execute workbooks with incremental refresh
+        """
+        mock_tableau_hook.get_all = Mock(return_value=self.mocked_workbooks)
+        mock_tableau_hook.return_value.__enter__ = 
Mock(return_value=mock_tableau_hook)
+        operator = TableauOperator(
+            blocking_refresh=False,
+            find="wb_2",
+            resource="workbooks",
+            incremental_refresh=True,
+            **self.kwargs,
+        )
+
+        job_id = operator.execute(context={})
+
+        mock_tableau_hook.server.workbooks.refresh.assert_called_once_with(2, 
incremental=True)
+        assert mock_tableau_hook.server.workbooks.refresh.return_value.id == 
job_id
+
+    @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+    def test_execute_workbooks_full_refresh(self, mock_tableau_hook):
+        """
+        Test execute workbooks with full refresh (default behavior)
+        """
+        mock_tableau_hook.get_all = Mock(return_value=self.mocked_workbooks)
+        mock_tableau_hook.return_value.__enter__ = 
Mock(return_value=mock_tableau_hook)
+        operator = TableauOperator(
+            blocking_refresh=False,
+            find="wb_2",
+            resource="workbooks",
+            incremental_refresh=False,
+            **self.kwargs,
+        )
+
+        job_id = operator.execute(context={})
+
+        mock_tableau_hook.server.workbooks.refresh.assert_called_once_with(2)
+        assert mock_tableau_hook.server.workbooks.refresh.return_value.id == 
job_id
+
+    @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+    def test_execute_datasources_incremental_refresh_blocking(self, 
mock_tableau_hook):
+        """
+        Test execute datasources with incremental refresh blocking
+        """
+        mock_signed_in = [False]
+
+        def mock_hook_enter():
+            mock_signed_in[0] = True
+            return mock_tableau_hook
+
+        def mock_hook_exit(exc_type, exc_val, exc_tb):
+            mock_signed_in[0] = False
+
+        def mock_wait_for_state(job_id, target_state, check_interval):
+            if not mock_signed_in[0]:
+                raise Exception("Not signed in")
+            return True
+
+        mock_tableau_hook.return_value.__enter__ = 
Mock(side_effect=mock_hook_enter)
+        mock_tableau_hook.return_value.__exit__ = 
Mock(side_effect=mock_hook_exit)
+        mock_tableau_hook.wait_for_state = 
Mock(side_effect=mock_wait_for_state)
+        mock_tableau_hook.get_all = Mock(return_value=self.mock_datasources)
+
+        operator = TableauOperator(
+            find="ds_2",
+            resource="datasources",
+            incremental_refresh=True,
+            **self.kwargs,
+        )
+
+        job_id = operator.execute(context={})
+
+        
mock_tableau_hook.server.datasources.refresh.assert_called_once_with(2, 
incremental=True)
+        assert mock_tableau_hook.server.datasources.refresh.return_value.id == 
job_id
+        mock_tableau_hook.wait_for_state.assert_called_once_with(
+            job_id=job_id, check_interval=20, 
target_state=TableauJobFinishCode.SUCCESS
+        )
+
+    @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+    def test_incremental_refresh_warning_on_non_refresh_method(self, 
mock_tableau_hook, caplog):
+        """
+        Test that a warning is logged when incremental_refresh is set but 
method is not 'refresh'
+        """
+        mock_tableau_hook.return_value.__enter__ = 
Mock(return_value=mock_tableau_hook)
+        mock_tableau_hook.get_all = Mock(return_value=self.mock_datasources)
+
+        operator = TableauOperator(
+            find="ds_2",
+            resource="datasources",
+            method="delete",
+            incremental_refresh=True,
+            dag=None,
+            task_id="test",
+        )
+
+        operator.execute(context={})
+
+        assert "incremental_refresh parameter is set to True but method is 
'delete'" in caplog.text
+        assert "This parameter only applies to 'refresh' operations" in 
caplog.text
+
+    @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+    def test_incremental_refresh_unsupported_version_raises(self, 
mock_tableau_hook):
+        """
+        Test that AirflowOptionalProviderFeatureException is raised when 
incremental_refresh=True
+        but the installed tableauserverclient does not support the incremental 
parameter.
+        """
+        mock_tableau_hook.get_all = Mock(return_value=self.mock_datasources)
+        mock_tableau_hook.return_value.__enter__ = 
Mock(return_value=mock_tableau_hook)
+        # Simulate older tableauserverclient that doesn't accept incremental 
kwarg
+        mock_tableau_hook.server.datasources.refresh.side_effect = TypeError(
+            "refresh() got an unexpected keyword argument 'incremental'"
+        )
+
+        operator = TableauOperator(
+            blocking_refresh=False,
+            find="ds_2",
+            resource="datasources",
+            incremental_refresh=True,
+            **self.kwargs,
+        )
+
+        with pytest.raises(AirflowOptionalProviderFeatureException, 
match="tableauserverclient>=0.35"):
+            operator.execute(context={})
+
+    @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+    def test_incremental_refresh_unrelated_type_error_is_raised(self, 
mock_tableau_hook):
+        """
+        Test that an unrelated TypeError during refresh is re-raised.
+        """
+        mock_tableau_hook.get_all = Mock(return_value=self.mock_datasources)
+        mock_tableau_hook.return_value.__enter__ = 
Mock(return_value=mock_tableau_hook)
+
+        # Simulate a different TypeError that does NOT contain the string 
"incremental"
+        mock_tableau_hook.server.datasources.refresh.side_effect = 
TypeError("Some other type error")
+
+        operator = TableauOperator(
+            blocking_refresh=False,
+            find="ds_2",
+            resource="datasources",
+            incremental_refresh=True,
+            **self.kwargs,
+        )
+
+        with pytest.raises(TypeError, match="Some other type error"):
+            operator.execute(context={})
+
+    @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+    def test_full_refresh_works_on_older_versions(self, mock_tableau_hook):
+        """
+        Test that full refresh (incremental_refresh=False) works fine on older
+        tableauserverclient versions since the incremental kwarg is not passed.
+        """
+        mock_tableau_hook.get_all = Mock(return_value=self.mock_datasources)
+        mock_tableau_hook.return_value.__enter__ = 
Mock(return_value=mock_tableau_hook)
+
+        operator = TableauOperator(
+            blocking_refresh=False,
+            find="ds_2",
+            resource="datasources",
+            incremental_refresh=False,
+            **self.kwargs,
+        )
+
+        job_id = operator.execute(context={})
+
+        # Verify that refresh was called WITHOUT the incremental parameter
+        mock_tableau_hook.server.datasources.refresh.assert_called_once_with(2)
+        assert job_id == 
mock_tableau_hook.server.datasources.refresh.return_value.id

Reply via email to