jroachgolf84 opened a new pull request, #68900:
URL: https://github.com/apache/airflow/pull/68900

   ## Description
   
   When attempting to use the Asset State Store from within a 
`BaseEventTrigger`, the following exception was thrown:
   
   ```
   Traceback (most recent call last):
     File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py", 
line 890, in handle_requests
       msg = 
self.decoder.validate_python(self._deserialize_request(request.body))
     File "/usr/python/lib/python3.10/site-packages/pydantic/type_adapter.py", 
line 441, in validate_python
       return self.validator.validate_python(
   
   pydantic_core._pydantic_core.ValidationError: 1 validation error for 
tagged-union[
     TriggerStateChanges, GetConnection, DeleteVariable, GetVariable, 
GetVariableKeys,
     PutVariable, DeleteXCom, GetXCom, SetXCom, GetTICount, GetTaskStates, 
GetDagRunState,
     GetDRCount, GetPreviousTI, GetHITLDetailResponse, UpdateHITLDetail, 
MaskSecret
   ]
   
     Input tag 'GetAssetStateStoreByName' found using 'type' does not match any 
of the expected tags:
     'TriggerStateChanges', 'GetConnection', 'DeleteVariable', 'GetVariable', 
'GetVariableKeys',
     'PutVariable', 'DeleteXCom', 'GetXCom', 'SetXCom', 'GetTICount', 
'GetTaskStates',
     'GetDagRunState', 'GetDRCount', 'GetPreviousTI', 'GetHITLDetailResponse', 
'UpdateHITLDetail',
     'MaskSecret'
   
     [type=union_tag_invalid, input_value={'name': 'generic_asset', ... 'type': 
'GetAssetStateStoreByName'}, input_type=dict]
   ```
   
   This was called out as something that needed to be addressed by @vikramkoka 
in #67839. This PR addresses this error by adding a pathway for each of the 8 
operations that the Asset State Store can perform.
   
   related: #67839
   
   ## Testing
   
   Changes were tested using both unit-tests, as well as E2E. Unit-tests can be 
run using the below commands:
   
   ```bash
   breeze testing core-tests 
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state_store.py
   
   breeze testing core-tests airflow-core/tests/unit/jobs/test_triggerer_job.py
   ```
   
   To test this E2E, a custom `BaseEventTrigger` was written to test `get`, 
`set`, `delete`, and `clear` operations. The DAG that uses this Trigger 
executes successfully E2E, and the Triggered contains all of the expected 
logging statements.
   
   ```python
   from airflow.triggers.base import BaseEventTrigger, TriggerEvent
   from collections.abc import AsyncIterator
   from typing import Any
   
   import asyncio
   import logging
   import random
   
   
   class GenericEventTrigger(BaseEventTrigger):
       def __init__(
           self,
           random_number,
           waiter_delay,
           asset_name,
           **kwargs
       ):
           super().__init__(**kwargs)
   
           self.random_number = random_number
           self.waiter_delay = waiter_delay
           self.asset_name = asset_name
   
       def serialize(self) -> tuple[str, dict[str, Any]]:
           """Serialize the Trigger, including the func, params, and 
waiter_delay."""
           return (
               self.__class__.__module__ + "." + self.__class__.__qualname__,
               {
                   "random_number": self.random_number,
                   "waiter_delay": self.waiter_delay,
                   "asset_name": self.asset_name,
               },
           )
   
       async def run(self) -> AsyncIterator[TriggerEvent]:
           """Logic that fires a TriggerEvent."""
           asset_state_store = self.asset_state_store
           logging.info(f"asset_state_store: {asset_state_store}")
   
           # Clearing state store before starting (in case there is anything 
there)
           asset_state_store.clear()
           logging.info(f"cleared asset_state_store")
   
           while True:
               # Get the previous number
               last_result = asset_state_store.get("result", -1)
               logging.info(f"last_result: {last_result}")
   
               # Create a new number
               result = random.randint(0, 5)
               asset_state_store.set("result", result)
               logging.info(f"result: {result}")
   
               if result == self.random_number:
                   logging.info("yield'ing TriggerEvent")
                   yield TriggerEvent({"status": "success", "result": result})
   
                   # Clear the state_store
                   asset_state_store.delete(key="result")  # Should this work?
                   logging.info(f"deleted 'result' key for asset_state_store")
                   break
   
               logging.info(f"Sleeping for {self.waiter_delay} seconds")
               await asyncio.sleep(self.waiter_delay)
   
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to