amoghrajesh opened a new pull request, #61461: URL: https://github.com/apache/airflow/pull/61461
<!-- Thank you for contributing! Please provide above a brief description of the changes made in this pull request. Write a good git commit message following this guide: http://chris.beams.io/posts/git-commit/ Please make sure that your code changes are covered with tests. And in case of new features or big changes remember to adjust the documentation. Feel free to ping (in general) for the review if you do not see reaction for a few days (72 Hours is the minimum reaction time you can expect from volunteers) - we sometimes miss notifications. In case of an existing issue, reference it using one of the following: * closes: #ISSUE * related: #ISSUE --> --- ##### Was generative AI tooling used to co-author this PR? <!-- If generative AI tooling has been used in the process of authoring this PR, please change below checkbox to `[X]` followed by the name of the tool, uncomment the "Generated-by". --> - [x] Yes (please specify the tool below) Cursor IDE with Claude sonnet 4 1/2 <!-- Generated-by: [Tool Name] following [the guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions) --> closes: #59303 ## Summary Restore support for custom deadline references by implementing a `SerializedCustomReference` wrapper in Core that bridges SDK defined custom refs with core serialization for deadline alerts. ## Motivation After PR #61118 moved deadline alert decoding from SDK to Core and introduced `SerializedReferenceModels`, custom deadline references stopped working. The decoder only looked up types in `SerializedReferenceModels`, but custom refs registered via the SDK's `DeadlineReference.register_custom_reference()` only existed in `ReferenceModels`. This caused "No reference class found with name: MyCustomRef" errors. ## What has changed? ### Core (`airflow-core/`) #### SerializedCustomReference wrapper - Added `SerializedReferenceModels.SerializedCustomReference` class in `serialization/definitions/deadline.py` - Wraps custom deadline references imported from user code - Implements the `evaluate_with()` logic since SDK's `BaseDeadlineReference` is lightweight wrapper without it - Delegates `_evaluate_with()` execution to the inner custom ref #### Decoder (`serialization/decoders.py`) - When `__class_path` is present in serialized data, uses `SerializedCustomReference.deserialize_reference()` to import and wrap the custom ref - Builtin types continue to use `SerializedReferenceModels.get_reference_class()` lookup #### DeadlineAlert model - `reference_class` property returns `SerializedCustomReference` when `__class_path` is present in the stored reference dict - Returns standard serialized types for built-ins #### Create deadline & prune logic (`serialization/definitions/dag.py`, `models/dagrun.py`) - No changes needed - already use `SerializedReferenceModels.TYPES.DAGRUN` - Custom refs now work because `SerializedCustomReference` is in that tuple ## Architecture with these changes ```shell SDK (task-sdk/) - DAG Authoring ├── deadline.py │ ├── DeadlineAlert (user-facing) │ ├── DeadlineReference (public interface) │ └── BaseDeadlineReference (lightweight, serialization only) Core (airflow-core/) - Scheduling/Execution ├── serialization/ │ ├── encoders.py (adds class_path for custom types) │ ├── decoders.py (wraps custom types in SerializedCustomReference) │ └── definitions/deadline.py │ ├── SerializedReferenceModels │ │ ├── Built-in types (with evaluate_with) │ │ └── SerializedCustomReference (wrapper for custom refs) │ └── TYPES.DAGRUN (includes wrapper) ``` ## How custom refs work now 1. Encoder adds `__class_path` for custom refs (no changes) 2. Decoder sees `__class_path`, imports custom class, wraps in `SerializedCustomReference` 3. `isinstance(ref, TYPES.DAGRUN)` → True (wrapper is in tuple) → deadline created 4. Wrapper's `evaluate_with()` validates kwargs, calls custom ref's `_evaluate_with()`, adds interval 5. `reference_class in TYPES.DAGRUN` → True → deadline pruned on success ## Testing Define the custom reference in a plugin: ```python from airflow.sdk.definitions.deadline import DeadlineReference, deadline_reference, BaseDeadlineReference from airflow._shared.timezones import timezone @deadline_reference() class MyCustomRef(BaseDeadlineReference): """Custom ref: deadline is now + interval.""" def _evaluate_with(self, *, session, **kwargs): return timezone.utcnow() DeadlineReference.register_custom_reference(MyCustomRef) async def on_deadline(**kwargs): print("Custom ref deadline exceeded", kwargs) ``` DAG: ```python from datetime import datetime, timedelta from airflow import DAG from airflow.providers.standard.operators.bash import BashOperator from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, DeadlineReference from custom_deadline_refs import on_deadline with DAG( dag_id="testing-custom-reference-deadlines", start_date=datetime(2025, 1, 1), schedule=None, deadline=DeadlineAlert( reference=DeadlineReference.MyCustomRef, interval=timedelta(seconds=10), callback=AsyncCallback(on_deadline), ), ): BashOperator(task_id="example_task", bash_command="sleep 120") ``` Result: <img width="1658" height="934" alt="image" src="https://github.com/user-attachments/assets/e655e3b4-35af-444a-8e69-93cbc52ccfe1" /> <img width="2558" height="991" alt="image" src="https://github.com/user-attachments/assets/a6d67665-433b-4778-b5cb-601a82603f59" /> --- * Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)** for more information. Note: commit author/co-author name and email in commits become permanently public when merged. * For fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed. * When adding dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). * For significant user-facing changes create newsfragment: `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
