vincbeck commented on PR #67248:
URL: https://github.com/apache/airflow/pull/67248#issuecomment-4544853225
> Hey @jroachgolf84, I think the plumbing fix here will be correct.
>
> `AssetState(name=self.asset_name)` works but it is inconsistent with how
tasks access asset state as Vincent also mentioned, and nothing prevents a
trigger from doing `AssetState(name="some_other_asset") `even if it's not
associated with that asset.
>
> Looking at `BaseTrigger`, the triggerer already injects
`self.task_instance` before calling `run()`. We could use the same pattern here
and the triggerer could populate `self.asset_states` on `BaseEventTrigger`
before `run()`, keyed by asset name, using the assets associated with the TI.
The trigger author can then do:
>
> ```python
> async def run(self):
> watermark = self.asset_states["orders"].get("watermark")
> self.asset_states["orders"].set("watermark", new_watermark)
> yield TriggerEvent(...)
> ```
>
> This will also provide some benefits like:
>
> * Make the triggers scoped automatically (only the trigger's associated
assets are present)
> * Should also work cleanly for multi-asset triggers(just use the asset
name while accessing)
> * The framework handles scoping, not the author
>
> The triggerer already knows which assets are associated with the TI at the
point it sets `trigger.task_instance = ti`, so populating `asset_states` there
is probably straightforward.
I really like that!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]