Ohashiro commented on code in PR #45513:
URL: https://github.com/apache/airflow/pull/45513#discussion_r1910548907


##########
providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py:
##########
@@ -98,19 +108,53 @@ def api_version(self) -> APIVersion | str:
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
         """Make async connection to the PowerBI and polls for the dataset 
refresh status."""
-        self.dataset_refresh_id = await self.hook.trigger_dataset_refresh(
-            dataset_id=self.dataset_id,
-            group_id=self.group_id,
-        )
-
-        async def fetch_refresh_status_and_error() -> tuple[str, str]:
-            """Fetch the current status and error of the dataset refresh."""
-            refresh_details = await 
self.hook.get_refresh_details_by_refresh_id(
+        if not self.dataset_refresh_id:
+            # Trigger the dataset refresh
+            dataset_refresh_id = await self.hook.trigger_dataset_refresh(
                 dataset_id=self.dataset_id,
                 group_id=self.group_id,
-                refresh_id=self.dataset_refresh_id,
             )
-            return refresh_details["status"], refresh_details["error"]
+
+            if dataset_refresh_id:
+                self.log.info("Triggered dataset refresh %s", 
dataset_refresh_id)
+                yield TriggerEvent(
+                    {
+                        "status": "success",
+                        "dataset_refresh_status": None,
+                        "message": f"The dataset refresh {dataset_refresh_id} 
has been triggered.",
+                        "dataset_refresh_id": dataset_refresh_id,
+                    }
+                )
+                return
+            else:
+                yield TriggerEvent(
+                    {
+                        "status": "error",
+                        "dataset_refresh_status": None,
+                        "message": "Failed to trigger the dataset refresh.",
+                        "dataset_refresh_id": None,
+                    }
+                )
+                return
+
+        # The dataset refresh is already triggered. Poll for the dataset 
refresh status.
+        @tenacity.retry(
+            stop=tenacity.stop_after_attempt(3),
+            wait=tenacity.wait_exponential(min=5, multiplier=2),
+            reraise=True,
+            
retry=tenacity.retry_if_exception_type(PowerBIDatasetRefreshException),
+        )
+        async def fetch_refresh_status_and_error() -> tuple[str, str]:
+            """Fetch the current status and error of the dataset refresh."""
+            if self.dataset_refresh_id:
+                refresh_details = await 
self.hook.get_refresh_details_by_refresh_id(
+                    dataset_id=self.dataset_id,
+                    group_id=self.group_id,
+                    refresh_id=self.dataset_refresh_id,
+                )
+                return refresh_details["status"], refresh_details["error"]
+            else:

Review Comment:
   Sure! I made the change!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to