jroachgolf84 opened a new pull request, #67248:
URL: https://github.com/apache/airflow/pull/67248

   ## Description
   
   This PR adds the `AssetState` mechanism to use the foundations put in place 
in AIP-103 within a Trigger or Task.
   
   ## Testing
   
   These changes were unit-tested, as well as tested E2E. See below for more 
information.
   
   ### Unit Tests
   
   Existing unit tests were updated and new tests were added to validate the 
changes that were made in this branch. *Note, there are additional unit-tests 
to be added.*
   
   ```bash
   # Updating existing unit testes
   breeze testing core-tests airflow-core/tests/unit/jobs/test_triggerer_job.py
   
   # New unit tests
   breeze testing task-sdk-tests 
task-sdk/tests/task_sdk/definitions/test_asset_state.py
   ```
   
   ### E2E Testing
   
   This `GenericEventTrigger` was used for E2E testing. Note that `AssetState` 
is used in the `run` method of the Trigger. This code properly stores and 
retrieves the generated number, and logs the output accordingly.
   
   ```python
   from airflow.sdk import AssetState
   from airflow.triggers.base import BaseEventTrigger, TriggerEvent
   from collections.abc import AsyncIterator
   from typing import Any
   
   import asyncio
   import logging
   import random
   
   
   class GenericEventTrigger(BaseEventTrigger):
       def __init__(
           self,
           random_number,
           waiter_delay,
           asset_name,
           **kwargs
       ):
           super().__init__(**kwargs)
   
           self.random_number = random_number
           self.waiter_delay = waiter_delay
           self.asset_name = asset_name
   
       def serialize(self) -> tuple[str, dict[str, Any]]:
           """Serialize the Trigger, including the func, params, and 
waiter_delay."""
           return (
               self.__class__.__module__ + "." + self.__class__.__qualname__,
               {
                   "random_number": self.random_number,
                   "waiter_delay": self.waiter_delay,
                   "asset_name": self.asset_name,
               },
           )
   
       async def run(self) -> AsyncIterator[TriggerEvent]:
           """Logic that fires a TriggerEvent."""
           # Here's where the AssetState is actually being used
           asset_state = AssetState(name=self.asset_name)
           logging.info(f"***** asset_state: {asset_state}")
   
           while True:
               result = random.randint(0, 5)
               logging.info(f"result: {result}")
   
               asset_state.set("result", str(result))
               _result = asset_state.get("result")
               logging.info(f"_result: {_result}")
   
               if result == self.random_number:
                   logging.info("yield'ing TriggerEvent")
                   yield TriggerEvent({"status": "success", "result": result})
                   break
   
               logging.info(f"Sleeping for {self.waiter_delay} seconds")
               await asyncio.sleep(self.waiter_delay)
   ```
   
   ## TODO
   
   The following items still need to be completed:
   
   - [ ] Validate that `AssetState` works when called within a Task.
   - [ ] Ensure complete test coverage of the changes that were made.
   
   * closes: #67200 
   
   ##### Was generative AI tooling used to co-author this PR?
   
   No, generative AI was not used to generate this PR.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to