This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 2193be2a5e5 Fix failures on main related to DagRun validation (#45917) 2193be2a5e5 is described below commit 2193be2a5e53760ae00d1b85c825087e995f8eb1 Author: Kaxil Naik <kaxiln...@apache.org> AuthorDate: Wed Jan 22 22:31:16 2025 +0530 Fix failures on main related to DagRun validation (#45917) Some test failures like https://github.com/apache/airflow/actions/runs/12903540392/job/35979200770?pr=45865 were not caught in #45834 Follow-up of https://github.com/apache/airflow/pull/45834/ --- .../execution_api/datamodels/taskinstance.py | 1 + .../cli/commands/remote_commands/task_command.py | 1 + .../providers/edge/example_dags/win_test.py | 2 +- .../airflow/providers/standard/operators/bash.py | 2 +- .../tests/amazon/aws/operators/test_athena.py | 3 +++ .../tests/amazon/aws/operators/test_datasync.py | 11 ++++++++++ providers/tests/amazon/aws/operators/test_dms.py | 4 ++++ .../amazon/aws/operators/test_emr_add_steps.py | 5 +++++ .../aws/operators/test_emr_create_job_flow.py | 5 +++++ providers/tests/amazon/aws/operators/test_s3.py | 13 +++++++++-- .../amazon/aws/operators/test_sagemaker_base.py | 13 +++++++++-- providers/tests/amazon/aws/sensors/test_s3.py | 25 ++++++++++++++++++---- providers/tests/amazon/aws/transfers/test_base.py | 3 +++ .../amazon/aws/transfers/test_dynamodb_to_s3.py | 3 +++ .../tests/amazon/aws/transfers/test_mongo_to_s3.py | 3 +++ .../apache/kylin/operators/test_kylin_cube.py | 4 +++- .../apache/spark/operators/test_spark_submit.py | 3 +++ .../tests/openlineage/plugins/test_adapter.py | 4 ++-- .../tests/openlineage/plugins/test_listener.py | 4 ++-- .../tests/redis/log/test_redis_task_handler.py | 1 + providers/tests/standard/operators/test_bash.py | 2 +- .../src/airflow/sdk/api/datamodels/_generated.py | 3 ++- .../src/airflow/sdk/execution_time/task_runner.py | 2 -- task_sdk/src/airflow/sdk/types.py | 1 + task_sdk/tests/execution_time/test_task_runner.py | 2 -- .../execution_api/routes/test_task_instances.py | 1 + 26 files changed, 100 insertions(+), 21 deletions(-) diff --git a/airflow/api_fastapi/execution_api/datamodels/taskinstance.py b/airflow/api_fastapi/execution_api/datamodels/taskinstance.py index bb6d643fb8e..5e8c267b82f 100644 --- a/airflow/api_fastapi/execution_api/datamodels/taskinstance.py +++ b/airflow/api_fastapi/execution_api/datamodels/taskinstance.py @@ -183,6 +183,7 @@ class DagRun(BaseModel): end_date: UtcDateTime | None run_type: DagRunType conf: Annotated[dict[str, Any], Field(default_factory=dict)] + external_trigger: bool = False class TIRunContext(BaseModel): diff --git a/airflow/cli/commands/remote_commands/task_command.py b/airflow/cli/commands/remote_commands/task_command.py index 681b478bfc1..b1f9182e4c9 100644 --- a/airflow/cli/commands/remote_commands/task_command.py +++ b/airflow/cli/commands/remote_commands/task_command.py @@ -174,6 +174,7 @@ def _get_dag_run( dag_id=dag.dag_id, run_id=logical_date_or_run_id, run_type=DagRunType.MANUAL, + external_trigger=True, logical_date=dag_run_logical_date, data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date), triggered_by=DagRunTriggeredByType.CLI, diff --git a/providers/edge/src/airflow/providers/edge/example_dags/win_test.py b/providers/edge/src/airflow/providers/edge/example_dags/win_test.py index 15735b85d18..3a730009d50 100644 --- a/providers/edge/src/airflow/providers/edge/example_dags/win_test.py +++ b/providers/edge/src/airflow/providers/edge/example_dags/win_test.py @@ -81,7 +81,7 @@ class CmdOperator(BaseOperator): :param cwd: Working directory to execute the command in (templated). If None (default), the command is run in a temporary directory. To use current DAG folder as the working directory, - you might set template ``{{ dag_run.dag.folder }}``. + you might set template ``{{ task.dag.folder }}``. :param output_processor: Function to further process the output of the script / command (default is lambda output: output). diff --git a/providers/src/airflow/providers/standard/operators/bash.py b/providers/src/airflow/providers/standard/operators/bash.py index d3f66380eea..9b4852a151b 100644 --- a/providers/src/airflow/providers/standard/operators/bash.py +++ b/providers/src/airflow/providers/standard/operators/bash.py @@ -69,7 +69,7 @@ class BashOperator(BaseOperator): :param cwd: Working directory to execute the command in (templated). If None (default), the command is run in a temporary directory. To use current DAG folder as the working directory, - you might set template ``{{ dag_run.dag.folder }}``. + you might set template ``{{ task.dag.folder }}``. When bash_command is a '.sh' or '.bash' file, Airflow must have write access to the working directory. The script will be rendered (Jinja template) into a new temporary file in this directory. diff --git a/providers/tests/amazon/aws/operators/test_athena.py b/providers/tests/amazon/aws/operators/test_athena.py index 63bfa890699..3ce9487c415 100644 --- a/providers/tests/amazon/aws/operators/test_athena.py +++ b/providers/tests/amazon/aws/operators/test_athena.py @@ -38,6 +38,7 @@ from airflow.providers.common.compat.openlineage.facet import ( ) from airflow.providers.openlineage.extractors import OperatorLineage from airflow.utils import timezone +from airflow.utils.state import DagRunState from airflow.utils.timezone import datetime from airflow.utils.types import DagRunType @@ -239,6 +240,7 @@ class TestAthenaOperator: logical_date=timezone.utcnow(), run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) else: dag_run = DagRun( @@ -246,6 +248,7 @@ class TestAthenaOperator: execution_date=timezone.utcnow(), run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) ti = TaskInstance(task=self.athena) ti.dag_run = dag_run diff --git a/providers/tests/amazon/aws/operators/test_datasync.py b/providers/tests/amazon/aws/operators/test_datasync.py index 272e03d201f..6b6b64caa13 100644 --- a/providers/tests/amazon/aws/operators/test_datasync.py +++ b/providers/tests/amazon/aws/operators/test_datasync.py @@ -27,6 +27,7 @@ from airflow.models import DAG, DagRun, TaskInstance from airflow.providers.amazon.aws.hooks.datasync import DataSyncHook from airflow.providers.amazon.aws.operators.datasync import DataSyncOperator from airflow.utils import timezone +from airflow.utils.state import DagRunState from airflow.utils.timezone import datetime from airflow.utils.types import DagRunType @@ -358,6 +359,7 @@ class TestDataSyncOperatorCreate(DataSyncTestCaseBase): logical_date=timezone.utcnow(), run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) else: dag_run = DagRun( @@ -365,6 +367,7 @@ class TestDataSyncOperatorCreate(DataSyncTestCaseBase): execution_date=timezone.utcnow(), run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) ti = TaskInstance(task=self.datasync) ti.dag_run = dag_run @@ -570,6 +573,7 @@ class TestDataSyncOperatorGetTasks(DataSyncTestCaseBase): logical_date=timezone.utcnow(), run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) else: dag_run = DagRun( @@ -577,6 +581,7 @@ class TestDataSyncOperatorGetTasks(DataSyncTestCaseBase): execution_date=timezone.utcnow(), run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) ti = TaskInstance(task=self.datasync) ti.dag_run = dag_run @@ -684,6 +689,7 @@ class TestDataSyncOperatorUpdate(DataSyncTestCaseBase): logical_date=timezone.utcnow(), run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) else: dag_run = DagRun( @@ -691,6 +697,7 @@ class TestDataSyncOperatorUpdate(DataSyncTestCaseBase): execution_date=timezone.utcnow(), run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) ti = TaskInstance(task=self.datasync) ti.dag_run = dag_run @@ -870,6 +877,7 @@ class TestDataSyncOperator(DataSyncTestCaseBase): logical_date=timezone.utcnow(), run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) else: dag_run = DagRun( @@ -877,6 +885,7 @@ class TestDataSyncOperator(DataSyncTestCaseBase): execution_date=timezone.utcnow(), run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) ti = TaskInstance(task=self.datasync) ti.dag_run = dag_run @@ -980,6 +989,7 @@ class TestDataSyncOperatorDelete(DataSyncTestCaseBase): logical_date=timezone.utcnow(), run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) else: dag_run = DagRun( @@ -987,6 +997,7 @@ class TestDataSyncOperatorDelete(DataSyncTestCaseBase): execution_date=timezone.utcnow(), run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) ti = TaskInstance(task=self.datasync) ti.dag_run = dag_run diff --git a/providers/tests/amazon/aws/operators/test_dms.py b/providers/tests/amazon/aws/operators/test_dms.py index 495beed5446..0b414234d9f 100644 --- a/providers/tests/amazon/aws/operators/test_dms.py +++ b/providers/tests/amazon/aws/operators/test_dms.py @@ -45,6 +45,7 @@ from airflow.providers.amazon.aws.triggers.dms import ( DmsReplicationTerminalStatusTrigger, ) from airflow.utils import timezone +from airflow.utils.state import DagRunState from airflow.utils.types import DagRunType from providers.tests.amazon.aws.utils.test_template_fields import validate_template_fields @@ -298,6 +299,7 @@ class TestDmsDescribeTasksOperator: logical_date=timezone.utcnow(), run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) else: dag_run = DagRun( @@ -305,6 +307,7 @@ class TestDmsDescribeTasksOperator: execution_date=timezone.utcnow(), run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) ti = TaskInstance(task=describe_task) ti.dag_run = dag_run @@ -491,6 +494,7 @@ class TestDmsDescribeReplicationConfigsOperator: dag_id=dag.dag_id, run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) ti = TaskInstance(task=op) ti.dag_run = dag_run diff --git a/providers/tests/amazon/aws/operators/test_emr_add_steps.py b/providers/tests/amazon/aws/operators/test_emr_add_steps.py index 8bd2e9325d5..c30f0b6d418 100644 --- a/providers/tests/amazon/aws/operators/test_emr_add_steps.py +++ b/providers/tests/amazon/aws/operators/test_emr_add_steps.py @@ -30,6 +30,7 @@ from airflow.models import DAG, DagRun, TaskInstance from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator from airflow.providers.amazon.aws.triggers.emr import EmrAddStepsTrigger from airflow.utils import timezone +from airflow.utils.state import DagRunState from airflow.utils.types import DagRunType from providers.tests.amazon.aws.utils.test_template_fields import validate_template_fields @@ -105,6 +106,7 @@ class TestEmrAddStepsOperator: logical_date=DEFAULT_DATE, run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) else: dag_run = DagRun( @@ -112,6 +114,7 @@ class TestEmrAddStepsOperator: execution_date=DEFAULT_DATE, run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) ti = TaskInstance(task=self.operator) ti.dag_run = dag_run @@ -170,6 +173,7 @@ class TestEmrAddStepsOperator: logical_date=timezone.utcnow(), run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) else: dag_run = DagRun( @@ -177,6 +181,7 @@ class TestEmrAddStepsOperator: execution_date=timezone.utcnow(), run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) ti = TaskInstance(task=test_task) ti.dag_run = dag_run diff --git a/providers/tests/amazon/aws/operators/test_emr_create_job_flow.py b/providers/tests/amazon/aws/operators/test_emr_create_job_flow.py index cc39fec5269..8131477e5e4 100644 --- a/providers/tests/amazon/aws/operators/test_emr_create_job_flow.py +++ b/providers/tests/amazon/aws/operators/test_emr_create_job_flow.py @@ -32,6 +32,7 @@ from airflow.providers.amazon.aws.operators.emr import EmrCreateJobFlowOperator from airflow.providers.amazon.aws.triggers.emr import EmrCreateJobFlowTrigger from airflow.providers.amazon.aws.utils.waiter import WAITER_POLICY_NAME_MAPPING, WaitPolicy from airflow.utils import timezone +from airflow.utils.state import DagRunState from airflow.utils.types import DagRunType from providers.tests.amazon.aws.utils.test_template_fields import validate_template_fields @@ -104,6 +105,7 @@ class TestEmrCreateJobFlowOperator: logical_date=DEFAULT_DATE, run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) else: dag_run = DagRun( @@ -111,6 +113,7 @@ class TestEmrCreateJobFlowOperator: execution_date=DEFAULT_DATE, run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) ti = TaskInstance(task=self.operator) ti.dag_run = dag_run @@ -150,6 +153,7 @@ class TestEmrCreateJobFlowOperator: logical_date=DEFAULT_DATE, run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) else: dag_run = DagRun( @@ -157,6 +161,7 @@ class TestEmrCreateJobFlowOperator: execution_date=DEFAULT_DATE, run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) ti = TaskInstance(task=self.operator) ti.dag_run = dag_run diff --git a/providers/tests/amazon/aws/operators/test_s3.py b/providers/tests/amazon/aws/operators/test_s3.py index 760a6ab53fb..ab8508ec5f8 100644 --- a/providers/tests/amazon/aws/operators/test_s3.py +++ b/providers/tests/amazon/aws/operators/test_s3.py @@ -55,6 +55,7 @@ from airflow.providers.common.compat.openlineage.facet import ( PreviousIdentifier, ) from airflow.providers.openlineage.extractors import OperatorLineage +from airflow.utils.state import DagRunState from airflow.utils.timezone import datetime, utcnow from airflow.utils.types import DagRunType @@ -654,11 +655,19 @@ class TestS3DeleteObjectsOperator: ) if hasattr(DagRun, "execution_date"): # Airflow 2.x. dag_run = DagRun( - dag_id=dag.dag_id, execution_date=logical_date, run_id="test", run_type=DagRunType.MANUAL + dag_id=dag.dag_id, + execution_date=logical_date, + run_id="test", + run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) else: dag_run = DagRun( - dag_id=dag.dag_id, logical_date=logical_date, run_id="test", run_type=DagRunType.MANUAL + dag_id=dag.dag_id, + logical_date=logical_date, + run_id="test", + run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) ti = TaskInstance(task=op) ti.dag_run = dag_run diff --git a/providers/tests/amazon/aws/operators/test_sagemaker_base.py b/providers/tests/amazon/aws/operators/test_sagemaker_base.py index 3ae0c2d95ed..408bc3dc579 100644 --- a/providers/tests/amazon/aws/operators/test_sagemaker_base.py +++ b/providers/tests/amazon/aws/operators/test_sagemaker_base.py @@ -31,6 +31,7 @@ from airflow.providers.amazon.aws.operators.sagemaker import ( SageMakerCreateExperimentOperator, ) from airflow.utils import timezone +from airflow.utils.state import DagRunState from airflow.utils.types import DagRunType from providers.tests.amazon.aws.utils.test_template_fields import validate_template_fields @@ -209,11 +210,19 @@ class TestSageMakerExperimentOperator: ) if AIRFLOW_V_3_0_PLUS: dag_run = DagRun( - dag_id=dag.dag_id, logical_date=logical_date, run_id="test", run_type=DagRunType.MANUAL + dag_id=dag.dag_id, + logical_date=logical_date, + run_id="test", + run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) else: dag_run = DagRun( - dag_id=dag.dag_id, execution_date=logical_date, run_id="test", run_type=DagRunType.MANUAL + dag_id=dag.dag_id, + execution_date=logical_date, + run_id="test", + run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) ti = TaskInstance(task=op) ti.dag_run = dag_run diff --git a/providers/tests/amazon/aws/sensors/test_s3.py b/providers/tests/amazon/aws/sensors/test_s3.py index 8b241168392..9c169f86cf0 100644 --- a/providers/tests/amazon/aws/sensors/test_s3.py +++ b/providers/tests/amazon/aws/sensors/test_s3.py @@ -30,6 +30,7 @@ from airflow.models.variable import Variable from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor, S3KeysUnchangedSensor from airflow.utils import timezone +from airflow.utils.state import DagRunState from airflow.utils.types import DagRunType from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS @@ -129,11 +130,19 @@ class TestS3KeySensor: if AIRFLOW_V_3_0_PLUS: dag_run = DagRun( - dag_id=dag.dag_id, logical_date=logical_date, run_id="test", run_type=DagRunType.MANUAL + dag_id=dag.dag_id, + logical_date=logical_date, + run_id="test", + run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) else: dag_run = DagRun( - dag_id=dag.dag_id, execution_date=logical_date, run_id="test", run_type=DagRunType.MANUAL + dag_id=dag.dag_id, + execution_date=logical_date, + run_id="test", + run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) ti = TaskInstance(task=op) ti.dag_run = dag_run @@ -164,11 +173,19 @@ class TestS3KeySensor: ) if hasattr(DagRun, "execution_date"): # Airflow 2.x. dag_run = DagRun( - dag_id=dag.dag_id, execution_date=logical_date, run_id="test", run_type=DagRunType.MANUAL + dag_id=dag.dag_id, + execution_date=logical_date, + run_id="test", + run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) else: dag_run = DagRun( - dag_id=dag.dag_id, logical_date=logical_date, run_id="test", run_type=DagRunType.MANUAL + dag_id=dag.dag_id, + logical_date=logical_date, + run_id="test", + run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) ti = TaskInstance(task=op) ti.dag_run = dag_run diff --git a/providers/tests/amazon/aws/transfers/test_base.py b/providers/tests/amazon/aws/transfers/test_base.py index a09f08b0d18..235955bd6eb 100644 --- a/providers/tests/amazon/aws/transfers/test_base.py +++ b/providers/tests/amazon/aws/transfers/test_base.py @@ -23,6 +23,7 @@ from airflow import DAG from airflow.models import DagRun, TaskInstance from airflow.providers.amazon.aws.transfers.base import AwsToAwsBaseOperator from airflow.utils import timezone +from airflow.utils.state import DagRunState from airflow.utils.types import DagRunType from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS @@ -50,6 +51,7 @@ class TestAwsToAwsBaseOperator: run_id="something", logical_date=timezone.datetime(2020, 1, 1), run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) else: ti.dag_run = DagRun( @@ -57,6 +59,7 @@ class TestAwsToAwsBaseOperator: run_id="something", execution_date=timezone.datetime(2020, 1, 1), run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) session.add(ti) session.commit() diff --git a/providers/tests/amazon/aws/transfers/test_dynamodb_to_s3.py b/providers/tests/amazon/aws/transfers/test_dynamodb_to_s3.py index 7304492c1ad..dcd3b6058a6 100644 --- a/providers/tests/amazon/aws/transfers/test_dynamodb_to_s3.py +++ b/providers/tests/amazon/aws/transfers/test_dynamodb_to_s3.py @@ -31,6 +31,7 @@ from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import ( JSONEncoder, ) from airflow.utils import timezone +from airflow.utils.state import DagRunState from airflow.utils.types import DagRunType from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS @@ -280,6 +281,7 @@ class TestDynamodbToS3: run_id="something", logical_date=timezone.datetime(2020, 1, 1), run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) else: ti.dag_run = DagRun( @@ -287,6 +289,7 @@ class TestDynamodbToS3: run_id="something", execution_date=timezone.datetime(2020, 1, 1), run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) session.add(ti) session.commit() diff --git a/providers/tests/amazon/aws/transfers/test_mongo_to_s3.py b/providers/tests/amazon/aws/transfers/test_mongo_to_s3.py index 4698469a650..764faf6dadb 100644 --- a/providers/tests/amazon/aws/transfers/test_mongo_to_s3.py +++ b/providers/tests/amazon/aws/transfers/test_mongo_to_s3.py @@ -24,6 +24,7 @@ import pytest from airflow.models import DAG, DagRun, TaskInstance from airflow.providers.amazon.aws.transfers.mongo_to_s3 import MongoToS3Operator from airflow.utils import timezone +from airflow.utils.state import DagRunState from airflow.utils.types import DagRunType from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS @@ -88,6 +89,7 @@ class TestMongoToS3Operator: logical_date=DEFAULT_DATE, run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) else: dag_run = DagRun( @@ -95,6 +97,7 @@ class TestMongoToS3Operator: execution_date=DEFAULT_DATE, run_id="test", run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, ) ti = TaskInstance(task=self.mock_operator) ti.dag_run = dag_run diff --git a/providers/tests/apache/kylin/operators/test_kylin_cube.py b/providers/tests/apache/kylin/operators/test_kylin_cube.py index b23833811f6..c56dbdb33b5 100644 --- a/providers/tests/apache/kylin/operators/test_kylin_cube.py +++ b/providers/tests/apache/kylin/operators/test_kylin_cube.py @@ -26,7 +26,7 @@ from airflow.exceptions import AirflowException from airflow.models import DagRun, TaskInstance from airflow.models.dag import DAG from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator -from airflow.utils import timezone +from airflow.utils import state, timezone from airflow.utils.types import DagRunType from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS @@ -177,6 +177,7 @@ class TestKylinCubeOperator: run_id="kylin_test", logical_date=DEFAULT_DATE, run_type=DagRunType.MANUAL, + state=state.DagRunState.RUNNING, ) else: ti.dag_run = DagRun( @@ -184,6 +185,7 @@ class TestKylinCubeOperator: run_id="kylin_test", execution_date=DEFAULT_DATE, run_type=DagRunType.MANUAL, + state=state.DagRunState.RUNNING, ) session.add(ti) session.commit() diff --git a/providers/tests/apache/spark/operators/test_spark_submit.py b/providers/tests/apache/spark/operators/test_spark_submit.py index 879a7c999ef..2670340086b 100644 --- a/providers/tests/apache/spark/operators/test_spark_submit.py +++ b/providers/tests/apache/spark/operators/test_spark_submit.py @@ -201,6 +201,7 @@ class TestSparkSubmitOperator: run_id="spark_test", logical_date=DEFAULT_DATE, run_type=DagRunType.MANUAL, + state="running", ) else: ti.dag_run = DagRun( @@ -208,7 +209,9 @@ class TestSparkSubmitOperator: run_id="spark_test", execution_date=DEFAULT_DATE, run_type=DagRunType.MANUAL, + state="running", ) + session.add(ti) session.commit() # When diff --git a/providers/tests/openlineage/plugins/test_adapter.py b/providers/tests/openlineage/plugins/test_adapter.py index fd7f01ff61e..9e19437142b 100644 --- a/providers/tests/openlineage/plugins/test_adapter.py +++ b/providers/tests/openlineage/plugins/test_adapter.py @@ -585,7 +585,7 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat "dag_id": "dag_id", "data_interval_start": event_time.isoformat(), "data_interval_end": event_time.isoformat(), - "external_trigger": None, + "external_trigger": False if AIRFLOW_V_3_0_PLUS else None, "run_id": run_id, "run_type": None, "start_date": event_time.isoformat(), @@ -626,7 +626,7 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat "dag_id": "dag_id", "data_interval_start": event_time.isoformat(), "data_interval_end": event_time.isoformat(), - "external_trigger": None, + "external_trigger": False if AIRFLOW_V_3_0_PLUS else None, "run_id": run_id, "run_type": None, "start_date": event_time.isoformat(), diff --git a/providers/tests/openlineage/plugins/test_listener.py b/providers/tests/openlineage/plugins/test_listener.py index 837873f439d..25d9c24e6e5 100644 --- a/providers/tests/openlineage/plugins/test_listener.py +++ b/providers/tests/openlineage/plugins/test_listener.py @@ -101,7 +101,7 @@ def test_listener_does_not_change_task_instance(render_mock, xcom_push_mock): run_id=run_id, data_interval=(date, date), run_type=types.DagRunType.MANUAL, - state=DagRunState.QUEUED, + state=DagRunState.RUNNING, **dagrun_kwargs, ) ti = TaskInstance(t, run_id=run_id) @@ -187,7 +187,7 @@ def _create_test_dag_and_task(python_callable: Callable, scenario_name: str) -> run_id=run_id, data_interval=(date, date), run_type=types.DagRunType.MANUAL, - state=DagRunState.QUEUED, + state=DagRunState.RUNNING, **dagrun_kwargs, ) task_instance = TaskInstance(t, run_id=run_id) diff --git a/providers/tests/redis/log/test_redis_task_handler.py b/providers/tests/redis/log/test_redis_task_handler.py index 604dcb1c731..2dde6fa402b 100644 --- a/providers/tests/redis/log/test_redis_task_handler.py +++ b/providers/tests/redis/log/test_redis_task_handler.py @@ -46,6 +46,7 @@ class TestRedisTaskHandler: else: dag_run = DagRun(dag_id=dag.dag_id, execution_date=date, run_id="test", run_type="scheduled") + dag_run.set_state(State.RUNNING) with create_session() as session: session.add(dag_run) session.commit() diff --git a/providers/tests/standard/operators/test_bash.py b/providers/tests/standard/operators/test_bash.py index 81c64048cba..13b3fad3ad4 100644 --- a/providers/tests/standard/operators/test_bash.py +++ b/providers/tests/standard/operators/test_bash.py @@ -273,7 +273,7 @@ class TestBashOperator: # Templated fields bash_command='echo "{{ dag_run.dag_id }}"', env={"FOO": "{{ ds }}"}, - cwd="{{ dag_run.dag.folder }}", + cwd="{{ task.dag.folder }}", # Other parameters dag_id="test_templated_fields_dag", task_id="test_templated_fields_task", diff --git a/task_sdk/src/airflow/sdk/api/datamodels/_generated.py b/task_sdk/src/airflow/sdk/api/datamodels/_generated.py index 7d8bd25e959..d91ecf841d9 100644 --- a/task_sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task_sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -130,8 +130,8 @@ class TIRescheduleStatePayload(BaseModel): """ state: Annotated[Literal["up_for_reschedule"] | None, Field(title="State")] = "up_for_reschedule" - end_date: Annotated[datetime, Field(title="End Date")] reschedule_date: Annotated[datetime, Field(title="Reschedule Date")] + end_date: Annotated[datetime, Field(title="End Date")] class TITargetStatePayload(BaseModel): @@ -223,6 +223,7 @@ class DagRun(BaseModel): end_date: Annotated[datetime | None, Field(title="End Date")] = None run_type: DagRunType conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None + external_trigger: Annotated[bool, Field(title="External Trigger")] = False class HTTPValidationError(BaseModel): diff --git a/task_sdk/src/airflow/sdk/execution_time/task_runner.py b/task_sdk/src/airflow/sdk/execution_time/task_runner.py index 230f5414a87..c35a79e13c3 100644 --- a/task_sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task_sdk/src/airflow/sdk/execution_time/task_runner.py @@ -96,8 +96,6 @@ class RuntimeTaskInstance(TaskInstance): # especially after removal of `conf` from Context. "ti": self, "outlet_events": OutletEventAccessors(), - # "expanded_ti_count": expanded_ti_count, - "expanded_ti_count": None, # TODO: Implement this # "inlet_events": InletEventsAccessors(task.inlets, session=session), "macros": MacrosAccessor(), # "params": validated_params, diff --git a/task_sdk/src/airflow/sdk/types.py b/task_sdk/src/airflow/sdk/types.py index 1a886b45ed1..fd02104fb2f 100644 --- a/task_sdk/src/airflow/sdk/types.py +++ b/task_sdk/src/airflow/sdk/types.py @@ -43,6 +43,7 @@ class DagRunProtocol(Protocol): end_date: datetime | None run_type: Any conf: dict[str, Any] | None + external_trigger: bool class RuntimeTaskInstanceProtocol(Protocol): diff --git a/task_sdk/tests/execution_time/test_task_runner.py b/task_sdk/tests/execution_time/test_task_runner.py index 9889c192e90..17c0029844e 100644 --- a/task_sdk/tests/execution_time/test_task_runner.py +++ b/task_sdk/tests/execution_time/test_task_runner.py @@ -615,7 +615,6 @@ class TestRuntimeTaskInstance: }, "conn": ConnectionAccessor(), "dag": runtime_ti.task.dag, - "expanded_ti_count": None, "inlets": task.inlets, "macros": MacrosAccessor(), "map_index_template": task.map_index_template, @@ -675,7 +674,6 @@ class TestRuntimeTaskInstance: "logical_date": timezone.datetime(2024, 12, 1, 1, 0, 0), "ds": "2024-12-01", "ds_nodash": "20241201", - "expanded_ti_count": None, "task_instance_key_str": "basic_task__hello__20241201", "ts": "2024-12-01T01:00:00+00:00", "ts_nodash": "20241201T010000", diff --git a/tests/api_fastapi/execution_api/routes/test_task_instances.py b/tests/api_fastapi/execution_api/routes/test_task_instances.py index c11fbb21bb2..e3aef1505bc 100644 --- a/tests/api_fastapi/execution_api/routes/test_task_instances.py +++ b/tests/api_fastapi/execution_api/routes/test_task_instances.py @@ -85,6 +85,7 @@ class TestTIRunState: "data_interval_end": instant_str, "start_date": instant_str, "end_date": None, + "external_trigger": False, "run_type": "manual", "conf": {}, },