amoghrajesh commented on PR #54449:
URL: https://github.com/apache/airflow/pull/54449#issuecomment-3209421767
Some peace of mind tests that I ran to gain confidence in the PR:
DAG used for testing some cases:
```
from __future__ import annotations
import logging
from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import Variable, Connection
def my_function() -> None:
conn = Connection.get("my_connection")
logging.getLogger(__name__).info(conn.password)
print("via print", conn.password)
with DAG("subprocess_dag") as dag:
start = EmptyOperator(task_id="start")
py_func = PythonOperator(task_id="py_func", python_callable=my_function)
end = EmptyOperator(task_id="end")
start >> py_func >> end
```
Env set:
```
export
AIRFLOW_CONN_MY_CONNECTION="mysql://testuser:testpassword123@localhost:3306/testdb
```
1. Running a task through CLI: `airflow tasks test subprocess_dag py_fu`
```
[2025-08-21T07:52:11.216+0000] {dag.py:2329} INFO - created dagrun <DagRun
subprocess_dag @ None:
__airflow_temporary_run_2025-08-21T07:52:11.207333+00:00__, state:running,
queued_at: None. run_type: manual>
[2025-08-21T07:52:11.222+0000] {dag.py:1293} INFO - [DAG TEST] starting
task_id=py_func map_index=-1
[2025-08-21T07:52:11.680+0000] {dag.py:1296} INFO - [DAG TEST] running task
<TaskInstance: subprocess_dag.py_func
__airflow_temporary_run_2025-08-21T07:52:11.207333+00:00__ [None]>
2025-08-21 07:52:12 [debug ] Starting task instance run
hostname=e2cc5b7dc023 pid=450 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
unixname=root
2025-08-21 07:52:12 [debug ] Retrieved task instance details
dag_id=subprocess_dag state=queued task_id=py_func
ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
2025-08-21 07:52:12 [info ] Task started
hostname=e2cc5b7dc023 previous_state=queued
ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
2025-08-21 07:52:12 [info ] Task instance state updated
rows_affected=1 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
[2025-08-21T07:52:12.497+0000] {_client.py:1025} INFO - HTTP Request: PATCH
http://in-process.invalid./task-instances/0198cb9d-28d2-7800-82b0-7161dc72f022/run
"HTTP/1.1 200 OK"
2025-08-21 07:52:12 [debug ] Sending request
msg=SetRenderedFields(rendered_fields={'templates_dict': None, 'op_args': [],
'op_kwargs': {}}, type='SetRenderedFields')
2025-08-21 07:52:12 [debug ] Received message from task runner
[supervisor] msg=SetRenderedFields(rendered_fields={'templates_dict': None,
'op_args': [], 'op_kwargs': {}}, type='SetRenderedFields')
2025-08-21 07:52:12 [info ] Updating RenderedTaskInstanceFields
field_count=3 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
2025-08-21 07:52:12 [debug ] RenderedTaskInstanceFields updated
successfully ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
[2025-08-21T07:52:12.509+0000] {_client.py:1025} INFO - HTTP Request: PUT
http://in-process.invalid./task-instances/0198cb9d-28d2-7800-82b0-7161dc72f022/rtif
"HTTP/1.1 201 Created"
2025-08-21 07:52:12 [debug ] Sending request
msg=MaskSecret(value='***', name=None, type='MaskSecret')
2025-08-21 07:52:12 [debug ] Received message from task runner (body
omitted) [supervisor] msg=<class 'airflow.sdk.execution_time.comms.MaskSecret'>
[2025-08-21T07:52:12.513+0000] {subprocess_mask_var.py:13} INFO - ***
via print ***
[2025-08-21T07:52:12.513+0000] {python.py:218} INFO - Done. Returned value
was: None
2025-08-21 07:52:12 [debug ] Sending request
msg=SucceedTask(state='success', end_date=datetime.datetime(2025, 8, 21, 7, 52,
12, 513546, tzinfo=datetime.timezone.utc), task_outlets=[], outlet_events=[],
rendered_map_index=None, type='SucceedTask')
2025-08-21 07:52:12 [debug ] Received message from task runner
[supervisor] msg=SucceedTask(state='success', end_date=datetime.datetime(2025,
8, 21, 7, 52, 12, 513546, tzinfo=datetime.timezone.utc), task_outlets=[],
outlet_events=[], rendered_map_index=None, type='SucceedTask')
2025-08-21 07:52:12 [debug ] Updating task instance state
new_state=success ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
2025-08-21 07:52:12 [debug ] Retrieved current task instance state
max_tries=0 previous_state=running ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
try_number=0
2025-08-21 07:52:12 [info ] Task instance state updated
new_state=success rows_affected=1 ti_id=0198cb9d-28d2-7800-82b0-7161dc72f022
[2025-08-21T07:52:12.522+0000] {_client.py:1025} INFO - HTTP Request: PATCH
http://in-process.invalid./task-instances/0198cb9d-28d2-7800-82b0-7161dc72f022/state
"HTTP/1.1 204 No Content"
2025-08-21 07:52:12 [debug ] Running finalizers [task]
ti=RuntimeTaskInstance(id=UUID('0198cb9d-28d2-7800-82b0-7161dc72f022'),
task_id='py_func', dag_id='subprocess_dag',
run_id='__airflow_temporary_run_2025-08-21T07:52:11.207333+00:00__',
try_number=0, dag_version_id=UUID('0198cb9b-d3d6-74f3-a561-083d897ed8cd'),
map_index=-1, hostname='e2cc5b7dc023', context_carrier=None,
task=<Task(PythonOperator): py_func>, max_tries=0,
start_date=datetime.datetime(2025, 8, 21, 7, 52, 11, 880733,
tzinfo=datetime.timezone.utc), end_date=datetime.datetime(2025, 8, 21, 7, 52,
12, 513546, tzinfo=datetime.timezone.utc), state=<TaskInstanceState.SUCCESS:
'success'>, is_mapped=False, rendered_map_index=None, log_url=None)
```
This is specifically to test the recent issue:
https://github.com/apache/airflow/commit/1f4c55c0e38fae6f46b71255f0a4dae2157f0991
2. Running a trigger with a dag that will log using dag processor, root
logger, structlog etc
Trigger:
```
from __future__ import annotations
import asyncio
import logging
import structlog
from airflow.sdk import Connection, Variable
from airflow.sdk.log import mask_secret
from airflow.triggers.base import BaseTrigger, TriggerEvent
print(f"{__file__=} loaded")
class CustomTrigger(BaseTrigger):
async def run(self, **args):
from asgiref.sync import sync_to_async
await sync_to_async(Variable.set)("my_api_key", "password1")
x = await sync_to_async(Variable.get)("my_api_key")
logging.getLogger(__name__).info("my_api_key=%s", x)
secret = "some-secret-value"
await sync_to_async(mask_secret)(secret)
await sync_to_async(mask_secret)("some-secret-value")
logging.getLogger(__name__).info("after manual mask %s", secret)
await structlog.get_logger().ainfo("Testing structlog", val=secret,
api_key="abcdef", x=x)
yield TriggerEvent({"Hi": "from trigger"})
def serialize(self):
return (
f"{type(self).__module__}.{type(self).__qualname__}",
{},
)
```
DAG:
```
from airflow.exceptions import AirflowRescheduleException, TaskDeferred
from airflow.sdk import dag, task
from airflow.sdk import Variable
import logging
x = Variable.get("toplevel_api_key", default="secret_api")
print(f"{x=}")
logging.root.info("toplevel=%s", x)
@dag()
def trigger_a_gag():
@task
def trigger(event=None) -> None:
if event:
print(event)
else:
from triggera import CustomTrigger
raise TaskDeferred(trigger=CustomTrigger(),
method_name="execute")
trigger()
trigger_a_gag()
```
<img width="1704" height="247" alt="image"
src="https://github.com/user-attachments/assets/2842464e-becd-44f6-abee-5bf67c8071f7"
/>
Logs:
```
[2025-08-21, 13:24:55] INFO - __file__='/files/plugins/triggera.py' loaded:
chan="stdout": source="task"
[2025-08-21, 13:24:55] INFO - DAG bundles loaded: dags-folder:
source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-08-21, 13:24:55] INFO - Filling up the DagBag from
/files/dags/reschedule-dag.py: source="airflow.models.dagbag.DagBag"
[2025-08-21, 13:24:55] INFO - toplevel=***: source="root"
[2025-08-21, 13:24:55] INFO - x='***': chan="stdout": source="task"
[2025-08-21, 13:24:55] INFO - Pausing task as DEFERRED. :
dag_id="trigger_a_gag": task_id="trigger":
run_id="manual__2025-08-21T07:54:54.321189+00:00": source="task"
[2025-08-21, 13:24:57] INFO - trigger
trigger_a_gag/manual__2025-08-21T07:54:54.321189+00:00/trigger/-1/1 (ID 1)
starting
[2025-08-21, 13:24:59] INFO - my_api_key=***: source="triggera"
[2025-08-21, 13:24:59] INFO - after manual mask ***: source="triggera"
[2025-08-21, 13:24:59] INFO - Testing structlog: val="***": api_key="***":
x="***"
[2025-08-21, 13:24:59] INFO - Trigger fired event:
name="trigger_a_gag/manual__2025-08-21T07:54:54.321189+00:00/trigger/-1/1 (ID
1)": result="TriggerEvent<{'Hi': 'from trigger'}>"
[2025-08-21, 13:24:59] INFO - trigger completed:
name="trigger_a_gag/manual__2025-08-21T07:54:54.321189+00:00/trigger/-1/1 (ID
1)"
[2025-08-21, 13:25:00] INFO - __file__='/files/plugins/triggera.py' loaded:
chan="stdout": source="task"
[2025-08-21, 13:25:00] INFO - DAG bundles loaded: dags-folder:
source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-08-21, 13:25:00] INFO - Filling up the DagBag from
/files/dags/reschedule-dag.py: source="airflow.models.dagbag.DagBag"
[2025-08-21, 13:25:00] INFO - toplevel=***: source="root"
[2025-08-21, 13:25:00] INFO - x='***': chan="stdout": source="task"
[2025-08-21, 13:25:00] ERROR - Task failed with exception: source="task"
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]