jroachgolf84 opened a new pull request, #67248:
URL: https://github.com/apache/airflow/pull/67248
## Description
This PR adds the `AssetState` mechanism to use the foundations put in place
in AIP-103 within a Trigger or Task.
## Testing
These changes were unit-tested, as well as tested E2E. See below for more
information.
### Unit Tests
Existing unit tests were updated and new tests were added to validate the
changes that were made in this branch. *Note, there are additional unit-tests
to be added.*
```bash
# Updating existing unit testes
breeze testing core-tests airflow-core/tests/unit/jobs/test_triggerer_job.py
# New unit tests
breeze testing task-sdk-tests
task-sdk/tests/task_sdk/definitions/test_asset_state.py
```
### E2E Testing
This `GenericEventTrigger` was used for E2E testing. Note that `AssetState`
is used in the `run` method of the Trigger. This code properly stores and
retrieves the generated number, and logs the output accordingly.
```python
from airflow.sdk import AssetState
from airflow.triggers.base import BaseEventTrigger, TriggerEvent
from collections.abc import AsyncIterator
from typing import Any
import asyncio
import logging
import random
class GenericEventTrigger(BaseEventTrigger):
def __init__(
self,
random_number,
waiter_delay,
asset_name,
**kwargs
):
super().__init__(**kwargs)
self.random_number = random_number
self.waiter_delay = waiter_delay
self.asset_name = asset_name
def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize the Trigger, including the func, params, and
waiter_delay."""
return (
self.__class__.__module__ + "." + self.__class__.__qualname__,
{
"random_number": self.random_number,
"waiter_delay": self.waiter_delay,
"asset_name": self.asset_name,
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Logic that fires a TriggerEvent."""
# Here's where the AssetState is actually being used
asset_state = AssetState(name=self.asset_name)
logging.info(f"***** asset_state: {asset_state}")
while True:
result = random.randint(0, 5)
logging.info(f"result: {result}")
asset_state.set("result", str(result))
_result = asset_state.get("result")
logging.info(f"_result: {_result}")
if result == self.random_number:
logging.info("yield'ing TriggerEvent")
yield TriggerEvent({"status": "success", "result": result})
break
logging.info(f"Sleeping for {self.waiter_delay} seconds")
await asyncio.sleep(self.waiter_delay)
```
## TODO
The following items still need to be completed:
- [ ] Validate that `AssetState` works when called within a Task.
- [ ] Ensure complete test coverage of the changes that were made.
* closes: #67200
##### Was generative AI tooling used to co-author this PR?
No, generative AI was not used to generate this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]