jroachgolf84 opened a new pull request, #67839:
URL: https://github.com/apache/airflow/pull/67839

   ## Description
   
   To ensure that classes inheriting from the `BaseEventTrigger` (typically 
used for Asset watching) are "Asset-aware", the work done in AIP-103 has been 
wired into the `BaseEventTrigger` class.
   
   #65103 drafted an approach that "flipped" the definition of the `Asset` and 
`AssetWatcher`. However, for good reason, the approach was challenged. After 
conversation, the goal became to "pass" the `AssetStateAccessors` through to 
the `BaseEventTrigger` with some runtime magic.
   
   With this model, defining an `Asset` and `AssetWatcher` remains the same as 
before.
   
   ```python
   from airflow.sdk import Asset, AssetWatcher, DAG, task
   from datetime import datetime
   from triggers.event_triggers import GenericEventTrigger
   
   
   generic_asset_watcher = AssetWatcher(
       name="generic_asset_watcher",
       trigger=GenericEventTrigger(
           random_number=1,
           waiter_delay=15
       )
   )
   
   generic_asset = Asset(
       name="generic_asset",
       watchers=[generic_asset_watcher],
   )
   
   
   with DAG(
       dag_id="aip_93_scoping",
       start_date=datetime(2026, 1, 1),
       schedule=[generic_asset]
   ) as dag:
   
       @task
       def downstream_task():
           pass
   
       downstream_task()
   ```
   
   The two PR's below were closed in favor of this PR:
   
   - #66595 
   
   ## Testing
   
   No unit tests have been written for this logic yet. However, testing has 
been performed E2E locally with `breeze`. The trigger authored below is what 
was used for testing. When this ran, the Asset `name` and `uri` were output in 
the Triggerer logs.
   
   ```python
   class GenericEventTrigger(BaseEventTrigger):
       def __init__(
           self,
           random_number,
           waiter_delay,
           asset_name,
           **kwargs
       ):
           super().__init__(**kwargs)
   
           self.random_number = random_number
           self.waiter_delay = waiter_delay
           self.asset_name = asset_name
   
       def serialize(self) -> tuple[str, dict[str, Any]]:
           """Serialize the Trigger, including the func, params, and 
waiter_delay."""
           return (
               self.__class__.__module__ + "." + self.__class__.__qualname__,
               {
                   "random_number": self.random_number,
                   "waiter_delay": self.waiter_delay,
                   "asset_name": self.asset_name,
               },
           )
   
       async def run(self) -> AsyncIterator[TriggerEvent]:
           """Logic that fires a TriggerEvent."""
           logging.info(f"***** asset_states: {self.asset_states}")
   
           while True:
               result = random.randint(0, 5)
   
               if result == self.random_number:
                   logging.info("yield'ing TriggerEvent")
                   yield TriggerEvent({"status": "success", "result": result})
                   break
   
               logging.info(f"Sleeping for {self.waiter_delay} seconds")
               await asyncio.sleep(self.waiter_delay)
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to