mvfc commented on code in PR #50001: URL: https://github.com/apache/airflow/pull/50001#discussion_r2071252846
########## providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py: ########## @@ -167,3 +171,182 @@ def execute_complete(self, context: Context, event: dict[str, str]) -> Any: ) if event["status"] == "error": raise AirflowException(event["message"]) + + +class PowerBIWorkspaceListOperator(BaseOperator): + """ + Gets a list of workspaces where the service principal from the connection is assigned as admin. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:PowerBIWorkspaceListOperator` + + :param conn_id: Airflow Connection ID that contains the connection information for the Power BI account used for authentication. + """ + + def __init__( + self, + *, + conn_id: str = PowerBIHook.default_conn_name, + timeout: float = 60 * 60 * 24 * 7, + proxies: dict | None = None, + api_version: APIVersion | str | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies, api_version=api_version, timeout=timeout) + self.conn_id = conn_id + self.timeout = timeout + + @property + def proxies(self) -> dict | None: + return self.hook.proxies + + @property + def api_version(self) -> str | None: + return self.hook.api_version + + def execute(self, context: Context): + """List visible PowerBI Workspaces.""" + self.defer( + trigger=PowerBIWorkspaceListTrigger( + conn_id=self.conn_id, + timeout=self.timeout, + proxies=self.proxies, + api_version=self.api_version, + ), + method_name=self.execute_complete.__name__, + ) + + def get_workspace_list( + self, context: Context, event: dict[str, str] | dict[str, list[str]] | None = None + ): + """Push the workspace ID list to XCom.""" + if event: + if event["status"] == "error": + raise AirflowException(event["message"]) + + workspace_ids = event["workspace_ids"] + + if workspace_ids: + self.xcom_push( + context=context, + key=f"{self.task_id}.powerbi_workspace_ids", + value=workspace_ids, + ) + self.defer( + trigger=PowerBIWorkspaceListTrigger( + conn_id=self.conn_id, + timeout=self.timeout, + proxies=self.proxies, + workspace_ids=workspace_ids, + api_version=self.api_version, + ), + method_name=self.execute_complete.__name__, + ) + + def execute_complete(self, context: Context, event: dict[str, str]) -> Any: + """ + Return immediately - callback for when the trigger fires. + + Relies on trigger to throw an exception, otherwise it assumes execution was successful. + """ + if event: + self.xcom_push( + context=context, + key=f"{self.task_id}.powerbi_workspace_ids", + value=event["workspace_ids"], + ) + if event["status"] == "error": + raise AirflowException(event["message"]) + + +class PowerBIDatasetListOperator(BaseOperator): + """ + Gets a list of datasets where the service principal from the connection is assigned as admin. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:PowerBIDatasetListOperator` + + :param conn_id: Airflow Connection ID that contains the connection information for the Power BI account used for authentication. + :param group_id: The group Id to list discoverable datasets. + """ + + def __init__( + self, + *, + group_id: str, + conn_id: str = PowerBIHook.default_conn_name, + timeout: float = 60 * 60 * 24 * 7, + proxies: dict | None = None, + api_version: APIVersion | str | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies, api_version=api_version, timeout=timeout) + self.conn_id = conn_id + self.group_id = group_id + self.timeout = timeout + + @property + def proxies(self) -> dict | None: + return self.hook.proxies + + @property + def api_version(self) -> str | None: + return self.hook.api_version + + def execute(self, context: Context): + """List visible PowerBI datasets within group (Workspace).""" + self.defer( + trigger=PowerBIDatasetListTrigger( + conn_id=self.conn_id, + timeout=self.timeout, + proxies=self.proxies, + api_version=self.api_version, + group_id=self.group_id, + ), + method_name=self.execute_complete.__name__, + ) + + def get_dataset_list(self, context: Context, event: dict[str, str] | dict[str, list[str]] | None = None): Review Comment: removed, was supposed to be only for testing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org