This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 095c5fe313 Ensure __exit__ is called in decorator context managers 
(#38383)
095c5fe313 is described below

commit 095c5fe3137e2cb6d45e8f3184bae149cb2850d1
Author: Daniel Standish <15932138+dstand...@users.noreply.github.com>
AuthorDate: Thu Mar 21 14:06:49 2024 -0700

    Ensure __exit__ is called in decorator context managers (#38383)
    
    In #36800 author fixed zombie scheduler issue arising from context manager 
exit not being called, thus sub process not getting terminated.  It was fixed 
by explicitly calling the `close` function on an ExitStack-managed context 
manager.  Simpler / better / cleaner / more standard solution is to "fix" the 
underlying context managers by wrapping the yield in a try / finally.
---
 airflow/cli/commands/celery_command.py         |  8 ++++---
 airflow/cli/commands/scheduler_command.py      | 32 +++++++++++---------------
 airflow/providers/celery/cli/celery_command.py |  8 ++++---
 tests/cli/commands/test_scheduler_command.py   |  3 ---
 4 files changed, 23 insertions(+), 28 deletions(-)

diff --git a/airflow/cli/commands/celery_command.py 
b/airflow/cli/commands/celery_command.py
index ae8b9c1925..ed6c0dbbd8 100644
--- a/airflow/cli/commands/celery_command.py
+++ b/airflow/cli/commands/celery_command.py
@@ -91,9 +91,11 @@ def _serve_logs(skip_serve_logs: bool = False):
     if skip_serve_logs is False:
         sub_proc = Process(target=serve_logs)
         sub_proc.start()
-    yield
-    if sub_proc:
-        sub_proc.terminate()
+    try:
+        yield
+    finally:
+        if sub_proc:
+            sub_proc.terminate()
 
 
 @after_setup_logger.connect()
diff --git a/airflow/cli/commands/scheduler_command.py 
b/airflow/cli/commands/scheduler_command.py
index 0b5cac8857..2a55ca2373 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -20,7 +20,7 @@ from __future__ import annotations
 
 import logging
 from argparse import Namespace
-from contextlib import ExitStack, contextmanager
+from contextlib import contextmanager
 from multiprocessing import Process
 
 from airflow import settings
@@ -45,18 +45,8 @@ def _run_scheduler_job(args) -> None:
     
ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor)
     InternalApiConfig.force_database_direct_access()
     enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
-    with ExitStack() as stack:
-        stack.enter_context(_serve_logs(args.skip_serve_logs))
-        stack.enter_context(_serve_health_check(enable_health_check))
-
-        try:
-            run_job(job=job_runner.job, execute_callable=job_runner._execute)
-        except Exception:
-            log.exception("Exception when running scheduler job")
-            raise
-        finally:
-            # Ensure that the contexts are closed
-            stack.close()
+    with _serve_logs(args.skip_serve_logs), 
_serve_health_check(enable_health_check):
+        run_job(job=job_runner.job, execute_callable=job_runner._execute)
 
 
 @cli_utils.action_cli
@@ -84,9 +74,11 @@ def _serve_logs(skip_serve_logs: bool = False):
         if skip_serve_logs is False:
             sub_proc = Process(target=serve_logs)
             sub_proc.start()
-    yield
-    if sub_proc:
-        sub_proc.terminate()
+    try:
+        yield
+    finally:
+        if sub_proc:
+            sub_proc.terminate()
 
 
 @contextmanager
@@ -96,6 +88,8 @@ def _serve_health_check(enable_health_check: bool = False):
     if enable_health_check:
         sub_proc = Process(target=serve_health_check)
         sub_proc.start()
-    yield
-    if sub_proc:
-        sub_proc.terminate()
+    try:
+        yield
+    finally:
+        if sub_proc:
+            sub_proc.terminate()
diff --git a/airflow/providers/celery/cli/celery_command.py 
b/airflow/providers/celery/cli/celery_command.py
index fff46090aa..f7682b9abf 100644
--- a/airflow/providers/celery/cli/celery_command.py
+++ b/airflow/providers/celery/cli/celery_command.py
@@ -107,9 +107,11 @@ def _serve_logs(skip_serve_logs: bool = False):
     if skip_serve_logs is False:
         sub_proc = Process(target=serve_logs)
         sub_proc.start()
-    yield
-    if sub_proc:
-        sub_proc.terminate()
+    try:
+        yield
+    finally:
+        if sub_proc:
+            sub_proc.terminate()
 
 
 @after_setup_logger.connect()
diff --git a/tests/cli/commands/test_scheduler_command.py 
b/tests/cli/commands/test_scheduler_command.py
index 2853763563..b6d6a9d921 100644
--- a/tests/cli/commands/test_scheduler_command.py
+++ b/tests/cli/commands/test_scheduler_command.py
@@ -165,10 +165,8 @@ class TestSchedulerCommand:
     @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
     @mock.patch("airflow.cli.commands.scheduler_command.Process")
     @mock.patch("airflow.cli.commands.scheduler_command.run_job", 
side_effect=Exception("run_job failed"))
-    @mock.patch("airflow.cli.commands.scheduler_command.log")
     def test_run_job_exception_handling(
         self,
-        mock_log,
         mock_run_job,
         mock_process,
         mock_scheduler_job,
@@ -183,7 +181,6 @@ class TestSchedulerCommand:
             job=mock_scheduler_job().job,
             execute_callable=mock_scheduler_job()._execute,
         )
-        mock_log.exception.assert_called_once_with("Exception when running 
scheduler job")
         mock_process.assert_called_once_with(target=serve_logs)
         mock_process().terminate.assert_called_once_with()
 

Reply via email to