This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-7-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit cf5715d2c94cfc04c910e96333e81c244c6148b0
Author: Daniel DylÄ…g <bi...@users.noreply.github.com>
AuthorDate: Sat Oct 14 17:56:24 2023 +0200

    Fix triggerer thread crash in daemon mode (#34931)
    
    * Fixes #34816
    
    Change the order of operations so that async child thread is created after 
forking when entering daemon context.
    
    This makes sure that the thread stays alive in the internal loop.
    
    ---------
    
    Co-authored-by: daniel.dylag <danieldylag1...@gmail.com>
    (cherry picked from commit 9c1e8c28307cc808739a3535e0d7901d0699dcf4)
---
 airflow/cli/commands/triggerer_command.py | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/airflow/cli/commands/triggerer_command.py 
b/airflow/cli/commands/triggerer_command.py
index 2288f1537f..5ddb4e23b6 100644
--- a/airflow/cli/commands/triggerer_command.py
+++ b/airflow/cli/commands/triggerer_command.py
@@ -58,7 +58,6 @@ def triggerer(args):
     settings.MASK_SECRETS_IN_LOGS = True
     print(settings.HEADER)
     triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC")
-    triggerer_job_runner = 
TriggererJobRunner(job=Job(heartrate=triggerer_heartrate), 
capacity=args.capacity)
 
     if args.daemon:
         pid, stdout, stderr, log_file = setup_locations(
@@ -77,10 +76,16 @@ def triggerer(args):
                 umask=int(settings.DAEMON_UMASK, 8),
             )
             with daemon_context, _serve_logs(args.skip_serve_logs):
+                triggerer_job_runner = TriggererJobRunner(
+                    job=Job(heartrate=triggerer_heartrate), 
capacity=args.capacity
+                )
                 run_job(job=triggerer_job_runner.job, 
execute_callable=triggerer_job_runner._execute)
     else:
         signal.signal(signal.SIGINT, sigint_handler)
         signal.signal(signal.SIGTERM, sigint_handler)
         signal.signal(signal.SIGQUIT, sigquit_handler)
         with _serve_logs(args.skip_serve_logs):
+            triggerer_job_runner = TriggererJobRunner(
+                job=Job(heartrate=triggerer_heartrate), capacity=args.capacity
+            )
             run_job(job=triggerer_job_runner.job, 
execute_callable=triggerer_job_runner._execute)

Reply via email to