This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b466bd13d Introduce Heartbeat Parameter to Allow Per-LocalTaskJob 
Configuration (#32313)
9b466bd13d is described below

commit 9b466bd13dd34d2a37b49687241f54f4d2df3b18
Author: Sai Pragna Etikyala <pragnared...@gmail.com>
AuthorDate: Fri Jul 14 23:56:29 2023 -0700

    Introduce Heartbeat Parameter to Allow Per-LocalTaskJob Configuration 
(#32313)
    
    * Refactor Job Heartbeat Parameter to Allow Per-Job Configuration
    
    This pull request introduces changes to allow users to set heartbeat 
expectations separately for LocalTaskJob,. his resolves a current limitation 
where all job types share a single configuration parameter for expected 
heartbeat time.
    
    
    related to: https://github.com/apache/airflow/issues/30908
---
 airflow/config_templates/config.yml          | 9 +++++++++
 airflow/config_templates/default_airflow.cfg | 5 +++++
 airflow/jobs/local_task_job_runner.py        | 7 +++++--
 3 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 015da426c4..b3161f49b7 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2302,6 +2302,15 @@ scheduler:
       type: integer
       example: ~
       default: "5"
+    local_task_job_heartbeat_sec:
+      description: |
+        The frequency (in seconds) at which the LocalTaskJob should send 
heartbeat signals to the
+        scheduler to notify it's still alive. If this value is set to 0, the 
heartbeat interval will default
+        to the value of scheduler_zombie_task_threshold.
+      version_added: 2.7.0
+      type: integer
+      example: ~
+      default: "0"
     num_runs:
       description: |
         The number of times to try to schedule each DAG file
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index cff56508df..a71bd3ccaa 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1195,6 +1195,11 @@ job_heartbeat_sec = 5
 # how often the scheduler should run (in seconds).
 scheduler_heartbeat_sec = 5
 
+# The frequency (in seconds) at which the LocalTaskJob should send heartbeat 
signals to the
+# scheduler to notify it's still alive. If this value is set to 0, the 
heartbeat interval will default
+# to the value of scheduler_zombie_task_threshold.
+local_task_job_heartbeat_sec = 0
+
 # The number of times to try to schedule each DAG file
 # -1 indicates unlimited number
 num_runs = -1
diff --git a/airflow/jobs/local_task_job_runner.py 
b/airflow/jobs/local_task_job_runner.py
index fd234e4150..6184a3e7fc 100644
--- a/airflow/jobs/local_task_job_runner.py
+++ b/airflow/jobs/local_task_job_runner.py
@@ -157,8 +157,11 @@ class LocalTaskJobRunner(BaseJobRunner["Job | 
JobPydantic"], LoggingMixin):
         return_code = None
         try:
             self.task_runner.start()
-
-            heartbeat_time_limit = conf.getint("scheduler", 
"scheduler_zombie_task_threshold")
+            local_task_job_heartbeat_sec = conf.getint("scheduler", 
"local_task_job_heartbeat_sec")
+            if local_task_job_heartbeat_sec < 1:
+                heartbeat_time_limit = conf.getint("scheduler", 
"scheduler_zombie_task_threshold")
+            else:
+                heartbeat_time_limit = local_task_job_heartbeat_sec
 
             # LocalTaskJob should not run callbacks, which are handled by 
TaskInstance._run_raw_task
             # 1, LocalTaskJob does not parse DAG, thus cannot run callbacks

Reply via email to