amoghrajesh opened a new pull request, #61461:
URL: https://github.com/apache/airflow/pull/61461

   <!--
   Thank you for contributing!
   
   Please provide above a brief description of the changes made in this pull 
request.
   Write a good git commit message following this guide: 
http://chris.beams.io/posts/git-commit/
   
   Please make sure that your code changes are covered with tests.
   And in case of new features or big changes remember to adjust the 
documentation.
   
   Feel free to ping (in general) for the review if you do not see reaction for 
a few days
   (72 Hours is the minimum reaction time you can expect from volunteers) - we 
sometimes miss notifications.
   
   In case of an existing issue, reference it using one of the following:
   
   * closes: #ISSUE
   * related: #ISSUE
   -->
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change below checkbox to `[X]` followed by the name of the tool, uncomment 
the "Generated-by".
   -->
   
   - [x] Yes (please specify the tool below)
   
   Cursor IDE with Claude sonnet 4 1/2
   
   <!--
   Generated-by: [Tool Name] following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   -->
   
   closes: #59303
   
   
   ## Summary
   
   Restore support for custom deadline references by implementing a 
`SerializedCustomReference` wrapper in Core that bridges SDK defined custom 
refs with core serialization for deadline alerts.
   
   ## Motivation
   
   After PR #61118 moved deadline alert decoding from SDK to Core and 
introduced `SerializedReferenceModels`, custom deadline references stopped 
working. The decoder only looked up types in `SerializedReferenceModels`, but 
custom refs registered via the SDK's 
`DeadlineReference.register_custom_reference()` only existed in 
`ReferenceModels`. This caused "No reference class found with name: 
MyCustomRef" errors.
   
   ## What has changed?
   
   ### Core (`airflow-core/`)
   
   #### SerializedCustomReference wrapper
   - Added `SerializedReferenceModels.SerializedCustomReference` class in 
`serialization/definitions/deadline.py`
   - Wraps custom deadline references imported from user code
   - Implements the `evaluate_with()` logic since SDK's `BaseDeadlineReference` 
is lightweight wrapper without it
   - Delegates `_evaluate_with()` execution to the inner custom ref
   
   #### Decoder (`serialization/decoders.py`)
   - When `__class_path` is present in serialized data, uses 
`SerializedCustomReference.deserialize_reference()` to import and wrap the 
custom ref
   - Builtin types continue to use 
`SerializedReferenceModels.get_reference_class()` lookup
   
   #### DeadlineAlert model
   - `reference_class` property returns `SerializedCustomReference` when 
`__class_path` is present in the stored reference dict
   - Returns standard serialized types for built-ins
   
   #### Create deadline & prune logic (`serialization/definitions/dag.py`, 
`models/dagrun.py`)
   - No changes needed - already use `SerializedReferenceModels.TYPES.DAGRUN`
   - Custom refs now work because `SerializedCustomReference` is in that tuple
   
   ## Architecture with these changes
   
   ```shell
   SDK (task-sdk/) - DAG Authoring
   ├── deadline.py
   │ ├── DeadlineAlert (user-facing)
   │ ├── DeadlineReference (public interface)
   │ └── BaseDeadlineReference (lightweight, serialization only)
   Core (airflow-core/) - Scheduling/Execution
   ├── serialization/
   │ ├── encoders.py (adds class_path for custom types)
   │ ├── decoders.py (wraps custom types in SerializedCustomReference)
   │ └── definitions/deadline.py
   │ ├── SerializedReferenceModels
   │ │ ├── Built-in types (with evaluate_with)
   │ │ └── SerializedCustomReference (wrapper for custom refs)
   │ └── TYPES.DAGRUN (includes wrapper)
   ```
   
   
   ## How custom refs work now
   
   1.  Encoder adds `__class_path` for custom refs (no changes)
   2. Decoder sees `__class_path`, imports custom class, wraps in 
`SerializedCustomReference`
   3. `isinstance(ref, TYPES.DAGRUN)` → True (wrapper is in tuple) → deadline 
created
   4.  Wrapper's `evaluate_with()` validates kwargs, calls custom ref's 
`_evaluate_with()`, adds interval
   5. `reference_class in TYPES.DAGRUN` → True → deadline pruned on success
   
   ## Testing
   
   Define the custom reference in a plugin:
   ```python
   from airflow.sdk.definitions.deadline import DeadlineReference, 
deadline_reference, BaseDeadlineReference
   from airflow._shared.timezones import timezone
   
   
   @deadline_reference()
   class MyCustomRef(BaseDeadlineReference):
       """Custom ref: deadline is now + interval."""
   
       def _evaluate_with(self, *, session, **kwargs):
           return timezone.utcnow()
   
   
   DeadlineReference.register_custom_reference(MyCustomRef)
   
   async def on_deadline(**kwargs):
       print("Custom ref deadline exceeded", kwargs)
   
   ```
   
   DAG:
   ```python
   from datetime import datetime, timedelta
   
   from airflow import DAG
   from airflow.providers.standard.operators.bash import BashOperator
   from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, 
DeadlineReference
   
   from custom_deadline_refs import on_deadline
   
   
   with DAG(
       dag_id="testing-custom-reference-deadlines",
       start_date=datetime(2025, 1, 1),
       schedule=None,
       deadline=DeadlineAlert(
           reference=DeadlineReference.MyCustomRef,
           interval=timedelta(seconds=10),
           callback=AsyncCallback(on_deadline),
       ),
   ):
       BashOperator(task_id="example_task", bash_command="sleep 120")
   
   ```
   
   Result:
   <img width="1658" height="934" alt="image" 
src="https://github.com/user-attachments/assets/e655e3b4-35af-444a-8e69-93cbc52ccfe1";
 />
   
   
   <img width="2558" height="991" alt="image" 
src="https://github.com/user-attachments/assets/a6d67665-433b-4778-b5cb-601a82603f59";
 />
   
   
   
   
   ---
   
   * Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information. Note: commit author/co-author name and email in commits 
become permanently public when merged.
   * For fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   * When adding dependency, check compliance with the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   * For significant user-facing changes create newsfragment: 
`{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to