mik-laj commented on a change in pull request #12188:
URL: https://github.com/apache/airflow/pull/12188#discussion_r528199794



##########
File path: airflow/providers/microsoft/azure/hooks/wasb.py
##########
@@ -16,94 +16,145 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-"""
-This module contains integration with Azure Blob Storage.
+"""This module contains integration with Azure Blob Storage."""
 
-It communicate via the Window Azure Storage Blob protocol. Make sure that a
-Airflow connection of type `wasb` exists. Authorization can be done by 
supplying a
-login (=Storage account name) and password (=KEY), or login and SAS token in 
the extra
-field (see connection `wasb_default` for an example).
-"""
-from azure.storage.blob import BlockBlobService
+from typing import Any, Dict, List, Optional
+
+from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError
+from azure.identity import ClientSecretCredential
+from azure.storage.blob import BlobClient, BlobServiceClient, ContainerClient, 
StorageStreamDownloader
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
 
 
 class WasbHook(BaseHook):
     """
-    Interacts with Azure Blob Storage through the ``wasb://`` protocol.
-
-    These parameters have to be passed in Airflow Data Base: account_name and 
account_key.
-
-    Additional options passed in the 'extra' field of the connection will be
-    passed to the `BlockBlockService()` constructor. For example, authenticate
-    using a SAS token by adding {"sas_token": "YOUR_TOKEN"}.
+    Interacts with Azure Blob Storage.
 
     :param wasb_conn_id: Reference to the wasb connection.
     :type wasb_conn_id: str
+    :param public_read: Whether an anonymous public read access should be 
used. default is False
+    :type public_read: bool
     """
 
-    def __init__(self, wasb_conn_id: str = 'wasb_default') -> None:
+    def __init__(self, wasb_conn_id: str = 'wasb_default', public_read: bool = 
False) -> None:
         super().__init__()
         self.conn_id = wasb_conn_id
+        self.public_read = public_read
         self.connection = self.get_conn()
 
-    def get_conn(self) -> BlockBlobService:
-        """Return the BlockBlobService object."""
+    def get_conn(self) -> BlobServiceClient:
+        """Return the BlobServiceClient object."""
         conn = self.get_connection(self.conn_id)
-        service_options = conn.extra_dejson
-        return BlockBlobService(account_name=conn.login, 
account_key=conn.password, **service_options)
+        extra = conn.extra_dejson
 
-    def check_for_blob(self, container_name, blob_name, **kwargs):
+        if self.public_read:
+            # Here we use anonymous public read
+            # more info
+            # 
https://docs.microsoft.com/en-us/azure/storage/blobs/storage-manage-access-to-resources
+            return BlobServiceClient(account_url=conn.host)
+        if extra.get('connection_string'):
+            # connection_string auth takes priority
+            return 
BlobServiceClient.from_connection_string(extra.get('connection_string'))
+        if extra.get('shared_access_key'):
+            # using shared access key
+            return BlobServiceClient(account_url=conn.host, 
credential=extra.get('shared_access_key'))
+        if extra.get('tenant_id'):
+            # use Active Directory auth
+            app_id = conn.login
+            app_secret = conn.password
+            token_credential = ClientSecretCredential(extra.get('tenant_id'), 
app_id, app_secret)
+            return BlobServiceClient(account_url=conn.host, 
credential=token_credential)
+        if extra.get('sas_token'):
+            return BlobServiceClient(account_url=extra.get('sas_token'))
+        else:
+            raise AirflowException('Unknown connection type')
+
+    def _get_container_client(self, container_name: str) -> ContainerClient:
         """
-        Check if a blob exists on Azure Blob Storage.
+        Instantiates a container client
+
+        :param container_name: The name of the container
+        :type container_name: str
+        :return: ContainerClient
+        """
+        return self.connection.get_container_client(container_name)
+
+    def _get_blob_client(self, container_name: str, blob_name: str) -> 
BlobClient:
+        """
+        Instantiates a blob client
+        :param container_name: The name of the blob container

Review comment:
       ```suggestion
   
           :param container_name: The name of the blob container
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to