This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 57eed58152 add "enable_tracemalloc" to log memory usage in scheduler 
(#42304)
57eed58152 is described below

commit 57eed581523c3af5b6e266935643ace81329c9a3
Author: Wei Lee <[email protected]>
AuthorDate: Tue Sep 24 04:20:46 2024 -0700

    add "enable_tracemalloc" to log memory usage in scheduler (#42304)
---
 airflow/config_templates/config.yml  | 10 ++++++++++
 airflow/executors/base_executor.py   |  4 ++--
 airflow/jobs/scheduler_job_runner.py | 26 ++++++++++++++++++++++++++
 docs/spelling_wordlist.txt           |  1 +
 4 files changed, 39 insertions(+), 2 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 068b19df07..bc749ad6b7 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2566,6 +2566,16 @@ scheduler:
       example: ~
       default: "True"
       see_also: ":ref:`Differences between the two cron timetables`"
+    enable_tracemalloc:
+      description: |
+        Whether to enable memory allocation tracing in the scheduler. If 
enabled, Airflow will start
+        tracing memory allocation and log the top 10 memory usages at the 
error level upon receiving the
+        signal SIGUSR1.
+        This is an expensive operation and generally should not be used except 
for debugging purposes.
+      version_added: 3.0.0
+      type: boolean
+      example: ~
+      default: "False"
 triggerer:
   description: ~
   options:
diff --git a/airflow/executors/base_executor.py 
b/airflow/executors/base_executor.py
index ad7690b3f6..87f496fb05 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -537,11 +537,11 @@ class BaseExecutor(LoggingMixin):
 
     def end(self) -> None:  # pragma: no cover
         """Wait synchronously for the previously submitted job to complete."""
-        raise NotImplementedError()
+        raise NotImplementedError
 
     def terminate(self):
         """Get called when the daemon receives a SIGTERM."""
-        raise NotImplementedError()
+        raise NotImplementedError
 
     def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> 
list[str]:  # pragma: no cover
         """
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 6b4a730358..eb0abbc296 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -181,6 +181,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
         self.do_pickle = do_pickle
 
+        self._enable_tracemalloc = conf.getboolean("scheduler", 
"enable_tracemalloc")
+        if self._enable_tracemalloc:
+            import tracemalloc
+
+            tracemalloc.start()
+
         if log:
             self._log = log
 
@@ -202,17 +208,37 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         signal.signal(signal.SIGTERM, self._exit_gracefully)
         signal.signal(signal.SIGUSR2, self._debug_dump)
 
+        if self._enable_tracemalloc:
+            signal.signal(signal.SIGUSR1, self._log_memory_usage)
+
     def _exit_gracefully(self, signum: int, frame: FrameType | None) -> None:
         """Clean up processor_agent to avoid leaving orphan processes."""
         if not _is_parent_process():
             # Only the parent process should perform the cleanup.
             return
 
+        if self._enable_tracemalloc:
+            import tracemalloc
+
+            tracemalloc.stop()
+
         self.log.info("Exiting gracefully upon receiving signal %s", signum)
         if self.processor_agent:
             self.processor_agent.end()
         sys.exit(os.EX_OK)
 
+    def _log_memory_usage(self, signum: int, frame: FrameType | None) -> None:
+        import tracemalloc
+
+        snapshot = tracemalloc.take_snapshot()
+        top_stats = snapshot.statistics("lineno")
+        n = 10
+        self.log.error(
+            "scheduler memory usgae:\n Top %d\n %s",
+            n,
+            "\n\t".join(map(str, top_stats[:n])),
+        )
+
     def _debug_dump(self, signum: int, frame: FrameType | None) -> None:
         if not _is_parent_process():
             # Only the parent process should perform the debug dump.
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index b80f2b0872..b834ccc9b2 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1684,6 +1684,7 @@ tooltip
 tooltips
 traceback
 tracebacks
+tracemalloc
 TrainingPipeline
 travis
 triage

Reply via email to