jason810496 commented on issue #58676:
URL: https://github.com/apache/airflow/issues/58676#issuecomment-3845092376
> Any insights or examples you can share? Thanks!
I use the same Dag and same setup in Breeze today, but I'm unable to to
reproduce right now 😥
**Breeze setup**:
- Add the dag to `${AIRFLOW_REPO}/files/dags`
- `breeze start-airflow --use-airflow-version 3.1.6 --backend postgres`
- Add `/files/dags` to `PYTHONPATH` ( or `cp /files/dags/<new-dag>.py
/opt/airflow` for quick workaround )
**The Dag I use**:
```python
from __future__ import annotations
import asyncio
import os
import signal
import subprocess
from typing import Any
from airflow.sdk import dag, task, teardown
from airflow.sdk.bases.operator import BaseOperator
from airflow.triggers.base import BaseTrigger, TriggerEvent
class ProcessWaitTrigger(BaseTrigger):
"""Trigger that waits for a process to finish."""
def __init__(self, pid: int):
super().__init__()
self.pid = pid
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
"no_logs_during_deferrable.ProcessWaitTrigger",
{"pid": self.pid},
)
async def run(self):
while True:
try:
# Check if process exists (pid 0 signal does not kill)
# os.kill is synchronous, but fast.
os.kill(self.pid, 0)
self.log.info("Process %d still running", self.pid)
await asyncio.sleep(1)
except OSError:
# Process dead
yield TriggerEvent({"status": "success"})
return
class ProcessWaitOperator(BaseOperator):
"""Operator that defers until a process finishes."""
template_fields = ("pid",)
def __init__(self, pid: int, **kwargs):
super().__init__(**kwargs)
self.pid = pid
def execute(self, context):
print(f">> Deferring process {self.pid} wait")
self.defer(
trigger=ProcessWaitTrigger(pid=int(self.pid)),
method_name="execute_complete"
)
def execute_complete(self, context, event=None):
print(f"Process {self.pid} finished")
return
@dag(
dag_id="no_logs_during_deferrable",
schedule=None,
catchup=False,
)
def no_logs_during_deferrable():
@task(task_id="submit_job")
def submit_job(**context):
# Run sleep 600 in background
process = subprocess.Popen(["sleep", "600"], close_fds=True)
print(f"Submitted process {process.pid}")
return process.pid
@teardown
@task(task_id="cancel_job")
def cancel_job(pid: int):
print(f"Killing process {pid}")
try:
os.kill(pid, signal.SIGTERM)
except OSError:
print(f"Process {pid} already gone")
pid = submit_job()
wait_job_finish = ProcessWaitOperator(task_id="wait_job_finish", pid=pid)
wait_job_finish >> cancel_job(pid)
no_logs_during_deferrable()
```
**Todays Logs**:
```
Log message source details
sources=["/root/airflow/logs/dag_id=no_logs_during_deferrable/run_id=manual__2026-02-04T03:37:09+00:00/task_id=wait_job_finish/attempt=1.log","/root/airflow/logs/dag_id=no_logs_during_deferrable/run_id=manual__2026-02-04T03:37:09+00:00/task_id=wait_job_finish/attempt=1.log.trigger.3.log","http://630c65abc04d:8794/log/dag_id=no_logs_during_deferrable/run_id=manual__2026-02-04T03:37:09+00:00/task_id=wait_job_finish/attempt=1.log.trigger.3.log"]
[2026-02-04 11:37:13] INFO - DAG bundles loaded: dags-folder
source=airflow.dag_processing.bundles.manager.DagBundlesManager
loc=manager.py:179
[2026-02-04 11:37:13] INFO - Filling up the DagBag from
/files/dags/no_logs_during_deferrable.py source=airflow.models.dagbag.DagBag
loc=dagbag.py:593
[2026-02-04 11:37:13] INFO - >> Deferring process 942 wait source=task.stdout
[2026-02-04 11:37:13] INFO - Pausing task as DEFERRED.
dag_id=no_logs_during_deferrable task_id=wait_job_finish
run_id=manual__2026-02-04T03:37:09+00:00 source=task loc=task_runner.py:928
[2026-02-04 11:37:14] INFO - trigger
no_logs_during_deferrable/manual__2026-02-04T03:37:09+00:00/wait_job_finish/-1/1
(ID 3) starting loc=triggerer_job_runner.py:1117
[2026-02-04 11:37:14] INFO - Process 942 still running
source=no_logs_during_deferrable.ProcessWaitTrigger
loc=no_logs_during_deferrable.py:33
[2026-02-04 11:37:15] INFO - Process 942 still running
source=no_logs_during_deferrable.ProcessWaitTrigger
loc=no_logs_during_deferrable.py:33
[2026-02-04 11:37:16] INFO - Process 942 still running
source=no_logs_during_deferrable.ProcessWaitTrigger
loc=no_logs_during_deferrable.py:33
[2026-02-04 11:37:17] INFO - Process 942 still running
source=no_logs_during_deferrable.ProcessWaitTrigger
loc=no_logs_during_deferrable.py:33
[2026-02-04 11:37:18] INFO - Process 942 still running
source=no_logs_during_deferrable.ProcessWaitTrigger
loc=no_logs_during_deferrable.py:33
[2026-02-04 11:37:19] INFO - Process 942 still running
source=no_logs_during_deferrable.ProcessWaitTrigger
loc=no_logs_during_deferrable.py:33
```
However, all the `Process xxx still running
source=no_logs_during_deferrable.ProcessWaitTrigger ` logs are not shown either
on UI or the local filesystem, I had `cat path/to/trigger/logs` several time,
and the logs only show `trigger
{ti.dag_id}/{ti.run_id}/{ti.task_id}/{ti.map_index}/{ti.try_number} (ID
{trigger_id}) starting loc=triggerer_job_runner.py:1117` yesterday
https://github.com/apache/airflow/blob/fe0633d729c85131ca96aa41a8c56282a407b7d5/airflow-core/src/airflow/jobs/triggerer_job_runner.py#L1183
Only this exact log line was shown yesterday.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]