This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 933b0039f66 Replacing gunicornmontor with uvicorn.run() (#45103)
933b0039f66 is described below
commit 933b0039f6652937006d24796b6a5a69d24e850a
Author: vatsrahul1001 <[email protected]>
AuthorDate: Fri Jan 24 14:33:08 2025 +0530
Replacing gunicornmontor with uvicorn.run() (#45103)
* replace gunicorm with uvicorn.run()
* fixing tests
* Daemonized fastapi server
* fixing setproctitle format
* updating setproctitle
---
.../commands/local_commands/fastapi_api_command.py | 149 +++------------------
.../local_commands/test_fastapi_api_command.py | 115 ++--------------
2 files changed, 33 insertions(+), 231 deletions(-)
diff --git a/airflow/cli/commands/local_commands/fastapi_api_command.py
b/airflow/cli/commands/local_commands/fastapi_api_command.py
index e9fdd870916..cc66940648b 100644
--- a/airflow/cli/commands/local_commands/fastapi_api_command.py
+++ b/airflow/cli/commands/local_commands/fastapi_api_command.py
@@ -20,25 +20,16 @@ from __future__ import annotations
import logging
import os
-import signal
import subprocess
-import sys
import textwrap
-from contextlib import suppress
-from pathlib import Path
-from time import sleep
-from typing import NoReturn
-import psutil
-from lockfile.pidlockfile import read_pid_from_pidfile
-from uvicorn.workers import UvicornWorker
+import uvicorn
+from gunicorn.util import daemonize
+from setproctitle import setproctitle
from airflow import settings
-from airflow.cli.commands.local_commands.daemon_utils import
run_command_with_daemon_option
-from airflow.cli.commands.local_commands.webserver_command import
GunicornMonitor
from airflow.exceptions import AirflowConfigException
from airflow.utils import cli as cli_utils
-from airflow.utils.cli import setup_locations
from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
log = logging.getLogger(__name__)
@@ -47,8 +38,6 @@ log = logging.getLogger(__name__)
# This shouldn't be necessary but there seems to be an issue in uvloop that
causes bad file descriptor
# errors when shutting down workers. Despite the 'closed' status of the issue
it is not solved,
# more info here:
https://github.com/benoitc/gunicorn/issues/1877#issuecomment-1911136399
-AirflowUvicornWorker = UvicornWorker
-AirflowUvicornWorker.CONFIG_KWARGS = {"loop": "asyncio", "http": "auto"}
@cli_utils.action_cli
@@ -59,18 +48,13 @@ def fastapi_api(args):
apps = args.apps
access_logfile = args.access_logfile or "-"
- error_logfile = args.error_logfile or "-"
access_logformat = args.access_logformat
num_workers = args.workers
worker_timeout = args.worker_timeout
- worker_class =
"airflow.cli.commands.local_commands.fastapi_api_command.AirflowUvicornWorker"
-
- from airflow.api_fastapi.app import create_app
-
if args.debug:
print(f"Starting the FastAPI API server on port {args.port} and host
{args.hostname} debug.")
- log.warning("Running in dev mode, ignoring gunicorn args")
+ log.warning("Running in dev mode, ignoring uvicorn args")
run_args = [
"fastapi",
@@ -93,124 +77,35 @@ def fastapi_api(args):
process.wait()
os.environ.pop("AIRFLOW_API_APPS")
else:
+ if args.daemon:
+ daemonize()
+ log.info("Daemonized the FastAPI API server process PID: %s",
os.getpid())
+
log.info(
textwrap.dedent(
f"""\
- Running the Gunicorn Server with:
+ Running the uvicorn with:
Apps: {apps}
- Workers: {num_workers} {worker_class}
+ Workers: {num_workers}
Host: {args.hostname}:{args.port}
Timeout: {worker_timeout}
- Logfiles: {access_logfile} {error_logfile}
+ Logfiles: {access_logfile}
Access Logformat: {access_logformat}
================================================================="""
)
)
-
- pid_file, _, _, _ = setup_locations("fastapi-api", pid=args.pid)
-
- run_args = [
- sys.executable,
- "-m",
- "gunicorn",
- "--workers",
- str(num_workers),
- "--worker-class",
- str(worker_class),
- "--timeout",
- str(worker_timeout),
- "--bind",
- args.hostname + ":" + str(args.port),
- "--name",
- "airflow-fastapi-api",
- "--pid",
- pid_file,
- "--access-logfile",
- str(access_logfile),
- "--error-logfile",
- str(error_logfile),
- "--config",
- "python:airflow.api_fastapi.gunicorn_config",
- ]
-
ssl_cert, ssl_key = _get_ssl_cert_and_key_filepaths(args)
- if ssl_cert and ssl_key:
- run_args += ["--certfile", ssl_cert, "--keyfile", ssl_key]
-
- if args.access_logformat and args.access_logformat.strip():
- run_args += ["--access-logformat", str(args.access_logformat)]
-
- if args.daemon:
- run_args += ["--daemon"]
-
- run_args += [f"airflow.api_fastapi.app:cached_app(apps='{apps}')"]
-
- # To prevent different workers creating the web app and
- # all writing to the database at the same time, we use the --preload
option.
- # With the preload option, the app is loaded before the workers are
forked, and each worker will
- # then have a copy of the app
- run_args += ["--preload"]
-
- def kill_proc(signum: int, gunicorn_master_proc: psutil.Process |
subprocess.Popen) -> NoReturn:
- log.info("Received signal: %s. Closing gunicorn.", signum)
- gunicorn_master_proc.terminate()
- with suppress(TimeoutError):
- gunicorn_master_proc.wait(timeout=30)
- if isinstance(gunicorn_master_proc, subprocess.Popen):
- still_running = gunicorn_master_proc.poll() is not None
- else:
- still_running = gunicorn_master_proc.is_running()
- if still_running:
- gunicorn_master_proc.kill()
- sys.exit(0)
-
- def monitor_gunicorn(gunicorn_master_proc: psutil.Process |
subprocess.Popen) -> NoReturn:
- # Register signal handlers
- signal.signal(signal.SIGINT, lambda signum, _: kill_proc(signum,
gunicorn_master_proc))
- signal.signal(signal.SIGTERM, lambda signum, _: kill_proc(signum,
gunicorn_master_proc))
-
- # These run forever until SIG{INT, TERM, KILL, ...} signal is sent
- GunicornMonitor(
- gunicorn_master_pid=gunicorn_master_proc.pid,
- num_workers_expected=num_workers,
- master_timeout=120,
- worker_refresh_interval=30,
- worker_refresh_batch_size=1,
- reload_on_plugin_change=False,
- ).start()
-
- def start_and_monitor_gunicorn(args):
- if args.daemon:
- subprocess.Popen(run_args, close_fds=True)
-
- # Reading pid of gunicorn master as it will be different that
- # the one of process spawned above.
- gunicorn_master_proc_pid = None
- while not gunicorn_master_proc_pid:
- sleep(0.1)
- gunicorn_master_proc_pid = read_pid_from_pidfile(pid_file)
-
- # Run Gunicorn monitor
- gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
- monitor_gunicorn(gunicorn_master_proc)
- else:
- with subprocess.Popen(run_args, close_fds=True) as
gunicorn_master_proc:
- monitor_gunicorn(gunicorn_master_proc)
-
- if args.daemon:
- # This makes possible errors get reported before daemonization
- os.environ["SKIP_DAGS_PARSING"] = "True"
- create_app(apps)
- os.environ.pop("SKIP_DAGS_PARSING")
-
- pid_file_path = Path(pid_file)
- monitor_pid_file =
str(pid_file_path.with_name(f"{pid_file_path.stem}-monitor{pid_file_path.suffix}"))
- run_command_with_daemon_option(
- args=args,
- process_name="fastapi-api",
- callback=lambda: start_and_monitor_gunicorn(args),
- should_setup_logging=True,
- pid_file=monitor_pid_file,
+ setproctitle(f"airflow fastapi_api -- host:{args.hostname}
port:{args.port}")
+ uvicorn.run(
+ "airflow.api_fastapi.main:app",
+ host=args.hostname,
+ port=args.port,
+ workers=num_workers,
+ timeout_keep_alive=worker_timeout,
+ timeout_graceful_shutdown=worker_timeout,
+ ssl_keyfile=ssl_key,
+ ssl_certfile=ssl_cert,
+ access_log=access_logfile,
)
diff --git a/tests/cli/commands/local_commands/test_fastapi_api_command.py
b/tests/cli/commands/local_commands/test_fastapi_api_command.py
index ffa0b8c98b5..ea5339cfb3e 100644
--- a/tests/cli/commands/local_commands/test_fastapi_api_command.py
+++ b/tests/cli/commands/local_commands/test_fastapi_api_command.py
@@ -16,10 +16,6 @@
# under the License.
from __future__ import annotations
-import os
-import subprocess
-import sys
-import time
from unittest import mock
import pytest
@@ -37,73 +33,9 @@ console = Console(width=400, color_system="standard")
class TestCliFastAPI(_CommonCLIGunicornTestClass):
main_process_regexp = r"airflow fastapi-api"
- @pytest.mark.execution_timeout(210)
- def test_cli_fastapi_api_background(self, tmp_path):
- parent_path = tmp_path / "gunicorn"
- parent_path.mkdir()
- pidfile_fastapi_api = parent_path / "pidflow-fastapi-api.pid"
- pidfile_monitor = parent_path / "pidflow-fastapi-api-monitor.pid"
- stdout = parent_path / "airflow-fastapi-api.out"
- stderr = parent_path / "airflow-fastapi-api.err"
- logfile = parent_path / "airflow-fastapi-api.log"
- try:
- # Run fastapi-api as daemon in background. Note that the wait
method is not called.
- console.print("[magenta]Starting airflow fastapi-api --daemon")
- env = os.environ.copy()
- proc = subprocess.Popen(
- [
- "airflow",
- "fastapi-api",
- "--daemon",
- "--pid",
- os.fspath(pidfile_fastapi_api),
- "--stdout",
- os.fspath(stdout),
- "--stderr",
- os.fspath(stderr),
- "--log-file",
- os.fspath(logfile),
- ],
- env=env,
- )
- assert proc.poll() is None
-
- pid_monitor = self._wait_pidfile(pidfile_monitor)
- console.print(f"[blue]Monitor started at {pid_monitor}")
- pid_fastapi_api = self._wait_pidfile(pidfile_fastapi_api)
- console.print(f"[blue]FastAPI API started at {pid_fastapi_api}")
- console.print("[blue]Running airflow fastapi-api process:")
- # Assert that the fastapi-api and gunicorn processes are running
(by name rather than pid).
- assert self._find_process(r"airflow fastapi-api --daemon",
print_found_process=True)
- console.print("[blue]Waiting for gunicorn processes:")
- # wait for gunicorn to start
- for _ in range(30):
- if self._find_process(r"^gunicorn"):
- break
- console.print("[blue]Waiting for gunicorn to start ...")
- time.sleep(1)
- console.print("[blue]Running gunicorn processes:")
- assert self._find_all_processes("^gunicorn",
print_found_process=True)
- console.print("[magenta]fastapi-api process started successfully.")
- console.print(
- "[magenta]Terminating monitor process and expect "
- "fastapi-api and gunicorn processes to terminate as well"
- )
- self._terminate_multiple_process([pid_fastapi_api, pid_monitor])
- self._check_processes(ignore_running=False)
- console.print("[magenta]All fastapi-api and gunicorn processes are
terminated.")
- except Exception:
- console.print("[red]Exception occurred. Dumping all logs.")
- # Dump all logs
- for file in parent_path.glob("*"):
- console.print(f"Dumping {file} (size: {file.stat().st_size})")
- console.print(file.read_text())
- raise
-
def test_cli_fastapi_api_debug(self, app):
with (
mock.patch("subprocess.Popen") as Popen,
- mock.patch.object(fastapi_api_command, "GunicornMonitor"),
):
port = "9092"
hostname = "somehost"
@@ -130,7 +62,6 @@ class TestCliFastAPI(_CommonCLIGunicornTestClass):
"""
with (
mock.patch("subprocess.Popen") as Popen,
- mock.patch.object(fastapi_api_command, "GunicornMonitor"),
mock.patch("os.environ", autospec=True) as mock_environ,
):
apps_value = "core,execution"
@@ -172,8 +103,7 @@ class TestCliFastAPI(_CommonCLIGunicornTestClass):
cert_path, key_path = ssl_cert_and_key
with (
- mock.patch("subprocess.Popen") as Popen,
- mock.patch.object(fastapi_api_command, "GunicornMonitor"),
+ mock.patch("uvicorn.run") as mock_run,
):
args = self.parser.parse_args(
[
@@ -192,39 +122,16 @@ class TestCliFastAPI(_CommonCLIGunicornTestClass):
)
fastapi_api_command.fastapi_api(args)
- Popen.assert_called_with(
- [
- sys.executable,
- "-m",
- "gunicorn",
- "--workers",
- "4",
- "--worker-class",
-
"airflow.cli.commands.local_commands.fastapi_api_command.AirflowUvicornWorker",
- "--timeout",
- "120",
- "--bind",
- "0.0.0.0:9091",
- "--name",
- "airflow-fastapi-api",
- "--pid",
- "/tmp/x.pid",
- "--access-logfile",
- "-",
- "--error-logfile",
- "-",
- "--config",
- "python:airflow.api_fastapi.gunicorn_config",
- "--certfile",
- str(cert_path),
- "--keyfile",
- str(key_path),
- "--access-logformat",
- "custom_log_format",
- "airflow.api_fastapi.app:cached_app(apps='core')",
- "--preload",
- ],
- close_fds=True,
+ mock_run.assert_called_with(
+ "airflow.api_fastapi.main:app",
+ host="0.0.0.0",
+ port=9091,
+ workers=4,
+ timeout_keep_alive=120,
+ timeout_graceful_shutdown=120,
+ ssl_keyfile=str(key_path),
+ ssl_certfile=str(cert_path),
+ access_log="-",
)
@pytest.mark.parametrize(