Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi merged PR #50677: URL: https://github.com/apache/airflow/pull/50677 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on PR #50677: URL: https://github.com/apache/airflow/pull/50677#issuecomment-2920837957 @uranusjr if you are happy with it, can you approve when you get time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on PR #50677: URL: https://github.com/apache/airflow/pull/50677#issuecomment-2914568562 @ashb - When you get a second can you have a look and see if I broke this down right? The part of the code that the user directly interacts with (and might be imported into a DAG) now lives in `airflow/sdk/definitions/deadline.py` and everything else that I don't want them using directly is in `airflow/models/deadline.py`. I think when I asked about it previously, the breakdown was "required db" vs "non-db" but this feels like a more natural break for this... is it still alright or do i need to put it back to all in the sdk path except the _fetch_from_db method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2110590988 ## airflow-core/src/airflow/models/deadline.py: ## @@ -186,3 +159,52 @@ def serialize_deadline_alert(self): "callback_kwargs": self.callback_kwargs, } ) + + +@provide_session +def _fetch_from_db(model_reference: Column, session=None, **conditions) -> datetime: +""" +Fetch a datetime value from the database using the provided model reference and filtering conditions. + +For example, to fetch a TaskInstance's start_date: +_fetch_from_db( +TaskInstance.start_date, dag_id='example_dag', task_id='example_task', run_id='example_run' +) + +This generates SQL equivalent to: +SELECT start_date +FROM task_instance +WHERE dag_id = 'example_dag' +AND task_id = 'example_task' +AND run_id = 'example_run' + +:param model_reference: SQLAlchemy Column to select (e.g., DagRun.logical_date, TaskInstance.start_date) +:param conditions: Filtering conditions applied as equality comparisons in the WHERE clause. + Multiple conditions are combined with AND. +:param session: SQLAlchemy session (auto-provided by decorator) +""" +query = select(model_reference) + +for key, value in conditions.items(): +query = query.where(getattr(model_reference.class_, key) == value) + +compiled_query = query.compile(compile_kwargs={"literal_binds": True}) +pretty_query = "\n".join(str(compiled_query).splitlines()) +logger.debug( +"Executing query:\n%r\nAs SQL:\n%s", +query, +pretty_query, +) + +try: +result = session.scalar(query) +except SQLAlchemyError as e: +logger.error("Database query failed: (%s)", str(e)) Review Comment: Is there a similar way to simplify the block below that which is currently: ``` if result is None: message = f"No matching record found in the database for query:\n {pretty_query}" logger.error(message) raise ValueError(message) ``` I always felt python needed a "log then raise" method, but maybe there has been one all along? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2110516180 ## task-sdk/src/airflow/sdk/definitions/deadline_reference.py: ## Review Comment: For the first one, I think I maybe should move `models.deadline.DeadlineAlert` into the SDK as well since it is a direct user interface, which means it looks like I may have other use-cases where I want stuff in that path, so that works. I'll rename the module. For the second comment, I'm sorry, I need some more context. I don't really follow what you are suggesting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2110464534 ## airflow-core/src/airflow/models/deadline.py: ## @@ -186,3 +159,52 @@ def serialize_deadline_alert(self): "callback_kwargs": self.callback_kwargs, } ) + + +@provide_session +def _fetch_from_db(model_reference: Column, session=None, **conditions) -> datetime: +""" +Fetch a datetime value from the database using the provided model reference and filtering conditions. + +For example, to fetch a TaskInstance's start_date: +_fetch_from_db( +TaskInstance.start_date, dag_id='example_dag', task_id='example_task', run_id='example_run' +) + +This generates SQL equivalent to: +SELECT start_date +FROM task_instance +WHERE dag_id = 'example_dag' +AND task_id = 'example_task' +AND run_id = 'example_run' + +:param model_reference: SQLAlchemy Column to select (e.g., DagRun.logical_date, TaskInstance.start_date) +:param conditions: Filtering conditions applied as equality comparisons in the WHERE clause. + Multiple conditions are combined with AND. +:param session: SQLAlchemy session (auto-provided by decorator) +""" +query = select(model_reference) + +for key, value in conditions.items(): +query = query.where(getattr(model_reference.class_, key) == value) + +compiled_query = query.compile(compile_kwargs={"literal_binds": True}) +pretty_query = "\n".join(str(compiled_query).splitlines()) +logger.debug( +"Executing query:\n%r\nAs SQL:\n%s", +query, +pretty_query, +) + +try: +result = session.scalar(query) +except SQLAlchemyError as e: +logger.error("Database query failed: (%s)", str(e)) Review Comment: That's actually pretty great. I'll be honest, I hadn't realized log.exception worked that way. I'll get it done, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2110463728 ## airflow-core/src/airflow/models/deadline.py: ## @@ -100,35 +102,6 @@ def add_deadline(cls, deadline: Deadline, session: Session = NEW_SESSION): session.add(deadline) -class DeadlineReference(Enum): Review Comment: I don't think so. This was added it in the previous PR purely as a placeholder so I could give an example of how something else will be used and the note on L107 made that pretty clear. Nobody should have been using it for anything, I think it's safe to just drop it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
uranusjr commented on PR #50677: URL: https://github.com/apache/airflow/pull/50677#issuecomment-2910968578 Some minor comments above; none of them are blocking, jut personal opinions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
uranusjr commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2108066688 ## task-sdk/src/airflow/sdk/definitions/deadline_reference.py: ## Review Comment: Would it be a good idea to name this just `deadline`? This would match the core module, is shorter but still clear enough (I think). Also, it’s probably a good idea to expose DeadlineReference to `airflow.sdk` without needing to do subpackage import. It is a user-facing class, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
uranusjr commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2108062361 ## airflow-core/src/airflow/models/deadline.py: ## @@ -186,3 +159,52 @@ def serialize_deadline_alert(self): "callback_kwargs": self.callback_kwargs, } ) + + +@provide_session +def _fetch_from_db(model_reference: Column, session=None, **conditions) -> datetime: +""" +Fetch a datetime value from the database using the provided model reference and filtering conditions. + +For example, to fetch a TaskInstance's start_date: +_fetch_from_db( +TaskInstance.start_date, dag_id='example_dag', task_id='example_task', run_id='example_run' +) + +This generates SQL equivalent to: +SELECT start_date +FROM task_instance +WHERE dag_id = 'example_dag' +AND task_id = 'example_task' +AND run_id = 'example_run' + +:param model_reference: SQLAlchemy Column to select (e.g., DagRun.logical_date, TaskInstance.start_date) +:param conditions: Filtering conditions applied as equality comparisons in the WHERE clause. + Multiple conditions are combined with AND. +:param session: SQLAlchemy session (auto-provided by decorator) +""" +query = select(model_reference) + +for key, value in conditions.items(): +query = query.where(getattr(model_reference.class_, key) == value) + +compiled_query = query.compile(compile_kwargs={"literal_binds": True}) +pretty_query = "\n".join(str(compiled_query).splitlines()) +logger.debug( +"Executing query:\n%r\nAs SQL:\n%s", +query, +pretty_query, +) + +try: +result = session.scalar(query) +except SQLAlchemyError as e: +logger.error("Database query failed: (%s)", str(e)) Review Comment: ```suggestion logger.exception("Database query failed") ``` Would this be more useful? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
uranusjr commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2108061722 ## airflow-core/src/airflow/models/deadline.py: ## @@ -100,35 +102,6 @@ def add_deadline(cls, deadline: Deadline, session: Session = NEW_SESSION): session.add(deadline) -class DeadlineReference(Enum): Review Comment: Do we need to leave an import in this module for compatibility? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ramitkataria commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2107923552 ## airflow-core/src/airflow/models/deadline.py: ## @@ -183,3 +156,52 @@ def serialize_deadline_alert(self): "callback_kwargs": self.callback_kwargs, } ) + + +@provide_session +def _fetch_from_db(model_reference: Column, session=None, **conditions) -> datetime: +""" +Fetch a datetime value from the database using the provided model reference and filtering conditions. + +For example, to fetch a TaskInstance's start_date: +_fetch_from_db( +TaskInstance.start_date, dag_id='example_dag', task_id='example_task', run_id='example_run' +) + +This generates SQL equivalent to: +SELECT start_date +FROM task_instance +WHERE dag_id = 'example_dag' +AND task_id = 'example_task' +AND run_id = 'example_run' + +:param model_reference: SQLAlchemy Column to select (e.g., DagRun.logical_date, TaskInstance.start_date) +:param conditions: Filtering conditions applied as equality comparisons in the WHERE clause. + Multiple conditions are combined with AND. +:param session: SQLAlchemy session (auto-provided by decorator) +""" +query = select(model_reference) + +for key, value in conditions.items(): +query = query.where(getattr(model_reference.class_, key) == value) Review Comment: I also think your version is easier to read @ferruzzi -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on PR #50677: URL: https://github.com/apache/airflow/pull/50677#issuecomment-2905249533 ah, found [the email thread](https://lists.apache.org/thread/f7t5zl6t3t0s89rt37orfcv4966crojt) I was looking for; I always forget that the list's search function defaults to "newer than a month". Based on that discussion, I'm dropping the caplog part of the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2103865896 ## task-sdk/src/airflow/sdk/definitions/deadline_reference.py: ## @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime +from typing import Any + +from airflow.models.deadline import _fetch_from_db +from airflow.utils.log.logging_mixin import LoggingMixin + + +class BaseDeadlineReference(LoggingMixin, ABC): +"""Base class for all Deadline implementations.""" + +# Set of required kwargs - subclasses should override this. +required_kwargs: set[str] = set() + +def evaluate_with(self, **kwargs: Any) -> datetime: +"""Validate the provided kwargs and evaluate this deadline with the given conditions.""" +filtered_kwargs = {k: kwargs[k] for k in self.required_kwargs if k in kwargs} Review Comment: Yeah, should be functionally identical. I don't feel strongly either way, I'll go with yours. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2103862801 ## airflow-core/src/airflow/models/deadline.py: ## @@ -183,3 +156,52 @@ def serialize_deadline_alert(self): "callback_kwargs": self.callback_kwargs, } ) + + +@provide_session +def _fetch_from_db(model_reference: Column, session=None, **conditions) -> datetime: +""" +Fetch a datetime value from the database using the provided model reference and filtering conditions. + +For example, to fetch a TaskInstance's start_date: +_fetch_from_db( +TaskInstance.start_date, dag_id='example_dag', task_id='example_task', run_id='example_run' +) + +This generates SQL equivalent to: +SELECT start_date +FROM task_instance +WHERE dag_id = 'example_dag' +AND task_id = 'example_task' +AND run_id = 'example_run' + +:param model_reference: SQLAlchemy Column to select (e.g., DagRun.logical_date, TaskInstance.start_date) +:param conditions: Filtering conditions applied as equality comparisons in the WHERE clause. + Multiple conditions are combined with AND. +:param session: SQLAlchemy session (auto-provided by decorator) +""" +query = select(model_reference) + +for key, value in conditions.items(): +query = query.where(getattr(model_reference.class_, key) == value) Review Comment: Do you feel strongly about this one? I find your version harder to read for some reason, but they are functionally equivalent so I can do it if you want. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
uranusjr commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2103584921 ## airflow-core/src/airflow/models/deadline.py: ## @@ -183,3 +156,52 @@ def serialize_deadline_alert(self): "callback_kwargs": self.callback_kwargs, } ) + + +@provide_session +def _fetch_from_db(model_reference: Column, session=None, **conditions) -> datetime: +""" +Fetch a datetime value from the database using the provided model reference and filtering conditions. + +For example, to fetch a TaskInstance's start_date: +_fetch_from_db( +TaskInstance.start_date, dag_id='example_dag', task_id='example_task', run_id='example_run' +) + +This generates SQL equivalent to: +SELECT start_date +FROM task_instance +WHERE dag_id = 'example_dag' +AND task_id = 'example_task' +AND run_id = 'example_run' + +:param model_reference: SQLAlchemy Column to select (e.g., DagRun.logical_date, TaskInstance.start_date) +:param conditions: Filtering conditions applied as equality comparisons in the WHERE clause. + Multiple conditions are combined with AND. +:param session: SQLAlchemy session (auto-provided by decorator) +""" +query = select(model_reference) + +for key, value in conditions.items(): +query = query.where(getattr(model_reference.class_, key) == value) Review Comment: ```suggestion query = query.where(*(getattr(model_reference.class_, k) == v for for k, v in conditions.items())) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
uranusjr commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2103588710 ## task-sdk/src/airflow/sdk/definitions/deadline_reference.py: ## @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime +from typing import Any + +from airflow.models.deadline import _fetch_from_db +from airflow.utils.log.logging_mixin import LoggingMixin + + +class BaseDeadlineReference(LoggingMixin, ABC): +"""Base class for all Deadline implementations.""" + +# Set of required kwargs - subclasses should override this. +required_kwargs: set[str] = set() + +def evaluate_with(self, **kwargs: Any) -> datetime: +"""Validate the provided kwargs and evaluate this deadline with the given conditions.""" +filtered_kwargs = {k: kwargs[k] for k in self.required_kwargs if k in kwargs} Review Comment: ```suggestion filtered_kwargs = {k: v for k, v in kwargs.items() if k in self.required_kwargs} ``` I think this is equivalent and easier to read? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2103372721 ## airflow-core/src/airflow/models/deadline.py: ## @@ -183,3 +156,36 @@ def serialize_deadline_alert(self): "callback_kwargs": self.callback_kwargs, } ) + + +@provide_session +def _fetch_from_db(model_reference: Column, session=None, **conditions) -> datetime: +""" +Fetch a datetime stored in the database. + +:param model_reference: SQLAlchemy Column reference (e.g., DagRun.logical_date, TaskInstance.queued_dttm, etc.) +:param conditions: Key-value pairs which are passed to the WHERE clause. + +:param session: SQLAlchemy session (provided by decorator) +""" +query = select(model_reference) + +for key, value in conditions.items(): +query = query.where(getattr(model_reference.class_, key) == value) + +# This should build a query similar to: +# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == dag_id)) +logger.debug("db query: session.scalar(%s)", query) + +try: +result = session.scalar(query) +except SQLAlchemyError as e: +logger.error("Database query failed: (%s)", str(e)) +raise + +if result is None: +message = "No matching record found in the database." +logger.error(message) Review Comment: I've updated all messaging and the docstring for this method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2103304812 ## task-sdk/src/airflow/sdk/definitions/deadline_reference.py: ## @@ -0,0 +1,136 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime +from typing import Any + +from airflow.models.deadline import _fetch_from_db +from airflow.utils.log.logging_mixin import LoggingMixin + + +class BaseDeadlineReference(LoggingMixin, ABC): +"""Base class for all Deadline implementations.""" + +# Whether the evaluation requires conditions. +requires_conditions = True + +# Set of required kwargs - subclasses should override this. +required_kwargs: set[str] = set() + +def evaluate_with(self, **kwargs: Any) -> datetime: +"""Validate the provided kwargs and evaluate this deadline with the given conditions.""" +missing_kwargs = self.required_kwargs - set(kwargs.keys()) +if missing_kwargs: +raise ValueError(f"Missing required parameters: {', '.join(missing_kwargs)}") Review Comment: I've added more robust kwarg verification both ways (missing and extra), thanks for the suggestion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2103303252 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() Review Comment: Alright, I have dropped evaluate(). It was overcomplicating things without adding any real value. We can always add it back in later if people want it. Resolving this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on PR #50677: URL: https://github.com/apache/airflow/pull/50677#issuecomment-2902213236 hmm. Task SDK tests failing here b7ut passing locally in Breeze I'll have to look into that I guess.  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2102812008 ## task-sdk/src/airflow/sdk/definitions/deadline_reference.py: ## @@ -0,0 +1,136 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime +from typing import Any + +from airflow.models.deadline import _fetch_from_db +from airflow.utils.log.logging_mixin import LoggingMixin + + +class BaseDeadlineReference(LoggingMixin, ABC): +"""Base class for all Deadline implementations.""" + +# Whether the evaluation requires conditions. +requires_conditions = True + +# Set of required kwargs - subclasses should override this. +required_kwargs: set[str] = set() + +def evaluate_with(self, **kwargs: Any) -> datetime: +"""Validate the provided kwargs and evaluate this deadline with the given conditions.""" +missing_kwargs = self.required_kwargs - set(kwargs.keys()) +if missing_kwargs: +raise ValueError(f"Missing required parameters: {', '.join(missing_kwargs)}") + +return self._evaluate_with(**kwargs) + +@abstractmethod +def _evaluate_with(self, **kwargs: Any) -> datetime: +"""Must be implemented by subclasses to perform the actual evaluation.""" +raise NotImplementedError + +def evaluate(self) -> datetime: +"""Evaluate this deadline with no parameters.""" +if self.requires_conditions: +raise AttributeError("This deadline requires additional conditions, use evaluate_with() instead.") +return self.evaluate_with() Review Comment: Responding to your comment below about using "context" there, maybe for now I'll try just dropping `evaluate()` and leave `evaluate_with()` as it is (without "context") and see how we feel about that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2102798341 ## task-sdk/src/airflow/sdk/definitions/deadline_reference.py: ## @@ -0,0 +1,136 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime +from typing import Any + +from airflow.models.deadline import _fetch_from_db +from airflow.utils.log.logging_mixin import LoggingMixin + + +class BaseDeadlineReference(LoggingMixin, ABC): +"""Base class for all Deadline implementations.""" + +# Whether the evaluation requires conditions. +requires_conditions = True + +# Set of required kwargs - subclasses should override this. +required_kwargs: set[str] = set() + +def evaluate_with(self, **kwargs: Any) -> datetime: +"""Validate the provided kwargs and evaluate this deadline with the given conditions.""" +missing_kwargs = self.required_kwargs - set(kwargs.keys()) +if missing_kwargs: +raise ValueError(f"Missing required parameters: {', '.join(missing_kwargs)}") + +return self._evaluate_with(**kwargs) + +@abstractmethod +def _evaluate_with(self, **kwargs: Any) -> datetime: +"""Must be implemented by subclasses to perform the actual evaluation.""" +raise NotImplementedError + +def evaluate(self) -> datetime: +"""Evaluate this deadline with no parameters.""" +if self.requires_conditions: +raise AttributeError("This deadline requires additional conditions, use evaluate_with() instead.") +return self.evaluate_with() Review Comment: Yeah, it felt really strange using `reference.evaluate_with()` without anything "with" but I'm likely just overthinking that. I could drop it and make things more streamlined if you really don't like it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2102798341 ## task-sdk/src/airflow/sdk/definitions/deadline_reference.py: ## @@ -0,0 +1,136 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime +from typing import Any + +from airflow.models.deadline import _fetch_from_db +from airflow.utils.log.logging_mixin import LoggingMixin + + +class BaseDeadlineReference(LoggingMixin, ABC): +"""Base class for all Deadline implementations.""" + +# Whether the evaluation requires conditions. +requires_conditions = True + +# Set of required kwargs - subclasses should override this. +required_kwargs: set[str] = set() + +def evaluate_with(self, **kwargs: Any) -> datetime: +"""Validate the provided kwargs and evaluate this deadline with the given conditions.""" +missing_kwargs = self.required_kwargs - set(kwargs.keys()) +if missing_kwargs: +raise ValueError(f"Missing required parameters: {', '.join(missing_kwargs)}") + +return self._evaluate_with(**kwargs) + +@abstractmethod +def _evaluate_with(self, **kwargs: Any) -> datetime: +"""Must be implemented by subclasses to perform the actual evaluation.""" +raise NotImplementedError + +def evaluate(self) -> datetime: +"""Evaluate this deadline with no parameters.""" +if self.requires_conditions: +raise AttributeError("This deadline requires additional conditions, use evaluate_with() instead.") +return self.evaluate_with() Review Comment: Yeah, it felt really strange using `DeadlineReference.FIXED_DATETIME(DEFAULT_DATE).evaluate_with()` without anything "with" but I'm likely just overthinking that. I could drop it and make things more streamlined if you really don't like it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
uranusjr commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2101687744 ## task-sdk/src/airflow/sdk/definitions/deadline_reference.py: ## @@ -0,0 +1,136 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime +from typing import Any + +from airflow.models.deadline import _fetch_from_db +from airflow.utils.log.logging_mixin import LoggingMixin + + +class BaseDeadlineReference(LoggingMixin, ABC): +"""Base class for all Deadline implementations.""" + +# Whether the evaluation requires conditions. +requires_conditions = True + +# Set of required kwargs - subclasses should override this. +required_kwargs: set[str] = set() + +def evaluate_with(self, **kwargs: Any) -> datetime: +"""Validate the provided kwargs and evaluate this deadline with the given conditions.""" +missing_kwargs = self.required_kwargs - set(kwargs.keys()) +if missing_kwargs: +raise ValueError(f"Missing required parameters: {', '.join(missing_kwargs)}") + +return self._evaluate_with(**kwargs) + +@abstractmethod +def _evaluate_with(self, **kwargs: Any) -> datetime: +"""Must be implemented by subclasses to perform the actual evaluation.""" +raise NotImplementedError + +def evaluate(self) -> datetime: +"""Evaluate this deadline with no parameters.""" +if self.requires_conditions: +raise AttributeError("This deadline requires additional conditions, use evaluate_with() instead.") +return self.evaluate_with() Review Comment: I feel this function is not necessary. We could just always use `evaluate_with()` directly. Probably not as pretty, but this is not supposed to be user-facing anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
uranusjr commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2101686150 ## task-sdk/src/airflow/sdk/definitions/deadline_reference.py: ## @@ -0,0 +1,136 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime +from typing import Any + +from airflow.models.deadline import _fetch_from_db +from airflow.utils.log.logging_mixin import LoggingMixin + + +class BaseDeadlineReference(LoggingMixin, ABC): +"""Base class for all Deadline implementations.""" + +# Whether the evaluation requires conditions. +requires_conditions = True + +# Set of required kwargs - subclasses should override this. +required_kwargs: set[str] = set() + +def evaluate_with(self, **kwargs: Any) -> datetime: +"""Validate the provided kwargs and evaluate this deadline with the given conditions.""" +missing_kwargs = self.required_kwargs - set(kwargs.keys()) +if missing_kwargs: +raise ValueError(f"Missing required parameters: {', '.join(missing_kwargs)}") Review Comment: ```suggestion if missing_kwargs := {k for k in self.required_kwargs if k not in kwargs}: raise ValueError(f"Missing required parameters: {', '.join(missing_kwargs)}") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
uranusjr commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2101683462 ## airflow-core/src/airflow/models/deadline.py: ## @@ -183,3 +156,36 @@ def serialize_deadline_alert(self): "callback_kwargs": self.callback_kwargs, } ) + + +@provide_session +def _fetch_from_db(model_reference: Column, session=None, **conditions) -> datetime: +""" +Fetch a datetime stored in the database. + +:param model_reference: SQLAlchemy Column reference (e.g., DagRun.logical_date, TaskInstance.queued_dttm, etc.) +:param conditions: Key-value pairs which are passed to the WHERE clause. + +:param session: SQLAlchemy session (provided by decorator) +""" +query = select(model_reference) + +for key, value in conditions.items(): +query = query.where(getattr(model_reference.class_, key) == value) + +# This should build a query similar to: +# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == dag_id)) +logger.debug("db query: session.scalar(%s)", query) + +try: +result = session.scalar(query) +except SQLAlchemyError as e: +logger.error("Database query failed: (%s)", str(e)) +raise + +if result is None: +message = "No matching record found in the database." +logger.error(message) Review Comment: We should include a bit more context here, such of what model and field we failed to fetch from. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2101542705 ## airflow-core/src/airflow/models/deadline_reference.py: ## Review Comment: Alright, I think I've done that. @ashb , If you happen to get time to review that I've made it work with Task SDK correctly, Id appreciate that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2101043739 ## airflow-core/src/airflow/models/deadline_reference.py: ## Review Comment: Just asked Ash and he says you are right, I'll make the change. ``` Anything used in dag code:sdk Anything only used to evaluate/check deadlines: core ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2101043739 ## airflow-core/src/airflow/models/deadline_reference.py: ## Review Comment: Just asked Ash and he says you are right, I'll make the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2100736613 ## airflow-core/src/airflow/models/deadline_reference.py: ## Review Comment: I honestly have no idea? Maybe? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2100697163 ## airflow-core/src/airflow/models/deadline_reference.py: ## @@ -0,0 +1,162 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from enum import Enum + +from sqlalchemy import Column, select +from sqlalchemy.exc import SQLAlchemyError + +from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.session import provide_session + + +class CalculatedDeadline(LoggingMixin, Enum): +""" +Implementation class for deadlines which are calculated at runtime. + +Do not instantiate directly. Instead, use DeadlineReference: + +deadline = DeadlineAlert( +reference=DeadlineReference.DAGRUN_LOGICAL_DATE, +interval=timedelta(hours=1), +callback=hello_callback, +) +""" + +DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() + +@provide_session +def _fetch_from_db(self, model_reference: Column, session=None, **conditions) -> datetime: +""" +Fetch a datetime stored in the database. + +:param model_reference: SQLAlchemy Column reference (e.g., DagRun.logical_date, TaskInstance.queued_dttm, etc.) +:param conditions: Key-value pairs which are passed to the WHERE clause. + +:param session: SQLAlchemy session (provided by decorator) +""" +query = select(model_reference) + +for key, value in conditions.items(): +query = query.where(getattr(model_reference.class_, key) == value) + +# This should build a query similar to: +# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == dag_id)) +self.log.debug("db query: session.scalar(%s)", query) + +try: +result = session.scalar(query) +except SQLAlchemyError as e: +self.log.error("Database query failed: (%s)", str(e)) +raise + +if result is None: +message = "No matching record found in the database." +self.log.error(message) +raise ValueError(message) + +return result + +def dagrun_logical_date(self, dag_id: str) -> datetime: +from airflow.models import DagRun + +return self._fetch_from_db(DagRun.logical_date, dag_id=dag_id) + +def dagrun_queued_at(self, dag_id: str) -> datetime: +from airflow.models import DagRun + +return self._fetch_from_db(DagRun.queued_at, dag_id=dag_id) + + +@dataclass +class FixedDatetimeDeadline(LoggingMixin): +"""Implementation class for fixed datetime deadlines.""" + +_datetime: datetime + +def evaluate_with(self, **kwargs) -> datetime: +""" +Evaluate this deadline reference with the given kwargs. + +Ignores all kwargs as fixed deadlines don't need any parameters. +""" +if kwargs: +self.log.debug("Fixed Datetime Deadlines do not accept conditions, ignoring kwargs: %s", kwargs) +return self._datetime + +def evaluate(self) -> datetime: +"""Evaluate this deadline reference with no parameters.""" +return self.evaluate_with() + + +class DeadlineReference: +""" +The public interface class for all DeadlineReference options. + +This class provides a unified interface for working with Deadlines, supporting both +calculated deadlines (which fetch values from the database) and fixed deadlines +(which return a predefined datetime). + +-- +Usage: +- -- Review Comment: Thanks, fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment.
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2099436549 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() + +@provide_session +def _fetch_from_db(self, model_class: Base, column: str, session=None, **conditions) -> datetime: +""" +Fetch a datetime stored in the database. + +:param model_class: The Airflow model class (e.g., DagRun, TaskInstance, etc.) +:param column: The column name to fetch +:param session: SQLAlchemy session (provided by decorator) + +:param conditions: Key-value pairs which are passed to the WHERE clause +""" +query = select(getattr(model_class, column)) + +for key, value in conditions.items(): +query = query.where(getattr(model_class, key) == value) +# This should build a query similar to: +# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == dag_id)) +self.log.debug("db query: session.scalar(%s)", query) +return session.scalar(query) Review Comment: Thinking on this more, I am going to roll that back. If no match is found then that should be a failure case. There is no way that I can think of to recover from that and an expected alarm will not be going off later. Changing the log messages that i added into exceptions in a coming commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on PR #50677: URL: https://github.com/apache/airflow/pull/50677#issuecomment-2896675827 Yeah, I don't like some of those changes in the last commit. I have another commit coming which rolls some of those changes back and also implement TP's suggestion. It made the models/deadline.py somewhat complicated so I moved these changes (and the relevant tests) into their own file(s), which is going to break your comment links. Sorry in advance. I'll push the new commit once mypy and static checks pass locally -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
uranusjr commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2099452935 ## airflow-core/src/airflow/models/deadline_reference.py: ## @@ -0,0 +1,162 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from enum import Enum + +from sqlalchemy import Column, select +from sqlalchemy.exc import SQLAlchemyError + +from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.session import provide_session + + +class CalculatedDeadline(LoggingMixin, Enum): +""" +Implementation class for deadlines which are calculated at runtime. + +Do not instantiate directly. Instead, use DeadlineReference: + +deadline = DeadlineAlert( +reference=DeadlineReference.DAGRUN_LOGICAL_DATE, +interval=timedelta(hours=1), +callback=hello_callback, +) +""" + +DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() + +@provide_session +def _fetch_from_db(self, model_reference: Column, session=None, **conditions) -> datetime: +""" +Fetch a datetime stored in the database. + +:param model_reference: SQLAlchemy Column reference (e.g., DagRun.logical_date, TaskInstance.queued_dttm, etc.) +:param conditions: Key-value pairs which are passed to the WHERE clause. + +:param session: SQLAlchemy session (provided by decorator) +""" +query = select(model_reference) + +for key, value in conditions.items(): +query = query.where(getattr(model_reference.class_, key) == value) + +# This should build a query similar to: +# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == dag_id)) +self.log.debug("db query: session.scalar(%s)", query) + +try: +result = session.scalar(query) +except SQLAlchemyError as e: +self.log.error("Database query failed: (%s)", str(e)) +raise + +if result is None: +message = "No matching record found in the database." +self.log.error(message) +raise ValueError(message) + +return result + +def dagrun_logical_date(self, dag_id: str) -> datetime: +from airflow.models import DagRun + +return self._fetch_from_db(DagRun.logical_date, dag_id=dag_id) + +def dagrun_queued_at(self, dag_id: str) -> datetime: +from airflow.models import DagRun + +return self._fetch_from_db(DagRun.queued_at, dag_id=dag_id) + + +@dataclass +class FixedDatetimeDeadline(LoggingMixin): +"""Implementation class for fixed datetime deadlines.""" + +_datetime: datetime + +def evaluate_with(self, **kwargs) -> datetime: +""" +Evaluate this deadline reference with the given kwargs. + +Ignores all kwargs as fixed deadlines don't need any parameters. +""" +if kwargs: +self.log.debug("Fixed Datetime Deadlines do not accept conditions, ignoring kwargs: %s", kwargs) +return self._datetime + +def evaluate(self) -> datetime: +"""Evaluate this deadline reference with no parameters.""" +return self.evaluate_with() + + +class DeadlineReference: +""" +The public interface class for all DeadlineReference options. + +This class provides a unified interface for working with Deadlines, supporting both +calculated deadlines (which fetch values from the database) and fixed deadlines +(which return a predefined datetime). + +-- +Usage: +- -- Review Comment: ```suggestion --- ``` Acciedntal? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the UR
Re: [PR] AIP-86 - Add Deadline References [airflow]
uranusjr commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2099456345 ## airflow-core/src/airflow/models/deadline_reference.py: ## Review Comment: I wonder if this module should be put in the task sdk instead. `_fetch_from_db` can be moved to `airflow.models.deadline` and imported inside `evaluate`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2099437979 ## airflow-core/src/airflow/models/deadline.py: ## @@ -97,11 +98,9 @@ def add_deadline(cls, deadline: Deadline, session: Session = NEW_SESSION): session.add(deadline) -class DeadlineReference(Enum): +class DeadlineReference(LoggingMixin, Enum): Review Comment: Let me know what you think about the new implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2098874986 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() + +@provide_session +def _fetch_from_db(self, model_class: Base, column: str, session=None, **conditions) -> datetime: +""" +Fetch a datetime stored in the database. + +:param model_class: The Airflow model class (e.g., DagRun, TaskInstance, etc.) +:param column: The column name to fetch +:param session: SQLAlchemy session (provided by decorator) + +:param conditions: Key-value pairs which are passed to the WHERE clause +""" +query = select(getattr(model_class, column)) + +for key, value in conditions.items(): +query = query.where(getattr(model_class, key) == value) +# This should build a query similar to: +# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == dag_id)) +self.log.debug("db query: session.scalar(%s)", query) +return session.scalar(query) Review Comment: Added a whole bunch of error messaging there, along with the tests to go with it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2098744014 ## airflow-core/src/airflow/models/deadline.py: ## @@ -97,11 +98,9 @@ def add_deadline(cls, deadline: Deadline, session: Session = NEW_SESSION): session.add(deadline) -class DeadlineReference(Enum): +class DeadlineReference(LoggingMixin, Enum): Review Comment: That's an interesting approach, thanks for the suggestion. Let me see what I can work out along those lines. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2098745558 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() Review Comment: Maybe, but then I feel like just calling it evaluate() misses the implied expectation that there usually should be a condition there? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
uranusjr commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2098736749 ## airflow-core/src/airflow/models/deadline.py: ## @@ -97,11 +98,9 @@ def add_deadline(cls, deadline: Deadline, session: Session = NEW_SESSION): session.add(deadline) -class DeadlineReference(Enum): +class DeadlineReference(LoggingMixin, Enum): Review Comment: This class is now awfully complex as an enum, and also does what normal enums don’t generally do (contain an arbitrary value via `FIXED_DATETIME`). Maybe we should split this into two classes like this ```python class DefinedDeadline(Enum): DAGRUN_LOGICAL_DATE = ... DAGRUN_QUEUED_AT = ... def evaluate(self, **kwargs): ... @dataclass class FixedDeadline: at: datetime def evaluate(self, **kwargs): ... class DeadlineReference: DAGRUN_LOGICAL_DATE = DefinedDeadline.DAGRUN_LOGICAL_DATE DAGRUN_QUEUED_AT = DefinedDeadline.DAGRUN_QUEUED_AT FIXED_DATETIME = FixedDeadline ``` ## airflow-core/src/airflow/models/deadline.py: ## @@ -97,11 +98,9 @@ def add_deadline(cls, deadline: Deadline, session: Session = NEW_SESSION): session.add(deadline) -class DeadlineReference(Enum): +class DeadlineReference(LoggingMixin, Enum): Review Comment: This class is now awfully complex as an enum, and also does what normal enums don’t generally do (contain an arbitrary value via `FIXED_DATETIME`). Maybe we should split this into two classes like this ```python class DefinedDeadline(Enum): DAGRUN_LOGICAL_DATE = ... DAGRUN_QUEUED_AT = ... def evaluate(self, **kwargs): ... @dataclass class FixedDeadline: at: datetime def evaluate(self, **kwargs): ... class DeadlineReference: DAGRUN_LOGICAL_DATE = DefinedDeadline.DAGRUN_LOGICAL_DATE DAGRUN_QUEUED_AT = DefinedDeadline.DAGRUN_QUEUED_AT FIXED_DATETIME = FixedDeadline ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2098560679 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() + +@provide_session +def _fetch_from_db(self, model_class: Base, column: str, session=None, **conditions) -> datetime: Review Comment: Thanks. I'll leave this open for others to weigh in, but consider it a non-blocking comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
1fanwang commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2098535047 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() + +@provide_session +def _fetch_from_db(self, model_class: Base, column: str, session=None, **conditions) -> datetime: +""" +Fetch a datetime stored in the database. + +:param model_class: The Airflow model class (e.g., DagRun, TaskInstance, etc.) +:param column: The column name to fetch +:param session: SQLAlchemy session (provided by decorator) + +:param conditions: Key-value pairs which are passed to the WHERE clause +""" +query = select(getattr(model_class, column)) + +for key, value in conditions.items(): +query = query.where(getattr(model_class, key) == value) +# This should build a query similar to: +# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == dag_id)) +self.log.debug("db query: session.scalar(%s)", query) Review Comment: TIL - turns out this is for backwards compatibility https://docs.python.org/3/howto/logging.html#logging-variable-data thanks for sharing the screenshot (and I should get into the habit of trying things out and building in the airflow repo myself :p) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
1fanwang commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2098547629 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() Review Comment: how about just call it `evaluate` with optional args (instead of `evaluate_with`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
1fanwang commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2098545729 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() + +@provide_session +def _fetch_from_db(self, model_class: Base, column: str, session=None, **conditions) -> datetime: +""" +Fetch a datetime stored in the database. + +:param model_class: The Airflow model class (e.g., DagRun, TaskInstance, etc.) +:param column: The column name to fetch +:param session: SQLAlchemy session (provided by decorator) + +:param conditions: Key-value pairs which are passed to the WHERE clause +""" +query = select(getattr(model_class, column)) + +for key, value in conditions.items(): +query = query.where(getattr(model_class, key) == value) +# This should build a query similar to: +# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == dag_id)) +self.log.debug("db query: session.scalar(%s)", query) +return session.scalar(query) Review Comment: agreed this should not happen (rare), and shouldn't be fatal since technically things didn't fail. log warn makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
1fanwang commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2098543605 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() + +@provide_session +def _fetch_from_db(self, model_class: Base, column: str, session=None, **conditions) -> datetime: Review Comment: no strong opinion on this makes sense if we expect to be reusing this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
1fanwang commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2098535047 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() + +@provide_session +def _fetch_from_db(self, model_class: Base, column: str, session=None, **conditions) -> datetime: +""" +Fetch a datetime stored in the database. + +:param model_class: The Airflow model class (e.g., DagRun, TaskInstance, etc.) +:param column: The column name to fetch +:param session: SQLAlchemy session (provided by decorator) + +:param conditions: Key-value pairs which are passed to the WHERE clause +""" +query = select(getattr(model_class, column)) + +for key, value in conditions.items(): +query = query.where(getattr(model_class, key) == value) +# This should build a query similar to: +# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == dag_id)) +self.log.debug("db query: session.scalar(%s)", query) Review Comment: TIL - turns out this is for backwards compatibility thanks for sharing the screenshot (and I should get into the habit of trying things out and building in the airflow repo myself :p) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2098520059 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() Review Comment: Any opinions on this? I honestly only added it because `DeadlineReference.FIXED_DATETIME(tomorrow).evaluate_with()` looked awkward, but maybe it's a bad idea to add the extra method? I feel like the `with()` implies something is required, but maybe I'm just overthinking it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2098509477 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() + +@provide_session +def _fetch_from_db(self, model_class: Base, column: str, session=None, **conditions) -> datetime: +""" +Fetch a datetime stored in the database. + +:param model_class: The Airflow model class (e.g., DagRun, TaskInstance, etc.) +:param column: The column name to fetch +:param session: SQLAlchemy session (provided by decorator) + +:param conditions: Key-value pairs which are passed to the WHERE clause +""" +query = select(getattr(model_class, column)) + +for key, value in conditions.items(): +query = query.where(getattr(model_class, key) == value) +# This should build a query similar to: +# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == dag_id)) +self.log.debug("db query: session.scalar(%s)", query) +return session.scalar(query) + +def dagrun_logical_date(self, dag_id: str) -> datetime: +from airflow.models import DagRun + +return self._fetch_from_db(DagRun, "logical_date", dag_id=dag_id) Review Comment: Great suggestion, thanks. Change coming. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2098467290 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() + +@provide_session +def _fetch_from_db(self, model_class: Base, column: str, session=None, **conditions) -> datetime: +""" +Fetch a datetime stored in the database. + +:param model_class: The Airflow model class (e.g., DagRun, TaskInstance, etc.) +:param column: The column name to fetch +:param session: SQLAlchemy session (provided by decorator) + +:param conditions: Key-value pairs which are passed to the WHERE clause +""" +query = select(getattr(model_class, column)) + +for key, value in conditions.items(): +query = query.where(getattr(model_class, key) == value) +# This should build a query similar to: +# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == dag_id)) +self.log.debug("db query: session.scalar(%s)", query) Review Comment: This time I'm not lying. This is where the Airflow CI forces us to not use f-strings.   -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2098432341 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() + +@provide_session +def _fetch_from_db(self, model_class: Base, column: str, session=None, **conditions) -> datetime: +""" +Fetch a datetime stored in the database. + +:param model_class: The Airflow model class (e.g., DagRun, TaskInstance, etc.) +:param column: The column name to fetch +:param session: SQLAlchemy session (provided by decorator) + +:param conditions: Key-value pairs which are passed to the WHERE clause +""" +query = select(getattr(model_class, column)) + +for key, value in conditions.items(): +query = query.where(getattr(model_class, key) == value) +# This should build a query similar to: +# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == dag_id)) +self.log.debug("db query: session.scalar(%s)", query) +return session.scalar(query) Review Comment: I'm not sure how that would come to pass, but I suppose extra checks can't hurt. What do you think? If you set a deadline that is one hour after the dagrun is queued, and the system can't find the queued time, what would you expect it to do? I'm thinking a log.warn maybe? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2098428231 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" Review Comment: Mostly just for reminding me to proof-read my PRs. :sweat: That was left over from some testing I did where I needed a Reference that wasn't related to the dagrun before I settled on the `object.__new__` version of fixed_datetime. I'll remove it. Thanks for catching that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2098415699 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() + +@provide_session +def _fetch_from_db(self, model_class: Base, column: str, session=None, **conditions) -> datetime: Review Comment: I debated that a lot myself. If anyone feels strongly, then I can drop it and the two current cases where it is used can be reverted to session.scalar calls. Keep in mind that this is intended to be reused by any number of future References and may or may not be more useful with those. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
1fanwang commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2093554618 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" Review Comment: curious what we plan to use `_CUSTOM_REFERENCE_BASE` for -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
vincbeck commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2093561653 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) Review Comment: Oh it is a an enum! My bad :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
1fanwang commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2093560619 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() + +@provide_session +def _fetch_from_db(self, model_class: Base, column: str, session=None, **conditions) -> datetime: +""" +Fetch a datetime stored in the database. + +:param model_class: The Airflow model class (e.g., DagRun, TaskInstance, etc.) +:param column: The column name to fetch +:param session: SQLAlchemy session (provided by decorator) + +:param conditions: Key-value pairs which are passed to the WHERE clause +""" +query = select(getattr(model_class, column)) + +for key, value in conditions.items(): +query = query.where(getattr(model_class, key) == value) +# This should build a query similar to: +# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == dag_id)) +self.log.debug("db query: session.scalar(%s)", query) +return session.scalar(query) Review Comment: What's the intended behavior if no matching records found do we need to handle existence / check if None here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
1fanwang commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2093547767 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() + +@provide_session +def _fetch_from_db(self, model_class: Base, column: str, session=None, **conditions) -> datetime: +""" +Fetch a datetime stored in the database. + +:param model_class: The Airflow model class (e.g., DagRun, TaskInstance, etc.) +:param column: The column name to fetch +:param session: SQLAlchemy session (provided by decorator) + +:param conditions: Key-value pairs which are passed to the WHERE clause +""" +query = select(getattr(model_class, column)) + +for key, value in conditions.items(): +query = query.where(getattr(model_class, key) == value) +# This should build a query similar to: +# session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == dag_id)) +self.log.debug("db query: session.scalar(%s)", query) +return session.scalar(query) + +def dagrun_logical_date(self, dag_id: str) -> datetime: +from airflow.models import DagRun + +return self._fetch_from_db(DagRun, "logical_date", dag_id=dag_id) Review Comment: nit: consider using attribute reference from models to avoid magic/hard-coded string as col name here getting out of sync "logical_date" -> `DagRun.logical_date` https://github.com/apache/airflow/blob/af06798ccc5d320e6174af9d4c7b10a7333f36e8/airflow-core/src/airflow/api/common/mark_tasks.py#L151 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
1fanwang commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2093554618 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" Review Comment: curious what we plan to use this for -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-86 - Add Deadline References [airflow]
1fanwang commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2093551012 ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) + +def evaluate(self): +"""Call evaluate_with() without any conditions, because it looks strange in use that way.""" +return self.evaluate_with() + +@provide_session +def _fetch_from_db(self, model_class: Base, column: str, session=None, **conditions) -> datetime: Review Comment: just for my learning: do we need this layer of abstraction? seems like we are mostly giving one condition (dag id specific) ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +return getattr(self, self.value)(**kwargs) Review Comment: I think that's inherited from Enum https://github.com/python/cpython/blob/main/Lib/enum.py ## airflow-core/src/airflow/models/deadline.py: ## @@ -123,7 +122,71 @@ class DeadlineReference(Enum): dag.deadline.reference.evaluate_with(dag_id=dag.dag_id) """ +# Available References. +# +# The value is the name of the method executed to fetch the datetime +# value for the given Reference. For example DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +# will execute dagrun_logical_date() to find the dagrun's logical date. DAGRUN_LOGICAL_DATE = "dagrun_logical_date" +DAGRUN_QUEUED_AT = "dagrun_queued_at" +_CUSTOM_REFERENCE_BASE = "_custom_reference" + +def __init__(self, value): +self._fixed_dt = None # Initialize the storage for fixed datetime +super().__init__() + +@classmethod +def FIXED_DATETIME(cls, dt: datetime): +""" +Calculate a reference based on a set datetime rather than fetching a value from the database. + +For example, you could set the Deadline for "tomorrow before 9AM" by +providing the appropriate datetime object." +""" +instance = object.__new__(cls) +instance._value_ = "fixed_datetime" +instance._fixed_dt = dt +return instance + +def evaluate_with(self, **kwargs): +"""Call the method in the enum's value with the provided kwargs.""" +if self._fixed_dt: +return self._fixed_dt +