This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 46cfeee30c2b579b25de8806250a95068b33d69f Author: Ash Berlin-Taylor <[email protected]> AuthorDate: Tue Jun 22 14:42:42 2021 +0100 Add back-compat layer to clear_task_instances (#16582) It is unlikely that anyone is using this function directly, but it is easy for us to maintain compatibility, so we should (cherry picked from commit 5b0acfef87d609e4d5e11e10e878e41e6ea89302) --- airflow/models/taskinstance.py | 15 ++++++++++++++- airflow/typing_compat.py | 3 ++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index ae7eeef..b77f0d7 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -61,6 +61,7 @@ from airflow.sentry import Sentry from airflow.stats import Stats from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS +from airflow.typing_compat import Literal from airflow.utils import timezone from airflow.utils.email import send_email from airflow.utils.helpers import is_container @@ -133,8 +134,9 @@ def set_error_file(error_file: str, error: Union[str, Exception]) -> None: def clear_task_instances( tis, session, - dag_run_state: str = State.RUNNING, + activate_dag_runs=None, dag=None, + dag_run_state: Union[str, Literal[False]] = State.RUNNING, ): """ Clears a set of task instances, but makes sure the running ones @@ -145,6 +147,7 @@ def clear_task_instances( :param dag_run_state: state to set DagRun to. If set to False, dagrun state will not be changed. :param dag: DAG object + :param activate_dag_runs: Deprecated parameter, do not pass """ job_ids = [] task_id_by_key = defaultdict(lambda: defaultdict(lambda: defaultdict(set))) @@ -205,6 +208,16 @@ def clear_task_instances( for job in session.query(BaseJob).filter(BaseJob.id.in_(job_ids)).all(): # noqa job.state = State.SHUTDOWN + if activate_dag_runs is not None: + warnings.warn( + "`activate_dag_runs` parameter to clear_task_instances function is deprecated. " + "Please use `dag_run_state`", + DeprecationWarning, + stacklevel=2, + ) + if not activate_dag_runs: + dag_run_state = False + if dag_run_state is not False and tis: from airflow.models.dagrun import DagRun # Avoid circular import diff --git a/airflow/typing_compat.py b/airflow/typing_compat.py index d98eb7b..a207ef2 100644 --- a/airflow/typing_compat.py +++ b/airflow/typing_compat.py @@ -26,12 +26,13 @@ try: # python 3.8 we can safely remove this shim import after Airflow drops # support for <3.8 from typing import ( # type: ignore # noqa # pylint: disable=unused-import + Literal, Protocol, TypedDict, runtime_checkable, ) except ImportError: - from typing_extensions import Protocol, TypedDict, runtime_checkable # type: ignore # noqa + from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable # type: ignore # noqa # Before Py 3.7, there is no re.Pattern class
