This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2193be2a5e5 Fix failures on main related to DagRun validation (#45917)
2193be2a5e5 is described below

commit 2193be2a5e53760ae00d1b85c825087e995f8eb1
Author: Kaxil Naik <kaxiln...@apache.org>
AuthorDate: Wed Jan 22 22:31:16 2025 +0530

    Fix failures on main related to DagRun validation (#45917)
    
    Some test failures like 
https://github.com/apache/airflow/actions/runs/12903540392/job/35979200770?pr=45865
 were not caught in #45834
    
    Follow-up of https://github.com/apache/airflow/pull/45834/
---
 .../execution_api/datamodels/taskinstance.py       |  1 +
 .../cli/commands/remote_commands/task_command.py   |  1 +
 .../providers/edge/example_dags/win_test.py        |  2 +-
 .../airflow/providers/standard/operators/bash.py   |  2 +-
 .../tests/amazon/aws/operators/test_athena.py      |  3 +++
 .../tests/amazon/aws/operators/test_datasync.py    | 11 ++++++++++
 providers/tests/amazon/aws/operators/test_dms.py   |  4 ++++
 .../amazon/aws/operators/test_emr_add_steps.py     |  5 +++++
 .../aws/operators/test_emr_create_job_flow.py      |  5 +++++
 providers/tests/amazon/aws/operators/test_s3.py    | 13 +++++++++--
 .../amazon/aws/operators/test_sagemaker_base.py    | 13 +++++++++--
 providers/tests/amazon/aws/sensors/test_s3.py      | 25 ++++++++++++++++++----
 providers/tests/amazon/aws/transfers/test_base.py  |  3 +++
 .../amazon/aws/transfers/test_dynamodb_to_s3.py    |  3 +++
 .../tests/amazon/aws/transfers/test_mongo_to_s3.py |  3 +++
 .../apache/kylin/operators/test_kylin_cube.py      |  4 +++-
 .../apache/spark/operators/test_spark_submit.py    |  3 +++
 .../tests/openlineage/plugins/test_adapter.py      |  4 ++--
 .../tests/openlineage/plugins/test_listener.py     |  4 ++--
 .../tests/redis/log/test_redis_task_handler.py     |  1 +
 providers/tests/standard/operators/test_bash.py    |  2 +-
 .../src/airflow/sdk/api/datamodels/_generated.py   |  3 ++-
 .../src/airflow/sdk/execution_time/task_runner.py  |  2 --
 task_sdk/src/airflow/sdk/types.py                  |  1 +
 task_sdk/tests/execution_time/test_task_runner.py  |  2 --
 .../execution_api/routes/test_task_instances.py    |  1 +
 26 files changed, 100 insertions(+), 21 deletions(-)

diff --git a/airflow/api_fastapi/execution_api/datamodels/taskinstance.py 
b/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
index bb6d643fb8e..5e8c267b82f 100644
--- a/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
+++ b/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
@@ -183,6 +183,7 @@ class DagRun(BaseModel):
     end_date: UtcDateTime | None
     run_type: DagRunType
     conf: Annotated[dict[str, Any], Field(default_factory=dict)]
+    external_trigger: bool = False
 
 
 class TIRunContext(BaseModel):
diff --git a/airflow/cli/commands/remote_commands/task_command.py 
b/airflow/cli/commands/remote_commands/task_command.py
index 681b478bfc1..b1f9182e4c9 100644
--- a/airflow/cli/commands/remote_commands/task_command.py
+++ b/airflow/cli/commands/remote_commands/task_command.py
@@ -174,6 +174,7 @@ def _get_dag_run(
             dag_id=dag.dag_id,
             run_id=logical_date_or_run_id,
             run_type=DagRunType.MANUAL,
+            external_trigger=True,
             logical_date=dag_run_logical_date,
             
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date),
             triggered_by=DagRunTriggeredByType.CLI,
diff --git a/providers/edge/src/airflow/providers/edge/example_dags/win_test.py 
b/providers/edge/src/airflow/providers/edge/example_dags/win_test.py
index 15735b85d18..3a730009d50 100644
--- a/providers/edge/src/airflow/providers/edge/example_dags/win_test.py
+++ b/providers/edge/src/airflow/providers/edge/example_dags/win_test.py
@@ -81,7 +81,7 @@ class CmdOperator(BaseOperator):
     :param cwd: Working directory to execute the command in (templated).
         If None (default), the command is run in a temporary directory.
         To use current DAG folder as the working directory,
-        you might set template ``{{ dag_run.dag.folder }}``.
+        you might set template ``{{ task.dag.folder }}``.
     :param output_processor: Function to further process the output of the 
script / command
         (default is lambda output: output).
 
diff --git a/providers/src/airflow/providers/standard/operators/bash.py 
b/providers/src/airflow/providers/standard/operators/bash.py
index d3f66380eea..9b4852a151b 100644
--- a/providers/src/airflow/providers/standard/operators/bash.py
+++ b/providers/src/airflow/providers/standard/operators/bash.py
@@ -69,7 +69,7 @@ class BashOperator(BaseOperator):
     :param cwd: Working directory to execute the command in (templated).
         If None (default), the command is run in a temporary directory.
         To use current DAG folder as the working directory,
-        you might set template ``{{ dag_run.dag.folder }}``.
+        you might set template ``{{ task.dag.folder }}``.
         When bash_command is a '.sh' or '.bash' file, Airflow must have write
         access to the working directory. The script will be rendered (Jinja
         template) into a new temporary file in this directory.
diff --git a/providers/tests/amazon/aws/operators/test_athena.py 
b/providers/tests/amazon/aws/operators/test_athena.py
index 63bfa890699..3ce9487c415 100644
--- a/providers/tests/amazon/aws/operators/test_athena.py
+++ b/providers/tests/amazon/aws/operators/test_athena.py
@@ -38,6 +38,7 @@ from airflow.providers.common.compat.openlineage.facet import 
(
 )
 from airflow.providers.openlineage.extractors import OperatorLineage
 from airflow.utils import timezone
+from airflow.utils.state import DagRunState
 from airflow.utils.timezone import datetime
 from airflow.utils.types import DagRunType
 
@@ -239,6 +240,7 @@ class TestAthenaOperator:
                 logical_date=timezone.utcnow(),
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         else:
             dag_run = DagRun(
@@ -246,6 +248,7 @@ class TestAthenaOperator:
                 execution_date=timezone.utcnow(),
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         ti = TaskInstance(task=self.athena)
         ti.dag_run = dag_run
diff --git a/providers/tests/amazon/aws/operators/test_datasync.py 
b/providers/tests/amazon/aws/operators/test_datasync.py
index 272e03d201f..6b6b64caa13 100644
--- a/providers/tests/amazon/aws/operators/test_datasync.py
+++ b/providers/tests/amazon/aws/operators/test_datasync.py
@@ -27,6 +27,7 @@ from airflow.models import DAG, DagRun, TaskInstance
 from airflow.providers.amazon.aws.hooks.datasync import DataSyncHook
 from airflow.providers.amazon.aws.operators.datasync import DataSyncOperator
 from airflow.utils import timezone
+from airflow.utils.state import DagRunState
 from airflow.utils.timezone import datetime
 from airflow.utils.types import DagRunType
 
@@ -358,6 +359,7 @@ class TestDataSyncOperatorCreate(DataSyncTestCaseBase):
                 logical_date=timezone.utcnow(),
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         else:
             dag_run = DagRun(
@@ -365,6 +367,7 @@ class TestDataSyncOperatorCreate(DataSyncTestCaseBase):
                 execution_date=timezone.utcnow(),
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         ti = TaskInstance(task=self.datasync)
         ti.dag_run = dag_run
@@ -570,6 +573,7 @@ class TestDataSyncOperatorGetTasks(DataSyncTestCaseBase):
                 logical_date=timezone.utcnow(),
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         else:
             dag_run = DagRun(
@@ -577,6 +581,7 @@ class TestDataSyncOperatorGetTasks(DataSyncTestCaseBase):
                 execution_date=timezone.utcnow(),
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         ti = TaskInstance(task=self.datasync)
         ti.dag_run = dag_run
@@ -684,6 +689,7 @@ class TestDataSyncOperatorUpdate(DataSyncTestCaseBase):
                 logical_date=timezone.utcnow(),
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         else:
             dag_run = DagRun(
@@ -691,6 +697,7 @@ class TestDataSyncOperatorUpdate(DataSyncTestCaseBase):
                 execution_date=timezone.utcnow(),
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         ti = TaskInstance(task=self.datasync)
         ti.dag_run = dag_run
@@ -870,6 +877,7 @@ class TestDataSyncOperator(DataSyncTestCaseBase):
                 logical_date=timezone.utcnow(),
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         else:
             dag_run = DagRun(
@@ -877,6 +885,7 @@ class TestDataSyncOperator(DataSyncTestCaseBase):
                 execution_date=timezone.utcnow(),
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         ti = TaskInstance(task=self.datasync)
         ti.dag_run = dag_run
@@ -980,6 +989,7 @@ class TestDataSyncOperatorDelete(DataSyncTestCaseBase):
                 logical_date=timezone.utcnow(),
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         else:
             dag_run = DagRun(
@@ -987,6 +997,7 @@ class TestDataSyncOperatorDelete(DataSyncTestCaseBase):
                 execution_date=timezone.utcnow(),
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         ti = TaskInstance(task=self.datasync)
         ti.dag_run = dag_run
diff --git a/providers/tests/amazon/aws/operators/test_dms.py 
b/providers/tests/amazon/aws/operators/test_dms.py
index 495beed5446..0b414234d9f 100644
--- a/providers/tests/amazon/aws/operators/test_dms.py
+++ b/providers/tests/amazon/aws/operators/test_dms.py
@@ -45,6 +45,7 @@ from airflow.providers.amazon.aws.triggers.dms import (
     DmsReplicationTerminalStatusTrigger,
 )
 from airflow.utils import timezone
+from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunType
 
 from providers.tests.amazon.aws.utils.test_template_fields import 
validate_template_fields
@@ -298,6 +299,7 @@ class TestDmsDescribeTasksOperator:
                 logical_date=timezone.utcnow(),
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         else:
             dag_run = DagRun(
@@ -305,6 +307,7 @@ class TestDmsDescribeTasksOperator:
                 execution_date=timezone.utcnow(),
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         ti = TaskInstance(task=describe_task)
         ti.dag_run = dag_run
@@ -491,6 +494,7 @@ class TestDmsDescribeReplicationConfigsOperator:
             dag_id=dag.dag_id,
             run_id="test",
             run_type=DagRunType.MANUAL,
+            state=DagRunState.RUNNING,
         )
         ti = TaskInstance(task=op)
         ti.dag_run = dag_run
diff --git a/providers/tests/amazon/aws/operators/test_emr_add_steps.py 
b/providers/tests/amazon/aws/operators/test_emr_add_steps.py
index 8bd2e9325d5..c30f0b6d418 100644
--- a/providers/tests/amazon/aws/operators/test_emr_add_steps.py
+++ b/providers/tests/amazon/aws/operators/test_emr_add_steps.py
@@ -30,6 +30,7 @@ from airflow.models import DAG, DagRun, TaskInstance
 from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator
 from airflow.providers.amazon.aws.triggers.emr import EmrAddStepsTrigger
 from airflow.utils import timezone
+from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunType
 
 from providers.tests.amazon.aws.utils.test_template_fields import 
validate_template_fields
@@ -105,6 +106,7 @@ class TestEmrAddStepsOperator:
                 logical_date=DEFAULT_DATE,
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         else:
             dag_run = DagRun(
@@ -112,6 +114,7 @@ class TestEmrAddStepsOperator:
                 execution_date=DEFAULT_DATE,
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         ti = TaskInstance(task=self.operator)
         ti.dag_run = dag_run
@@ -170,6 +173,7 @@ class TestEmrAddStepsOperator:
                 logical_date=timezone.utcnow(),
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         else:
             dag_run = DagRun(
@@ -177,6 +181,7 @@ class TestEmrAddStepsOperator:
                 execution_date=timezone.utcnow(),
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         ti = TaskInstance(task=test_task)
         ti.dag_run = dag_run
diff --git a/providers/tests/amazon/aws/operators/test_emr_create_job_flow.py 
b/providers/tests/amazon/aws/operators/test_emr_create_job_flow.py
index cc39fec5269..8131477e5e4 100644
--- a/providers/tests/amazon/aws/operators/test_emr_create_job_flow.py
+++ b/providers/tests/amazon/aws/operators/test_emr_create_job_flow.py
@@ -32,6 +32,7 @@ from airflow.providers.amazon.aws.operators.emr import 
EmrCreateJobFlowOperator
 from airflow.providers.amazon.aws.triggers.emr import EmrCreateJobFlowTrigger
 from airflow.providers.amazon.aws.utils.waiter import 
WAITER_POLICY_NAME_MAPPING, WaitPolicy
 from airflow.utils import timezone
+from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunType
 
 from providers.tests.amazon.aws.utils.test_template_fields import 
validate_template_fields
@@ -104,6 +105,7 @@ class TestEmrCreateJobFlowOperator:
                 logical_date=DEFAULT_DATE,
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         else:
             dag_run = DagRun(
@@ -111,6 +113,7 @@ class TestEmrCreateJobFlowOperator:
                 execution_date=DEFAULT_DATE,
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         ti = TaskInstance(task=self.operator)
         ti.dag_run = dag_run
@@ -150,6 +153,7 @@ class TestEmrCreateJobFlowOperator:
                 logical_date=DEFAULT_DATE,
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         else:
             dag_run = DagRun(
@@ -157,6 +161,7 @@ class TestEmrCreateJobFlowOperator:
                 execution_date=DEFAULT_DATE,
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         ti = TaskInstance(task=self.operator)
         ti.dag_run = dag_run
diff --git a/providers/tests/amazon/aws/operators/test_s3.py 
b/providers/tests/amazon/aws/operators/test_s3.py
index 760a6ab53fb..ab8508ec5f8 100644
--- a/providers/tests/amazon/aws/operators/test_s3.py
+++ b/providers/tests/amazon/aws/operators/test_s3.py
@@ -55,6 +55,7 @@ from airflow.providers.common.compat.openlineage.facet import 
(
     PreviousIdentifier,
 )
 from airflow.providers.openlineage.extractors import OperatorLineage
+from airflow.utils.state import DagRunState
 from airflow.utils.timezone import datetime, utcnow
 from airflow.utils.types import DagRunType
 
@@ -654,11 +655,19 @@ class TestS3DeleteObjectsOperator:
         )
         if hasattr(DagRun, "execution_date"):  # Airflow 2.x.
             dag_run = DagRun(
-                dag_id=dag.dag_id, execution_date=logical_date, run_id="test", 
run_type=DagRunType.MANUAL
+                dag_id=dag.dag_id,
+                execution_date=logical_date,
+                run_id="test",
+                run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         else:
             dag_run = DagRun(
-                dag_id=dag.dag_id, logical_date=logical_date, run_id="test", 
run_type=DagRunType.MANUAL
+                dag_id=dag.dag_id,
+                logical_date=logical_date,
+                run_id="test",
+                run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         ti = TaskInstance(task=op)
         ti.dag_run = dag_run
diff --git a/providers/tests/amazon/aws/operators/test_sagemaker_base.py 
b/providers/tests/amazon/aws/operators/test_sagemaker_base.py
index 3ae0c2d95ed..408bc3dc579 100644
--- a/providers/tests/amazon/aws/operators/test_sagemaker_base.py
+++ b/providers/tests/amazon/aws/operators/test_sagemaker_base.py
@@ -31,6 +31,7 @@ from airflow.providers.amazon.aws.operators.sagemaker import (
     SageMakerCreateExperimentOperator,
 )
 from airflow.utils import timezone
+from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunType
 
 from providers.tests.amazon.aws.utils.test_template_fields import 
validate_template_fields
@@ -209,11 +210,19 @@ class TestSageMakerExperimentOperator:
         )
         if AIRFLOW_V_3_0_PLUS:
             dag_run = DagRun(
-                dag_id=dag.dag_id, logical_date=logical_date, run_id="test", 
run_type=DagRunType.MANUAL
+                dag_id=dag.dag_id,
+                logical_date=logical_date,
+                run_id="test",
+                run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         else:
             dag_run = DagRun(
-                dag_id=dag.dag_id, execution_date=logical_date, run_id="test", 
run_type=DagRunType.MANUAL
+                dag_id=dag.dag_id,
+                execution_date=logical_date,
+                run_id="test",
+                run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         ti = TaskInstance(task=op)
         ti.dag_run = dag_run
diff --git a/providers/tests/amazon/aws/sensors/test_s3.py 
b/providers/tests/amazon/aws/sensors/test_s3.py
index 8b241168392..9c169f86cf0 100644
--- a/providers/tests/amazon/aws/sensors/test_s3.py
+++ b/providers/tests/amazon/aws/sensors/test_s3.py
@@ -30,6 +30,7 @@ from airflow.models.variable import Variable
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor, 
S3KeysUnchangedSensor
 from airflow.utils import timezone
+from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
@@ -129,11 +130,19 @@ class TestS3KeySensor:
 
         if AIRFLOW_V_3_0_PLUS:
             dag_run = DagRun(
-                dag_id=dag.dag_id, logical_date=logical_date, run_id="test", 
run_type=DagRunType.MANUAL
+                dag_id=dag.dag_id,
+                logical_date=logical_date,
+                run_id="test",
+                run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         else:
             dag_run = DagRun(
-                dag_id=dag.dag_id, execution_date=logical_date, run_id="test", 
run_type=DagRunType.MANUAL
+                dag_id=dag.dag_id,
+                execution_date=logical_date,
+                run_id="test",
+                run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         ti = TaskInstance(task=op)
         ti.dag_run = dag_run
@@ -164,11 +173,19 @@ class TestS3KeySensor:
         )
         if hasattr(DagRun, "execution_date"):  # Airflow 2.x.
             dag_run = DagRun(
-                dag_id=dag.dag_id, execution_date=logical_date, run_id="test", 
run_type=DagRunType.MANUAL
+                dag_id=dag.dag_id,
+                execution_date=logical_date,
+                run_id="test",
+                run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         else:
             dag_run = DagRun(
-                dag_id=dag.dag_id, logical_date=logical_date, run_id="test", 
run_type=DagRunType.MANUAL
+                dag_id=dag.dag_id,
+                logical_date=logical_date,
+                run_id="test",
+                run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         ti = TaskInstance(task=op)
         ti.dag_run = dag_run
diff --git a/providers/tests/amazon/aws/transfers/test_base.py 
b/providers/tests/amazon/aws/transfers/test_base.py
index a09f08b0d18..235955bd6eb 100644
--- a/providers/tests/amazon/aws/transfers/test_base.py
+++ b/providers/tests/amazon/aws/transfers/test_base.py
@@ -23,6 +23,7 @@ from airflow import DAG
 from airflow.models import DagRun, TaskInstance
 from airflow.providers.amazon.aws.transfers.base import AwsToAwsBaseOperator
 from airflow.utils import timezone
+from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
@@ -50,6 +51,7 @@ class TestAwsToAwsBaseOperator:
                 run_id="something",
                 logical_date=timezone.datetime(2020, 1, 1),
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         else:
             ti.dag_run = DagRun(
@@ -57,6 +59,7 @@ class TestAwsToAwsBaseOperator:
                 run_id="something",
                 execution_date=timezone.datetime(2020, 1, 1),
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         session.add(ti)
         session.commit()
diff --git a/providers/tests/amazon/aws/transfers/test_dynamodb_to_s3.py 
b/providers/tests/amazon/aws/transfers/test_dynamodb_to_s3.py
index 7304492c1ad..dcd3b6058a6 100644
--- a/providers/tests/amazon/aws/transfers/test_dynamodb_to_s3.py
+++ b/providers/tests/amazon/aws/transfers/test_dynamodb_to_s3.py
@@ -31,6 +31,7 @@ from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 
import (
     JSONEncoder,
 )
 from airflow.utils import timezone
+from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
@@ -280,6 +281,7 @@ class TestDynamodbToS3:
                 run_id="something",
                 logical_date=timezone.datetime(2020, 1, 1),
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         else:
             ti.dag_run = DagRun(
@@ -287,6 +289,7 @@ class TestDynamodbToS3:
                 run_id="something",
                 execution_date=timezone.datetime(2020, 1, 1),
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         session.add(ti)
         session.commit()
diff --git a/providers/tests/amazon/aws/transfers/test_mongo_to_s3.py 
b/providers/tests/amazon/aws/transfers/test_mongo_to_s3.py
index 4698469a650..764faf6dadb 100644
--- a/providers/tests/amazon/aws/transfers/test_mongo_to_s3.py
+++ b/providers/tests/amazon/aws/transfers/test_mongo_to_s3.py
@@ -24,6 +24,7 @@ import pytest
 from airflow.models import DAG, DagRun, TaskInstance
 from airflow.providers.amazon.aws.transfers.mongo_to_s3 import 
MongoToS3Operator
 from airflow.utils import timezone
+from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
@@ -88,6 +89,7 @@ class TestMongoToS3Operator:
                 logical_date=DEFAULT_DATE,
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         else:
             dag_run = DagRun(
@@ -95,6 +97,7 @@ class TestMongoToS3Operator:
                 execution_date=DEFAULT_DATE,
                 run_id="test",
                 run_type=DagRunType.MANUAL,
+                state=DagRunState.RUNNING,
             )
         ti = TaskInstance(task=self.mock_operator)
         ti.dag_run = dag_run
diff --git a/providers/tests/apache/kylin/operators/test_kylin_cube.py 
b/providers/tests/apache/kylin/operators/test_kylin_cube.py
index b23833811f6..c56dbdb33b5 100644
--- a/providers/tests/apache/kylin/operators/test_kylin_cube.py
+++ b/providers/tests/apache/kylin/operators/test_kylin_cube.py
@@ -26,7 +26,7 @@ from airflow.exceptions import AirflowException
 from airflow.models import DagRun, TaskInstance
 from airflow.models.dag import DAG
 from airflow.providers.apache.kylin.operators.kylin_cube import 
KylinCubeOperator
-from airflow.utils import timezone
+from airflow.utils import state, timezone
 from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
@@ -177,6 +177,7 @@ class TestKylinCubeOperator:
                 run_id="kylin_test",
                 logical_date=DEFAULT_DATE,
                 run_type=DagRunType.MANUAL,
+                state=state.DagRunState.RUNNING,
             )
         else:
             ti.dag_run = DagRun(
@@ -184,6 +185,7 @@ class TestKylinCubeOperator:
                 run_id="kylin_test",
                 execution_date=DEFAULT_DATE,
                 run_type=DagRunType.MANUAL,
+                state=state.DagRunState.RUNNING,
             )
         session.add(ti)
         session.commit()
diff --git a/providers/tests/apache/spark/operators/test_spark_submit.py 
b/providers/tests/apache/spark/operators/test_spark_submit.py
index 879a7c999ef..2670340086b 100644
--- a/providers/tests/apache/spark/operators/test_spark_submit.py
+++ b/providers/tests/apache/spark/operators/test_spark_submit.py
@@ -201,6 +201,7 @@ class TestSparkSubmitOperator:
                 run_id="spark_test",
                 logical_date=DEFAULT_DATE,
                 run_type=DagRunType.MANUAL,
+                state="running",
             )
         else:
             ti.dag_run = DagRun(
@@ -208,7 +209,9 @@ class TestSparkSubmitOperator:
                 run_id="spark_test",
                 execution_date=DEFAULT_DATE,
                 run_type=DagRunType.MANUAL,
+                state="running",
             )
+
         session.add(ti)
         session.commit()
         # When
diff --git a/providers/tests/openlineage/plugins/test_adapter.py 
b/providers/tests/openlineage/plugins/test_adapter.py
index fd7f01ff61e..9e19437142b 100644
--- a/providers/tests/openlineage/plugins/test_adapter.py
+++ b/providers/tests/openlineage/plugins/test_adapter.py
@@ -585,7 +585,7 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
             "dag_id": "dag_id",
             "data_interval_start": event_time.isoformat(),
             "data_interval_end": event_time.isoformat(),
-            "external_trigger": None,
+            "external_trigger": False if AIRFLOW_V_3_0_PLUS else None,
             "run_id": run_id,
             "run_type": None,
             "start_date": event_time.isoformat(),
@@ -626,7 +626,7 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
                             "dag_id": "dag_id",
                             "data_interval_start": event_time.isoformat(),
                             "data_interval_end": event_time.isoformat(),
-                            "external_trigger": None,
+                            "external_trigger": False if AIRFLOW_V_3_0_PLUS 
else None,
                             "run_id": run_id,
                             "run_type": None,
                             "start_date": event_time.isoformat(),
diff --git a/providers/tests/openlineage/plugins/test_listener.py 
b/providers/tests/openlineage/plugins/test_listener.py
index 837873f439d..25d9c24e6e5 100644
--- a/providers/tests/openlineage/plugins/test_listener.py
+++ b/providers/tests/openlineage/plugins/test_listener.py
@@ -101,7 +101,7 @@ def 
test_listener_does_not_change_task_instance(render_mock, xcom_push_mock):
         run_id=run_id,
         data_interval=(date, date),
         run_type=types.DagRunType.MANUAL,
-        state=DagRunState.QUEUED,
+        state=DagRunState.RUNNING,
         **dagrun_kwargs,
     )
     ti = TaskInstance(t, run_id=run_id)
@@ -187,7 +187,7 @@ def _create_test_dag_and_task(python_callable: Callable, 
scenario_name: str) ->
         run_id=run_id,
         data_interval=(date, date),
         run_type=types.DagRunType.MANUAL,
-        state=DagRunState.QUEUED,
+        state=DagRunState.RUNNING,
         **dagrun_kwargs,
     )
     task_instance = TaskInstance(t, run_id=run_id)
diff --git a/providers/tests/redis/log/test_redis_task_handler.py 
b/providers/tests/redis/log/test_redis_task_handler.py
index 604dcb1c731..2dde6fa402b 100644
--- a/providers/tests/redis/log/test_redis_task_handler.py
+++ b/providers/tests/redis/log/test_redis_task_handler.py
@@ -46,6 +46,7 @@ class TestRedisTaskHandler:
         else:
             dag_run = DagRun(dag_id=dag.dag_id, execution_date=date, 
run_id="test", run_type="scheduled")
 
+        dag_run.set_state(State.RUNNING)
         with create_session() as session:
             session.add(dag_run)
             session.commit()
diff --git a/providers/tests/standard/operators/test_bash.py 
b/providers/tests/standard/operators/test_bash.py
index 81c64048cba..13b3fad3ad4 100644
--- a/providers/tests/standard/operators/test_bash.py
+++ b/providers/tests/standard/operators/test_bash.py
@@ -273,7 +273,7 @@ class TestBashOperator:
             # Templated fields
             bash_command='echo "{{ dag_run.dag_id }}"',
             env={"FOO": "{{ ds }}"},
-            cwd="{{ dag_run.dag.folder }}",
+            cwd="{{ task.dag.folder }}",
             # Other parameters
             dag_id="test_templated_fields_dag",
             task_id="test_templated_fields_task",
diff --git a/task_sdk/src/airflow/sdk/api/datamodels/_generated.py 
b/task_sdk/src/airflow/sdk/api/datamodels/_generated.py
index 7d8bd25e959..d91ecf841d9 100644
--- a/task_sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task_sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -130,8 +130,8 @@ class TIRescheduleStatePayload(BaseModel):
     """
 
     state: Annotated[Literal["up_for_reschedule"] | None, 
Field(title="State")] = "up_for_reschedule"
-    end_date: Annotated[datetime, Field(title="End Date")]
     reschedule_date: Annotated[datetime, Field(title="Reschedule Date")]
+    end_date: Annotated[datetime, Field(title="End Date")]
 
 
 class TITargetStatePayload(BaseModel):
@@ -223,6 +223,7 @@ class DagRun(BaseModel):
     end_date: Annotated[datetime | None, Field(title="End Date")] = None
     run_type: DagRunType
     conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None
+    external_trigger: Annotated[bool, Field(title="External Trigger")] = False
 
 
 class HTTPValidationError(BaseModel):
diff --git a/task_sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
index 230f5414a87..c35a79e13c3 100644
--- a/task_sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -96,8 +96,6 @@ class RuntimeTaskInstance(TaskInstance):
             #   especially after removal of `conf` from Context.
             "ti": self,
             "outlet_events": OutletEventAccessors(),
-            # "expanded_ti_count": expanded_ti_count,
-            "expanded_ti_count": None,  # TODO: Implement this
             # "inlet_events": InletEventsAccessors(task.inlets, 
session=session),
             "macros": MacrosAccessor(),
             # "params": validated_params,
diff --git a/task_sdk/src/airflow/sdk/types.py 
b/task_sdk/src/airflow/sdk/types.py
index 1a886b45ed1..fd02104fb2f 100644
--- a/task_sdk/src/airflow/sdk/types.py
+++ b/task_sdk/src/airflow/sdk/types.py
@@ -43,6 +43,7 @@ class DagRunProtocol(Protocol):
     end_date: datetime | None
     run_type: Any
     conf: dict[str, Any] | None
+    external_trigger: bool
 
 
 class RuntimeTaskInstanceProtocol(Protocol):
diff --git a/task_sdk/tests/execution_time/test_task_runner.py 
b/task_sdk/tests/execution_time/test_task_runner.py
index 9889c192e90..17c0029844e 100644
--- a/task_sdk/tests/execution_time/test_task_runner.py
+++ b/task_sdk/tests/execution_time/test_task_runner.py
@@ -615,7 +615,6 @@ class TestRuntimeTaskInstance:
             },
             "conn": ConnectionAccessor(),
             "dag": runtime_ti.task.dag,
-            "expanded_ti_count": None,
             "inlets": task.inlets,
             "macros": MacrosAccessor(),
             "map_index_template": task.map_index_template,
@@ -675,7 +674,6 @@ class TestRuntimeTaskInstance:
             "logical_date": timezone.datetime(2024, 12, 1, 1, 0, 0),
             "ds": "2024-12-01",
             "ds_nodash": "20241201",
-            "expanded_ti_count": None,
             "task_instance_key_str": "basic_task__hello__20241201",
             "ts": "2024-12-01T01:00:00+00:00",
             "ts_nodash": "20241201T010000",
diff --git a/tests/api_fastapi/execution_api/routes/test_task_instances.py 
b/tests/api_fastapi/execution_api/routes/test_task_instances.py
index c11fbb21bb2..e3aef1505bc 100644
--- a/tests/api_fastapi/execution_api/routes/test_task_instances.py
+++ b/tests/api_fastapi/execution_api/routes/test_task_instances.py
@@ -85,6 +85,7 @@ class TestTIRunState:
                 "data_interval_end": instant_str,
                 "start_date": instant_str,
                 "end_date": None,
+                "external_trigger": False,
                 "run_type": "manual",
                 "conf": {},
             },

Reply via email to