GitHub user Sagargupta16 added a comment to the discussion: How to efficiently 
test my DAGs on a per-task basis in Airflow 3?

Airflow 3 changed how task testing works. The `test` command and `DagBag`-based 
unit testing patterns from 2.x still work but need adjustments.

## Per-task testing in Airflow 3

### Option 1: Use `dag.test()` method

Airflow 3 added a built-in `dag.test()` that runs the DAG in a single process:

```python
from airflow.models.dag import DAG
import importlib

def test_check_s3_task():
    mod = importlib.import_module("dags.my_dag")
    dag = mod.dag
    # Run just the specific task
    dag.test(task_id="check_s3")
```

### Option 2: Use `create_task_instance_of_operator` fixture (pytest)

For more granular testing, especially template rendering:

```python
import pytest
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator

@pytest.mark.db_test
def test_s3_task(create_task_instance_of_operator):
    ti = create_task_instance_of_operator(
        S3ListOperator,
        dag_id="test_dag",
        task_id="check_s3",
        bucket="my-bucket",
        prefix="data/",
    )
    ti.render_templates()
    assert ti.task.bucket == "my-bucket"
```

### Option 3: Direct operator execution

For `PythonOperator` and XCom assertions:

```python
from airflow.models import DagBag

def test_python_task_xcom():
    dagbag = DagBag(dag_folder="dags/", include_examples=False)
    dag = dagbag.get_dag("example")
    
    # Run the task and check XCom
    dr = dag.test()
    ti = dr.get_task_instance("my_python_task")
    result = ti.xcom_pull(task_ids="my_python_task", key="return_value")
    assert result == expected_value
```

### For SQL tasks

Mock the database connection and verify the queries:

```python
from unittest.mock import patch, MagicMock

@patch("airflow.providers.common.sql.operators.sql.BaseSQLOperator.get_db_hook")
def test_sql_task(mock_hook):
    mock_conn = MagicMock()
    mock_hook.return_value = mock_conn
    
    dag.test(task_id="run_query")
    
    # Verify the SQL was executed
    mock_conn.run.assert_called_once()
    executed_sql = mock_conn.run.call_args[0][0]
    assert "INSERT INTO" in executed_sql
```

The key change in Airflow 3 is that `dag.test()` returns a `DagRun` object, 
making it much easier to inspect task results and XComs after execution.

GitHub link: 
https://github.com/apache/airflow/discussions/63941#discussioncomment-16335021

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to