This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 9b466bd13d Introduce Heartbeat Parameter to Allow Per-LocalTaskJob Configuration (#32313) 9b466bd13d is described below commit 9b466bd13dd34d2a37b49687241f54f4d2df3b18 Author: Sai Pragna Etikyala <pragnared...@gmail.com> AuthorDate: Fri Jul 14 23:56:29 2023 -0700 Introduce Heartbeat Parameter to Allow Per-LocalTaskJob Configuration (#32313) * Refactor Job Heartbeat Parameter to Allow Per-Job Configuration This pull request introduces changes to allow users to set heartbeat expectations separately for LocalTaskJob,. his resolves a current limitation where all job types share a single configuration parameter for expected heartbeat time. related to: https://github.com/apache/airflow/issues/30908 --- airflow/config_templates/config.yml | 9 +++++++++ airflow/config_templates/default_airflow.cfg | 5 +++++ airflow/jobs/local_task_job_runner.py | 7 +++++-- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 015da426c4..b3161f49b7 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2302,6 +2302,15 @@ scheduler: type: integer example: ~ default: "5" + local_task_job_heartbeat_sec: + description: | + The frequency (in seconds) at which the LocalTaskJob should send heartbeat signals to the + scheduler to notify it's still alive. If this value is set to 0, the heartbeat interval will default + to the value of scheduler_zombie_task_threshold. + version_added: 2.7.0 + type: integer + example: ~ + default: "0" num_runs: description: | The number of times to try to schedule each DAG file diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index cff56508df..a71bd3ccaa 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -1195,6 +1195,11 @@ job_heartbeat_sec = 5 # how often the scheduler should run (in seconds). scheduler_heartbeat_sec = 5 +# The frequency (in seconds) at which the LocalTaskJob should send heartbeat signals to the +# scheduler to notify it's still alive. If this value is set to 0, the heartbeat interval will default +# to the value of scheduler_zombie_task_threshold. +local_task_job_heartbeat_sec = 0 + # The number of times to try to schedule each DAG file # -1 indicates unlimited number num_runs = -1 diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py index fd234e4150..6184a3e7fc 100644 --- a/airflow/jobs/local_task_job_runner.py +++ b/airflow/jobs/local_task_job_runner.py @@ -157,8 +157,11 @@ class LocalTaskJobRunner(BaseJobRunner["Job | JobPydantic"], LoggingMixin): return_code = None try: self.task_runner.start() - - heartbeat_time_limit = conf.getint("scheduler", "scheduler_zombie_task_threshold") + local_task_job_heartbeat_sec = conf.getint("scheduler", "local_task_job_heartbeat_sec") + if local_task_job_heartbeat_sec < 1: + heartbeat_time_limit = conf.getint("scheduler", "scheduler_zombie_task_threshold") + else: + heartbeat_time_limit = local_task_job_heartbeat_sec # LocalTaskJob should not run callbacks, which are handled by TaskInstance._run_raw_task # 1, LocalTaskJob does not parse DAG, thus cannot run callbacks