jroachgolf84 commented on code in PR #67016: URL: https://github.com/apache/airflow/pull/67016#discussion_r3275917106
########## providers/microsoft/azure/src/airflow/providers/microsoft/azure/bundles/wasb.py: ########## @@ -0,0 +1,154 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +from pathlib import Path + +import structlog + +from airflow.dag_processing.bundles.base import BaseDagBundle +from airflow.providers.microsoft.azure.hooks.wasb import WasbHook + + +class WasbDagBundle(BaseDagBundle): + """ + WASB Dag bundle - exposes a directory in Azure Blob Storage as a Dag bundle. + + This allows Airflow to load Dags directly from an Azure Blob Storage container. + + :param wasb_conn_id: Airflow connection ID for Azure Blob Storage. Defaults to WasbHook.default_conn_name. + :param container_name: The name of the blob container containing the Dag files. + :param prefix: Optional subdirectory within the container where the Dags are stored. + If empty, Dags are assumed to be at the root of the container. + """ + + supports_versioning = False + + def __init__( + self, + *, + wasb_conn_id: str = WasbHook.default_conn_name, + container_name: str, + prefix: str = "", + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.wasb_conn_id = wasb_conn_id + self.container_name = container_name + self.prefix = prefix + self.wasb_dags_dir: Path = self.base_dir + + log = structlog.get_logger(__name__) + self._log = log.bind( + bundle_name=self.name, + version=self.version, + container_name=self.container_name, + prefix=self.prefix, + wasb_conn_id=self.wasb_conn_id, + ) + self._wasb_hook: WasbHook | None = None + + def _initialize(self): + with self.lock(): + if not self.wasb_dags_dir.exists(): + self._log.info("Creating local Dags directory: %s", self.wasb_dags_dir) + os.makedirs(self.wasb_dags_dir) + + if not self.wasb_dags_dir.is_dir(): + raise NotADirectoryError(f"Local Dags path: {self.wasb_dags_dir} is not a directory.") + + if not self.wasb_hook.check_for_container(container_name=self.container_name): + raise ValueError(f"WASB container '{self.container_name}' does not exist.") + + if self.prefix: + if not self.wasb_hook.check_for_prefix( + container_name=self.container_name, prefix=self.prefix, delimiter="/" + ): + raise ValueError( + f"WASB prefix 'wasb://{self.container_name}/{self.prefix}' does not exist." + ) + self.refresh() + + def initialize(self) -> None: + self._initialize() + super().initialize() + + @property + def wasb_hook(self): + if self._wasb_hook is None: + self._wasb_hook = WasbHook(wasb_conn_id=self.wasb_conn_id) + return self._wasb_hook + + def __repr__(self): + return ( + f"<WasbDagBundle(" + f"name={self.name!r}, " + f"container_name={self.container_name!r}, " + f"prefix={self.prefix!r}, " + f"version={self.version!r}" + f")>" + ) + + def get_current_version(self) -> str | None: + """Return the current version of the Dag bundle. Currently not supported.""" + return None + + @property + def path(self) -> Path: + """Return the local path to the Dag files.""" + return self.wasb_dags_dir + + def refresh(self) -> None: + """Refresh the Dag bundle by re-downloading the Dags from Azure Blob Storage.""" + if self.version: + raise ValueError("Refreshing a specific version is not supported") + + with self.lock(): + self._log.debug( + "Downloading Dags from wasb://%s/%s to %s", + self.container_name, + self.prefix, + self.wasb_dags_dir, + ) + self.wasb_hook.sync_to_local_dir( + container_name=self.container_name, + prefix=self.prefix, + local_dir=self.wasb_dags_dir, + delete_stale=True, + ) + + def view_url(self, version: str | None = None) -> str | None: Review Comment: Or a video of this working. ########## providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/wasb.py: ########## @@ -271,6 +272,17 @@ def check_for_prefix(self, container_name: str, prefix: str, **kwargs) -> bool: blobs = self.get_blobs_list(container_name=container_name, prefix=prefix, **kwargs) return bool(blobs) + def check_for_container(self, container_name: str) -> bool: + """ + Check if a container exists on Azure Blob Storage. + + :param container_name: Name of the container. + :return: True if the container exists, False otherwise. + """ + container = self._get_container_client(container_name) + self.check_for_variable_type("container", container, ContainerClient) + return cast("ContainerClient", container).exists() Review Comment: I don't love this pattern, but seems it's used quite a few other places :) ########## providers/microsoft/azure/src/airflow/providers/microsoft/azure/bundles/wasb.py: ########## @@ -0,0 +1,154 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +from pathlib import Path + +import structlog + +from airflow.dag_processing.bundles.base import BaseDagBundle +from airflow.providers.microsoft.azure.hooks.wasb import WasbHook + + +class WasbDagBundle(BaseDagBundle): + """ + WASB Dag bundle - exposes a directory in Azure Blob Storage as a Dag bundle. + + This allows Airflow to load Dags directly from an Azure Blob Storage container. + + :param wasb_conn_id: Airflow connection ID for Azure Blob Storage. Defaults to WasbHook.default_conn_name. + :param container_name: The name of the blob container containing the Dag files. + :param prefix: Optional subdirectory within the container where the Dags are stored. + If empty, Dags are assumed to be at the root of the container. + """ + + supports_versioning = False + + def __init__( + self, + *, + wasb_conn_id: str = WasbHook.default_conn_name, + container_name: str, + prefix: str = "", + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.wasb_conn_id = wasb_conn_id + self.container_name = container_name + self.prefix = prefix + self.wasb_dags_dir: Path = self.base_dir + + log = structlog.get_logger(__name__) + self._log = log.bind( + bundle_name=self.name, + version=self.version, + container_name=self.container_name, + prefix=self.prefix, + wasb_conn_id=self.wasb_conn_id, + ) + self._wasb_hook: WasbHook | None = None + + def _initialize(self): + with self.lock(): + if not self.wasb_dags_dir.exists(): + self._log.info("Creating local Dags directory: %s", self.wasb_dags_dir) + os.makedirs(self.wasb_dags_dir) + + if not self.wasb_dags_dir.is_dir(): + raise NotADirectoryError(f"Local Dags path: {self.wasb_dags_dir} is not a directory.") + + if not self.wasb_hook.check_for_container(container_name=self.container_name): + raise ValueError(f"WASB container '{self.container_name}' does not exist.") + + if self.prefix: + if not self.wasb_hook.check_for_prefix( + container_name=self.container_name, prefix=self.prefix, delimiter="/" + ): + raise ValueError( + f"WASB prefix 'wasb://{self.container_name}/{self.prefix}' does not exist." + ) + self.refresh() + + def initialize(self) -> None: + self._initialize() + super().initialize() + + @property + def wasb_hook(self): + if self._wasb_hook is None: + self._wasb_hook = WasbHook(wasb_conn_id=self.wasb_conn_id) + return self._wasb_hook + + def __repr__(self): + return ( + f"<WasbDagBundle(" + f"name={self.name!r}, " + f"container_name={self.container_name!r}, " + f"prefix={self.prefix!r}, " + f"version={self.version!r}" + f")>" + ) + + def get_current_version(self) -> str | None: + """Return the current version of the Dag bundle. Currently not supported.""" + return None + + @property + def path(self) -> Path: + """Return the local path to the Dag files.""" + return self.wasb_dags_dir + + def refresh(self) -> None: + """Refresh the Dag bundle by re-downloading the Dags from Azure Blob Storage.""" + if self.version: + raise ValueError("Refreshing a specific version is not supported") + + with self.lock(): + self._log.debug( + "Downloading Dags from wasb://%s/%s to %s", + self.container_name, + self.prefix, + self.wasb_dags_dir, + ) + self.wasb_hook.sync_to_local_dir( + container_name=self.container_name, + prefix=self.prefix, + local_dir=self.wasb_dags_dir, + delete_stale=True, + ) + + def view_url(self, version: str | None = None) -> str | None: Review Comment: Can you include a screenshot of this in your PR? ########## providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/wasb.py: ########## @@ -460,6 +472,98 @@ def download( # TODO: rework the interface as it might also return Awaitable return blob_client.download_blob(offset=offset, length=length, **kwargs) # type: ignore[return-value] + def _sync_to_local_dir_delete_stale_local_files( Review Comment: Seems like there are missing tests for this logic in `.../hooks/wasb.py`... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
