mvfc commented on code in PR #50001:
URL: https://github.com/apache/airflow/pull/50001#discussion_r2071252846


##########
providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py:
##########
@@ -167,3 +171,182 @@ def execute_complete(self, context: Context, event: 
dict[str, str]) -> Any:
             )
             if event["status"] == "error":
                 raise AirflowException(event["message"])
+
+
+class PowerBIWorkspaceListOperator(BaseOperator):
+    """
+    Gets a list of workspaces where the service principal from the connection 
is assigned as admin.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:PowerBIWorkspaceListOperator`
+
+    :param conn_id: Airflow Connection ID that contains the connection 
information for the Power BI account used for authentication.
+    """
+
+    def __init__(
+        self,
+        *,
+        conn_id: str = PowerBIHook.default_conn_name,
+        timeout: float = 60 * 60 * 24 * 7,
+        proxies: dict | None = None,
+        api_version: APIVersion | str | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies, 
api_version=api_version, timeout=timeout)
+        self.conn_id = conn_id
+        self.timeout = timeout
+
+    @property
+    def proxies(self) -> dict | None:
+        return self.hook.proxies
+
+    @property
+    def api_version(self) -> str | None:
+        return self.hook.api_version
+
+    def execute(self, context: Context):
+        """List visible PowerBI Workspaces."""
+        self.defer(
+            trigger=PowerBIWorkspaceListTrigger(
+                conn_id=self.conn_id,
+                timeout=self.timeout,
+                proxies=self.proxies,
+                api_version=self.api_version,
+            ),
+            method_name=self.execute_complete.__name__,
+        )
+
+    def get_workspace_list(
+        self, context: Context, event: dict[str, str] | dict[str, list[str]] | 
None = None
+    ):
+        """Push the workspace ID list to XCom."""
+        if event:
+            if event["status"] == "error":
+                raise AirflowException(event["message"])
+
+            workspace_ids = event["workspace_ids"]
+
+        if workspace_ids:
+            self.xcom_push(
+                context=context,
+                key=f"{self.task_id}.powerbi_workspace_ids",
+                value=workspace_ids,
+            )
+            self.defer(
+                trigger=PowerBIWorkspaceListTrigger(
+                    conn_id=self.conn_id,
+                    timeout=self.timeout,
+                    proxies=self.proxies,
+                    workspace_ids=workspace_ids,
+                    api_version=self.api_version,
+                ),
+                method_name=self.execute_complete.__name__,
+            )
+
+    def execute_complete(self, context: Context, event: dict[str, str]) -> Any:
+        """
+        Return immediately - callback for when the trigger fires.
+
+        Relies on trigger to throw an exception, otherwise it assumes 
execution was successful.
+        """
+        if event:
+            self.xcom_push(
+                context=context,
+                key=f"{self.task_id}.powerbi_workspace_ids",
+                value=event["workspace_ids"],
+            )
+            if event["status"] == "error":
+                raise AirflowException(event["message"])
+
+
+class PowerBIDatasetListOperator(BaseOperator):
+    """
+    Gets a list of datasets where the service principal from the connection is 
assigned as admin.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:PowerBIDatasetListOperator`
+
+    :param conn_id: Airflow Connection ID that contains the connection 
information for the Power BI account used for authentication.
+    :param group_id: The group Id to list discoverable datasets.
+    """
+
+    def __init__(
+        self,
+        *,
+        group_id: str,
+        conn_id: str = PowerBIHook.default_conn_name,
+        timeout: float = 60 * 60 * 24 * 7,
+        proxies: dict | None = None,
+        api_version: APIVersion | str | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies, 
api_version=api_version, timeout=timeout)
+        self.conn_id = conn_id
+        self.group_id = group_id
+        self.timeout = timeout
+
+    @property
+    def proxies(self) -> dict | None:
+        return self.hook.proxies
+
+    @property
+    def api_version(self) -> str | None:
+        return self.hook.api_version
+
+    def execute(self, context: Context):
+        """List visible PowerBI datasets within group (Workspace)."""
+        self.defer(
+            trigger=PowerBIDatasetListTrigger(
+                conn_id=self.conn_id,
+                timeout=self.timeout,
+                proxies=self.proxies,
+                api_version=self.api_version,
+                group_id=self.group_id,
+            ),
+            method_name=self.execute_complete.__name__,
+        )
+
+    def get_dataset_list(self, context: Context, event: dict[str, str] | 
dict[str, list[str]] | None = None):

Review Comment:
   removed, was supposed to be only for testing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to