amoghrajesh commented on code in PR #45486:
URL: https://github.com/apache/airflow/pull/45486#discussion_r1908842129
##########
task_sdk/tests/execution_time/conftest.py:
##########
@@ -40,3 +49,95 @@ def mock_supervisor_comms():
"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
) as supervisor_comms:
yield supervisor_comms
+
+
[email protected]
+def mocked_parse(spy_agency):
+ """
+ Fixture to set up an inline DAG and use it in a stubbed `parse` function.
Use this fixture if you
+ want to isolate and test `parse` or `run` logic without having to define a
DAG file.
+
+ This fixture returns a helper function `set_dag` that:
+ 1. Creates an in line DAG with the given `dag_id` and `task` (limited to
one task)
+ 2. Constructs a `RuntimeTaskInstance` based on the provided
`StartupDetails` and task.
+ 3. Stubs the `parse` function using `spy_agency`, to return the mocked
`RuntimeTaskInstance`.
+
+ After adding the fixture in your test function signature, you can use it
like this ::
+
+ mocked_parse(
+ StartupDetails(
+ ti=TaskInstance(id=uuid7(), task_id="hello",
dag_id="super_basic_run", run_id="c", try_number=1),
+ file="",
+ requests_fd=0,
+ ),
+ "example_dag_id",
+ CustomOperator(task_id="hello"),
+ )
+ """
+
+ def set_dag(what: StartupDetails, dag_id: str, task: BaseOperator) ->
RuntimeTaskInstance:
+ from airflow.sdk.definitions.dag import DAG
+ from airflow.sdk.execution_time.task_runner import
RuntimeTaskInstance, parse
+ from airflow.utils import timezone
+
+ dag = DAG(dag_id=dag_id, start_date=timezone.datetime(2024, 12, 3))
+ task.dag = dag
+ t = dag.task_dict[task.task_id]
+ ti =
RuntimeTaskInstance.model_construct(**what.ti.model_dump(exclude_unset=True),
task=t)
+ spy_agency.spy_on(parse, call_fake=lambda _: ti)
+ return ti
+
+ return set_dag
+
+
[email protected]
+def create_runtime_ti(
+ mocked_parse: Callable[[StartupDetails, str, BaseOperator],
RuntimeTaskInstance],
+ make_ti_context: Callable[..., TIRunContext],
+) -> Callable[[BaseOperator, TIRunContext | None, StartupDetails | None],
RuntimeTaskInstance]:
+ """
+ Fixture to create a Runtime TaskInstance for testing purposes without
defining a dag file.
+
+ This fixture sets up a `RuntimeTaskInstance` with default or custom
`TIRunContext` and `StartupDetails`,
+ making it easy to simulate task execution scenarios in tests.
+
+ Example usage: ::
+
+ def test_custom_task_instance(create_runtime_ti):
+ class MyTaskOperator(BaseOperator):
+ def execute(self, context):
+ assert context["dag_run"].run_id == "test_run"
+
+ task = MyTaskOperator(task_id="test_task")
+ ti = create_runtime_ti(task,
context_from_server=make_ti_context(run_id="test_run"))
+ # Further test logic...
Review Comment:
Amazing!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]