This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9c1e8c2830 Fix triggerer thread crash in daemon mode (#34931)
9c1e8c2830 is described below

commit 9c1e8c28307cc808739a3535e0d7901d0699dcf4
Author: Daniel DylÄ…g <bi...@users.noreply.github.com>
AuthorDate: Sat Oct 14 17:56:24 2023 +0200

    Fix triggerer thread crash in daemon mode (#34931)
    
    * Fixes #34816
    
    Change the order of operations so that async child thread is created after 
forking when entering daemon context.
    
    This makes sure that the thread stays alive in the internal loop.
    
    ---------
    
    Co-authored-by: daniel.dylag <danieldylag1...@gmail.com>
---
 airflow/cli/commands/triggerer_command.py | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/airflow/cli/commands/triggerer_command.py 
b/airflow/cli/commands/triggerer_command.py
index 2288f1537f..5ddb4e23b6 100644
--- a/airflow/cli/commands/triggerer_command.py
+++ b/airflow/cli/commands/triggerer_command.py
@@ -58,7 +58,6 @@ def triggerer(args):
     settings.MASK_SECRETS_IN_LOGS = True
     print(settings.HEADER)
     triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC")
-    triggerer_job_runner = 
TriggererJobRunner(job=Job(heartrate=triggerer_heartrate), 
capacity=args.capacity)
 
     if args.daemon:
         pid, stdout, stderr, log_file = setup_locations(
@@ -77,10 +76,16 @@ def triggerer(args):
                 umask=int(settings.DAEMON_UMASK, 8),
             )
             with daemon_context, _serve_logs(args.skip_serve_logs):
+                triggerer_job_runner = TriggererJobRunner(
+                    job=Job(heartrate=triggerer_heartrate), 
capacity=args.capacity
+                )
                 run_job(job=triggerer_job_runner.job, 
execute_callable=triggerer_job_runner._execute)
     else:
         signal.signal(signal.SIGINT, sigint_handler)
         signal.signal(signal.SIGTERM, sigint_handler)
         signal.signal(signal.SIGQUIT, sigquit_handler)
         with _serve_logs(args.skip_serve_logs):
+            triggerer_job_runner = TriggererJobRunner(
+                job=Job(heartrate=triggerer_heartrate), capacity=args.capacity
+            )
             run_job(job=triggerer_job_runner.job, 
execute_callable=triggerer_job_runner._execute)

Reply via email to