jroachgolf84 opened a new pull request, #68900:
URL: https://github.com/apache/airflow/pull/68900
## Description
When attempting to use the Asset State Store from within a
`BaseEventTrigger`, the following exception was thrown:
```
Traceback (most recent call last):
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py",
line 890, in handle_requests
msg =
self.decoder.validate_python(self._deserialize_request(request.body))
File "/usr/python/lib/python3.10/site-packages/pydantic/type_adapter.py",
line 441, in validate_python
return self.validator.validate_python(
pydantic_core._pydantic_core.ValidationError: 1 validation error for
tagged-union[
TriggerStateChanges, GetConnection, DeleteVariable, GetVariable,
GetVariableKeys,
PutVariable, DeleteXCom, GetXCom, SetXCom, GetTICount, GetTaskStates,
GetDagRunState,
GetDRCount, GetPreviousTI, GetHITLDetailResponse, UpdateHITLDetail,
MaskSecret
]
Input tag 'GetAssetStateStoreByName' found using 'type' does not match any
of the expected tags:
'TriggerStateChanges', 'GetConnection', 'DeleteVariable', 'GetVariable',
'GetVariableKeys',
'PutVariable', 'DeleteXCom', 'GetXCom', 'SetXCom', 'GetTICount',
'GetTaskStates',
'GetDagRunState', 'GetDRCount', 'GetPreviousTI', 'GetHITLDetailResponse',
'UpdateHITLDetail',
'MaskSecret'
[type=union_tag_invalid, input_value={'name': 'generic_asset', ... 'type':
'GetAssetStateStoreByName'}, input_type=dict]
```
This was called out as something that needed to be addressed by @vikramkoka
in #67839. This PR addresses this error by adding a pathway for each of the 8
operations that the Asset State Store can perform.
related: #67839
## Testing
Changes were tested using both unit-tests, as well as E2E. Unit-tests can be
run using the below commands:
```bash
breeze testing core-tests
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state_store.py
breeze testing core-tests airflow-core/tests/unit/jobs/test_triggerer_job.py
```
To test this E2E, a custom `BaseEventTrigger` was written to test `get`,
`set`, `delete`, and `clear` operations. The DAG that uses this Trigger
executes successfully E2E, and the Triggered contains all of the expected
logging statements.
```python
from airflow.triggers.base import BaseEventTrigger, TriggerEvent
from collections.abc import AsyncIterator
from typing import Any
import asyncio
import logging
import random
class GenericEventTrigger(BaseEventTrigger):
def __init__(
self,
random_number,
waiter_delay,
asset_name,
**kwargs
):
super().__init__(**kwargs)
self.random_number = random_number
self.waiter_delay = waiter_delay
self.asset_name = asset_name
def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize the Trigger, including the func, params, and
waiter_delay."""
return (
self.__class__.__module__ + "." + self.__class__.__qualname__,
{
"random_number": self.random_number,
"waiter_delay": self.waiter_delay,
"asset_name": self.asset_name,
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Logic that fires a TriggerEvent."""
asset_state_store = self.asset_state_store
logging.info(f"asset_state_store: {asset_state_store}")
# Clearing state store before starting (in case there is anything
there)
asset_state_store.clear()
logging.info(f"cleared asset_state_store")
while True:
# Get the previous number
last_result = asset_state_store.get("result", -1)
logging.info(f"last_result: {last_result}")
# Create a new number
result = random.randint(0, 5)
asset_state_store.set("result", result)
logging.info(f"result: {result}")
if result == self.random_number:
logging.info("yield'ing TriggerEvent")
yield TriggerEvent({"status": "success", "result": result})
# Clear the state_store
asset_state_store.delete(key="result") # Should this work?
logging.info(f"deleted 'result' key for asset_state_store")
break
logging.info(f"Sleeping for {self.waiter_delay} seconds")
await asyncio.sleep(self.waiter_delay)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]