GitHub user Sagargupta16 added a comment to the discussion: How to efficiently
test my DAGs on a per-task basis in Airflow 3?
Airflow 3 changed how task testing works. The `test` command and `DagBag`-based
unit testing patterns from 2.x still work but need adjustments.
## Per-task testing in Airflow 3
### Option 1: Use `dag.test()` method
Airflow 3 added a built-in `dag.test()` that runs the DAG in a single process:
```python
from airflow.models.dag import DAG
import importlib
def test_check_s3_task():
mod = importlib.import_module("dags.my_dag")
dag = mod.dag
# Run just the specific task
dag.test(task_id="check_s3")
```
### Option 2: Use `create_task_instance_of_operator` fixture (pytest)
For more granular testing, especially template rendering:
```python
import pytest
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator
@pytest.mark.db_test
def test_s3_task(create_task_instance_of_operator):
ti = create_task_instance_of_operator(
S3ListOperator,
dag_id="test_dag",
task_id="check_s3",
bucket="my-bucket",
prefix="data/",
)
ti.render_templates()
assert ti.task.bucket == "my-bucket"
```
### Option 3: Direct operator execution
For `PythonOperator` and XCom assertions:
```python
from airflow.models import DagBag
def test_python_task_xcom():
dagbag = DagBag(dag_folder="dags/", include_examples=False)
dag = dagbag.get_dag("example")
# Run the task and check XCom
dr = dag.test()
ti = dr.get_task_instance("my_python_task")
result = ti.xcom_pull(task_ids="my_python_task", key="return_value")
assert result == expected_value
```
### For SQL tasks
Mock the database connection and verify the queries:
```python
from unittest.mock import patch, MagicMock
@patch("airflow.providers.common.sql.operators.sql.BaseSQLOperator.get_db_hook")
def test_sql_task(mock_hook):
mock_conn = MagicMock()
mock_hook.return_value = mock_conn
dag.test(task_id="run_query")
# Verify the SQL was executed
mock_conn.run.assert_called_once()
executed_sql = mock_conn.run.call_args[0][0]
assert "INSERT INTO" in executed_sql
```
The key change in Airflow 3 is that `dag.test()` returns a `DagRun` object,
making it much easier to inspect task results and XComs after execution.
GitHub link:
https://github.com/apache/airflow/discussions/63941#discussioncomment-16335021
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]