josh-fell commented on code in PR #30252:
URL: https://github.com/apache/airflow/pull/30252#discussion_r1157804622


##########
airflow/providers/microsoft/azure/sensors/wasb.py:
##########
@@ -158,3 +158,71 @@ def poke(self, context: Context) -> bool:
         self.log.info("Poking for prefix: %s in wasb://%s", self.prefix, 
self.container_name)
         hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
         return hook.check_for_prefix(self.container_name, self.prefix, 
**self.check_options)
+
+
+class WasbPrefixAsyncSensor(WasbPrefixSensor):
+    """
+     Polls asynchronously for the existence of a blob having the given prefix 
in a WASB container.
+
+    :param container_name: name of the container in which the blob should be 
searched for
+    :param blob_name: name of the blob to check existence for
+    :param include: specifies one or more additional datasets to include in the
+            response. Options include: ``snapshots``, ``metadata``, 
``uncommittedblobs``,
+            ``copy`, ``deleted``
+    :param delimiter: filters objects based on the delimiter (for e.g '.csv')
+    :param wasb_conn_id: the connection identifier for connecting to Azure WASB
+    :param poll_interval:  polling period in seconds to check for the status

Review Comment:
   ```suggestion
       :param poke_interval:  polling period in seconds to check for the status
   ```



##########
airflow/providers/microsoft/azure/triggers/wasb.py:
##########
@@ -86,3 +86,80 @@ async def run(self) -> AsyncIterator["TriggerEvent"]:
                         await asyncio.sleep(self.poke_interval)
         except Exception as e:
             yield TriggerEvent({"status": "error", "message": str(e)})
+
+
+class WasbPrefixSensorTrigger(BaseTrigger):
+    """
+    WasbPrefixSensorTrigger is fired as a deferred class with params to run 
the task in trigger.
+    It checks for the existence of a blob with the given prefix in the 
provided container.
+
+    :param container_name: name of the container in which the blob should be 
searched for
+    :param prefix: prefix of the blob to check existence for
+    :param include: specifies one or more additional datasets to include in the
+            response. Options include: ``snapshots``, ``metadata``, 
``uncommittedblobs``,
+            ``copy`, ``deleted``
+    :param delimiter: filters objects based on the delimiter (for e.g '.csv')
+    :param wasb_conn_id: the connection identifier for connecting to Azure WASB
+    :param poke_interval:  polling period in seconds to check for the status
+    :param public_read: whether an anonymous public read access should be 
used. Default is False
+    """
+
+    def __init__(
+        self,
+        container_name: str,
+        prefix: str,
+        include: list[str] | None = None,
+        delimiter: str | None = "/",

Review Comment:
   ```suggestion
           delimiter: str = "/",
   ```
   Seems like this should be the actual typing?



##########
airflow/providers/microsoft/azure/triggers/wasb.py:
##########
@@ -86,3 +86,80 @@ async def run(self) -> AsyncIterator["TriggerEvent"]:
                         await asyncio.sleep(self.poke_interval)
         except Exception as e:
             yield TriggerEvent({"status": "error", "message": str(e)})
+
+
+class WasbPrefixSensorTrigger(BaseTrigger):
+    """
+    WasbPrefixSensorTrigger is fired as a deferred class with params to run 
the task in trigger.
+    It checks for the existence of a blob with the given prefix in the 
provided container.
+
+    :param container_name: name of the container in which the blob should be 
searched for
+    :param prefix: prefix of the blob to check existence for
+    :param include: specifies one or more additional datasets to include in the
+            response. Options include: ``snapshots``, ``metadata``, 
``uncommittedblobs``,
+            ``copy`, ``deleted``

Review Comment:
   ```suggestion
               ``copy``, ``deleted``
   ```



##########
tests/system/providers/microsoft/azure/example_azure_blob_to_gcs.py:
##########
@@ -43,11 +46,21 @@
     start_date=datetime(2021, 1, 1),  # Override to match your needs
     default_args={"container_name": AZURE_CONTAINER_NAME, "blob_name": 
BLOB_NAME},
 ) as dag:
-
     wait_for_blob = WasbBlobSensor(task_id="wait_for_blob")
 
     wait_for_blob_async = WasbBlobSensor(task_id="wait_for_blob_async", 
deferrable=True)
 
+    wait_for_blob_prefix = WasbPrefixSensor(
+        task_id="wait_for_blob_prefix", container_name="azure_container", 
prefix="prefix_to_check"
+    )
+
+    wait_for_blob_prefix_async = WasbPrefixSensor(
+        task_id="wait_for_blob_prefix_async",
+        container_name="azure_container",
+        prefix="prefix_to_check",
+        deferrable=True,
+    )

Review Comment:
   I think it would be worth using the same `container_name` value that's 
already defined in the `default_args` for this system test. `prefix` should 
probably be something that's "legitimate" for this test as well so it's 
self-contained.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to