This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2193be2a5e5 Fix failures on main related to DagRun validation (#45917)
2193be2a5e5 is described below
commit 2193be2a5e53760ae00d1b85c825087e995f8eb1
Author: Kaxil Naik <[email protected]>
AuthorDate: Wed Jan 22 22:31:16 2025 +0530
Fix failures on main related to DagRun validation (#45917)
Some test failures like
https://github.com/apache/airflow/actions/runs/12903540392/job/35979200770?pr=45865
were not caught in #45834
Follow-up of https://github.com/apache/airflow/pull/45834/
---
.../execution_api/datamodels/taskinstance.py | 1 +
.../cli/commands/remote_commands/task_command.py | 1 +
.../providers/edge/example_dags/win_test.py | 2 +-
.../airflow/providers/standard/operators/bash.py | 2 +-
.../tests/amazon/aws/operators/test_athena.py | 3 +++
.../tests/amazon/aws/operators/test_datasync.py | 11 ++++++++++
providers/tests/amazon/aws/operators/test_dms.py | 4 ++++
.../amazon/aws/operators/test_emr_add_steps.py | 5 +++++
.../aws/operators/test_emr_create_job_flow.py | 5 +++++
providers/tests/amazon/aws/operators/test_s3.py | 13 +++++++++--
.../amazon/aws/operators/test_sagemaker_base.py | 13 +++++++++--
providers/tests/amazon/aws/sensors/test_s3.py | 25 ++++++++++++++++++----
providers/tests/amazon/aws/transfers/test_base.py | 3 +++
.../amazon/aws/transfers/test_dynamodb_to_s3.py | 3 +++
.../tests/amazon/aws/transfers/test_mongo_to_s3.py | 3 +++
.../apache/kylin/operators/test_kylin_cube.py | 4 +++-
.../apache/spark/operators/test_spark_submit.py | 3 +++
.../tests/openlineage/plugins/test_adapter.py | 4 ++--
.../tests/openlineage/plugins/test_listener.py | 4 ++--
.../tests/redis/log/test_redis_task_handler.py | 1 +
providers/tests/standard/operators/test_bash.py | 2 +-
.../src/airflow/sdk/api/datamodels/_generated.py | 3 ++-
.../src/airflow/sdk/execution_time/task_runner.py | 2 --
task_sdk/src/airflow/sdk/types.py | 1 +
task_sdk/tests/execution_time/test_task_runner.py | 2 --
.../execution_api/routes/test_task_instances.py | 1 +
26 files changed, 100 insertions(+), 21 deletions(-)
diff --git a/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
b/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
index bb6d643fb8e..5e8c267b82f 100644
--- a/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
+++ b/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
@@ -183,6 +183,7 @@ class DagRun(BaseModel):
end_date: UtcDateTime | None
run_type: DagRunType
conf: Annotated[dict[str, Any], Field(default_factory=dict)]
+ external_trigger: bool = False
class TIRunContext(BaseModel):
diff --git a/airflow/cli/commands/remote_commands/task_command.py
b/airflow/cli/commands/remote_commands/task_command.py
index 681b478bfc1..b1f9182e4c9 100644
--- a/airflow/cli/commands/remote_commands/task_command.py
+++ b/airflow/cli/commands/remote_commands/task_command.py
@@ -174,6 +174,7 @@ def _get_dag_run(
dag_id=dag.dag_id,
run_id=logical_date_or_run_id,
run_type=DagRunType.MANUAL,
+ external_trigger=True,
logical_date=dag_run_logical_date,
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date),
triggered_by=DagRunTriggeredByType.CLI,
diff --git a/providers/edge/src/airflow/providers/edge/example_dags/win_test.py
b/providers/edge/src/airflow/providers/edge/example_dags/win_test.py
index 15735b85d18..3a730009d50 100644
--- a/providers/edge/src/airflow/providers/edge/example_dags/win_test.py
+++ b/providers/edge/src/airflow/providers/edge/example_dags/win_test.py
@@ -81,7 +81,7 @@ class CmdOperator(BaseOperator):
:param cwd: Working directory to execute the command in (templated).
If None (default), the command is run in a temporary directory.
To use current DAG folder as the working directory,
- you might set template ``{{ dag_run.dag.folder }}``.
+ you might set template ``{{ task.dag.folder }}``.
:param output_processor: Function to further process the output of the
script / command
(default is lambda output: output).
diff --git a/providers/src/airflow/providers/standard/operators/bash.py
b/providers/src/airflow/providers/standard/operators/bash.py
index d3f66380eea..9b4852a151b 100644
--- a/providers/src/airflow/providers/standard/operators/bash.py
+++ b/providers/src/airflow/providers/standard/operators/bash.py
@@ -69,7 +69,7 @@ class BashOperator(BaseOperator):
:param cwd: Working directory to execute the command in (templated).
If None (default), the command is run in a temporary directory.
To use current DAG folder as the working directory,
- you might set template ``{{ dag_run.dag.folder }}``.
+ you might set template ``{{ task.dag.folder }}``.
When bash_command is a '.sh' or '.bash' file, Airflow must have write
access to the working directory. The script will be rendered (Jinja
template) into a new temporary file in this directory.
diff --git a/providers/tests/amazon/aws/operators/test_athena.py
b/providers/tests/amazon/aws/operators/test_athena.py
index 63bfa890699..3ce9487c415 100644
--- a/providers/tests/amazon/aws/operators/test_athena.py
+++ b/providers/tests/amazon/aws/operators/test_athena.py
@@ -38,6 +38,7 @@ from airflow.providers.common.compat.openlineage.facet import
(
)
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.utils import timezone
+from airflow.utils.state import DagRunState
from airflow.utils.timezone import datetime
from airflow.utils.types import DagRunType
@@ -239,6 +240,7 @@ class TestAthenaOperator:
logical_date=timezone.utcnow(),
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
else:
dag_run = DagRun(
@@ -246,6 +248,7 @@ class TestAthenaOperator:
execution_date=timezone.utcnow(),
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
ti = TaskInstance(task=self.athena)
ti.dag_run = dag_run
diff --git a/providers/tests/amazon/aws/operators/test_datasync.py
b/providers/tests/amazon/aws/operators/test_datasync.py
index 272e03d201f..6b6b64caa13 100644
--- a/providers/tests/amazon/aws/operators/test_datasync.py
+++ b/providers/tests/amazon/aws/operators/test_datasync.py
@@ -27,6 +27,7 @@ from airflow.models import DAG, DagRun, TaskInstance
from airflow.providers.amazon.aws.hooks.datasync import DataSyncHook
from airflow.providers.amazon.aws.operators.datasync import DataSyncOperator
from airflow.utils import timezone
+from airflow.utils.state import DagRunState
from airflow.utils.timezone import datetime
from airflow.utils.types import DagRunType
@@ -358,6 +359,7 @@ class TestDataSyncOperatorCreate(DataSyncTestCaseBase):
logical_date=timezone.utcnow(),
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
else:
dag_run = DagRun(
@@ -365,6 +367,7 @@ class TestDataSyncOperatorCreate(DataSyncTestCaseBase):
execution_date=timezone.utcnow(),
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
ti = TaskInstance(task=self.datasync)
ti.dag_run = dag_run
@@ -570,6 +573,7 @@ class TestDataSyncOperatorGetTasks(DataSyncTestCaseBase):
logical_date=timezone.utcnow(),
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
else:
dag_run = DagRun(
@@ -577,6 +581,7 @@ class TestDataSyncOperatorGetTasks(DataSyncTestCaseBase):
execution_date=timezone.utcnow(),
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
ti = TaskInstance(task=self.datasync)
ti.dag_run = dag_run
@@ -684,6 +689,7 @@ class TestDataSyncOperatorUpdate(DataSyncTestCaseBase):
logical_date=timezone.utcnow(),
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
else:
dag_run = DagRun(
@@ -691,6 +697,7 @@ class TestDataSyncOperatorUpdate(DataSyncTestCaseBase):
execution_date=timezone.utcnow(),
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
ti = TaskInstance(task=self.datasync)
ti.dag_run = dag_run
@@ -870,6 +877,7 @@ class TestDataSyncOperator(DataSyncTestCaseBase):
logical_date=timezone.utcnow(),
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
else:
dag_run = DagRun(
@@ -877,6 +885,7 @@ class TestDataSyncOperator(DataSyncTestCaseBase):
execution_date=timezone.utcnow(),
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
ti = TaskInstance(task=self.datasync)
ti.dag_run = dag_run
@@ -980,6 +989,7 @@ class TestDataSyncOperatorDelete(DataSyncTestCaseBase):
logical_date=timezone.utcnow(),
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
else:
dag_run = DagRun(
@@ -987,6 +997,7 @@ class TestDataSyncOperatorDelete(DataSyncTestCaseBase):
execution_date=timezone.utcnow(),
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
ti = TaskInstance(task=self.datasync)
ti.dag_run = dag_run
diff --git a/providers/tests/amazon/aws/operators/test_dms.py
b/providers/tests/amazon/aws/operators/test_dms.py
index 495beed5446..0b414234d9f 100644
--- a/providers/tests/amazon/aws/operators/test_dms.py
+++ b/providers/tests/amazon/aws/operators/test_dms.py
@@ -45,6 +45,7 @@ from airflow.providers.amazon.aws.triggers.dms import (
DmsReplicationTerminalStatusTrigger,
)
from airflow.utils import timezone
+from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
from providers.tests.amazon.aws.utils.test_template_fields import
validate_template_fields
@@ -298,6 +299,7 @@ class TestDmsDescribeTasksOperator:
logical_date=timezone.utcnow(),
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
else:
dag_run = DagRun(
@@ -305,6 +307,7 @@ class TestDmsDescribeTasksOperator:
execution_date=timezone.utcnow(),
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
ti = TaskInstance(task=describe_task)
ti.dag_run = dag_run
@@ -491,6 +494,7 @@ class TestDmsDescribeReplicationConfigsOperator:
dag_id=dag.dag_id,
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
ti = TaskInstance(task=op)
ti.dag_run = dag_run
diff --git a/providers/tests/amazon/aws/operators/test_emr_add_steps.py
b/providers/tests/amazon/aws/operators/test_emr_add_steps.py
index 8bd2e9325d5..c30f0b6d418 100644
--- a/providers/tests/amazon/aws/operators/test_emr_add_steps.py
+++ b/providers/tests/amazon/aws/operators/test_emr_add_steps.py
@@ -30,6 +30,7 @@ from airflow.models import DAG, DagRun, TaskInstance
from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator
from airflow.providers.amazon.aws.triggers.emr import EmrAddStepsTrigger
from airflow.utils import timezone
+from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
from providers.tests.amazon.aws.utils.test_template_fields import
validate_template_fields
@@ -105,6 +106,7 @@ class TestEmrAddStepsOperator:
logical_date=DEFAULT_DATE,
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
else:
dag_run = DagRun(
@@ -112,6 +114,7 @@ class TestEmrAddStepsOperator:
execution_date=DEFAULT_DATE,
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
ti = TaskInstance(task=self.operator)
ti.dag_run = dag_run
@@ -170,6 +173,7 @@ class TestEmrAddStepsOperator:
logical_date=timezone.utcnow(),
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
else:
dag_run = DagRun(
@@ -177,6 +181,7 @@ class TestEmrAddStepsOperator:
execution_date=timezone.utcnow(),
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
ti = TaskInstance(task=test_task)
ti.dag_run = dag_run
diff --git a/providers/tests/amazon/aws/operators/test_emr_create_job_flow.py
b/providers/tests/amazon/aws/operators/test_emr_create_job_flow.py
index cc39fec5269..8131477e5e4 100644
--- a/providers/tests/amazon/aws/operators/test_emr_create_job_flow.py
+++ b/providers/tests/amazon/aws/operators/test_emr_create_job_flow.py
@@ -32,6 +32,7 @@ from airflow.providers.amazon.aws.operators.emr import
EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.triggers.emr import EmrCreateJobFlowTrigger
from airflow.providers.amazon.aws.utils.waiter import
WAITER_POLICY_NAME_MAPPING, WaitPolicy
from airflow.utils import timezone
+from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
from providers.tests.amazon.aws.utils.test_template_fields import
validate_template_fields
@@ -104,6 +105,7 @@ class TestEmrCreateJobFlowOperator:
logical_date=DEFAULT_DATE,
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
else:
dag_run = DagRun(
@@ -111,6 +113,7 @@ class TestEmrCreateJobFlowOperator:
execution_date=DEFAULT_DATE,
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
ti = TaskInstance(task=self.operator)
ti.dag_run = dag_run
@@ -150,6 +153,7 @@ class TestEmrCreateJobFlowOperator:
logical_date=DEFAULT_DATE,
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
else:
dag_run = DagRun(
@@ -157,6 +161,7 @@ class TestEmrCreateJobFlowOperator:
execution_date=DEFAULT_DATE,
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
ti = TaskInstance(task=self.operator)
ti.dag_run = dag_run
diff --git a/providers/tests/amazon/aws/operators/test_s3.py
b/providers/tests/amazon/aws/operators/test_s3.py
index 760a6ab53fb..ab8508ec5f8 100644
--- a/providers/tests/amazon/aws/operators/test_s3.py
+++ b/providers/tests/amazon/aws/operators/test_s3.py
@@ -55,6 +55,7 @@ from airflow.providers.common.compat.openlineage.facet import
(
PreviousIdentifier,
)
from airflow.providers.openlineage.extractors import OperatorLineage
+from airflow.utils.state import DagRunState
from airflow.utils.timezone import datetime, utcnow
from airflow.utils.types import DagRunType
@@ -654,11 +655,19 @@ class TestS3DeleteObjectsOperator:
)
if hasattr(DagRun, "execution_date"): # Airflow 2.x.
dag_run = DagRun(
- dag_id=dag.dag_id, execution_date=logical_date, run_id="test",
run_type=DagRunType.MANUAL
+ dag_id=dag.dag_id,
+ execution_date=logical_date,
+ run_id="test",
+ run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
else:
dag_run = DagRun(
- dag_id=dag.dag_id, logical_date=logical_date, run_id="test",
run_type=DagRunType.MANUAL
+ dag_id=dag.dag_id,
+ logical_date=logical_date,
+ run_id="test",
+ run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
ti = TaskInstance(task=op)
ti.dag_run = dag_run
diff --git a/providers/tests/amazon/aws/operators/test_sagemaker_base.py
b/providers/tests/amazon/aws/operators/test_sagemaker_base.py
index 3ae0c2d95ed..408bc3dc579 100644
--- a/providers/tests/amazon/aws/operators/test_sagemaker_base.py
+++ b/providers/tests/amazon/aws/operators/test_sagemaker_base.py
@@ -31,6 +31,7 @@ from airflow.providers.amazon.aws.operators.sagemaker import (
SageMakerCreateExperimentOperator,
)
from airflow.utils import timezone
+from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
from providers.tests.amazon.aws.utils.test_template_fields import
validate_template_fields
@@ -209,11 +210,19 @@ class TestSageMakerExperimentOperator:
)
if AIRFLOW_V_3_0_PLUS:
dag_run = DagRun(
- dag_id=dag.dag_id, logical_date=logical_date, run_id="test",
run_type=DagRunType.MANUAL
+ dag_id=dag.dag_id,
+ logical_date=logical_date,
+ run_id="test",
+ run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
else:
dag_run = DagRun(
- dag_id=dag.dag_id, execution_date=logical_date, run_id="test",
run_type=DagRunType.MANUAL
+ dag_id=dag.dag_id,
+ execution_date=logical_date,
+ run_id="test",
+ run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
ti = TaskInstance(task=op)
ti.dag_run = dag_run
diff --git a/providers/tests/amazon/aws/sensors/test_s3.py
b/providers/tests/amazon/aws/sensors/test_s3.py
index 8b241168392..9c169f86cf0 100644
--- a/providers/tests/amazon/aws/sensors/test_s3.py
+++ b/providers/tests/amazon/aws/sensors/test_s3.py
@@ -30,6 +30,7 @@ from airflow.models.variable import Variable
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor,
S3KeysUnchangedSensor
from airflow.utils import timezone
+from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
@@ -129,11 +130,19 @@ class TestS3KeySensor:
if AIRFLOW_V_3_0_PLUS:
dag_run = DagRun(
- dag_id=dag.dag_id, logical_date=logical_date, run_id="test",
run_type=DagRunType.MANUAL
+ dag_id=dag.dag_id,
+ logical_date=logical_date,
+ run_id="test",
+ run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
else:
dag_run = DagRun(
- dag_id=dag.dag_id, execution_date=logical_date, run_id="test",
run_type=DagRunType.MANUAL
+ dag_id=dag.dag_id,
+ execution_date=logical_date,
+ run_id="test",
+ run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
ti = TaskInstance(task=op)
ti.dag_run = dag_run
@@ -164,11 +173,19 @@ class TestS3KeySensor:
)
if hasattr(DagRun, "execution_date"): # Airflow 2.x.
dag_run = DagRun(
- dag_id=dag.dag_id, execution_date=logical_date, run_id="test",
run_type=DagRunType.MANUAL
+ dag_id=dag.dag_id,
+ execution_date=logical_date,
+ run_id="test",
+ run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
else:
dag_run = DagRun(
- dag_id=dag.dag_id, logical_date=logical_date, run_id="test",
run_type=DagRunType.MANUAL
+ dag_id=dag.dag_id,
+ logical_date=logical_date,
+ run_id="test",
+ run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
ti = TaskInstance(task=op)
ti.dag_run = dag_run
diff --git a/providers/tests/amazon/aws/transfers/test_base.py
b/providers/tests/amazon/aws/transfers/test_base.py
index a09f08b0d18..235955bd6eb 100644
--- a/providers/tests/amazon/aws/transfers/test_base.py
+++ b/providers/tests/amazon/aws/transfers/test_base.py
@@ -23,6 +23,7 @@ from airflow import DAG
from airflow.models import DagRun, TaskInstance
from airflow.providers.amazon.aws.transfers.base import AwsToAwsBaseOperator
from airflow.utils import timezone
+from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
@@ -50,6 +51,7 @@ class TestAwsToAwsBaseOperator:
run_id="something",
logical_date=timezone.datetime(2020, 1, 1),
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
else:
ti.dag_run = DagRun(
@@ -57,6 +59,7 @@ class TestAwsToAwsBaseOperator:
run_id="something",
execution_date=timezone.datetime(2020, 1, 1),
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
session.add(ti)
session.commit()
diff --git a/providers/tests/amazon/aws/transfers/test_dynamodb_to_s3.py
b/providers/tests/amazon/aws/transfers/test_dynamodb_to_s3.py
index 7304492c1ad..dcd3b6058a6 100644
--- a/providers/tests/amazon/aws/transfers/test_dynamodb_to_s3.py
+++ b/providers/tests/amazon/aws/transfers/test_dynamodb_to_s3.py
@@ -31,6 +31,7 @@ from airflow.providers.amazon.aws.transfers.dynamodb_to_s3
import (
JSONEncoder,
)
from airflow.utils import timezone
+from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
@@ -280,6 +281,7 @@ class TestDynamodbToS3:
run_id="something",
logical_date=timezone.datetime(2020, 1, 1),
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
else:
ti.dag_run = DagRun(
@@ -287,6 +289,7 @@ class TestDynamodbToS3:
run_id="something",
execution_date=timezone.datetime(2020, 1, 1),
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
session.add(ti)
session.commit()
diff --git a/providers/tests/amazon/aws/transfers/test_mongo_to_s3.py
b/providers/tests/amazon/aws/transfers/test_mongo_to_s3.py
index 4698469a650..764faf6dadb 100644
--- a/providers/tests/amazon/aws/transfers/test_mongo_to_s3.py
+++ b/providers/tests/amazon/aws/transfers/test_mongo_to_s3.py
@@ -24,6 +24,7 @@ import pytest
from airflow.models import DAG, DagRun, TaskInstance
from airflow.providers.amazon.aws.transfers.mongo_to_s3 import
MongoToS3Operator
from airflow.utils import timezone
+from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
@@ -88,6 +89,7 @@ class TestMongoToS3Operator:
logical_date=DEFAULT_DATE,
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
else:
dag_run = DagRun(
@@ -95,6 +97,7 @@ class TestMongoToS3Operator:
execution_date=DEFAULT_DATE,
run_id="test",
run_type=DagRunType.MANUAL,
+ state=DagRunState.RUNNING,
)
ti = TaskInstance(task=self.mock_operator)
ti.dag_run = dag_run
diff --git a/providers/tests/apache/kylin/operators/test_kylin_cube.py
b/providers/tests/apache/kylin/operators/test_kylin_cube.py
index b23833811f6..c56dbdb33b5 100644
--- a/providers/tests/apache/kylin/operators/test_kylin_cube.py
+++ b/providers/tests/apache/kylin/operators/test_kylin_cube.py
@@ -26,7 +26,7 @@ from airflow.exceptions import AirflowException
from airflow.models import DagRun, TaskInstance
from airflow.models.dag import DAG
from airflow.providers.apache.kylin.operators.kylin_cube import
KylinCubeOperator
-from airflow.utils import timezone
+from airflow.utils import state, timezone
from airflow.utils.types import DagRunType
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
@@ -177,6 +177,7 @@ class TestKylinCubeOperator:
run_id="kylin_test",
logical_date=DEFAULT_DATE,
run_type=DagRunType.MANUAL,
+ state=state.DagRunState.RUNNING,
)
else:
ti.dag_run = DagRun(
@@ -184,6 +185,7 @@ class TestKylinCubeOperator:
run_id="kylin_test",
execution_date=DEFAULT_DATE,
run_type=DagRunType.MANUAL,
+ state=state.DagRunState.RUNNING,
)
session.add(ti)
session.commit()
diff --git a/providers/tests/apache/spark/operators/test_spark_submit.py
b/providers/tests/apache/spark/operators/test_spark_submit.py
index 879a7c999ef..2670340086b 100644
--- a/providers/tests/apache/spark/operators/test_spark_submit.py
+++ b/providers/tests/apache/spark/operators/test_spark_submit.py
@@ -201,6 +201,7 @@ class TestSparkSubmitOperator:
run_id="spark_test",
logical_date=DEFAULT_DATE,
run_type=DagRunType.MANUAL,
+ state="running",
)
else:
ti.dag_run = DagRun(
@@ -208,7 +209,9 @@ class TestSparkSubmitOperator:
run_id="spark_test",
execution_date=DEFAULT_DATE,
run_type=DagRunType.MANUAL,
+ state="running",
)
+
session.add(ti)
session.commit()
# When
diff --git a/providers/tests/openlineage/plugins/test_adapter.py
b/providers/tests/openlineage/plugins/test_adapter.py
index fd7f01ff61e..9e19437142b 100644
--- a/providers/tests/openlineage/plugins/test_adapter.py
+++ b/providers/tests/openlineage/plugins/test_adapter.py
@@ -585,7 +585,7 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, generate_stat
"dag_id": "dag_id",
"data_interval_start": event_time.isoformat(),
"data_interval_end": event_time.isoformat(),
- "external_trigger": None,
+ "external_trigger": False if AIRFLOW_V_3_0_PLUS else None,
"run_id": run_id,
"run_type": None,
"start_date": event_time.isoformat(),
@@ -626,7 +626,7 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, generate_stat
"dag_id": "dag_id",
"data_interval_start": event_time.isoformat(),
"data_interval_end": event_time.isoformat(),
- "external_trigger": None,
+ "external_trigger": False if AIRFLOW_V_3_0_PLUS
else None,
"run_id": run_id,
"run_type": None,
"start_date": event_time.isoformat(),
diff --git a/providers/tests/openlineage/plugins/test_listener.py
b/providers/tests/openlineage/plugins/test_listener.py
index 837873f439d..25d9c24e6e5 100644
--- a/providers/tests/openlineage/plugins/test_listener.py
+++ b/providers/tests/openlineage/plugins/test_listener.py
@@ -101,7 +101,7 @@ def
test_listener_does_not_change_task_instance(render_mock, xcom_push_mock):
run_id=run_id,
data_interval=(date, date),
run_type=types.DagRunType.MANUAL,
- state=DagRunState.QUEUED,
+ state=DagRunState.RUNNING,
**dagrun_kwargs,
)
ti = TaskInstance(t, run_id=run_id)
@@ -187,7 +187,7 @@ def _create_test_dag_and_task(python_callable: Callable,
scenario_name: str) ->
run_id=run_id,
data_interval=(date, date),
run_type=types.DagRunType.MANUAL,
- state=DagRunState.QUEUED,
+ state=DagRunState.RUNNING,
**dagrun_kwargs,
)
task_instance = TaskInstance(t, run_id=run_id)
diff --git a/providers/tests/redis/log/test_redis_task_handler.py
b/providers/tests/redis/log/test_redis_task_handler.py
index 604dcb1c731..2dde6fa402b 100644
--- a/providers/tests/redis/log/test_redis_task_handler.py
+++ b/providers/tests/redis/log/test_redis_task_handler.py
@@ -46,6 +46,7 @@ class TestRedisTaskHandler:
else:
dag_run = DagRun(dag_id=dag.dag_id, execution_date=date,
run_id="test", run_type="scheduled")
+ dag_run.set_state(State.RUNNING)
with create_session() as session:
session.add(dag_run)
session.commit()
diff --git a/providers/tests/standard/operators/test_bash.py
b/providers/tests/standard/operators/test_bash.py
index 81c64048cba..13b3fad3ad4 100644
--- a/providers/tests/standard/operators/test_bash.py
+++ b/providers/tests/standard/operators/test_bash.py
@@ -273,7 +273,7 @@ class TestBashOperator:
# Templated fields
bash_command='echo "{{ dag_run.dag_id }}"',
env={"FOO": "{{ ds }}"},
- cwd="{{ dag_run.dag.folder }}",
+ cwd="{{ task.dag.folder }}",
# Other parameters
dag_id="test_templated_fields_dag",
task_id="test_templated_fields_task",
diff --git a/task_sdk/src/airflow/sdk/api/datamodels/_generated.py
b/task_sdk/src/airflow/sdk/api/datamodels/_generated.py
index 7d8bd25e959..d91ecf841d9 100644
--- a/task_sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task_sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -130,8 +130,8 @@ class TIRescheduleStatePayload(BaseModel):
"""
state: Annotated[Literal["up_for_reschedule"] | None,
Field(title="State")] = "up_for_reschedule"
- end_date: Annotated[datetime, Field(title="End Date")]
reschedule_date: Annotated[datetime, Field(title="Reschedule Date")]
+ end_date: Annotated[datetime, Field(title="End Date")]
class TITargetStatePayload(BaseModel):
@@ -223,6 +223,7 @@ class DagRun(BaseModel):
end_date: Annotated[datetime | None, Field(title="End Date")] = None
run_type: DagRunType
conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None
+ external_trigger: Annotated[bool, Field(title="External Trigger")] = False
class HTTPValidationError(BaseModel):
diff --git a/task_sdk/src/airflow/sdk/execution_time/task_runner.py
b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
index 230f5414a87..c35a79e13c3 100644
--- a/task_sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -96,8 +96,6 @@ class RuntimeTaskInstance(TaskInstance):
# especially after removal of `conf` from Context.
"ti": self,
"outlet_events": OutletEventAccessors(),
- # "expanded_ti_count": expanded_ti_count,
- "expanded_ti_count": None, # TODO: Implement this
# "inlet_events": InletEventsAccessors(task.inlets,
session=session),
"macros": MacrosAccessor(),
# "params": validated_params,
diff --git a/task_sdk/src/airflow/sdk/types.py
b/task_sdk/src/airflow/sdk/types.py
index 1a886b45ed1..fd02104fb2f 100644
--- a/task_sdk/src/airflow/sdk/types.py
+++ b/task_sdk/src/airflow/sdk/types.py
@@ -43,6 +43,7 @@ class DagRunProtocol(Protocol):
end_date: datetime | None
run_type: Any
conf: dict[str, Any] | None
+ external_trigger: bool
class RuntimeTaskInstanceProtocol(Protocol):
diff --git a/task_sdk/tests/execution_time/test_task_runner.py
b/task_sdk/tests/execution_time/test_task_runner.py
index 9889c192e90..17c0029844e 100644
--- a/task_sdk/tests/execution_time/test_task_runner.py
+++ b/task_sdk/tests/execution_time/test_task_runner.py
@@ -615,7 +615,6 @@ class TestRuntimeTaskInstance:
},
"conn": ConnectionAccessor(),
"dag": runtime_ti.task.dag,
- "expanded_ti_count": None,
"inlets": task.inlets,
"macros": MacrosAccessor(),
"map_index_template": task.map_index_template,
@@ -675,7 +674,6 @@ class TestRuntimeTaskInstance:
"logical_date": timezone.datetime(2024, 12, 1, 1, 0, 0),
"ds": "2024-12-01",
"ds_nodash": "20241201",
- "expanded_ti_count": None,
"task_instance_key_str": "basic_task__hello__20241201",
"ts": "2024-12-01T01:00:00+00:00",
"ts_nodash": "20241201T010000",
diff --git a/tests/api_fastapi/execution_api/routes/test_task_instances.py
b/tests/api_fastapi/execution_api/routes/test_task_instances.py
index c11fbb21bb2..e3aef1505bc 100644
--- a/tests/api_fastapi/execution_api/routes/test_task_instances.py
+++ b/tests/api_fastapi/execution_api/routes/test_task_instances.py
@@ -85,6 +85,7 @@ class TestTIRunState:
"data_interval_end": instant_str,
"start_date": instant_str,
"end_date": None,
+ "external_trigger": False,
"run_type": "manual",
"conf": {},
},