This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2748374852d Fix PowerBIDatasetRefreshOperator to properly respect
wait_for_completion flag (#62842)
2748374852d is described below
commit 2748374852dc4ed37e815d8d8944933c1c9e7fda
Author: Henry Chen <[email protected]>
AuthorDate: Mon Mar 9 22:48:30 2026 +0800
Fix PowerBIDatasetRefreshOperator to properly respect wait_for_completion
flag (#62842)
---
.../providers/microsoft/azure/triggers/powerbi.py | 29 +++++++-----
.../unit/microsoft/azure/triggers/test_powerbi.py | 55 ++++++++++++++++++++++
2 files changed, 72 insertions(+), 12 deletions(-)
diff --git
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py
index c9198474cbb..354e84b8553 100644
---
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py
+++
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py
@@ -152,8 +152,23 @@ class PowerBITrigger(BasePowerBITrigger):
request_body=self.request_body,
)
- if dataset_refresh_id:
- self.log.info("Triggered dataset refresh %s",
dataset_refresh_id)
+ if not dataset_refresh_id:
+ yield TriggerEvent(
+ {
+ "status": "error",
+ "dataset_refresh_status": None,
+ "message": "Failed to trigger the dataset refresh.",
+ "dataset_refresh_id": None,
+ }
+ )
+ return
+
+ self.log.info("Triggered dataset refresh %s", dataset_refresh_id)
+ # Set the dataset_refresh_id for polling
+ self.dataset_refresh_id = dataset_refresh_id
+
+ # If wait_for_termination is False, return immediately after
triggering
+ if not self.wait_for_termination:
yield TriggerEvent(
{
"status": "success",
@@ -164,16 +179,6 @@ class PowerBITrigger(BasePowerBITrigger):
)
return
- yield TriggerEvent(
- {
- "status": "error",
- "dataset_refresh_status": None,
- "message": "Failed to trigger the dataset refresh.",
- "dataset_refresh_id": None,
- }
- )
- return
-
# The dataset refresh is already triggered. Poll for the dataset
refresh status.
@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
diff --git
a/providers/microsoft/azure/tests/unit/microsoft/azure/triggers/test_powerbi.py
b/providers/microsoft/azure/tests/unit/microsoft/azure/triggers/test_powerbi.py
index 9648fa47387..4b6b4f9c988 100644
---
a/providers/microsoft/azure/tests/unit/microsoft/azure/triggers/test_powerbi.py
+++
b/providers/microsoft/azure/tests/unit/microsoft/azure/triggers/test_powerbi.py
@@ -178,6 +178,7 @@ class TestPowerBITrigger:
async def test_powerbi_trigger_run_trigger_refresh(self,
mock_trigger_dataset_refresh, powerbi_trigger):
"""Assert event is triggered upon successful new refresh trigger."""
powerbi_trigger.dataset_refresh_id = None
+ powerbi_trigger.wait_for_termination = False
mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
task = [i async for i in powerbi_trigger.run()]
@@ -355,3 +356,57 @@ class TestPowerBITrigger:
)
assert expected == actual
+
+ @pytest.mark.asyncio
+ @mock.patch.object(PowerBIHook, "get_refresh_details_by_refresh_id")
+ @mock.patch.object(PowerBIHook, "trigger_dataset_refresh")
+ async def
test_powerbi_trigger_waits_for_completion_when_wait_for_termination_true(
+ self, mock_trigger_dataset_refresh,
mock_get_refresh_details_by_refresh_id
+ ):
+ # Create trigger without dataset_refresh_id (simulating first-time
trigger)
+ trigger = PowerBITrigger(
+ conn_id=POWERBI_CONN_ID,
+ proxies=None,
+ api_version=API_VERSION,
+ dataset_id=DATASET_ID,
+ dataset_refresh_id=None, # No refresh ID - will trigger new
refresh
+ group_id=GROUP_ID,
+ check_interval=CHECK_INTERVAL,
+ wait_for_termination=True,
+ timeout=TIMEOUT,
+ request_body=REQUEST_BODY,
+ )
+
+ # Mock trigger_dataset_refresh to return a new refresh ID
+ mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
+
+ # Mock get_refresh_details to simulate the refresh completing
+ mock_get_refresh_details_by_refresh_id.return_value = {
+ "status": PowerBIDatasetRefreshStatus.COMPLETED,
+ "error": None,
+ }
+
+ # Run the trigger
+ generator = trigger.run()
+ actual = await generator.asend(None)
+
+ # Verify trigger was called
+ mock_trigger_dataset_refresh.assert_called_once_with(
+ dataset_id=DATASET_ID,
+ group_id=GROUP_ID,
+ request_body=REQUEST_BODY,
+ )
+
+ # Verify get_refresh_details was called (proving it's polling, not
returning immediately)
+ assert mock_get_refresh_details_by_refresh_id.call_count >= 1
+
+ # Verify the final event shows COMPLETED status (not None)
+ expected = TriggerEvent(
+ {
+ "status": "success",
+ "dataset_refresh_status":
PowerBIDatasetRefreshStatus.COMPLETED,
+ "message": f"The dataset refresh {DATASET_REFRESH_ID} has
{PowerBIDatasetRefreshStatus.COMPLETED}.",
+ "dataset_refresh_id": DATASET_REFRESH_ID,
+ }
+ )
+ assert expected == actual