This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 02d83b0768f Refactor DAG.create_dagrun() arguments (#45370)
02d83b0768f is described below

commit 02d83b0768ff8ad3024b3a42cf8829789867861f
Author: Tzu-ping Chung <uranu...@gmail.com>
AuthorDate: Wed Jan 15 20:08:43 2025 +0800

    Refactor DAG.create_dagrun() arguments (#45370)
---
 airflow/api/common/trigger_dag.py                  |   7 +-
 .../api_connexion/endpoints/dag_run_endpoint.py    |   9 +-
 .../api_fastapi/core_api/routes/public/dag_run.py  |  20 +-
 .../cli/commands/remote_commands/task_command.py   |  10 +-
 airflow/jobs/scheduler_job_runner.py               |  39 ++--
 airflow/models/backfill.py                         |  23 ++-
 airflow/models/dag.py                              | 177 +++++++----------
 airflow/models/dagrun.py                           |  14 +-
 airflow/www/views.py                               |  22 ++-
 dev/perf/scheduler_dag_execution_timing.py         |  12 +-
 docs/apache-airflow/best-practices.rst             |  11 +-
 .../kubernetes/log_handlers/test_log_handlers.py   |  30 ++-
 providers/tests/common/sql/operators/test_sql.py   | 215 +++++++++------------
 .../tests/openlineage/plugins/test_execution.py    |  27 ++-
 .../tests/openlineage/plugins/test_listener.py     |  67 +++++--
 providers/tests/openlineage/plugins/test_utils.py  |  15 +-
 .../commands/remote_commands/test_task_command.py  |  57 +++++-
 tests/decorators/test_python.py                    |   6 +-
 tests/jobs/test_local_task_job.py                  |  12 ++
 tests/jobs/test_scheduler_job.py                   | 100 ++++++----
 tests/models/test_cleartasks.py                    |   1 +
 tests/models/test_dag.py                           | 156 +++++++--------
 tests/models/test_dagrun.py                        | 183 +++++-------------
 tests/models/test_taskinstance.py                  |   3 +
 tests/sensors/test_external_task_sensor.py         |  69 +++----
 tests/task/test_standard_task_runner.py            |  30 ++-
 tests/utils/test_log_handlers.py                   | 100 ++++------
 tests/utils/test_sqlalchemy.py                     |   3 +
 tests/utils/test_state.py                          |   5 +
 tests/utils/test_types.py                          |  74 +++----
 tests/www/test_utils.py                            |  12 +-
 tests/www/views/test_views.py                      |  34 +---
 tests/www/views/test_views_acl.py                  |   1 +
 tests/www/views/test_views_dagrun.py               |   4 +
 tests/www/views/test_views_decorators.py           |   2 +
 tests/www/views/test_views_extra_links.py          |   1 +
 tests/www/views/test_views_log.py                  |   2 +
 tests/www/views/test_views_rendered.py             |   2 +
 tests/www/views/test_views_tasks.py                |   3 +
 tests_common/pytest_plugin.py                      |  29 ++-
 40 files changed, 784 insertions(+), 803 deletions(-)

diff --git a/airflow/api/common/trigger_dag.py 
b/airflow/api/common/trigger_dag.py
index fcc335b8b58..d04a790c02f 100644
--- a/airflow/api/common/trigger_dag.py
+++ b/airflow/api/common/trigger_dag.py
@@ -101,12 +101,13 @@ def _trigger_dag(
     dag_run = dag.create_dagrun(
         run_id=run_id,
         logical_date=logical_date,
-        state=DagRunState.QUEUED,
+        data_interval=data_interval,
         conf=run_conf,
+        run_type=DagRunType.MANUAL,
+        triggered_by=triggered_by,
         external_trigger=True,
         dag_version=dag_version,
-        data_interval=data_interval,
-        triggered_by=triggered_by,
+        state=DagRunState.QUEUED,
         session=session,
     )
 
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py 
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 4574a7b77c7..0fd7f537a5e 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -347,18 +347,17 @@ def post_dag_run(*, dag_id: str, session: Session = 
NEW_SESSION) -> APIResponse:
                 )
             else:
                 data_interval = 
dag.timetable.infer_manual_data_interval(run_after=logical_date)
-            dag_version = DagVersion.get_latest_version(dag.dag_id)
             dag_run = dag.create_dagrun(
-                run_type=DagRunType.MANUAL,
                 run_id=run_id,
                 logical_date=logical_date,
                 data_interval=data_interval,
-                state=DagRunState.QUEUED,
                 conf=post_body.get("conf"),
+                run_type=DagRunType.MANUAL,
+                triggered_by=DagRunTriggeredByType.REST_API,
                 external_trigger=True,
-                dag_version=dag_version,
+                dag_version=DagVersion.get_latest_version(dag.dag_id),
+                state=DagRunState.QUEUED,
                 session=session,
-                triggered_by=DagRunTriggeredByType.REST_API,
             )
             dag_run_note = post_body.get("note")
             if dag_run_note:
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py 
b/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 5c87ce14562..9b8cfd6727b 100644
--- a/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -342,7 +342,6 @@ def trigger_dag_run(
             f"DAG with dag_id: '{dag_id}' has import errors and cannot be 
triggered",
         )
 
-    run_id = body.dag_run_id
     logical_date = pendulum.instance(body.logical_date)
 
     try:
@@ -355,18 +354,27 @@ def trigger_dag_run(
             )
         else:
             data_interval = 
dag.timetable.infer_manual_data_interval(run_after=logical_date)
-        dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+        if body.dag_run_id:
+            run_id = body.dag_run_id
+        else:
+            run_id = dag.timetable.generate_run_id(
+                run_type=DagRunType.MANUAL,
+                logical_date=logical_date,
+                data_interval=data_interval,
+            )
+
         dag_run = dag.create_dagrun(
-            run_type=DagRunType.MANUAL,
             run_id=run_id,
             logical_date=logical_date,
             data_interval=data_interval,
-            state=DagRunState.QUEUED,
             conf=body.conf,
+            run_type=DagRunType.MANUAL,
+            triggered_by=DagRunTriggeredByType.REST_API,
             external_trigger=True,
-            dag_version=dag_version,
+            dag_version=DagVersion.get_latest_version(dag.dag_id),
+            state=DagRunState.QUEUED,
             session=session,
-            triggered_by=DagRunTriggeredByType.REST_API,
         )
         dag_run_note = body.note
         if dag_run_note:
diff --git a/airflow/cli/commands/remote_commands/task_command.py 
b/airflow/cli/commands/remote_commands/task_command.py
index 2329ee25bcc..51198af7496 100644
--- a/airflow/cli/commands/remote_commands/task_command.py
+++ b/airflow/cli/commands/remote_commands/task_command.py
@@ -66,7 +66,7 @@ from airflow.utils.providers_configuration_loader import 
providers_configuration
 from airflow.utils.session import NEW_SESSION, create_session, provide_session
 from airflow.utils.state import DagRunState
 from airflow.utils.task_instance_session import 
set_current_task_instance_session
-from airflow.utils.types import DagRunTriggeredByType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
 if TYPE_CHECKING:
     from typing import Literal
@@ -180,12 +180,14 @@ def _get_dag_run(
         return dag_run, True
     elif create_if_necessary == "db":
         dag_run = dag.create_dagrun(
-            state=DagRunState.QUEUED,
-            logical_date=dag_run_logical_date,
             run_id=_generate_temporary_run_id(),
+            logical_date=dag_run_logical_date,
             
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date),
-            session=session,
+            run_type=DagRunType.MANUAL,
             triggered_by=DagRunTriggeredByType.CLI,
+            dag_version=None,
+            state=DagRunState.QUEUED,
+            session=session,
         )
         return dag_run, True
     raise ValueError(f"unknown create_if_necessary value: 
{create_if_necessary!r}")
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 4718b824830..f8a2eb32953 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1344,15 +1344,19 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
                 try:
                     dag.create_dagrun(
-                        run_type=DagRunType.SCHEDULED,
+                        run_id=dag.timetable.generate_run_id(
+                            run_type=DagRunType.SCHEDULED,
+                            logical_date=dag_model.next_dagrun,
+                            data_interval=data_interval,
+                        ),
                         logical_date=dag_model.next_dagrun,
-                        state=DagRunState.QUEUED,
                         data_interval=data_interval,
-                        external_trigger=False,
-                        session=session,
+                        run_type=DagRunType.SCHEDULED,
+                        triggered_by=DagRunTriggeredByType.TIMETABLE,
                         dag_version=latest_dag_version,
+                        state=DagRunState.QUEUED,
                         creating_job_id=self.job.id,
-                        triggered_by=DagRunTriggeredByType.TIMETABLE,
+                        session=session,
                     )
                     active_runs_of_dags[dag.dag_id] += 1
                 # Exceptions like ValueError, ParamValidationError, etc. are 
raised by
@@ -1448,25 +1452,22 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 ).all()
 
                 data_interval = 
dag.timetable.data_interval_for_events(logical_date, asset_events)
-                run_id = dag.timetable.generate_run_id(
-                    run_type=DagRunType.ASSET_TRIGGERED,
-                    logical_date=logical_date,
-                    data_interval=data_interval,
-                    session=session,
-                    events=asset_events,
-                )
-
                 dag_run = dag.create_dagrun(
-                    run_id=run_id,
-                    run_type=DagRunType.ASSET_TRIGGERED,
+                    run_id=dag.timetable.generate_run_id(
+                        run_type=DagRunType.ASSET_TRIGGERED,
+                        logical_date=logical_date,
+                        data_interval=data_interval,
+                        session=session,
+                        events=asset_events,
+                    ),
                     logical_date=logical_date,
                     data_interval=data_interval,
-                    state=DagRunState.QUEUED,
-                    external_trigger=False,
-                    session=session,
+                    run_type=DagRunType.ASSET_TRIGGERED,
+                    triggered_by=DagRunTriggeredByType.ASSET,
                     dag_version=latest_dag_version,
+                    state=DagRunState.QUEUED,
                     creating_job_id=self.job.id,
-                    triggered_by=DagRunTriggeredByType.ASSET,
+                    session=session,
                 )
                 Stats.incr("asset.triggered_dagruns")
                 dag_run.consumed_asset_events.extend(asset_events)
diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py
index 39a28379a65..3003b1a5347 100644
--- a/airflow/models/backfill.py
+++ b/airflow/models/backfill.py
@@ -54,6 +54,8 @@ from airflow.utils.types import DagRunTriggeredByType, 
DagRunType
 if TYPE_CHECKING:
     from datetime import datetime
 
+    from airflow.models.dag import DAG
+    from airflow.timetables.base import DagRunInfo
 
 log = logging.getLogger(__name__)
 
@@ -223,8 +225,8 @@ def _do_dry_run(*, dag_id, from_date, to_date, reverse, 
reprocess_behavior, sess
 
 def _create_backfill_dag_run(
     *,
-    dag,
-    info,
+    dag: DAG,
+    info: DagRunInfo,
     reprocess_behavior: ReprocessBehavior,
     backfill_id,
     dag_run_conf,
@@ -257,18 +259,21 @@ def _create_backfill_dag_run(
                 return
         dag_version = DagVersion.get_latest_version(dag.dag_id, 
session=session)
         dr = dag.create_dagrun(
-            triggered_by=DagRunTriggeredByType.BACKFILL,
+            run_id=dag.timetable.generate_run_id(
+                run_type=DagRunType.BACKFILL_JOB,
+                logical_date=info.logical_date,
+                data_interval=info.data_interval,
+            ),
             logical_date=info.logical_date,
             data_interval=info.data_interval,
-            start_date=timezone.utcnow(),
-            state=DagRunState.QUEUED,
-            external_trigger=False,
             conf=dag_run_conf,
             run_type=DagRunType.BACKFILL_JOB,
-            creating_job_id=None,
-            session=session,
-            backfill_id=backfill_id,
+            triggered_by=DagRunTriggeredByType.BACKFILL,
             dag_version=dag_version,
+            state=DagRunState.QUEUED,
+            start_date=timezone.utcnow(),
+            backfill_id=backfill_id,
+            session=session,
         )
         session.add(
             BackfillDagRun(
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 0f7d4d4925c..9c3451dc637 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -246,31 +246,24 @@ def _triggerer_is_healthy():
 
 @provide_session
 def _create_orm_dagrun(
-    dag,
-    dag_id,
-    run_id,
-    logical_date,
-    start_date,
-    external_trigger,
-    conf,
-    state,
-    run_type,
-    dag_version,
-    creating_job_id,
-    data_interval,
-    backfill_id,
-    session,
-    triggered_by,
-):
-    bundle_version = session.scalar(
-        select(
-            DagModel.latest_bundle_version,
-        ).where(
-            DagModel.dag_id == dag.dag_id,
-        )
-    )
+    *,
+    dag: DAG,
+    run_id: str,
+    logical_date: datetime | None,
+    data_interval: DataInterval | None,
+    start_date: datetime | None,
+    external_trigger: bool,
+    conf: Any,
+    state: DagRunState | None,
+    run_type: DagRunType,
+    dag_version: DagVersion | None,
+    creating_job_id: int | None,
+    backfill_id: int | None,
+    triggered_by: DagRunTriggeredByType,
+    session: Session = NEW_SESSION,
+) -> DagRun:
     run = DagRun(
-        dag_id=dag_id,
+        dag_id=dag.dag_id,
         run_id=run_id,
         logical_date=logical_date,
         start_date=start_date,
@@ -283,7 +276,9 @@ def _create_orm_dagrun(
         data_interval=data_interval,
         triggered_by=triggered_by,
         backfill_id=backfill_id,
-        bundle_version=bundle_version,
+        bundle_version=session.scalar(
+            select(DagModel.latest_bundle_version).where(DagModel.dag_id == 
dag.dag_id)
+        ),
     )
     # Load defaults into the following two fields to ensure result can be 
serialized detached
     run.log_template_id = 
int(session.scalar(select(func.max(LogTemplate.__table__.c.id))))
@@ -1735,86 +1730,63 @@ class DAG(TaskSDKDag, LoggingMixin):
     @provide_session
     def create_dagrun(
         self,
-        state: DagRunState,
         *,
-        triggered_by: DagRunTriggeredByType | None,
-        logical_date: datetime | None = None,
-        run_id: str | None = None,
-        start_date: datetime | None = None,
-        external_trigger: bool | None = False,
+        run_id: str,
+        logical_date: datetime,
+        data_interval: tuple[datetime, datetime],
         conf: dict | None = None,
-        run_type: DagRunType | None = None,
-        session: Session = NEW_SESSION,
+        run_type: DagRunType,
+        triggered_by: DagRunTriggeredByType,
+        external_trigger: bool = False,
         dag_version: DagVersion | None = None,
+        state: DagRunState,
+        start_date: datetime | None = None,
         creating_job_id: int | None = None,
-        data_interval: tuple[datetime, datetime] | None = None,
         backfill_id: int | None = None,
-    ):
+        session: Session = NEW_SESSION,
+    ) -> DagRun:
         """
-        Create a dag run from this dag including the tasks associated with 
this dag.
+        Create a run for this DAG to run its tasks.
 
-        Returns the dag run.
-
-        :param state: the state of the dag run
-        :param triggered_by: The entity which triggers the DagRun
-        :param run_id: defines the run id for this dag run
-        :param run_type: type of DagRun
-        :param logical_date: the logical date of this dag run
         :param start_date: the date this dag run should be evaluated
-        :param external_trigger: whether this dag run is externally triggered
         :param conf: Dict containing configuration/parameters to pass to the 
DAG
-        :param creating_job_id: id of the job creating this DagRun
-        :param session: database session
-        :param dag_version: The DagVersion object for this run
-        :param data_interval: Data interval of the DagRun
-        :param backfill_id: id of the backfill run if one exists
+        :param creating_job_id: ID of the job creating this DagRun
+        :param backfill_id: ID of the backfill run if one exists
+        :return: The created DAG run.
+
+        :meta private:
         """
         logical_date = timezone.coerce_datetime(logical_date)
 
         if data_interval and not isinstance(data_interval, DataInterval):
             data_interval = DataInterval(*map(timezone.coerce_datetime, 
data_interval))
 
-        if data_interval is None and logical_date is not None:
-            raise ValueError(
-                "Calling `DAG.create_dagrun()` without an explicit data 
interval is not supported."
-            )
-
-        if run_type is None or isinstance(run_type, DagRunType):
+        if isinstance(run_type, DagRunType):
             pass
-        elif isinstance(run_type, str):  # Compatibility: run_type used to be 
a str.
+        elif isinstance(run_type, str):  # Ensure the input value is valid.
             run_type = DagRunType(run_type)
         else:
-            raise ValueError(f"`run_type` should be a DagRunType, not 
{type(run_type)}")
-
-        if run_id:  # Infer run_type from run_id if needed.
-            if not isinstance(run_id, str):
-                raise ValueError(f"`run_id` should be a str, not 
{type(run_id)}")
-            inferred_run_type = DagRunType.from_run_id(run_id)
-            if run_type is None:
-                # No explicit type given, use the inferred type.
-                run_type = inferred_run_type
-            elif run_type == DagRunType.MANUAL and inferred_run_type != 
DagRunType.MANUAL:
-                # Prevent a manual run from using an ID that looks like a 
scheduled run.
+            raise ValueError(f"run_type should be a DagRunType, not 
{type(run_type)}")
+
+        if not isinstance(run_id, str):
+            raise ValueError(f"`run_id` should be a str, not {type(run_id)}")
+
+        # This is also done on the DagRun model class, but SQLAlchemy column
+        # validator does not work well for some reason.
+        if not re2.match(RUN_ID_REGEX, run_id):
+            regex = airflow_conf.get("scheduler", 
"allowed_run_id_pattern").strip()
+            if not regex or not re2.match(regex, run_id):
                 raise ValueError(
-                    f"A {run_type.value} DAG run cannot use ID {run_id!r} 
since it "
-                    f"is reserved for {inferred_run_type.value} runs"
+                    f"The run_id provided '{run_id}' does not match regex 
pattern "
+                    f"'{regex}' or '{RUN_ID_REGEX}'"
                 )
-        elif run_type and logical_date is not None:  # Generate run_id from 
run_type and logical_date.
-            run_id = self.timetable.generate_run_id(
-                run_type=run_type, logical_date=logical_date, 
data_interval=data_interval
-            )
-        else:
-            raise AirflowException(
-                "Creating DagRun needs either `run_id` or both `run_type` and 
`logical_date`"
-            )
-
-        regex = airflow_conf.get("scheduler", "allowed_run_id_pattern")
 
-        if run_id and not re2.match(RUN_ID_REGEX, run_id):
-            if not regex.strip() or not re2.match(regex.strip(), run_id):
-                raise AirflowException(
-                    f"The provided run ID '{run_id}' is invalid. It does not 
match either "
-                    f"the configured pattern: '{regex}' or the built-in 
pattern: '{RUN_ID_REGEX}'"
+        # Prevent a manual run from using an ID that looks like a scheduled 
run.
+        if run_type == DagRunType.MANUAL:
+            if (inferred_run_type := DagRunType.from_run_id(run_id)) != 
DagRunType.MANUAL:
+                raise ValueError(
+                    f"A {run_type.value} DAG run cannot use ID {run_id!r} 
since it "
+                    f"is reserved for {inferred_run_type.value} runs"
                 )
 
         # todo: AIP-78 add verification that if run type is backfill then we 
have a backfill id
@@ -1824,15 +1796,15 @@ class DAG(TaskSDKDag, LoggingMixin):
             assert self.params
         # create a copy of params before validating
         copied_params = copy.deepcopy(self.params)
-        copied_params.update(conf or {})
+        if conf:
+            copied_params.update(conf)
         copied_params.validate()
 
-        run = _create_orm_dagrun(
+        return _create_orm_dagrun(
             dag=self,
-            dag_id=self.dag_id,
             run_id=run_id,
             logical_date=logical_date,
-            start_date=start_date,
+            start_date=timezone.coerce_datetime(start_date),
             external_trigger=external_trigger,
             conf=conf,
             state=state,
@@ -1841,10 +1813,9 @@ class DAG(TaskSDKDag, LoggingMixin):
             creating_job_id=creating_job_id,
             backfill_id=backfill_id,
             data_interval=data_interval,
-            session=session,
             triggered_by=triggered_by,
+            session=session,
         )
-        return run
 
     @classmethod
     @provide_session
@@ -2487,14 +2458,15 @@ def _run_task(
 
 
 def _get_or_create_dagrun(
+    *,
     dag: DAG,
-    conf: dict[Any, Any] | None,
-    start_date: datetime,
-    logical_date: datetime,
     run_id: str,
-    session: Session,
+    logical_date: datetime,
+    data_interval: tuple[datetime, datetime],
+    conf: dict | None,
     triggered_by: DagRunTriggeredByType,
-    data_interval: tuple[datetime, datetime] | None = None,
+    start_date: datetime,
+    session: Session,
 ) -> DagRun:
     """
     Create a DAG run, replacing an existing instance if needed to prevent 
collisions.
@@ -2510,24 +2482,23 @@ def _get_or_create_dagrun(
 
     :return: The newly created DAG run.
     """
-    log.info("dagrun id: %s", dag.dag_id)
     dr: DagRun = session.scalar(
         select(DagRun).where(DagRun.dag_id == dag.dag_id, DagRun.logical_date 
== logical_date)
     )
     if dr:
         session.delete(dr)
         session.commit()
-    dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
     dr = dag.create_dagrun(
-        state=DagRunState.RUNNING,
-        logical_date=logical_date,
         run_id=run_id,
-        start_date=start_date or logical_date,
-        session=session,
-        conf=conf,
+        logical_date=logical_date,
         data_interval=data_interval,
+        conf=conf,
+        run_type=DagRunType.MANUAL,
+        state=DagRunState.RUNNING,
         triggered_by=triggered_by,
-        dag_version=dag_version,
+        dag_version=DagVersion.get_latest_version(dag.dag_id, session=session),
+        start_date=start_date or logical_date,
+        session=session,
     )
     log.info("created dagrun %s", dr)
     return dr
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index e7aad18672d..8060901f948 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -278,12 +278,14 @@ class DagRun(Base, LoggingMixin):
     def validate_run_id(self, key: str, run_id: str) -> str | None:
         if not run_id:
             return None
-        regex = airflow_conf.get("scheduler", "allowed_run_id_pattern")
-        if not re2.match(regex, run_id) and not re2.match(RUN_ID_REGEX, 
run_id):
-            raise ValueError(
-                f"The run_id provided '{run_id}' does not match the pattern 
'{regex}' or '{RUN_ID_REGEX}'"
-            )
-        return run_id
+        if re2.match(RUN_ID_REGEX, run_id):
+            return run_id
+        regex = airflow_conf.get("scheduler", "allowed_run_id_pattern").strip()
+        if regex and re2.match(regex, run_id):
+            return run_id
+        raise ValueError(
+            f"The run_id provided '{run_id}' does not match regex pattern 
'{regex}' or '{RUN_ID_REGEX}'"
+        )
 
     @property
     def stats_tags(self) -> dict[str, str]:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 01b10b98aff..d2751d18046 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2221,18 +2221,26 @@ class Airflow(AirflowBaseView):
                     "warning",
                 )
 
+        data_interval = 
dag.timetable.infer_manual_data_interval(run_after=logical_date)
+        if not run_id:
+            run_id = dag.timetable.generate_run_id(
+                logical_date=logical_date,
+                data_interval=data_interval,
+                run_type=DagRunType.MANUAL,
+            )
+
         try:
-            dag_version = DagVersion.get_latest_version(dag.dag_id)
             dag_run = dag.create_dagrun(
-                run_type=DagRunType.MANUAL,
+                run_id=run_id,
                 logical_date=logical_date,
-                
data_interval=dag.timetable.infer_manual_data_interval(run_after=logical_date),
-                state=DagRunState.QUEUED,
+                data_interval=data_interval,
                 conf=run_conf,
-                external_trigger=True,
-                dag_version=dag_version,
-                run_id=run_id,
+                run_type=DagRunType.MANUAL,
                 triggered_by=DagRunTriggeredByType.UI,
+                external_trigger=True,
+                dag_version=DagVersion.get_latest_version(dag.dag_id),
+                state=DagRunState.QUEUED,
+                session=session,
             )
         except (ValueError, ParamValidationError) as ve:
             flash(f"{ve}", "error")
diff --git a/dev/perf/scheduler_dag_execution_timing.py 
b/dev/perf/scheduler_dag_execution_timing.py
index f8760f4c03c..3c1fbd0395c 100755
--- a/dev/perf/scheduler_dag_execution_timing.py
+++ b/dev/perf/scheduler_dag_execution_timing.py
@@ -172,11 +172,14 @@ def create_dag_runs(dag, num_runs, session):
         dag.create_dagrun(
             run_id=f"{id_prefix}{logical_date.isoformat()}",
             logical_date=logical_date,
-            start_date=timezone.utcnow(),
-            state=DagRunState.RUNNING,
+            data_interval=(logical_date, logical_date),
+            run_type=DagRunType.MANUAL,
+            triggered_by=DagRunTriggeredByType.TEST,
             external_trigger=False,
+            dag_version=None,
+            state=DagRunState.RUNNING,
+            start_date=timezone.utcnow(),
             session=session,
-            triggered_by=DagRunTriggeredByType.TEST,
         )
         last_dagrun_data_interval = next_info.data_interval
 
@@ -292,7 +295,8 @@ def main(num_runs, repeat, pre_create_dag_runs, 
executor_class, dag_ids):
 
     # Need a lambda to refer to the _latest_ value for scheduler_job, not just
     # the initial one
-    code_to_test = lambda: run_job(job=job_runner.job, 
execute_callable=job_runner._execute)
+    def code_to_test():
+        run_job(job=job_runner.job, execute_callable=job_runner._execute)
 
     for count in range(repeat):
         if not count:
diff --git a/docs/apache-airflow/best-practices.rst 
b/docs/apache-airflow/best-practices.rst
index 0f4b77001f2..37092f31a6d 100644
--- a/docs/apache-airflow/best-practices.rst
+++ b/docs/apache-airflow/best-practices.rst
@@ -725,13 +725,14 @@ This is an example test want to verify the structure of a 
code-generated DAG aga
 
     from airflow import DAG
     from airflow.utils.state import DagRunState, TaskInstanceState
-    from airflow.utils.types import DagRunType
+    from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
     DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC")
     DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)
 
     TEST_DAG_ID = "my_custom_operator_dag"
     TEST_TASK_ID = "my_custom_operator_task"
+    TEST_RUN_ID = "my_custom_operator_dag_run"
 
 
     @pytest.fixture()
@@ -750,11 +751,13 @@ This is an example test want to verify the structure of a 
code-generated DAG aga
 
     def test_my_custom_operator_execute_no_trigger(dag):
         dagrun = dag.create_dagrun(
-            state=DagRunState.RUNNING,
-            execution_date=DATA_INTERVAL_START,
+            run_id=TEST_RUN_ID,
+            logical_date=DATA_INTERVAL_START,
             data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
-            start_date=DATA_INTERVAL_END,
             run_type=DagRunType.MANUAL,
+            triggered_by=DagRunTriggeredByType.TIMETABLE,
+            state=DagRunState.RUNNING,
+            start_date=DATA_INTERVAL_END,
         )
         ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
         ti.task = dag.get_task(task_id=TEST_TASK_ID)
diff --git a/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py 
b/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py
index f18a4b88eed..9cbebcf8df9 100644
--- a/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py
+++ b/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py
@@ -122,23 +122,21 @@ class TestFileTaskLogHandler:
                 python_callable=task_callable,
                 executor_config={"pod_override": pod_override},
             )
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
+
         if AIRFLOW_V_3_0_PLUS:
-            dagrun = dag.create_dagrun(
-                run_type=DagRunType.MANUAL,
-                state=State.RUNNING,
-                logical_date=DEFAULT_DATE,
-                
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
-                **triggered_by_kwargs,
-            )
+            dagrun_kwargs: dict = {
+                "logical_date": DEFAULT_DATE,
+                "triggered_by": DagRunTriggeredByType.TEST,
+            }
         else:
-            dagrun = dag.create_dagrun(
-                run_type=DagRunType.MANUAL,
-                state=State.RUNNING,
-                execution_date=DEFAULT_DATE,
-                
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
-                **triggered_by_kwargs,
-            )
+            dagrun_kwargs = {"execution_date": DEFAULT_DATE}
+        dagrun = dag.create_dagrun(
+            run_id="test",
+            run_type=DagRunType.MANUAL,
+            state=State.RUNNING,
+            
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
+            **dagrun_kwargs,
+        )
         ti = TaskInstance(task=task, run_id=dagrun.run_id)
         ti.try_number = 3
 
@@ -160,7 +158,7 @@ class TestFileTaskLogHandler:
             (
                 "dag_id=dag_for_testing_file_task_handler,"
                 "kubernetes_executor=True,"
-                "run_id=manual__2016-01-01T0000000000-2b88d1d57,"
+                "run_id=test,"
                 "task_id=task_for_testing_file_log_handler,"
                 "try_number=2,"
                 "airflow-worker"
diff --git a/providers/tests/common/sql/operators/test_sql.py 
b/providers/tests/common/sql/operators/test_sql.py
index 133d51ac757..b0eedb52c47 100644
--- a/providers/tests/common/sql/operators/test_sql.py
+++ b/providers/tests/common/sql/operators/test_sql.py
@@ -44,6 +44,7 @@ from airflow.providers.postgres.hooks.postgres import 
PostgresHook
 from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.state import State
+from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.providers import get_provider_min_airflow_version
 from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
@@ -1158,26 +1159,23 @@ class TestSqlBranch:
         self.branch_1.set_upstream(branch_op)
         self.branch_2.set_upstream(branch_op)
         self.dag.clear()
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
 
         if AIRFLOW_V_3_0_PLUS:
-            dr = self.dag.create_dagrun(
-                run_id="manual__",
-                start_date=timezone.utcnow(),
-                logical_date=DEFAULT_DATE,
-                state=State.RUNNING,
-                data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-                **triggered_by_kwargs,
-            )
+            dagrun_kwargs = {
+                "logical_date": DEFAULT_DATE,
+                "triggered_by": DagRunTriggeredByType.TEST,
+            }
         else:
-            dr = self.dag.create_dagrun(
-                run_id="manual__",
-                start_date=timezone.utcnow(),
-                execution_date=DEFAULT_DATE,
-                state=State.RUNNING,
-                data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-                **triggered_by_kwargs,
-            )
+            dagrun_kwargs = {"execution_date": DEFAULT_DATE}
+
+        dr = self.dag.create_dagrun(
+            run_id="manual__",
+            run_type=DagRunType.MANUAL,
+            start_date=timezone.utcnow(),
+            state=State.RUNNING,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            **dagrun_kwargs,
+        )
 
         mock_get_records = mock_get_db_hook.return_value.get_first
 
@@ -1211,25 +1209,22 @@ class TestSqlBranch:
         self.branch_1.set_upstream(branch_op)
         self.branch_2.set_upstream(branch_op)
         self.dag.clear()
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
+
         if AIRFLOW_V_3_0_PLUS:
-            dr = self.dag.create_dagrun(
-                run_id="manual__",
-                start_date=timezone.utcnow(),
-                logical_date=DEFAULT_DATE,
-                state=State.RUNNING,
-                data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-                **triggered_by_kwargs,
-            )
+            dagrun_kwargs = {
+                "logical_date": DEFAULT_DATE,
+                "triggered_by": DagRunTriggeredByType.TEST,
+            }
         else:
-            dr = self.dag.create_dagrun(
-                run_id="manual__",
-                start_date=timezone.utcnow(),
-                execution_date=DEFAULT_DATE,
-                state=State.RUNNING,
-                data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-                **triggered_by_kwargs,
-            )
+            dagrun_kwargs = {"execution_date": DEFAULT_DATE}
+        dr = self.dag.create_dagrun(
+            run_id="manual__",
+            run_type=DagRunType.MANUAL,
+            start_date=timezone.utcnow(),
+            state=State.RUNNING,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            **dagrun_kwargs,
+        )
 
         mock_get_records = mock_get_db_hook.return_value.get_first
 
@@ -1264,26 +1259,22 @@ class TestSqlBranch:
         self.branch_1.set_upstream(branch_op)
         self.branch_2.set_upstream(branch_op)
         self.dag.clear()
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
 
         if AIRFLOW_V_3_0_PLUS:
-            dr = self.dag.create_dagrun(
-                run_id="manual__",
-                start_date=timezone.utcnow(),
-                logical_date=DEFAULT_DATE,
-                state=State.RUNNING,
-                data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-                **triggered_by_kwargs,
-            )
+            dagrun_kwargs = {
+                "logical_date": DEFAULT_DATE,
+                "triggered_by": DagRunTriggeredByType.TEST,
+            }
         else:
-            dr = self.dag.create_dagrun(
-                run_id="manual__",
-                start_date=timezone.utcnow(),
-                execution_date=DEFAULT_DATE,
-                state=State.RUNNING,
-                data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-                **triggered_by_kwargs,
-            )
+            dagrun_kwargs = {"execution_date": DEFAULT_DATE}
+        dr = self.dag.create_dagrun(
+            run_id="manual__",
+            run_type=DagRunType.MANUAL,
+            start_date=timezone.utcnow(),
+            state=State.RUNNING,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            **dagrun_kwargs,
+        )
 
         mock_get_records = mock_get_db_hook.return_value.get_first
 
@@ -1319,26 +1310,22 @@ class TestSqlBranch:
         self.branch_3 = EmptyOperator(task_id="branch_3", dag=self.dag)
         self.branch_3.set_upstream(branch_op)
         self.dag.clear()
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
 
         if AIRFLOW_V_3_0_PLUS:
-            dr = self.dag.create_dagrun(
-                run_id="manual__",
-                start_date=timezone.utcnow(),
-                logical_date=DEFAULT_DATE,
-                state=State.RUNNING,
-                data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-                **triggered_by_kwargs,
-            )
+            dagrun_kwargs = {
+                "logical_date": DEFAULT_DATE,
+                "triggered_by": DagRunTriggeredByType.TEST,
+            }
         else:
-            dr = self.dag.create_dagrun(
-                run_id="manual__",
-                start_date=timezone.utcnow(),
-                execution_date=DEFAULT_DATE,
-                state=State.RUNNING,
-                data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-                **triggered_by_kwargs,
-            )
+            dagrun_kwargs = {"execution_date": DEFAULT_DATE}
+        dr = self.dag.create_dagrun(
+            run_id="manual__",
+            run_type=DagRunType.MANUAL,
+            start_date=timezone.utcnow(),
+            state=State.RUNNING,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            **dagrun_kwargs,
+        )
 
         mock_get_records = mock_get_db_hook.return_value.get_first
         mock_get_records.return_value = [["1"]]
@@ -1371,26 +1358,22 @@ class TestSqlBranch:
         self.branch_1.set_upstream(branch_op)
         self.branch_2.set_upstream(branch_op)
         self.dag.clear()
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
 
         if AIRFLOW_V_3_0_PLUS:
-            self.dag.create_dagrun(
-                run_id="manual__",
-                start_date=timezone.utcnow(),
-                logical_date=DEFAULT_DATE,
-                state=State.RUNNING,
-                data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-                **triggered_by_kwargs,
-            )
+            dagrun_kwargs = {
+                "logical_date": DEFAULT_DATE,
+                "triggered_by": DagRunTriggeredByType.TEST,
+            }
         else:
-            self.dag.create_dagrun(
-                run_id="manual__",
-                start_date=timezone.utcnow(),
-                execution_date=DEFAULT_DATE,
-                state=State.RUNNING,
-                data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-                **triggered_by_kwargs,
-            )
+            dagrun_kwargs = {"execution_date": DEFAULT_DATE}
+        self.dag.create_dagrun(
+            run_id="manual__",
+            run_type=DagRunType.MANUAL,
+            start_date=timezone.utcnow(),
+            state=State.RUNNING,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            **dagrun_kwargs,
+        )
 
         mock_get_records = mock_get_db_hook.return_value.get_first
 
@@ -1414,25 +1397,22 @@ class TestSqlBranch:
         branch_op >> self.branch_1 >> self.branch_2
         branch_op >> self.branch_2
         self.dag.clear()
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
+
         if AIRFLOW_V_3_0_PLUS:
-            dr = self.dag.create_dagrun(
-                run_id="manual__",
-                start_date=timezone.utcnow(),
-                logical_date=DEFAULT_DATE,
-                state=State.RUNNING,
-                data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-                **triggered_by_kwargs,
-            )
+            dagrun_kwargs = {
+                "logical_date": DEFAULT_DATE,
+                "triggered_by": DagRunTriggeredByType.TEST,
+            }
         else:
-            dr = self.dag.create_dagrun(
-                run_id="manual__",
-                start_date=timezone.utcnow(),
-                execution_date=DEFAULT_DATE,
-                state=State.RUNNING,
-                data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-                **triggered_by_kwargs,
-            )
+            dagrun_kwargs = {"execution_date": DEFAULT_DATE}
+        dr = self.dag.create_dagrun(
+            run_id="manual__",
+            run_type=DagRunType.MANUAL,
+            start_date=timezone.utcnow(),
+            state=State.RUNNING,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            **dagrun_kwargs,
+        )
 
         mock_get_records = mock_get_db_hook.return_value.get_first
 
@@ -1465,25 +1445,22 @@ class TestSqlBranch:
         branch_op >> self.branch_1 >> self.branch_2
         branch_op >> self.branch_2
         self.dag.clear()
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
+
         if AIRFLOW_V_3_0_PLUS:
-            dr = self.dag.create_dagrun(
-                run_id="manual__",
-                start_date=timezone.utcnow(),
-                logical_date=DEFAULT_DATE,
-                state=State.RUNNING,
-                data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-                **triggered_by_kwargs,
-            )
+            dagrun_kwargs = {
+                "logical_date": DEFAULT_DATE,
+                "triggered_by": DagRunTriggeredByType.TEST,
+            }
         else:
-            dr = self.dag.create_dagrun(
-                run_id="manual__",
-                start_date=timezone.utcnow(),
-                execution_date=DEFAULT_DATE,
-                state=State.RUNNING,
-                data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-                **triggered_by_kwargs,
-            )
+            dagrun_kwargs = {"execution_date": DEFAULT_DATE}
+        dr = self.dag.create_dagrun(
+            run_id="manual__",
+            run_type=DagRunType.MANUAL,
+            start_date=timezone.utcnow(),
+            state=State.RUNNING,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            **dagrun_kwargs,
+        )
 
         mock_get_records = mock_get_db_hook.return_value.get_first
 
diff --git a/providers/tests/openlineage/plugins/test_execution.py 
b/providers/tests/openlineage/plugins/test_execution.py
index 039064e7053..6eb84a5d401 100644
--- a/providers/tests/openlineage/plugins/test_execution.py
+++ b/providers/tests/openlineage/plugins/test_execution.py
@@ -34,8 +34,10 @@ from airflow.providers.google.cloud.openlineage.utils import 
get_from_nullable_c
 from airflow.providers.openlineage.plugins.listener import OpenLineageListener
 from airflow.utils import timezone
 from airflow.utils.state import State
+from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.config import conf_vars
+from tests_common.test_utils.db import clear_db_runs
 from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
 
 if AIRFLOW_V_3_0_PLUS:
@@ -72,6 +74,9 @@ with tempfile.TemporaryDirectory(prefix="venv") as tmp_dir:
     @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Test requires Airflow 
2.10+")
     @pytest.mark.usefixtures("reset_logging_config")
     class TestOpenLineageExecution:
+        def teardown_method(self):
+            clear_db_runs()
+
         @pytest.fixture(autouse=True)
         def clean_listener_manager(self):
             get_listener_manager().clear()
@@ -93,13 +98,20 @@ with tempfile.TemporaryDirectory(prefix="venv") as tmp_dir:
             dag = dagbag.dags.get("test_openlineage_execution")
             task = dag.get_task(task_name)
 
-            triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} 
if AIRFLOW_V_3_0_PLUS else {}
+            if AIRFLOW_V_3_0_PLUS:
+                dagrun_kwargs = {
+                    "logical_date": DEFAULT_DATE,
+                    "triggered_by": DagRunTriggeredByType.TEST,
+                }
+            else:
+                dagrun_kwargs = {"execution_date": DEFAULT_DATE}
             dag.create_dagrun(
                 run_id=run_id,
+                run_type=DagRunType.MANUAL,
                 data_interval=(DEFAULT_DATE, DEFAULT_DATE),
                 state=State.RUNNING,
                 start_date=DEFAULT_DATE,
-                **triggered_by_kwargs,
+                **dagrun_kwargs,
             )
             ti = TaskInstance(task=task, run_id=run_id)
             job = Job(id=random.randint(0, 23478197), dag_id=ti.dag_id)
@@ -192,13 +204,20 @@ with tempfile.TemporaryDirectory(prefix="venv") as 
tmp_dir:
             dag = dagbag.dags.get("test_openlineage_execution")
             task = dag.get_task("execute_long_stall")
 
-            triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} 
if AIRFLOW_V_3_0_PLUS else {}
+            if AIRFLOW_V_3_0_PLUS:
+                dagrun_kwargs = {
+                    "logical_date": DEFAULT_DATE,
+                    "triggered_by": DagRunTriggeredByType.TEST,
+                }
+            else:
+                dagrun_kwargs = {"execution_date": DEFAULT_DATE}
             dag.create_dagrun(
                 
run_id="test_long_stalled_task_is_killed_by_listener_overtime_if_ol_timeout_long_enough",
+                run_type=DagRunType.MANUAL,
                 data_interval=(DEFAULT_DATE, DEFAULT_DATE),
                 state=State.RUNNING,
                 start_date=DEFAULT_DATE,
-                **triggered_by_kwargs,
+                **dagrun_kwargs,
             )
             ti = TaskInstance(
                 task=task,
diff --git a/providers/tests/openlineage/plugins/test_listener.py 
b/providers/tests/openlineage/plugins/test_listener.py
index eeca49c7f8d..837873f439d 100644
--- a/providers/tests/openlineage/plugins/test_listener.py
+++ b/providers/tests/openlineage/plugins/test_listener.py
@@ -36,15 +36,14 @@ from airflow.providers.openlineage.plugins.adapter import 
OpenLineageAdapter
 from airflow.providers.openlineage.plugins.facets import AirflowDebugRunFacet
 from airflow.providers.openlineage.plugins.listener import OpenLineageListener
 from airflow.providers.openlineage.utils.selective_enable import 
disable_lineage, enable_lineage
+from airflow.utils import types
 from airflow.utils.state import DagRunState, State
 
 from tests_common.test_utils.compat import PythonOperator
 from tests_common.test_utils.config import conf_vars
+from tests_common.test_utils.db import clear_db_runs
 from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
 
-if AIRFLOW_V_3_0_PLUS:
-    from airflow.utils.types import DagRunTriggeredByType
-
 pytestmark = pytest.mark.db_test
 
 EXPECTED_TRY_NUMBER_1 = 1 if AIRFLOW_V_2_10_PLUS else 0
@@ -80,20 +79,30 @@ def regular_call(self, callable, callable_name, use_fork):
 def test_listener_does_not_change_task_instance(render_mock, xcom_push_mock):
     render_mock.return_value = render_df()
 
+    date = dt.datetime(2022, 1, 1)
     dag = DAG(
         "test",
         schedule=None,
-        start_date=dt.datetime(2022, 1, 1),
+        start_date=date,
         user_defined_macros={"render_df": render_df},
         params={"df": {"col": [1, 2]}},
     )
     t = TemplateOperator(task_id="template_op", dag=dag, do_xcom_push=True, 
df=dag.param("df"))
     run_id = str(uuid.uuid1())
-    triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
+    if AIRFLOW_V_3_0_PLUS:
+        dagrun_kwargs = {
+            "dag_version": None,
+            "logical_date": date,
+            "triggered_by": types.DagRunTriggeredByType.TEST,
+        }
+    else:
+        dagrun_kwargs = {"execution_date": date}
     dag.create_dagrun(
-        state=State.NONE,
         run_id=run_id,
-        **triggered_by_kwargs,
+        data_interval=(date, date),
+        run_type=types.DagRunType.MANUAL,
+        state=DagRunState.QUEUED,
+        **dagrun_kwargs,
     )
     ti = TaskInstance(t, run_id=run_id)
     ti.check_and_change_state_before_execution()  # make listener hook on 
running event
@@ -146,8 +155,9 @@ def _create_test_dag_and_task(python_callable: Callable, 
scenario_name: str) ->
 
     :return: TaskInstance: The created TaskInstance object.
 
-    This function creates a DAG and a PythonOperator task with the provided 
python_callable. It generates a unique
-    run ID and creates a DAG run. This setup is useful for testing different 
scenarios in Airflow tasks.
+    This function creates a DAG and a PythonOperator task with the provided
+    python_callable. It generates a unique run ID and creates a DAG run. This
+    setup is useful for testing different scenarios in Airflow tasks.
 
     :Example:
 
@@ -157,18 +167,28 @@ def _create_test_dag_and_task(python_callable: Callable, 
scenario_name: str) ->
         task_instance = _create_test_dag_and_task(sample_callable, 
"sample_scenario")
         # Use task_instance to simulate running a task in a test.
     """
+    date = dt.datetime(2022, 1, 1)
     dag = DAG(
         f"test_{scenario_name}",
         schedule=None,
-        start_date=dt.datetime(2022, 1, 1),
+        start_date=date,
     )
     t = PythonOperator(task_id=f"test_task_{scenario_name}", dag=dag, 
python_callable=python_callable)
     run_id = str(uuid.uuid1())
-    triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
+    if AIRFLOW_V_3_0_PLUS:
+        dagrun_kwargs: dict = {
+            "dag_version": None,
+            "logical_date": date,
+            "triggered_by": types.DagRunTriggeredByType.TEST,
+        }
+    else:
+        dagrun_kwargs = {"execution_date": date}
     dagrun = dag.create_dagrun(
-        state=State.NONE,  # type: ignore
         run_id=run_id,
-        **triggered_by_kwargs,  # type: ignore
+        data_interval=(date, date),
+        run_type=types.DagRunType.MANUAL,
+        state=DagRunState.QUEUED,
+        **dagrun_kwargs,
     )
     task_instance = TaskInstance(t, run_id=run_id)
     return dagrun, task_instance
@@ -669,10 +689,11 @@ def test_listener_logs_failed_serialization():
 
 class TestOpenLineageSelectiveEnable:
     def setup_method(self):
+        date = dt.datetime(2022, 1, 1)
         self.dag = DAG(
             "test_selective_enable",
             schedule=None,
-            start_date=dt.datetime(2022, 1, 1),
+            start_date=date,
         )
 
         def simple_callable(**kwargs):
@@ -685,16 +706,28 @@ class TestOpenLineageSelectiveEnable:
             task_id="test_task_selective_enable_2", dag=self.dag, 
python_callable=simple_callable
         )
         run_id = str(uuid.uuid1())
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
+        if AIRFLOW_V_3_0_PLUS:
+            dagrun_kwargs = {
+                "dag_version": None,
+                "logical_date": date,
+                "triggered_by": types.DagRunTriggeredByType.TEST,
+            }
+        else:
+            dagrun_kwargs = {"execution_date": date}
         self.dagrun = self.dag.create_dagrun(
-            state=State.NONE,
             run_id=run_id,
-            **triggered_by_kwargs,
+            data_interval=(date, date),
+            run_type=types.DagRunType.MANUAL,
+            state=DagRunState.QUEUED,
+            **dagrun_kwargs,
         )  # type: ignore
         self.task_instance_1 = TaskInstance(self.task_1, run_id=run_id, 
map_index=-1)
         self.task_instance_2 = TaskInstance(self.task_2, run_id=run_id, 
map_index=-1)
         self.task_instance_1.dag_run = self.task_instance_2.dag_run = 
self.dagrun
 
+    def teardown_method(self):
+        clear_db_runs()
+
     @pytest.mark.parametrize(
         "selective_enable, enable_dag, expected_call_count",
         [
diff --git a/providers/tests/openlineage/plugins/test_utils.py 
b/providers/tests/openlineage/plugins/test_utils.py
index 490061db4f5..36b8057cf01 100644
--- a/providers/tests/openlineage/plugins/test_utils.py
+++ b/providers/tests/openlineage/plugins/test_utils.py
@@ -47,6 +47,7 @@ from airflow.serialization.enums import DagAttributeTypes
 from airflow.utils import timezone
 from airflow.utils.log.secrets_masker import _secrets_masker
 from airflow.utils.state import State
+from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.compat import (
     BashOperator,
@@ -100,12 +101,20 @@ def test_get_dagrun_start_end(dag_maker):
     dag_model = DagModel.get_dagmodel(dag.dag_id)
 
     run_id = str(uuid.uuid1())
-    triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
+    data_interval = dag.get_next_data_interval(dag_model)
+    if AIRFLOW_V_3_0_PLUS:
+        dagrun_kwargs = {
+            "logical_date": data_interval.start,
+            "triggered_by": DagRunTriggeredByType.TEST,
+        }
+    else:
+        dagrun_kwargs = {"execution_date": data_interval.start}
     dagrun = dag.create_dagrun(
         state=State.NONE,
         run_id=run_id,
-        data_interval=dag.get_next_data_interval(dag_model),
-        **triggered_by_kwargs,
+        run_type=DagRunType.MANUAL,
+        data_interval=data_interval,
+        **dagrun_kwargs,
     )
     assert dagrun.data_interval_start is not None
     start_date_tz = datetime.datetime(2022, 1, 1, tzinfo=timezone.utc)
diff --git a/tests/cli/commands/remote_commands/test_task_command.py 
b/tests/cli/commands/remote_commands/test_task_command.py
index c1e6b6b23d7..06a477e2831 100644
--- a/tests/cli/commands/remote_commands/test_task_command.py
+++ b/tests/cli/commands/remote_commands/test_task_command.py
@@ -104,14 +104,21 @@ class TestCliTasks:
         cls.dagbag = DagBag(read_dags_from_db=True)
         cls.dag = cls.dagbag.get_dag(cls.dag_id)
         data_interval = 
cls.dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE)
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.CLI} if 
AIRFLOW_V_3_0_PLUS else {}
+        v3_kwargs = (
+            {
+                "dag_version": None,
+                "triggered_by": DagRunTriggeredByType.TEST,
+            }
+            if AIRFLOW_V_3_0_PLUS
+            else {}
+        )
         cls.dag_run = cls.dag.create_dagrun(
             state=State.NONE,
             run_id=cls.run_id,
             run_type=DagRunType.MANUAL,
             logical_date=DEFAULT_DATE,
             data_interval=data_interval,
-            **triggered_by_kwargs,
+            **v3_kwargs,
         )
 
     @classmethod
@@ -168,7 +175,14 @@ class TestCliTasks:
 
         logical_date = pendulum.now("UTC")
         data_interval = 
dag.timetable.infer_manual_data_interval(run_after=logical_date)
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
+        v3_kwargs = (
+            {
+                "dag_version": None,
+                "triggered_by": DagRunTriggeredByType.TEST,
+            }
+            if AIRFLOW_V_3_0_PLUS
+            else {}
+        )
         dag.create_dagrun(
             state=State.NONE,
             run_id="abc123",
@@ -176,7 +190,7 @@ class TestCliTasks:
             logical_date=logical_date,
             data_interval=data_interval,
             session=session,
-            **triggered_by_kwargs,
+            **v3_kwargs,
         )
         session.commit()
 
@@ -633,14 +647,22 @@ class TestCliTasks:
         default_date2 = timezone.datetime(2016, 1, 9)
         dag2.clear()
         data_interval = 
dag2.timetable.infer_manual_data_interval(run_after=default_date2)
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.CLI} if 
AIRFLOW_V_3_0_PLUS else {}
+        v3_kwargs = (
+            {
+                "dag_version": None,
+                "triggered_by": DagRunTriggeredByType.CLI,
+            }
+            if AIRFLOW_V_3_0_PLUS
+            else {}
+        )
         dagrun = dag2.create_dagrun(
+            run_id="test",
             state=State.RUNNING,
             logical_date=default_date2,
             data_interval=data_interval,
             run_type=DagRunType.MANUAL,
             external_trigger=True,
-            **triggered_by_kwargs,
+            **v3_kwargs,
         )
         ti2 = TaskInstance(task2, run_id=dagrun.run_id)
         ti2.set_state(State.SUCCESS)
@@ -714,7 +736,14 @@ class TestLogsfromTaskRunCommand:
 
         dag = DagBag().get_dag(self.dag_id)
         data_interval = 
dag.timetable.infer_manual_data_interval(run_after=self.logical_date)
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
+        v3_kwargs = (
+            {
+                "dag_version": None,
+                "triggered_by": DagRunTriggeredByType.TEST,
+            }
+            if AIRFLOW_V_3_0_PLUS
+            else {}
+        )
         self.dr = dag.create_dagrun(
             run_id=self.run_id,
             logical_date=self.logical_date,
@@ -722,7 +751,7 @@ class TestLogsfromTaskRunCommand:
             start_date=timezone.utcnow(),
             state=State.RUNNING,
             run_type=DagRunType.MANUAL,
-            **triggered_by_kwargs,
+            **v3_kwargs,
         )
         self.tis = self.dr.get_task_instances()
         assert len(self.tis) == 1
@@ -1019,7 +1048,15 @@ def test_context_with_run():
 
     dag = DagBag().get_dag(dag_id)
     data_interval = 
dag.timetable.infer_manual_data_interval(run_after=logical_date)
-    triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
+
+    v3_kwargs = (
+        {
+            "dag_version": None,
+            "triggered_by": DagRunTriggeredByType.TEST,
+        }
+        if AIRFLOW_V_3_0_PLUS
+        else {}
+    )
     dag.create_dagrun(
         run_id=run_id,
         logical_date=logical_date,
@@ -1027,7 +1064,7 @@ def test_context_with_run():
         start_date=timezone.utcnow(),
         state=State.RUNNING,
         run_type=DagRunType.MANUAL,
-        **triggered_by_kwargs,
+        **v3_kwargs,
     )
     with conf_vars({("core", "dags_folder"): dag_path}):
         task_command.task_run(parser.parse_args(task_args))
diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py
index 377666427ab..6ffe935348e 100644
--- a/tests/decorators/test_python.py
+++ b/tests/decorators/test_python.py
@@ -444,7 +444,8 @@ class TestAirflowTaskDecorator(BasePythonTest):
 
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dr = self.dag_non_serialized.create_dagrun(
-            run_id=DagRunType.MANUAL,
+            run_id="test",
+            run_type=DagRunType.MANUAL,
             start_date=timezone.utcnow(),
             logical_date=DEFAULT_DATE,
             state=State.RUNNING,
@@ -508,7 +509,8 @@ class TestAirflowTaskDecorator(BasePythonTest):
 
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dr = self.dag_non_serialized.create_dagrun(
-            run_id=DagRunType.MANUAL,
+            run_id="test",
+            run_type=DagRunType.MANUAL,
             start_date=timezone.utcnow(),
             logical_date=DEFAULT_DATE,
             state=State.RUNNING,
diff --git a/tests/jobs/test_local_task_job.py 
b/tests/jobs/test_local_task_job.py
index 3ca565cf5e2..d229abfe34f 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -310,6 +310,7 @@ class TestLocalTaskJob:
             triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} 
if AIRFLOW_V_3_0_PLUS else {}
             dr = dag.create_dagrun(
                 run_id="test_heartbeat_failed_fast_run",
+                run_type=DagRunType.MANUAL,
                 state=State.RUNNING,
                 logical_date=DEFAULT_DATE,
                 start_date=DEFAULT_DATE,
@@ -347,6 +348,7 @@ class TestLocalTaskJob:
         data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dr = dag.create_dagrun(
+            run_id="test",
             state=State.RUNNING,
             logical_date=DEFAULT_DATE,
             run_type=DagRunType.SCHEDULED,
@@ -382,6 +384,7 @@ class TestLocalTaskJob:
         dag.clear()
         dr = dag.create_dagrun(
             run_id="test",
+            run_type=DagRunType.MANUAL,
             state=State.SUCCESS,
             logical_date=DEFAULT_DATE,
             start_date=DEFAULT_DATE,
@@ -485,6 +488,7 @@ class TestLocalTaskJob:
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         with create_session() as session:
             dr = dag.create_dagrun(
+                run_id="test",
                 state=State.RUNNING,
                 logical_date=DEFAULT_DATE,
                 run_type=DagRunType.SCHEDULED,
@@ -520,6 +524,7 @@ class TestLocalTaskJob:
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         with create_session() as session:
             dr = dag.create_dagrun(
+                run_id="test",
                 state=State.RUNNING,
                 start_date=DEFAULT_DATE,
                 logical_date=DEFAULT_DATE,
@@ -555,6 +560,7 @@ class TestLocalTaskJob:
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         with create_session() as session:
             dr = dag.create_dagrun(
+                run_id="test",
                 state=State.RUNNING,
                 logical_date=DEFAULT_DATE,
                 run_type=DagRunType.SCHEDULED,
@@ -591,6 +597,7 @@ class TestLocalTaskJob:
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         with create_session() as session:
             dr = dag.create_dagrun(
+                run_id="test",
                 state=State.RUNNING,
                 logical_date=DEFAULT_DATE,
                 run_type=DagRunType.SCHEDULED,
@@ -627,6 +634,7 @@ class TestLocalTaskJob:
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         with create_session() as session:
             dr = dag.create_dagrun(
+                run_id="test",
                 state=State.RUNNING,
                 logical_date=DEFAULT_DATE,
                 run_type=DagRunType.SCHEDULED,
@@ -666,6 +674,7 @@ class TestLocalTaskJob:
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         with create_session() as session:
             dr = dag.create_dagrun(
+                run_id="test",
                 state=State.RUNNING,
                 logical_date=DEFAULT_DATE,
                 run_type=DagRunType.SCHEDULED,
@@ -705,6 +714,7 @@ class TestLocalTaskJob:
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         with create_session() as session:
             dr = dag.create_dagrun(
+                run_id="test",
                 state=State.RUNNING,
                 logical_date=DEFAULT_DATE,
                 run_type=DagRunType.SCHEDULED,
@@ -748,6 +758,7 @@ class TestLocalTaskJob:
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         with create_session() as session:
             dag.create_dagrun(
+                run_id="test",
                 state=State.RUNNING,
                 logical_date=DEFAULT_DATE,
                 run_type=DagRunType.SCHEDULED,
@@ -1007,6 +1018,7 @@ class TestSigtermOnRunner:
         data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dag_run = dag.create_dagrun(
+            run_type=DagRunType.MANUAL,
             state=State.RUNNING,
             run_id=run_id,
             logical_date=logical_date,
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 328ae1742fb..0ea27f97bb8 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -133,6 +133,37 @@ def _loader_mock(mock_executors):
         yield
 
 
+@pytest.fixture
+def create_dagrun(session):
+    triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
+
+    def _create_dagrun(
+        dag: DAG,
+        *,
+        logical_date: datetime,
+        data_interval: DataInterval,
+        run_type: DagRunType,
+        state: DagRunState = DagRunState.RUNNING,
+        start_date: datetime | None = None,
+    ) -> DagRun:
+        run_id = dag.timetable.generate_run_id(
+            run_type=run_type,
+            logical_date=logical_date,
+            data_interval=data_interval,
+        )
+        return dag.create_dagrun(
+            run_id=run_id,
+            logical_date=logical_date,
+            data_interval=data_interval,
+            run_type=run_type,
+            state=state,
+            start_date=start_date,
+            **triggered_by_kwargs,
+        )
+
+    return _create_dagrun
+
+
 @patch.dict(
     ExecutorLoader.executors, {MOCK_EXECUTOR: 
f"{MockExecutor.__module__}.{MockExecutor.__qualname__}"}
 )
@@ -2071,33 +2102,25 @@ class TestSchedulerJob:
             assert enqueued == 80
         session.rollback()
 
-    def test_adopt_or_reset_orphaned_tasks(self, dag_maker):
-        session = settings.Session()
-        with dag_maker("test_execute_helper_reset_orphaned_tasks") as dag:
+    def test_adopt_or_reset_orphaned_tasks(self, dag_maker, session):
+        with dag_maker("test_execute_helper_reset_orphaned_tasks", 
session=session):
             op1 = EmptyOperator(task_id="op1")
 
-        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
-        dr = dag_maker.create_dagrun()
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dr2 = dag.create_dagrun(
-            run_type=DagRunType.BACKFILL_JOB,
-            state=State.RUNNING,
-            logical_date=DEFAULT_DATE + datetime.timedelta(1),
-            start_date=DEFAULT_DATE,
-            session=session,
-            data_interval=data_interval,
-            **triggered_by_kwargs,
-        )
         scheduler_job = Job()
         session.add(scheduler_job)
-        session.commit()
+        session.flush()
+
+        dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
         ti = dr.get_task_instance(task_id=op1.task_id, session=session)
         ti.state = State.QUEUED
         ti.queued_by_job_id = scheduler_job.id
+        session.flush()
+
+        dr2 = dag_maker.create_dagrun_after(dr, run_type=DagRunType.SCHEDULED)
         ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
         ti2.state = State.QUEUED
         ti2.queued_by_job_id = scheduler_job.id
-        session.commit()
+        session.flush()
 
         processor = mock.MagicMock()
 
@@ -3185,8 +3208,9 @@ class TestSchedulerJob:
             next_info = dag.next_dagrun_info(None)
             triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} 
if AIRFLOW_V_3_0_PLUS else {}
             assert next_info is not None
-            for _ in range(30):
+            for i in range(30):
                 yield dag.create_dagrun(
+                    run_id=f"scheduled_{i}",
                     run_type=DagRunType.SCHEDULED,
                     logical_date=next_info.logical_date,
                     data_interval=next_info.data_interval,
@@ -4214,7 +4238,8 @@ class TestSchedulerJob:
         data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dr = dag.create_dagrun(
-            state=State.RUNNING,
+            run_id="test",
+            state=DagRunState.RUNNING,
             logical_date=timezone.utcnow(),
             run_type=DagRunType.MANUAL,
             session=session,
@@ -4282,7 +4307,7 @@ class TestSchedulerJob:
         session.rollback()
 
     @conf_vars({("scheduler", "use_job_schedule"): "false"})
-    def test_do_schedule_max_active_runs_dag_timed_out(self, dag_maker):
+    def test_do_schedule_max_active_runs_dag_timed_out(self, dag_maker, 
session):
         """Test that tasks are set to a finished state when their DAG times 
out"""
 
         with dag_maker(
@@ -4297,11 +4322,11 @@ class TestSchedulerJob:
                 bash_command=' for((i=1;i<=600;i+=1)); do sleep "$i";  done',
             )
 
-        session = settings.Session()
         data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dag_version = DagVersion.get_latest_version(dag.dag_id)
         run1 = dag.create_dagrun(
+            run_id="test1",
             run_type=DagRunType.SCHEDULED,
             logical_date=DEFAULT_DATE,
             state=State.RUNNING,
@@ -4316,6 +4341,7 @@ class TestSchedulerJob:
         run1_ti.state = State.RUNNING
 
         run2 = dag.create_dagrun(
+            run_id="test2",
             run_type=DagRunType.SCHEDULED,
             logical_date=DEFAULT_DATE + timedelta(seconds=10),
             state=State.QUEUED,
@@ -5612,20 +5638,18 @@ class TestSchedulerJob:
             self.job_runner._find_and_purge_zombies()
         executor.callback_sink.send.assert_not_called()
 
-    def test_find_and_purge_zombies(self, session, testing_dag_bundle):
+    @pytest.mark.usefixtures("testing_dag_bundle")
+    def test_find_and_purge_zombies(self, session, create_dagrun):
         dagfile = os.path.join(EXAMPLE_DAGS_FOLDER, 
"example_branch_operator.py")
         dagbag = DagBag(dagfile)
         dag = dagbag.get_dag("example_branch_operator")
         DAG.bulk_write_to_db("testing", None, [dag])
         data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dag_run = dag.create_dagrun(
-            state=DagRunState.RUNNING,
+        dag_run = create_dagrun(
+            dag,
             logical_date=DEFAULT_DATE,
             run_type=DagRunType.SCHEDULED,
-            session=session,
             data_interval=data_interval,
-            **triggered_by_kwargs,
         )
 
         executor = MockExecutor()
@@ -5668,12 +5692,11 @@ class TestSchedulerJob:
         assert callback_request.ti.run_id == ti.run_id
         assert callback_request.ti.map_index == ti.map_index
 
-    def test_zombie_message(self, testing_dag_bundle, session):
+    @pytest.mark.usefixtures("testing_dag_bundle")
+    def test_zombie_message(self, session, create_dagrun):
         """
         Check that the zombie message comes out as expected
         """
-
-        dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False)
         dagfile = os.path.join(EXAMPLE_DAGS_FOLDER, 
"example_branch_operator.py")
         dagbag = DagBag(dagfile)
         dag = dagbag.get_dag("example_branch_operator")
@@ -5682,14 +5705,11 @@ class TestSchedulerJob:
         session.query(Job).delete()
 
         data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dag_run = dag.create_dagrun(
-            state=DagRunState.RUNNING,
+        dag_run = create_dagrun(
+            dag,
             logical_date=DEFAULT_DATE,
             run_type=DagRunType.SCHEDULED,
-            session=session,
             data_interval=data_interval,
-            **triggered_by_kwargs,
         )
 
         scheduler_job = Job(executor=MockExecutor())
@@ -5733,8 +5753,10 @@ class TestSchedulerJob:
             "External Executor Id": "abcdefg",
         }
 
+    @pytest.mark.usefixtures("testing_dag_bundle")
     def 
test_find_zombies_handle_failure_callbacks_are_correctly_passed_to_dag_processor(
-        self, testing_dag_bundle
+        self,
+        create_dagrun,
     ):
         """
         Check that the same set of failure callback with zombies are passed to 
the dag
@@ -5749,14 +5771,12 @@ class TestSchedulerJob:
             dag = dagbag.get_dag("test_example_bash_operator")
             DAG.bulk_write_to_db("testing", None, [dag])
             data_interval = 
dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
-            triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} 
if AIRFLOW_V_3_0_PLUS else {}
-            dag_run = dag.create_dagrun(
+            dag_run = create_dagrun(
+                dag,
                 state=DagRunState.RUNNING,
                 logical_date=DEFAULT_DATE,
                 run_type=DagRunType.SCHEDULED,
-                session=session,
                 data_interval=data_interval,
-                **triggered_by_kwargs,
             )
             task = dag.get_task(task_id="run_this_last")
 
diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py
index 9b5af0b62c5..1543fa347ec 100644
--- a/tests/models/test_cleartasks.py
+++ b/tests/models/test_cleartasks.py
@@ -644,6 +644,7 @@ class TestClearTasks:
 
             triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} 
if AIRFLOW_V_3_0_PLUS else {}
             dr = dag.create_dagrun(
+                run_id=f"scheduled_{i}",
                 logical_date=DEFAULT_DATE,
                 state=State.RUNNING,
                 run_type=DagRunType.SCHEDULED,
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 9f599e7acd4..30495a4ccb6 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -112,6 +112,7 @@ if AIRFLOW_V_3_0_PLUS:
     from airflow.utils.types import DagRunTriggeredByType
 
 if TYPE_CHECKING:
+    from pendulum import DateTime
     from sqlalchemy.orm import Session
 
 pytestmark = pytest.mark.db_test
@@ -137,6 +138,32 @@ def clear_assets():
     clear_db_assets()
 
 
+def _create_dagrun(
+    dag: DAG,
+    *,
+    logical_date: DateTime,
+    data_interval: DataInterval,
+    run_type: DagRunType,
+    state: DagRunState = DagRunState.RUNNING,
+    start_date: datetime.datetime | None = None,
+) -> DagRun:
+    triggered_by_kwargs: dict = {"triggered_by": DagRunTriggeredByType.TEST} 
if AIRFLOW_V_3_0_PLUS else {}
+    run_id = dag.timetable.generate_run_id(
+        run_type=run_type,
+        logical_date=logical_date,
+        data_interval=data_interval,
+    )
+    return dag.create_dagrun(
+        run_id=run_id,
+        logical_date=logical_date,
+        data_interval=data_interval,
+        run_type=run_type,
+        state=state,
+        start_date=start_date,
+        **triggered_by_kwargs,
+    )
+
+
 class TestDag:
     def setup_method(self) -> None:
         clear_db_runs()
@@ -282,13 +309,11 @@ class TestDag:
                 task_id="empty_task",
                 weight_rule=cls(),
             )
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dr = dag.create_dagrun(
-            state=None,
-            run_id="test",
+        dr = _create_dagrun(
+            dag,
             logical_date=DEFAULT_DATE,
             data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-            **triggered_by_kwargs,
+            run_type=DagRunType.MANUAL,
         )
         ti = dr.get_task_instance(task.task_id)
         assert ti.priority_weight == expected
@@ -299,44 +324,39 @@ class TestDag:
 
         test_dag = DAG(dag_id=test_dag_id, schedule=None, 
start_date=DEFAULT_DATE)
         test_task = EmptyOperator(task_id=test_task_id, dag=test_dag)
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
 
-        dr1 = test_dag.create_dagrun(
-            state=None,
-            run_id="test1",
+        dr1 = _create_dagrun(
+            test_dag,
+            run_type=DagRunType.MANUAL,
             logical_date=DEFAULT_DATE,
             data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-            **triggered_by_kwargs,
         )
-        dr2 = test_dag.create_dagrun(
-            state=None,
-            run_id="test2",
+        dr2 = _create_dagrun(
+            test_dag,
+            run_type=DagRunType.MANUAL,
             logical_date=DEFAULT_DATE + datetime.timedelta(days=1),
             data_interval=(
                 DEFAULT_DATE + datetime.timedelta(days=1),
                 DEFAULT_DATE + datetime.timedelta(days=1),
             ),
-            **triggered_by_kwargs,
         )
-        dr3 = test_dag.create_dagrun(
-            state=None,
-            run_id="test3",
+        dr3 = _create_dagrun(
+            test_dag,
+            run_type=DagRunType.MANUAL,
             logical_date=DEFAULT_DATE + datetime.timedelta(days=2),
             data_interval=(
                 DEFAULT_DATE + datetime.timedelta(days=2),
                 DEFAULT_DATE + datetime.timedelta(days=2),
             ),
-            **triggered_by_kwargs,
         )
-        dr4 = test_dag.create_dagrun(
-            state=None,
-            run_id="test4",
+        dr4 = _create_dagrun(
+            test_dag,
+            run_type=DagRunType.MANUAL,
             logical_date=DEFAULT_DATE + datetime.timedelta(days=3),
             data_interval=(
                 DEFAULT_DATE + datetime.timedelta(days=2),
                 DEFAULT_DATE + datetime.timedelta(days=2),
             ),
-            **triggered_by_kwargs,
         )
 
         ti1 = TI(task=test_task, run_id=dr1.run_id)
@@ -407,6 +427,8 @@ class TestDag:
                 state=State.SUCCESS,
                 run_type=type,
                 run_id=f"test_{delta_h}",
+                logical_date=None,
+                data_interval=None,
                 session=session,
                 **triggered_by_kwargs,
             )
@@ -606,6 +628,7 @@ class TestDag:
         dag.add_task(BaseOperator(task_id="task_without_start_date"))
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dagrun = dag.create_dagrun(
+            run_id="test",
             state=State.RUNNING,
             run_type=DagRunType.MANUAL,
             logical_date=DEFAULT_DATE,
@@ -802,6 +825,7 @@ class TestDag:
 
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dr = dag.create_dagrun(
+            run_id="test",
             state=state,
             logical_date=model.next_dagrun,
             run_type=DagRunType.SCHEDULED,
@@ -1089,6 +1113,7 @@ class TestDag:
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
 
         dag_run = dag.create_dagrun(
+            run_id="test",
             run_type=DagRunType.SCHEDULED,
             logical_date=TEST_DATE,
             state=State.RUNNING,
@@ -1128,6 +1153,7 @@ class TestDag:
 
         with create_session() as session:
             dag_run = dag.create_dagrun(
+                run_id="test",
                 state=State.RUNNING,
                 logical_date=when,
                 run_type=DagRunType.MANUAL,
@@ -1166,6 +1192,7 @@ class TestDag:
         with create_session() as session:
             triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} 
if AIRFLOW_V_3_0_PLUS else {}
             dag_run = dag.create_dagrun(
+                run_id="test",
                 state=State.RUNNING,
                 logical_date=TEST_DATE,
                 run_type=DagRunType.MANUAL,
@@ -1194,15 +1221,13 @@ class TestDag:
         dag_id = "test_schedule_dag_fake_scheduled_previous"
         dag = DAG(dag_id=dag_id, schedule=delta, start_date=DEFAULT_DATE)
         dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", 
start_date=DEFAULT_DATE))
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
 
-        dag.create_dagrun(
+        _create_dagrun(
+            dag,
             run_type=DagRunType.SCHEDULED,
             logical_date=DEFAULT_DATE,
             state=State.SUCCESS,
-            external_trigger=True,
             data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-            **triggered_by_kwargs,
         )
         dag.sync_to_db()
         with create_session() as session:
@@ -1228,13 +1253,12 @@ class TestDag:
         # Sync once to create the DagModel
         dag.sync_to_db()
 
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dag.create_dagrun(
+        _create_dagrun(
+            dag,
             run_type=DagRunType.SCHEDULED,
             logical_date=TEST_DATE,
             state=State.SUCCESS,
             data_interval=(TEST_DATE, TEST_DATE),
-            **triggered_by_kwargs,
         )
 
         # Then sync again after creating the dag run -- this should update 
next_dagrun
@@ -1256,15 +1280,13 @@ class TestDag:
 
         start_date = timezone.utcnow()
 
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        run = dag.create_dagrun(
-            run_id="test_" + start_date.isoformat(),
+        run = _create_dagrun(
+            dag,
+            run_type=DagRunType.MANUAL,
             logical_date=start_date,
             start_date=start_date,
             state=State.RUNNING,
-            external_trigger=False,
             data_interval=(start_date, start_date),
-            **triggered_by_kwargs,
         )
 
         run.refresh_from_db()
@@ -1389,37 +1411,15 @@ class TestDag:
         assert dag.timetable == timetable
         assert dag.timetable.description == expected_description
 
-    def test_create_dagrun_run_id_is_generated(self):
-        dag = DAG(dag_id="run_id_is_generated", schedule=None)
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dr = dag.create_dagrun(
-            run_type=DagRunType.MANUAL,
-            logical_date=DEFAULT_DATE,
-            state=State.NONE,
-            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-            **triggered_by_kwargs,
-        )
-        assert dr.run_id == f"manual__{DEFAULT_DATE.isoformat()}"
-
-    def test_create_dagrun_run_type_is_obtained_from_run_id(self):
-        dag = DAG(dag_id="run_type_is_obtained_from_run_id", schedule=None)
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dr = dag.create_dagrun(run_id="scheduled__", state=State.NONE, 
**triggered_by_kwargs)
-        assert dr.run_type == DagRunType.SCHEDULED
-
-        dr = dag.create_dagrun(
-            run_id="custom_is_set_to_manual",
-            state=State.NONE,
-            **triggered_by_kwargs,
-        )
-        assert dr.run_type == DagRunType.MANUAL
-
     def test_create_dagrun_job_id_is_set(self):
         job_id = 42
         dag = DAG(dag_id="test_create_dagrun_job_id_is_set", schedule=None)
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dr = dag.create_dagrun(
             run_id="test_create_dagrun_job_id_is_set",
+            logical_date=DEFAULT_DATE,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            run_type=DagRunType.MANUAL,
             state=State.NONE,
             creating_job_id=job_id,
             **triggered_by_kwargs,
@@ -1485,14 +1485,13 @@ class TestDag:
         t_1 = EmptyOperator(task_id=task_id, dag=dag)
 
         session = settings.Session()
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dagrun_1 = dag.create_dagrun(
+        dagrun_1 = _create_dagrun(
+            dag,
             run_type=DagRunType.BACKFILL_JOB,
             state=State.FAILED,
             start_date=DEFAULT_DATE,
             logical_date=DEFAULT_DATE,
             data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-            **triggered_by_kwargs,
         )
         session.merge(dagrun_1)
 
@@ -1536,6 +1535,7 @@ class TestDag:
         session = settings.Session()
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dagrun_1 = dag.create_dagrun(
+            run_id="backfill",
             run_type=DagRunType.BACKFILL_JOB,
             state=State.FAILED,
             start_date=DEFAULT_DATE,
@@ -1723,6 +1723,7 @@ my_postgres_conn:
         session = settings.Session()  # type: ignore
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dagrun_1 = dag.create_dagrun(
+            run_id="backfill",
             run_type=DagRunType.BACKFILL_JOB,
             state=DagRunState.RUNNING,
             start_date=DEFAULT_DATE,
@@ -1733,6 +1734,8 @@ my_postgres_conn:
         session.merge(dagrun_1)
 
         task_instance_1 = dagrun_1.get_task_instance(task_id)
+        if TYPE_CHECKING:
+            assert task_instance_1
         task_instance_1.state = ti_state_begin
         task_instance_1.job_id = 123
         session.merge(task_instance_1)
@@ -2037,6 +2040,7 @@ my_postgres_conn:
         with pytest.raises(ParamValidationError, match="No value passed and 
Param has no default value"):
             dag.create_dagrun(
                 run_id="test_dagrun_missing_param",
+                run_type=DagRunType.MANUAL,
                 state=State.RUNNING,
                 logical_date=TEST_DATE,
                 data_interval=(TEST_DATE, TEST_DATE),
@@ -2049,6 +2053,7 @@ my_postgres_conn:
         ):
             dag.create_dagrun(
                 run_id="test_dagrun_missing_param",
+                run_type=DagRunType.MANUAL,
                 state=State.RUNNING,
                 logical_date=TEST_DATE,
                 conf={"param1": None},
@@ -2059,6 +2064,7 @@ my_postgres_conn:
         dag = DAG("dummy-dag", schedule=None, params={"param1": 
Param(type="string")})
         dag.create_dagrun(
             run_id="test_dagrun_missing_param",
+            run_type=DagRunType.MANUAL,
             state=State.RUNNING,
             logical_date=TEST_DATE,
             conf={"param1": "hello"},
@@ -2521,6 +2527,7 @@ class TestQueries:
         with assert_queries_count(4):
             dag.create_dagrun(
                 run_id="test_dagrun_query_count",
+                run_type=DagRunType.MANUAL,
                 state=State.RUNNING,
                 logical_date=TEST_DATE,
                 data_interval=(TEST_DATE, TEST_DATE),
@@ -2595,7 +2602,8 @@ class TestDagDecorator:
 
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dr = dag.create_dagrun(
-            run_id=DagRunType.MANUAL.value,
+            run_id="test",
+            run_type=DagRunType.MANUAL,
             start_date=timezone.utcnow(),
             logical_date=self.DEFAULT_DATE,
             data_interval=(self.DEFAULT_DATE, self.DEFAULT_DATE),
@@ -2625,7 +2633,8 @@ class TestDagDecorator:
         new_value = 52
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dr = dag.create_dagrun(
-            run_id=DagRunType.MANUAL.value,
+            run_id="test",
+            run_type=DagRunType.MANUAL,
             start_date=timezone.utcnow(),
             logical_date=self.DEFAULT_DATE,
             data_interval=(self.DEFAULT_DATE, self.DEFAULT_DATE),
@@ -3119,25 +3128,6 @@ def 
test_get_asset_triggered_next_run_info_with_unresolved_asset_alias(dag_maker
     assert dag1_model.get_asset_triggered_next_run_info(session=session) is 
None
 
 
-def test_dag_uses_timetable_for_run_id(session):
-    class CustomRunIdTimetable(Timetable):
-        def generate_run_id(self, *, run_type, logical_date, data_interval, 
**extra) -> str:
-            return "abc"
-
-    dag = DAG(dag_id="test", start_date=DEFAULT_DATE, 
schedule=CustomRunIdTimetable())
-    triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-
-    dag_run = dag.create_dagrun(
-        run_type=DagRunType.MANUAL,
-        state=DagRunState.QUEUED,
-        logical_date=DEFAULT_DATE,
-        data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-        **triggered_by_kwargs,
-    )
-
-    assert dag_run.run_id == "abc"
-
-
 @pytest.mark.parametrize(
     "run_id_type",
     [DagRunType.BACKFILL_JOB, DagRunType.SCHEDULED, 
DagRunType.ASSET_TRIGGERED],
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index f398be8ad88..96c06d279bc 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -30,7 +30,6 @@ import pytest
 from airflow import settings
 from airflow.callbacks.callback_requests import DagCallbackRequest
 from airflow.decorators import setup, task, task_group, teardown
-from airflow.exceptions import AirflowException
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.dag import DAG, DagModel
 from airflow.models.dagrun import DagRun, DagRunNote
@@ -111,6 +110,11 @@ class TestDagRun:
             run_type = DagRunType.MANUAL
             data_interval = 
dag.timetable.infer_manual_data_interval(run_after=logical_date)
         dag_run = dag.create_dagrun(
+            run_id=dag.timetable.generate_run_id(
+                run_type=run_type,
+                logical_date=logical_date,
+                data_interval=data_interval,
+            ),
             run_type=run_type,
             logical_date=logical_date,
             data_interval=data_interval,
@@ -123,6 +127,8 @@ class TestDagRun:
         if task_states is not None:
             for task_id, task_state in task_states.items():
                 ti = dag_run.get_task_instance(task_id)
+                if TYPE_CHECKING:
+                    assert ti
                 ti.set_state(task_state, session)
             session.flush()
 
@@ -279,18 +285,11 @@ class TestDagRun:
         dag_run.update_state()
         assert dag_run.state == DagRunState.SUCCESS
 
-    def test_dagrun_success_conditions(self, session):
-        dag = DAG(
-            "test_dagrun_success_conditions",
-            schedule=datetime.timedelta(days=1),
-            start_date=DEFAULT_DATE,
-            default_args={"owner": "owner1"},
-        )
-
+    def test_dagrun_success_conditions(self, dag_maker, session):
         # A -> B
         # A -> C -> D
         # ordered: B, D, C, A or D, B, C, A or D, C, B, A
-        with dag:
+        with dag_maker(schedule=datetime.timedelta(days=1), session=session):
             op1 = EmptyOperator(task_id="A")
             op2 = EmptyOperator(task_id="B")
             op3 = EmptyOperator(task_id="C")
@@ -298,18 +297,7 @@ class TestDagRun:
             op1.set_upstream([op2, op3])
             op3.set_upstream(op4)
 
-        dag.clear()
-
-        now = pendulum.now("UTC")
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dr = dag.create_dagrun(
-            run_id="test_dagrun_success_conditions",
-            state=DagRunState.RUNNING,
-            logical_date=now,
-            
data_interval=dag.timetable.infer_manual_data_interval(run_after=now),
-            start_date=now,
-            **triggered_by_kwargs,
-        )
+        dr = dag_maker.create_dagrun()
 
         # op1 = root
         ti_op1 = dr.get_task_instance(task_id=op1.task_id)
@@ -330,32 +318,14 @@ class TestDagRun:
         dr.update_state()
         assert dr.state == DagRunState.SUCCESS
 
-    def test_dagrun_deadlock(self, session):
-        dag = DAG(
-            "text_dagrun_deadlock",
-            schedule=datetime.timedelta(days=1),
-            start_date=DEFAULT_DATE,
-            default_args={"owner": "owner1"},
-        )
-
-        with dag:
+    def test_dagrun_deadlock(self, dag_maker, session):
+        with dag_maker(schedule=datetime.timedelta(days=1), session=session):
             op1 = EmptyOperator(task_id="A")
             op2 = EmptyOperator(task_id="B")
             op2.trigger_rule = TriggerRule.ONE_FAILED
             op2.set_upstream(op1)
 
-        dag.clear()
-        now = pendulum.now("UTC")
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dr = dag.create_dagrun(
-            run_id="test_dagrun_deadlock",
-            state=DagRunState.RUNNING,
-            logical_date=now,
-            
data_interval=dag.timetable.infer_manual_data_interval(run_after=now),
-            start_date=now,
-            session=session,
-            **triggered_by_kwargs,
-        )
+        dr = dag_maker.create_dagrun()
 
         ti_op1: TI = dr.get_task_instance(task_id=op1.task_id, session=session)
         ti_op2: TI = dr.get_task_instance(task_id=op2.task_id, session=session)
@@ -370,56 +340,32 @@ class TestDagRun:
         dr.update_state(session=session)
         assert dr.state == DagRunState.FAILED
 
-    def test_dagrun_no_deadlock_with_restarting(self, session):
-        dag = DAG(
-            "test_dagrun_no_deadlock_with_restarting",
-            schedule=datetime.timedelta(days=1),
-            start_date=DEFAULT_DATE,
-        )
-        with dag:
+    def test_dagrun_no_deadlock_with_restarting(self, dag_maker, session):
+        with dag_maker(schedule=datetime.timedelta(days=1)):
             op1 = EmptyOperator(task_id="upstream_task")
             op2 = EmptyOperator(task_id="downstream_task")
             op2.set_upstream(op1)
 
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dr = dag.create_dagrun(
-            run_id="test_dagrun_no_deadlock_with_shutdown",
-            state=DagRunState.RUNNING,
-            logical_date=DEFAULT_DATE,
-            
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
-            start_date=DEFAULT_DATE,
-            **triggered_by_kwargs,
-        )
+        dr = dag_maker.create_dagrun()
         upstream_ti = dr.get_task_instance(task_id="upstream_task")
         upstream_ti.set_state(TaskInstanceState.RESTARTING, session=session)
 
         dr.update_state()
         assert dr.state == DagRunState.RUNNING
 
-    def test_dagrun_no_deadlock_with_depends_on_past(self, session):
-        dag = DAG("test_dagrun_no_deadlock", 
schedule=datetime.timedelta(days=1), start_date=DEFAULT_DATE)
-        with dag:
+    def test_dagrun_no_deadlock_with_depends_on_past(self, dag_maker, session):
+        with dag_maker(schedule=datetime.timedelta(days=1)):
             EmptyOperator(task_id="dop", depends_on_past=True)
             EmptyOperator(task_id="tc", max_active_tis_per_dag=1)
 
-        dag.clear()
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dr = dag.create_dagrun(
+        dr = dag_maker.create_dagrun(
             run_id="test_dagrun_no_deadlock_1",
-            state=DagRunState.RUNNING,
-            logical_date=DEFAULT_DATE,
-            
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
             start_date=DEFAULT_DATE,
-            **triggered_by_kwargs,
         )
-        next_date = DEFAULT_DATE + datetime.timedelta(days=1)
-        dr2 = dag.create_dagrun(
+        dr2 = dag_maker.create_dagrun_after(
+            dr,
             run_id="test_dagrun_no_deadlock_2",
-            state=DagRunState.RUNNING,
-            logical_date=next_date,
-            
data_interval=dag.timetable.infer_manual_data_interval(run_after=next_date),
-            start_date=next_date,
-            **triggered_by_kwargs,
+            start_date=DEFAULT_DATE + datetime.timedelta(days=1),
         )
         ti1_op1 = dr.get_task_instance(task_id="dop")
         dr2.get_task_instance(task_id="dop")
@@ -594,26 +540,11 @@ class TestDagRun:
             msg="task_failure",
         )
 
-    def test_dagrun_set_state_end_date(self, session):
-        dag = DAG(
-            "test_dagrun_set_state_end_date",
-            schedule=datetime.timedelta(days=1),
-            start_date=DEFAULT_DATE,
-            default_args={"owner": "owner1"},
-        )
-
-        dag.clear()
+    def test_dagrun_set_state_end_date(self, dag_maker, session):
+        with dag_maker(schedule=datetime.timedelta(days=1), 
start_date=DEFAULT_DATE):
+            pass
 
-        now = pendulum.now("UTC")
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dr = dag.create_dagrun(
-            run_id="test_dagrun_set_state_end_date",
-            state=DagRunState.RUNNING,
-            logical_date=now,
-            data_interval=dag.timetable.infer_manual_data_interval(now),
-            start_date=now,
-            **triggered_by_kwargs,
-        )
+        dr = dag_maker.create_dagrun()
 
         # Initial end_date should be NULL
         # DagRunState.SUCCESS and DagRunState.FAILED are all ending state and 
should set end_date
@@ -626,7 +557,7 @@ class TestDagRun:
         session.merge(dr)
         session.commit()
 
-        dr_database = session.query(DagRun).filter(DagRun.run_id == 
"test_dagrun_set_state_end_date").one()
+        dr_database = session.query(DagRun).filter(DagRun.run_id == 
dr.run_id).one()
         assert dr_database.end_date is not None
         assert dr.end_date == dr_database.end_date
 
@@ -634,44 +565,26 @@ class TestDagRun:
         session.merge(dr)
         session.commit()
 
-        dr_database = session.query(DagRun).filter(DagRun.run_id == 
"test_dagrun_set_state_end_date").one()
+        dr_database = session.query(DagRun).filter(DagRun.run_id == 
dr.run_id).one()
 
         assert dr_database.end_date is None
 
         dr.set_state(DagRunState.FAILED)
         session.merge(dr)
         session.commit()
-        dr_database = session.query(DagRun).filter(DagRun.run_id == 
"test_dagrun_set_state_end_date").one()
+        dr_database = session.query(DagRun).filter(DagRun.run_id == 
dr.run_id).one()
 
         assert dr_database.end_date is not None
         assert dr.end_date == dr_database.end_date
 
-    def test_dagrun_update_state_end_date(self, session):
-        dag = DAG(
-            "test_dagrun_update_state_end_date",
-            schedule=datetime.timedelta(days=1),
-            start_date=DEFAULT_DATE,
-            default_args={"owner": "owner1"},
-        )
-
+    def test_dagrun_update_state_end_date(self, dag_maker, session):
         # A -> B
-        with dag:
+        with dag_maker(schedule=datetime.timedelta(days=1)):
             op1 = EmptyOperator(task_id="A")
             op2 = EmptyOperator(task_id="B")
             op1.set_upstream(op2)
 
-        dag.clear()
-
-        now = pendulum.now("UTC")
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dr = dag.create_dagrun(
-            run_id="test_dagrun_update_state_end_date",
-            state=DagRunState.RUNNING,
-            logical_date=now,
-            data_interval=dag.timetable.infer_manual_data_interval(now),
-            start_date=now,
-            **triggered_by_kwargs,
-        )
+        dr = dag_maker.create_dagrun()
 
         # Initial end_date should be NULL
         # DagRunState.SUCCESS and DagRunState.FAILED are all ending state and 
should set end_date
@@ -687,7 +600,7 @@ class TestDagRun:
 
         dr.update_state()
 
-        dr_database = session.query(DagRun).filter(DagRun.run_id == 
"test_dagrun_update_state_end_date").one()
+        dr_database = session.query(DagRun).filter(DagRun.run_id == 
dr.run_id).one()
         assert dr_database.end_date is not None
         assert dr.end_date == dr_database.end_date
 
@@ -695,7 +608,7 @@ class TestDagRun:
         ti_op2.set_state(state=TaskInstanceState.RUNNING, session=session)
         dr.update_state()
 
-        dr_database = session.query(DagRun).filter(DagRun.run_id == 
"test_dagrun_update_state_end_date").one()
+        dr_database = session.query(DagRun).filter(DagRun.run_id == 
dr.run_id).one()
 
         assert dr._state == DagRunState.RUNNING
         assert dr.end_date is None
@@ -705,7 +618,7 @@ class TestDagRun:
         ti_op2.set_state(state=TaskInstanceState.FAILED, session=session)
         dr.update_state()
 
-        dr_database = session.query(DagRun).filter(DagRun.run_id == 
"test_dagrun_update_state_end_date").one()
+        dr_database = session.query(DagRun).filter(DagRun.run_id == 
dr.run_id).one()
 
         assert dr_database.end_date is not None
         assert dr.end_date == dr_database.end_date
@@ -925,6 +838,11 @@ class TestDagRun:
         session.flush()
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dr = dag.create_dagrun(
+            run_id=dag.timetable.generate_run_id(
+                run_type=DagRunType.SCHEDULED,
+                logical_date=DEFAULT_DATE,
+                data_interval=dag.infer_automated_data_interval(DEFAULT_DATE),
+            ),
             run_type=DagRunType.SCHEDULED,
             state=state,
             logical_date=DEFAULT_DATE,
@@ -998,6 +916,11 @@ class TestDagRun:
             session.flush()
             triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} 
if AIRFLOW_V_3_0_PLUS else {}
             dag_run = dag.create_dagrun(
+                run_id=dag.timetable.generate_run_id(
+                    run_type=DagRunType.SCHEDULED,
+                    logical_date=dag.start_date,
+                    
data_interval=dag.infer_automated_data_interval(dag.start_date),
+                ),
                 run_type=DagRunType.SCHEDULED,
                 state=DagRunState.SUCCESS,
                 logical_date=dag.start_date,
@@ -1065,12 +988,7 @@ class TestDagRun:
         with dag_maker(session=session) as dag:
             PythonOperator(task_id="t1", python_callable=lambda: print)
             PythonOperator(task_id="t2", python_callable=lambda: print)
-        dr = dag.create_dagrun(
-            state=DagRunState.FAILED,
-            triggered_by=DagRunTriggeredByType.TEST,
-            run_id="abc123",
-            session=session,
-        )
+        dr = dag_maker.create_dagrun(state=DagRunState.FAILED)
         for ti in dr.get_task_instances(session=session):
             ti.state = TaskInstanceState.FAILED
         session.commit()
@@ -2821,9 +2739,10 @@ def test_tis_considered_for_state(dag_maker, session, 
input, expected):
 def test_dag_run_id_config(session, dag_maker, pattern, run_id, result):
     with conf_vars({("scheduler", "allowed_run_id_pattern"): pattern}):
         with dag_maker():
-            ...
+            pass
+        run_type = DagRunType.from_run_id(run_id)
         if result:
-            dag_maker.create_dagrun(run_id=run_id)
+            dag_maker.create_dagrun(run_id=run_id, run_type=run_type)
         else:
-            with pytest.raises(AirflowException):
-                dag_maker.create_dagrun(run_id=run_id)
+            with pytest.raises(ValueError):
+                dag_maker.create_dagrun(run_id=run_id, run_type=run_type)
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 0422ceb3994..29506c66761 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1748,6 +1748,8 @@ class TestTaskInstance:
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dr = ti.task.dag.create_dagrun(
             run_id="test2",
+            run_type=DagRunType.MANUAL,
+            logical_date=exec_date,
             data_interval=(exec_date, exec_date),
             state=None,
             **triggered_by_kwargs,
@@ -2022,6 +2024,7 @@ class TestTaskInstance:
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dr = ti1.task.dag.create_dagrun(
             logical_date=logical_date,
+            run_type=DagRunType.MANUAL,
             state=None,
             run_id="2",
             session=session,
diff --git a/tests/sensors/test_external_task_sensor.py 
b/tests/sensors/test_external_task_sensor.py
index c73980ca24b..d70c63263b9 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -45,11 +45,12 @@ from airflow.providers.standard.sensors.external_task 
import (
 from airflow.providers.standard.sensors.time import TimeSensor
 from airflow.providers.standard.triggers.external_task import WorkflowTrigger
 from airflow.serialization.serialized_objects import SerializedBaseOperator
+from airflow.timetables.base import DataInterval
 from airflow.utils.hashlib_wrapper import md5
 from airflow.utils.session import NEW_SESSION, create_session, provide_session
 from airflow.utils.state import DagRunState, State, TaskInstanceState
 from airflow.utils.task_group import TaskGroup
-from airflow.utils.timezone import datetime
+from airflow.utils.timezone import coerce_datetime, datetime
 from airflow.utils.types import DagRunType
 
 from tests.models import TEST_DAGS_FOLDER
@@ -438,17 +439,11 @@ class TestExternalTaskSensor:
             f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... "
         ) in caplog.messages
 
-    def test_external_dag_sensor(self):
-        other_dag = DAG("other_dag", default_args=self.args, 
end_date=DEFAULT_DATE, schedule="@once")
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        other_dag.create_dagrun(
-            run_id="test",
-            start_date=DEFAULT_DATE,
-            logical_date=DEFAULT_DATE,
-            state=State.SUCCESS,
-            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-            **triggered_by_kwargs,
-        )
+    def test_external_dag_sensor(self, dag_maker):
+        with dag_maker("other_dag", default_args=self.args, 
end_date=DEFAULT_DATE, schedule="@once"):
+            pass
+        dag_maker.create_dagrun(state=DagRunState.SUCCESS)
+
         op = ExternalTaskSensor(
             task_id="test_external_dag_sensor_check",
             external_dag_id="other_dag",
@@ -457,17 +452,10 @@ class TestExternalTaskSensor:
         )
         op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
-    def test_external_dag_sensor_log(self, caplog):
-        other_dag = DAG("other_dag", default_args=self.args, 
end_date=DEFAULT_DATE, schedule="@once")
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        other_dag.create_dagrun(
-            run_id="test",
-            start_date=DEFAULT_DATE,
-            logical_date=DEFAULT_DATE,
-            state=State.SUCCESS,
-            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-            **triggered_by_kwargs,
-        )
+    def test_external_dag_sensor_log(self, caplog, dag_maker):
+        with dag_maker("other_dag", default_args=self.args, 
end_date=DEFAULT_DATE, schedule="@once"):
+            pass
+        dag_maker.create_dagrun(state=DagRunState.SUCCESS)
         op = ExternalTaskSensor(
             task_id="test_external_dag_sensor_check",
             external_dag_id="other_dag",
@@ -476,17 +464,10 @@ class TestExternalTaskSensor:
         op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
         assert (f"Poking for DAG 'other_dag' on {DEFAULT_DATE.isoformat()} ... 
") in caplog.messages
 
-    def test_external_dag_sensor_soft_fail_as_skipped(self):
-        other_dag = DAG("other_dag", default_args=self.args, 
end_date=DEFAULT_DATE, schedule="@once")
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        other_dag.create_dagrun(
-            run_id="test",
-            start_date=DEFAULT_DATE,
-            logical_date=DEFAULT_DATE,
-            state=State.SUCCESS,
-            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-            **triggered_by_kwargs,
-        )
+    def test_external_dag_sensor_soft_fail_as_skipped(self, dag_maker, 
session):
+        with dag_maker("other_dag", default_args=self.args, 
end_date=DEFAULT_DATE, schedule="@once"):
+            pass
+        dag_maker.create_dagrun(state=DagRunState.SUCCESS)
         op = ExternalTaskSensor(
             task_id="test_external_dag_sensor_check",
             external_dag_id="other_dag",
@@ -501,7 +482,6 @@ class TestExternalTaskSensor:
         op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
         # then
-        session = settings.Session()
         TI = TaskInstance
         task_instances: list[TI] = session.query(TI).filter(TI.task_id == 
op.task_id).all()
         assert len(task_instances) == 1, "Unexpected number of task instances"
@@ -1252,19 +1232,24 @@ def run_tasks(
     """
     runs: dict[str, DagRun] = {}
     tis: dict[str, TaskInstance] = {}
-    triggered_by = DagRunTriggeredByType.TEST if AIRFLOW_V_3_0_PLUS else None
 
     for dag in dag_bag.dags.values():
-        dagrun = dag.create_dagrun(
-            state=DagRunState.RUNNING,
+        data_interval = DataInterval(coerce_datetime(logical_date), 
coerce_datetime(logical_date))
+        runs[dag.dag_id] = dagrun = dag.create_dagrun(
+            run_id=dag.timetable.generate_run_id(
+                run_type=DagRunType.MANUAL,
+                logical_date=logical_date,
+                data_interval=data_interval,
+            ),
             logical_date=logical_date,
-            start_date=logical_date,
+            data_interval=data_interval,
             run_type=DagRunType.MANUAL,
+            triggered_by=DagRunTriggeredByType.TEST,
+            dag_version=None,
+            state=DagRunState.RUNNING,
+            start_date=logical_date,
             session=session,
-            data_interval=(logical_date, logical_date),
-            triggered_by=triggered_by,
         )
-        runs[dag.dag_id] = dagrun
         # we use sorting by task_id here because for the test DAG structure of 
ours
         # this is equivalent to topological sort. It would not work in general 
case
         # but it works for our case because we specifically constructed test 
DAGS
diff --git a/tests/task/test_standard_task_runner.py 
b/tests/task/test_standard_task_runner.py
index 5e9eee15b07..51c5f9f8fa3 100644
--- a/tests/task/test_standard_task_runner.py
+++ b/tests/task/test_standard_task_runner.py
@@ -40,6 +40,7 @@ from airflow.utils.log.file_task_handler import 
FileTaskHandler
 from airflow.utils.platform import getuser
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
+from airflow.utils.types import DagRunType
 
 from tests.listeners import xcom_listener
 from tests.listeners.file_write_listener import FileWriteListener
@@ -173,7 +174,7 @@ class TestStandardTaskRunner:
         mock_read_task_utilization.assert_called()
 
     @pytest.mark.db_test
-    def test_notifies_about_start_and_stop(self, tmp_path):
+    def test_notifies_about_start_and_stop(self, tmp_path, session):
         path_listener_writer = tmp_path / "test_notifies_about_start_and_stop"
 
         lm = get_listener_manager()
@@ -185,14 +186,19 @@ class TestStandardTaskRunner:
         )
         dag = dagbag.dags.get("test_example_bash_operator")
         task = dag.get_task("runme_1")
+        current_time = timezone.utcnow()
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dag.create_dagrun(
             run_id="test",
-            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            logical_date=current_time,
+            data_interval=(current_time, current_time),
+            run_type=DagRunType.MANUAL,
             state=State.RUNNING,
-            start_date=DEFAULT_DATE,
+            start_date=current_time,
+            session=session,
             **triggered_by_kwargs,
         )
+        session.commit()
         ti = TaskInstance(task=task, run_id="test")
         job = Job(dag_id=ti.dag_id)
         job_runner = LocalTaskJobRunner(job=job, task_instance=ti, 
ignore_ti_state=True)
@@ -228,12 +234,15 @@ class TestStandardTaskRunner:
         )
         dag = dagbag.dags.get("test_failing_bash_operator")
         task = dag.get_task("failing_task")
+        current_time = timezone.utcnow()
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dag.create_dagrun(
             run_id="test",
-            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            logical_date=current_time,
+            data_interval=(current_time, current_time),
+            run_type=DagRunType.MANUAL,
             state=State.RUNNING,
-            start_date=DEFAULT_DATE,
+            start_date=current_time,
             **triggered_by_kwargs,
         )
         ti = TaskInstance(task=task, run_id="test")
@@ -275,12 +284,15 @@ class TestStandardTaskRunner:
         )
         dag = dagbag.dags.get("test_dag_xcom_openlineage")
         task = dag.get_task("push_and_pull")
+        current_time = timezone.utcnow()
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dag.create_dagrun(
             run_id="test",
-            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            logical_date=current_time,
+            data_interval=(current_time, current_time),
+            run_type=DagRunType.MANUAL,
             state=State.RUNNING,
-            start_date=DEFAULT_DATE,
+            start_date=current_time,
             **triggered_by_kwargs,
         )
 
@@ -408,7 +420,9 @@ class TestStandardTaskRunner:
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dag.create_dagrun(
             run_id="test",
+            logical_date=DEFAULT_DATE,
             data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            run_type=DagRunType.MANUAL,
             state=State.RUNNING,
             start_date=DEFAULT_DATE,
             **triggered_by_kwargs,
@@ -463,7 +477,9 @@ class TestStandardTaskRunner:
 
         dag.create_dagrun(
             run_id="test",
+            logical_date=DEFAULT_DATE,
             data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            run_type=DagRunType.MANUAL,
             state=State.RUNNING,
             start_date=DEFAULT_DATE,
             **triggered_by_kwargs,
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index 19a432bb737..454af48d667 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -36,7 +36,6 @@ from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONF
 from airflow.executors import executor_loader
 from airflow.jobs.job import Job
 from airflow.jobs.triggerer_job_runner import TriggererJobRunner
-from airflow.models.dag import DAG
 from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance
 from airflow.models.trigger import Trigger
@@ -56,10 +55,6 @@ from airflow.utils.timezone import datetime
 from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
-    from airflow.utils.types import DagRunTriggeredByType
 
 pytestmark = pytest.mark.db_test
 
@@ -91,24 +86,17 @@ class TestFileTaskLogHandler:
         handler = handlers[0]
         assert handler.name == FILE_TASK_HANDLER
 
-    def test_file_task_handler_when_ti_value_is_invalid(self):
+    def test_file_task_handler_when_ti_value_is_invalid(self, dag_maker):
         def task_callable(ti):
             ti.log.info("test")
 
-        dag = DAG("dag_for_testing_file_task_handler", schedule=None, 
start_date=DEFAULT_DATE)
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dagrun = dag.create_dagrun(
-            run_type=DagRunType.MANUAL,
-            state=State.RUNNING,
-            logical_date=DEFAULT_DATE,
-            
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
-            **triggered_by_kwargs,
-        )
-        task = PythonOperator(
-            task_id="task_for_testing_file_log_handler",
-            dag=dag,
-            python_callable=task_callable,
-        )
+        with dag_maker("dag_for_testing_file_task_handler", schedule=None):
+            task = PythonOperator(
+                task_id="task_for_testing_file_log_handler",
+                python_callable=task_callable,
+            )
+
+        dagrun = dag_maker.create_dagrun()
         ti = TaskInstance(task=task, run_id=dagrun.run_id)
 
         logger = ti.log
@@ -146,26 +134,22 @@ class TestFileTaskLogHandler:
         # Remove the generated tmp log file.
         os.remove(log_filename)
 
-    def test_file_task_handler(self):
+    def test_file_task_handler(self, dag_maker, session):
         def task_callable(ti):
             ti.log.info("test")
 
-        dag = DAG("dag_for_testing_file_task_handler", schedule=None, 
start_date=DEFAULT_DATE)
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dagrun = dag.create_dagrun(
-            run_type=DagRunType.MANUAL,
-            state=State.RUNNING,
-            logical_date=DEFAULT_DATE,
-            
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
-            **triggered_by_kwargs,
-        )
-        task = PythonOperator(
-            task_id="task_for_testing_file_log_handler",
-            dag=dag,
-            python_callable=task_callable,
-        )
-        ti = TaskInstance(task=task, run_id=dagrun.run_id)
+        with dag_maker("dag_for_testing_file_task_handler", schedule=None, 
session=session):
+            PythonOperator(
+                task_id="task_for_testing_file_log_handler",
+                python_callable=task_callable,
+            )
+
+        dagrun = dag_maker.create_dagrun()
+
+        (ti,) = dagrun.get_task_instances()
         ti.try_number += 1
+        session.merge(ti)
+        session.flush()
         logger = ti.log
         ti.log.disabled = False
 
@@ -203,24 +187,16 @@ class TestFileTaskLogHandler:
         # Remove the generated tmp log file.
         os.remove(log_filename)
 
-    def test_file_task_handler_running(self):
+    def test_file_task_handler_running(self, dag_maker):
         def task_callable(ti):
             ti.log.info("test")
 
-        dag = DAG("dag_for_testing_file_task_handler", schedule=None, 
start_date=DEFAULT_DATE)
-        task = PythonOperator(
-            task_id="task_for_testing_file_log_handler",
-            python_callable=task_callable,
-            dag=dag,
-        )
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dagrun = dag.create_dagrun(
-            run_type=DagRunType.MANUAL,
-            state=State.RUNNING,
-            logical_date=DEFAULT_DATE,
-            
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
-            **triggered_by_kwargs,
-        )
+        with dag_maker("dag_for_testing_file_task_handler", schedule=None):
+            task = PythonOperator(
+                task_id="task_for_testing_file_log_handler",
+                python_callable=task_callable,
+            )
+        dagrun = dag_maker.create_dagrun()
         ti = TaskInstance(task=task, run_id=dagrun.run_id)
 
         ti.try_number = 2
@@ -256,7 +232,7 @@ class TestFileTaskLogHandler:
         # Remove the generated tmp log file.
         os.remove(log_filename)
 
-    def test_file_task_handler_rotate_size_limit(self):
+    def test_file_task_handler_rotate_size_limit(self, dag_maker):
         def reset_log_config(update_conf):
             import logging.config
 
@@ -270,20 +246,12 @@ class TestFileTaskLogHandler:
         max_bytes_size = 60000
         update_conf = {"handlers": {"task": {"max_bytes": max_bytes_size, 
"backup_count": 1}}}
         reset_log_config(update_conf)
-        dag = DAG("dag_for_testing_file_task_handler_rotate_size_limit", 
start_date=DEFAULT_DATE)
-        task = PythonOperator(
-            task_id="task_for_testing_file_log_handler_rotate_size_limit",
-            python_callable=task_callable,
-            dag=dag,
-        )
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dagrun = dag.create_dagrun(
-            run_type=DagRunType.MANUAL,
-            state=State.RUNNING,
-            logical_date=DEFAULT_DATE,
-            
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
-            **triggered_by_kwargs,
-        )
+        with dag_maker("dag_for_testing_file_task_handler_rotate_size_limit"):
+            task = PythonOperator(
+                task_id="task_for_testing_file_log_handler_rotate_size_limit",
+                python_callable=task_callable,
+            )
+        dagrun = dag_maker.create_dagrun()
         ti = TaskInstance(task=task, run_id=dagrun.run_id)
 
         ti.try_number = 1
diff --git a/tests/utils/test_sqlalchemy.py b/tests/utils/test_sqlalchemy.py
index 0e791de1955..5b30595e4a3 100644
--- a/tests/utils/test_sqlalchemy.py
+++ b/tests/utils/test_sqlalchemy.py
@@ -42,6 +42,7 @@ from airflow.utils.sqlalchemy import (
 )
 from airflow.utils.state import State
 from airflow.utils.timezone import utcnow
+from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
@@ -81,6 +82,7 @@ class TestSqlAlchemyUtils:
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         run = dag.create_dagrun(
             run_id=iso_date,
+            run_type=DagRunType.MANUAL,
             state=State.NONE,
             logical_date=logical_date,
             start_date=start_date,
@@ -115,6 +117,7 @@ class TestSqlAlchemyUtils:
         with pytest.raises((ValueError, StatementError)):
             dag.create_dagrun(
                 run_id=start_date.isoformat,
+                run_type=DagRunType.MANUAL,
                 state=State.NONE,
                 logical_date=start_date,
                 start_date=start_date,
diff --git a/tests/utils/test_state.py b/tests/utils/test_state.py
index 5ad9f4a7044..e99bc0bd083 100644
--- a/tests/utils/test_state.py
+++ b/tests/utils/test_state.py
@@ -44,6 +44,11 @@ def test_dagrun_state_enum_escape():
         dag = DAG(dag_id="test_dagrun_state_enum_escape", 
schedule=timedelta(days=1), start_date=DEFAULT_DATE)
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dag.create_dagrun(
+            run_id=dag.timetable.generate_run_id(
+                run_type=DagRunType.SCHEDULED,
+                logical_date=DEFAULT_DATE,
+                
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
+            ),
             run_type=DagRunType.SCHEDULED,
             state=DagRunState.QUEUED,
             logical_date=DEFAULT_DATE,
diff --git a/tests/utils/test_types.py b/tests/utils/test_types.py
index c66c5807988..4a6831f4035 100644
--- a/tests/utils/test_types.py
+++ b/tests/utils/test_types.py
@@ -20,60 +20,42 @@ from datetime import timedelta
 
 import pytest
 
-from airflow.models.dag import DAG
 from airflow.models.dagrun import DagRun
-from airflow.utils.session import create_session
 from airflow.utils.state import State
 from airflow.utils.types import DagRunType
 
-from tests.models import DEFAULT_DATE
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
-    from airflow.utils.types import DagRunTriggeredByType
-
 pytestmark = pytest.mark.db_test
 
 
-def test_runtype_enum_escape():
+def test_runtype_enum_escape(dag_maker, session):
     """
     Make sure DagRunType.SCHEDULE is converted to string 'scheduled' when
     referenced in DB query
     """
-    with create_session() as session:
-        dag = DAG(dag_id="test_enum_dags", schedule=timedelta(days=1), 
start_date=DEFAULT_DATE)
-        data_interval = 
dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE)
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dag.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-            state=State.RUNNING,
-            logical_date=DEFAULT_DATE,
-            start_date=DEFAULT_DATE,
-            session=session,
-            data_interval=data_interval,
-            **triggered_by_kwargs,
-        )
-
-        query = session.query(
-            DagRun.dag_id,
-            DagRun.state,
-            DagRun.run_type,
-        ).filter(
-            DagRun.dag_id == dag.dag_id,
-            # make sure enum value can be used in filter queries
-            DagRun.run_type == DagRunType.SCHEDULED,
-        )
-        assert str(query.statement.compile(compile_kwargs={"literal_binds": 
True})) == (
-            "SELECT dag_run.dag_id, dag_run.state, dag_run.run_type \n"
-            "FROM dag_run \n"
-            "WHERE dag_run.dag_id = 'test_enum_dags' AND dag_run.run_type = 
'scheduled'"
-        )
-
-        rows = query.all()
-        assert len(rows) == 1
-        assert rows[0].dag_id == dag.dag_id
-        assert rows[0].state == State.RUNNING
-        # make sure value in db is stored as `scheduled`, not 
`DagRunType.SCHEDULED`
-        assert rows[0].run_type == "scheduled"
-
-        session.rollback()
+    with dag_maker(dag_id="test_enum_dags", schedule=timedelta(days=1), 
session=session):
+        pass
+    dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+    query = session.query(
+        DagRun.dag_id,
+        DagRun.state,
+        DagRun.run_type,
+    ).filter(
+        DagRun.dag_id == "test_enum_dags",
+        # make sure enum value can be used in filter queries
+        DagRun.run_type == DagRunType.SCHEDULED,
+    )
+    assert str(query.statement.compile(compile_kwargs={"literal_binds": 
True})) == (
+        "SELECT dag_run.dag_id, dag_run.state, dag_run.run_type \n"
+        "FROM dag_run \n"
+        "WHERE dag_run.dag_id = 'test_enum_dags' AND dag_run.run_type = 
'scheduled'"
+    )
+
+    rows = query.all()
+    assert len(rows) == 1
+    assert rows[0].dag_id == "test_enum_dags"
+    assert rows[0].state == State.RUNNING
+    # make sure value in db is stored as `scheduled`, not 
`DagRunType.SCHEDULED`
+    assert rows[0].run_type == "scheduled"
+
+    session.rollback()
diff --git a/tests/www/test_utils.py b/tests/www/test_utils.py
index c87848fdf0e..2e652f7811a 100644
--- a/tests/www/test_utils.py
+++ b/tests/www/test_utils.py
@@ -616,16 +616,10 @@ def 
test_dag_run_custom_sqla_interface_delete_no_collateral_damage(dag_maker, se
     interface = DagRunCustomSQLAInterface(obj=DagRun, session=session)
     dag_ids = (f"test_dag_{x}" for x in range(1, 4))
     dates = (pendulum.datetime(2023, 1, x) for x in range(1, 4))
-    triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
     for dag_id, date in itertools.product(dag_ids, dates):
-        with dag_maker(dag_id=dag_id) as dag:
-            dag.create_dagrun(
-                logical_date=date,
-                state="running",
-                run_type="scheduled",
-                data_interval=(date, date),
-                **triggered_by_kwargs,
-            )
+        with dag_maker(dag_id=dag_id):
+            pass
+        dag_maker.create_dagrun(logical_date=date, state="running", 
run_type="scheduled")
     dag_runs = session.query(DagRun).all()
     assert len(dag_runs) == 9
     assert len(set(x.run_id for x in dag_runs)) == 3
diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py
index c138e24125a..6ac7039cebd 100644
--- a/tests/www/views/test_views.py
+++ b/tests/www/views/test_views.py
@@ -43,12 +43,8 @@ from airflow.www.views import (
 
 from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.mock_plugins import mock_plugin_manager
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 from tests_common.test_utils.www import check_content_in_response, 
check_content_not_in_response
 
-if AIRFLOW_V_3_0_PLUS:
-    from airflow.utils.types import DagRunTriggeredByType
-
 pytestmark = pytest.mark.db_test
 
 
@@ -312,13 +308,12 @@ def test_app():
     return app.create_app(testing=True)
 
 
-def test_mark_task_instance_state(test_app):
+def test_mark_task_instance_state(test_app, dag_maker):
     """
     Test that _mark_task_instance_state() does all three things:
     - Marks the given TaskInstance as SUCCESS;
     - Clears downstream TaskInstances in FAILED/UPSTREAM_FAILED state;
     """
-    from airflow.models.dag import DAG
     from airflow.models.dagbag import DagBag
     from airflow.models.taskinstance import TaskInstance
     from airflow.operators.empty import EmptyOperator
@@ -332,7 +327,7 @@ def test_mark_task_instance_state(test_app):
 
     clear_db_runs()
     start_date = datetime(2020, 1, 1)
-    with DAG("test_mark_task_instance_state", start_date=start_date, 
schedule="0 0 * * *") as dag:
+    with dag_maker("test_mark_task_instance_state", start_date=start_date, 
schedule="0 0 * * *") as dag:
         task_1 = EmptyOperator(task_id="task_1")
         task_2 = EmptyOperator(task_id="task_2")
         task_3 = EmptyOperator(task_id="task_3")
@@ -341,15 +336,7 @@ def test_mark_task_instance_state(test_app):
 
         task_1 >> [task_2, task_3, task_4, task_5]
 
-    triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-    dagrun = dag.create_dagrun(
-        start_date=start_date,
-        logical_date=start_date,
-        data_interval=(start_date, start_date),
-        state=State.FAILED,
-        run_type=DagRunType.SCHEDULED,
-        **triggered_by_kwargs,
-    )
+    dagrun = dag_maker.create_dagrun(state=State.FAILED, 
run_type=DagRunType.SCHEDULED)
 
     def get_task_instance(session, task):
         return (
@@ -405,14 +392,13 @@ def test_mark_task_instance_state(test_app):
         assert dagrun.get_state() == State.QUEUED
 
 
-def test_mark_task_group_state(test_app):
+def test_mark_task_group_state(test_app, dag_maker):
     """
     Test that _mark_task_group_state() does all three things:
     - Marks the given TaskGroup as SUCCESS;
     - Clears downstream TaskInstances in FAILED/UPSTREAM_FAILED state;
     - Set DagRun to QUEUED.
     """
-    from airflow.models.dag import DAG
     from airflow.models.dagbag import DagBag
     from airflow.models.taskinstance import TaskInstance
     from airflow.operators.empty import EmptyOperator
@@ -426,7 +412,7 @@ def test_mark_task_group_state(test_app):
 
     clear_db_runs()
     start_date = datetime(2020, 1, 1)
-    with DAG("test_mark_task_group_state", start_date=start_date, schedule="0 
0 * * *") as dag:
+    with dag_maker("test_mark_task_group_state", start_date=start_date, 
schedule="0 0 * * *") as dag:
         start = EmptyOperator(task_id="start")
 
         with TaskGroup("section_1", tooltip="Tasks for section_1") as 
section_1:
@@ -444,15 +430,7 @@ def test_mark_task_group_state(test_app):
 
         start >> section_1 >> [task_4, task_5, task_6, task_7, task_8]
 
-    triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-    dagrun = dag.create_dagrun(
-        start_date=start_date,
-        logical_date=start_date,
-        data_interval=(start_date, start_date),
-        state=State.FAILED,
-        run_type=DagRunType.SCHEDULED,
-        **triggered_by_kwargs,
-    )
+    dagrun = dag_maker.create_dagrun(state=State.FAILED, 
run_type=DagRunType.SCHEDULED)
 
     def get_task_instance(session, task):
         return (
diff --git a/tests/www/views/test_views_acl.py 
b/tests/www/views/test_views_acl.py
index 21379550dd1..c28026973eb 100644
--- a/tests/www/views/test_views_acl.py
+++ b/tests/www/views/test_views_acl.py
@@ -159,6 +159,7 @@ def _init_dagruns(acl_app, _reset_dagruns):
         **triggered_by_kwargs,
     )
     acl_app.dag_bag.get_dag("example_python_operator").create_dagrun(
+        run_id=DEFAULT_RUN_ID,
         run_type=DagRunType.SCHEDULED,
         logical_date=DEFAULT_DATE,
         start_date=timezone.utcnow(),
diff --git a/tests/www/views/test_views_dagrun.py 
b/tests/www/views/test_views_dagrun.py
index c7f860d6246..12f375b870e 100644
--- a/tests/www/views/test_views_dagrun.py
+++ b/tests/www/views/test_views_dagrun.py
@@ -23,6 +23,7 @@ from airflow.models import DagBag, DagRun, TaskInstance
 from airflow.security import permissions
 from airflow.utils import timezone
 from airflow.utils.session import create_session
+from airflow.utils.types import DagRunType
 from airflow.www.views import DagRunModelView
 
 from providers.tests.fab.auth_manager.api_endpoints.api_connexion_utils import 
(
@@ -146,6 +147,7 @@ def running_dag_run(session):
         logical_date=logical_date,
         data_interval=(logical_date, logical_date),
         run_id="test_dag_runs_action",
+        run_type=DagRunType.MANUAL,
         session=session,
         **triggered_by_kwargs,
     )
@@ -169,6 +171,7 @@ def completed_dag_run_with_missing_task(session):
         logical_date=logical_date,
         data_interval=(logical_date, logical_date),
         run_id="test_dag_runs_action",
+        run_type=DagRunType.MANUAL,
         session=session,
         **triggered_by_kwargs,
     )
@@ -324,6 +327,7 @@ def dag_run_with_all_done_task(session):
         logical_date=logical_date,
         data_interval=(logical_date, logical_date),
         run_id="test_dagrun_failed",
+        run_type=DagRunType.MANUAL,
         session=session,
         **triggered_by_kwargs,
     )
diff --git a/tests/www/views/test_views_decorators.py 
b/tests/www/views/test_views_decorators.py
index 6c0a0a7d781..02be0ebd368 100644
--- a/tests/www/views/test_views_decorators.py
+++ b/tests/www/views/test_views_decorators.py
@@ -62,6 +62,7 @@ def xcom_dag(dagbag):
 def dagruns(bash_dag, xcom_dag):
     triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
     bash_dagrun = bash_dag.create_dagrun(
+        run_id="test_bash",
         run_type=DagRunType.SCHEDULED,
         logical_date=EXAMPLE_DAG_DEFAULT_DATE,
         data_interval=(EXAMPLE_DAG_DEFAULT_DATE, EXAMPLE_DAG_DEFAULT_DATE),
@@ -71,6 +72,7 @@ def dagruns(bash_dag, xcom_dag):
     )
 
     xcom_dagrun = xcom_dag.create_dagrun(
+        run_id="test_xcom",
         run_type=DagRunType.SCHEDULED,
         logical_date=EXAMPLE_DAG_DEFAULT_DATE,
         data_interval=(EXAMPLE_DAG_DEFAULT_DATE, EXAMPLE_DAG_DEFAULT_DATE),
diff --git a/tests/www/views/test_views_extra_links.py 
b/tests/www/views/test_views_extra_links.py
index 6ea977da7fa..8ae2c63b818 100644
--- a/tests/www/views/test_views_extra_links.py
+++ b/tests/www/views/test_views_extra_links.py
@@ -90,6 +90,7 @@ def create_dag_run(dag):
     def _create_dag_run(*, logical_date, session):
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         return dag.create_dagrun(
+            run_id=f"manual__{logical_date.isoformat()}",
             state=DagRunState.RUNNING,
             logical_date=logical_date,
             data_interval=(logical_date, logical_date),
diff --git a/tests/www/views/test_views_log.py 
b/tests/www/views/test_views_log.py
index 735c9a569e0..ae264d3f638 100644
--- a/tests/www/views/test_views_log.py
+++ b/tests/www/views/test_views_log.py
@@ -160,6 +160,7 @@ def tis(dags, session):
     dag, dag_removed = dags
     triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
     dagrun = dag.create_dagrun(
+        run_id=f"scheduled__{DEFAULT_DATE.isoformat()}",
         run_type=DagRunType.SCHEDULED,
         logical_date=DEFAULT_DATE,
         data_interval=(DEFAULT_DATE, DEFAULT_DATE),
@@ -172,6 +173,7 @@ def tis(dags, session):
     ti.try_number = 1
     ti.hostname = "localhost"
     dagrun_removed = dag_removed.create_dagrun(
+        run_id=f"scheduled__{DEFAULT_DATE.isoformat()}",
         run_type=DagRunType.SCHEDULED,
         logical_date=DEFAULT_DATE,
         data_interval=(DEFAULT_DATE, DEFAULT_DATE),
diff --git a/tests/www/views/test_views_rendered.py 
b/tests/www/views/test_views_rendered.py
index 0af32512c4d..5f4b8063d00 100644
--- a/tests/www/views/test_views_rendered.py
+++ b/tests/www/views/test_views_rendered.py
@@ -147,6 +147,7 @@ def create_dag_run(dag, task1, task2, task3, task4, 
task_secret):
     def _create_dag_run(*, logical_date, session):
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dag_run = dag.create_dagrun(
+            run_id="test",
             state=DagRunState.RUNNING,
             logical_date=logical_date,
             data_interval=(logical_date, logical_date),
@@ -343,6 +344,7 @@ def test_rendered_task_detail_env_secret(patch_app, 
admin_client, request, env,
     with create_session() as session:
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
         dag.create_dagrun(
+            run_id="test",
             state=DagRunState.RUNNING,
             logical_date=DEFAULT_DATE,
             data_interval=(DEFAULT_DATE, DEFAULT_DATE),
diff --git a/tests/www/views/test_views_tasks.py 
b/tests/www/views/test_views_tasks.py
index 10f42aca6c4..44c43160581 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -397,6 +397,7 @@ def test_rendered_k8s_without_k8s(admin_client):
 def test_tree_trigger_origin_tree_view(app, admin_client):
     triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
     app.dag_bag.get_dag("example_bash_operator").create_dagrun(
+        run_id="test",
         run_type=DagRunType.SCHEDULED,
         logical_date=DEFAULT_DATE,
         data_interval=(DEFAULT_DATE, DEFAULT_DATE),
@@ -415,6 +416,7 @@ def test_tree_trigger_origin_tree_view(app, admin_client):
 def test_graph_trigger_origin_grid_view(app, admin_client):
     triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
     app.dag_bag.get_dag("example_bash_operator").create_dagrun(
+        run_id="test",
         run_type=DagRunType.SCHEDULED,
         logical_date=DEFAULT_DATE,
         data_interval=(DEFAULT_DATE, DEFAULT_DATE),
@@ -433,6 +435,7 @@ def test_graph_trigger_origin_grid_view(app, admin_client):
 def test_gantt_trigger_origin_grid_view(app, admin_client):
     triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
     app.dag_bag.get_dag("example_bash_operator").create_dagrun(
+        run_id="test",
         run_type=DagRunType.SCHEDULED,
         logical_date=DEFAULT_DATE,
         data_interval=(DEFAULT_DATE, DEFAULT_DATE),
diff --git a/tests_common/pytest_plugin.py b/tests_common/pytest_plugin.py
index b4d3c089a37..0516f8431fe 100644
--- a/tests_common/pytest_plugin.py
+++ b/tests_common/pytest_plugin.py
@@ -889,31 +889,42 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
                 "session": self.session,
                 **kwargs,
             }
-            # Need to provide run_id if the user does not either provide one
-            # explicitly, or pass run_type for inference in 
dag.create_dagrun().
-            if "run_id" not in kwargs and "run_type" not in kwargs:
-                kwargs["run_id"] = "test"
 
-            if "run_type" not in kwargs:
-                kwargs["run_type"] = DagRunType.from_run_id(kwargs["run_id"])
+            run_type = kwargs.get("run_type", DagRunType.MANUAL)
+            if not isinstance(run_type, DagRunType):
+                run_type = DagRunType(run_type)
 
             if logical_date is None:
-                if kwargs["run_type"] == DagRunType.MANUAL:
+                if run_type == DagRunType.MANUAL:
                     logical_date = self.start_date
                 else:
                     logical_date = dag.next_dagrun_info(None).logical_date
             logical_date = timezone.coerce_datetime(logical_date)
 
-            if "data_interval" not in kwargs:
-                if kwargs["run_type"] == DagRunType.MANUAL:
+            try:
+                data_interval = kwargs["data_interval"]
+            except KeyError:
+                if run_type == DagRunType.MANUAL:
                     data_interval = 
dag.timetable.infer_manual_data_interval(run_after=logical_date)
                 else:
                     data_interval = 
dag.infer_automated_data_interval(logical_date)
                 kwargs["data_interval"] = data_interval
 
+            if "run_id" not in kwargs:
+                if "run_type" not in kwargs:
+                    kwargs["run_id"] = "test"
+                else:
+                    kwargs["run_id"] = dag.timetable.generate_run_id(
+                        run_type=run_type,
+                        logical_date=logical_date,
+                        data_interval=data_interval,
+                    )
+            kwargs["run_type"] = run_type
+
             if AIRFLOW_V_3_0_PLUS:
                 kwargs.setdefault("triggered_by", DagRunTriggeredByType.TEST)
                 kwargs["logical_date"] = logical_date
+                kwargs["dag_version"] = None
             else:
                 kwargs.pop("triggered_by", None)
                 kwargs["execution_date"] = logical_date


Reply via email to