jroachgolf84 opened a new pull request, #67839:
URL: https://github.com/apache/airflow/pull/67839
## Description
To ensure that classes inheriting from the `BaseEventTrigger` (typically
used for Asset watching) are "Asset-aware", the work done in AIP-103 has been
wired into the `BaseEventTrigger` class.
#65103 drafted an approach that "flipped" the definition of the `Asset` and
`AssetWatcher`. However, for good reason, the approach was challenged. After
conversation, the goal became to "pass" the `AssetStateAccessors` through to
the `BaseEventTrigger` with some runtime magic.
With this model, defining an `Asset` and `AssetWatcher` remains the same as
before.
```python
from airflow.sdk import Asset, AssetWatcher, DAG, task
from datetime import datetime
from triggers.event_triggers import GenericEventTrigger
generic_asset_watcher = AssetWatcher(
name="generic_asset_watcher",
trigger=GenericEventTrigger(
random_number=1,
waiter_delay=15
)
)
generic_asset = Asset(
name="generic_asset",
watchers=[generic_asset_watcher],
)
with DAG(
dag_id="aip_93_scoping",
start_date=datetime(2026, 1, 1),
schedule=[generic_asset]
) as dag:
@task
def downstream_task():
pass
downstream_task()
```
The two PR's below were closed in favor of this PR:
- #66595
## Testing
No unit tests have been written for this logic yet. However, testing has
been performed E2E locally with `breeze`. The trigger authored below is what
was used for testing. When this ran, the Asset `name` and `uri` were output in
the Triggerer logs.
```python
class GenericEventTrigger(BaseEventTrigger):
def __init__(
self,
random_number,
waiter_delay,
asset_name,
**kwargs
):
super().__init__(**kwargs)
self.random_number = random_number
self.waiter_delay = waiter_delay
self.asset_name = asset_name
def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize the Trigger, including the func, params, and
waiter_delay."""
return (
self.__class__.__module__ + "." + self.__class__.__qualname__,
{
"random_number": self.random_number,
"waiter_delay": self.waiter_delay,
"asset_name": self.asset_name,
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Logic that fires a TriggerEvent."""
logging.info(f"***** asset_states: {self.asset_states}")
while True:
result = random.randint(0, 5)
if result == self.random_number:
logging.info("yield'ing TriggerEvent")
yield TriggerEvent({"status": "success", "result": result})
break
logging.info(f"Sleeping for {self.waiter_delay} seconds")
await asyncio.sleep(self.waiter_delay)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]