mik-laj commented on a change in pull request #12188:
URL: https://github.com/apache/airflow/pull/12188#discussion_r528199809
##########
File path: airflow/providers/microsoft/azure/hooks/wasb.py
##########
@@ -16,94 +16,145 @@
# specific language governing permissions and limitations
# under the License.
#
-"""
-This module contains integration with Azure Blob Storage.
+"""This module contains integration with Azure Blob Storage."""
-It communicate via the Window Azure Storage Blob protocol. Make sure that a
-Airflow connection of type `wasb` exists. Authorization can be done by
supplying a
-login (=Storage account name) and password (=KEY), or login and SAS token in
the extra
-field (see connection `wasb_default` for an example).
-"""
-from azure.storage.blob import BlockBlobService
+from typing import Any, Dict, List, Optional
+
+from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError
+from azure.identity import ClientSecretCredential
+from azure.storage.blob import BlobClient, BlobServiceClient, ContainerClient,
StorageStreamDownloader
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
class WasbHook(BaseHook):
"""
- Interacts with Azure Blob Storage through the ``wasb://`` protocol.
-
- These parameters have to be passed in Airflow Data Base: account_name and
account_key.
-
- Additional options passed in the 'extra' field of the connection will be
- passed to the `BlockBlockService()` constructor. For example, authenticate
- using a SAS token by adding {"sas_token": "YOUR_TOKEN"}.
+ Interacts with Azure Blob Storage.
:param wasb_conn_id: Reference to the wasb connection.
:type wasb_conn_id: str
+ :param public_read: Whether an anonymous public read access should be
used. default is False
+ :type public_read: bool
"""
- def __init__(self, wasb_conn_id: str = 'wasb_default') -> None:
+ def __init__(self, wasb_conn_id: str = 'wasb_default', public_read: bool =
False) -> None:
super().__init__()
self.conn_id = wasb_conn_id
+ self.public_read = public_read
self.connection = self.get_conn()
- def get_conn(self) -> BlockBlobService:
- """Return the BlockBlobService object."""
+ def get_conn(self) -> BlobServiceClient:
+ """Return the BlobServiceClient object."""
conn = self.get_connection(self.conn_id)
- service_options = conn.extra_dejson
- return BlockBlobService(account_name=conn.login,
account_key=conn.password, **service_options)
+ extra = conn.extra_dejson
- def check_for_blob(self, container_name, blob_name, **kwargs):
+ if self.public_read:
+ # Here we use anonymous public read
+ # more info
+ #
https://docs.microsoft.com/en-us/azure/storage/blobs/storage-manage-access-to-resources
+ return BlobServiceClient(account_url=conn.host)
+ if extra.get('connection_string'):
+ # connection_string auth takes priority
+ return
BlobServiceClient.from_connection_string(extra.get('connection_string'))
+ if extra.get('shared_access_key'):
+ # using shared access key
+ return BlobServiceClient(account_url=conn.host,
credential=extra.get('shared_access_key'))
+ if extra.get('tenant_id'):
+ # use Active Directory auth
+ app_id = conn.login
+ app_secret = conn.password
+ token_credential = ClientSecretCredential(extra.get('tenant_id'),
app_id, app_secret)
+ return BlobServiceClient(account_url=conn.host,
credential=token_credential)
+ if extra.get('sas_token'):
+ return BlobServiceClient(account_url=extra.get('sas_token'))
+ else:
+ raise AirflowException('Unknown connection type')
+
+ def _get_container_client(self, container_name: str) -> ContainerClient:
"""
- Check if a blob exists on Azure Blob Storage.
+ Instantiates a container client
+
+ :param container_name: The name of the container
+ :type container_name: str
+ :return: ContainerClient
+ """
+ return self.connection.get_container_client(container_name)
+
+ def _get_blob_client(self, container_name: str, blob_name: str) ->
BlobClient:
+ """
+ Instantiates a blob client
+ :param container_name: The name of the blob container
+ :type container_name: str
+ :param blob_name: The name of the blob. This needs not be existing
+ :type blob_name: str
+ """
+ container_client = self.create_container(container_name)
+ return container_client.get_blob_client(blob_name)
+ def check_for_blob(self, container_name: str, blob_name: str, **kwargs) ->
bool:
+ """
+ Check if a blob exists on Azure Blob Storage.
:param container_name: Name of the container.
Review comment:
```suggestion
:param container_name: Name of the container.
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]