This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-7-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 7559c1cabed4fc3c7a7d5aa67ac1461915a1ead2 Author: Daniel DylÄ…g <bi...@users.noreply.github.com> AuthorDate: Sat Oct 14 17:56:24 2023 +0200 Fix triggerer thread crash in daemon mode (#34931) * Fixes #34816 Change the order of operations so that async child thread is created after forking when entering daemon context. This makes sure that the thread stays alive in the internal loop. --------- Co-authored-by: daniel.dylag <danieldylag1...@gmail.com> (cherry picked from commit 9c1e8c28307cc808739a3535e0d7901d0699dcf4) --- airflow/cli/commands/triggerer_command.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/airflow/cli/commands/triggerer_command.py b/airflow/cli/commands/triggerer_command.py index 2288f1537f..5ddb4e23b6 100644 --- a/airflow/cli/commands/triggerer_command.py +++ b/airflow/cli/commands/triggerer_command.py @@ -58,7 +58,6 @@ def triggerer(args): settings.MASK_SECRETS_IN_LOGS = True print(settings.HEADER) triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC") - triggerer_job_runner = TriggererJobRunner(job=Job(heartrate=triggerer_heartrate), capacity=args.capacity) if args.daemon: pid, stdout, stderr, log_file = setup_locations( @@ -77,10 +76,16 @@ def triggerer(args): umask=int(settings.DAEMON_UMASK, 8), ) with daemon_context, _serve_logs(args.skip_serve_logs): + triggerer_job_runner = TriggererJobRunner( + job=Job(heartrate=triggerer_heartrate), capacity=args.capacity + ) run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute) else: signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGTERM, sigint_handler) signal.signal(signal.SIGQUIT, sigquit_handler) with _serve_logs(args.skip_serve_logs): + triggerer_job_runner = TriggererJobRunner( + job=Job(heartrate=triggerer_heartrate), capacity=args.capacity + ) run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute)