Repository: incubator-airflow Updated Branches: refs/heads/master e4494f85e -> a9b20a04b
[AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background AIRFLOW-276 introduced a monitor process for gunicorn to find new files in the dag folder, but it also changed `airflow webserver -D`'s behavior to run in foreground. This PR fixes that by running the monitor as a daemon process. Closes #2208 from sekikn/AIRFLOW-1004 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a9b20a04 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a9b20a04 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a9b20a04 Branch: refs/heads/master Commit: a9b20a04b052e9479dbb79fd46124293085610e9 Parents: e4494f8 Author: Kengo Seki <sek...@apache.org> Authored: Tue Apr 4 08:32:44 2017 +0200 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Tue Apr 4 08:32:44 2017 +0200 ---------------------------------------------------------------------- airflow/bin/cli.py | 64 ++++++++++++++++++++++++++++++++++++++++--------- tests/core.py | 56 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9b20a04/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index e9c54e6..e4755c7 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -753,7 +753,12 @@ def webserver(args): app.run(debug=True, port=args.port, host=args.hostname, ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else None) else: - pid, stdout, stderr, log_file = setup_locations("webserver", pid=args.pid) + pid, stdout, stderr, log_file = setup_locations("webserver", args.pid, args.stdout, args.stderr, args.log_file) + if args.daemon: + handle = setup_logging(log_file) + stdout = open(stdout, 'w+') + stderr = open(stderr, 'w+') + print( textwrap.dedent('''\ Running the Gunicorn Server with: @@ -771,7 +776,6 @@ def webserver(args): '-t', str(worker_timeout), '-b', args.hostname + ':' + str(args.port), '-n', 'airflow-webserver', - '-p', str(pid), '-c', 'airflow.www.gunicorn_config' ] @@ -782,28 +786,66 @@ def webserver(args): run_args += ['--error-logfile', str(args.error_logfile)] if args.daemon: - run_args += ["-D"] + run_args += ['-D', '-p', str(pid)] + if ssl_cert: run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key] run_args += ["airflow.www.app:cached_app()"] - gunicorn_master_proc = subprocess.Popen(run_args) + gunicorn_master_proc = None def kill_proc(dummy_signum, dummy_frame): gunicorn_master_proc.terminate() gunicorn_master_proc.wait() sys.exit(0) - signal.signal(signal.SIGINT, kill_proc) - signal.signal(signal.SIGTERM, kill_proc) + def monitor_gunicorn(gunicorn_master_proc): + # These run forever until SIG{INT, TERM, KILL, ...} signal is sent + if conf.getint('webserver', 'worker_refresh_interval') > 0: + restart_workers(gunicorn_master_proc, num_workers) + else: + while True: + time.sleep(1) - # These run forever until SIG{INT, TERM, KILL, ...} signal is sent - if conf.getint('webserver', 'worker_refresh_interval') > 0: - restart_workers(gunicorn_master_proc, num_workers) + if args.daemon: + base, ext = os.path.splitext(pid) + ctx = daemon.DaemonContext( + pidfile=TimeoutPIDLockFile(base + "-monitor" + ext, -1), + files_preserve=[handle], + stdout=stdout, + stderr=stderr, + signal_map={ + signal.SIGINT: kill_proc, + signal.SIGTERM: kill_proc + }, + ) + with ctx: + subprocess.Popen(run_args) + + # Reading pid file directly, since Popen#pid doesn't + # seem to return the right value with DaemonContext. + while True: + try: + with open(pid) as f: + gunicorn_master_proc_pid = int(f.read()) + break + except IOError: + logging.debug("Waiting for gunicorn's pid file to be created.") + time.sleep(0.1) + + gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid) + monitor_gunicorn(gunicorn_master_proc) + + stdout.close() + stderr.close() else: - while True: - time.sleep(1) + gunicorn_master_proc = subprocess.Popen(run_args) + + signal.signal(signal.SIGINT, kill_proc) + signal.signal(signal.SIGTERM, kill_proc) + + monitor_gunicorn(gunicorn_master_proc) def scheduler(args): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9b20a04/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index 997bb42..7da08e1 100644 --- a/tests/core.py +++ b/tests/core.py @@ -1427,6 +1427,62 @@ class CliTests(unittest.TestCase): os.remove('variables1.json') os.remove('variables2.json') + def test_cli_webserver_foreground(self): + import subprocess + + # Confirm that webserver hasn't been launched. + # pgrep returns exit status 1 if no process matched. + self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait()) + self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait()) + + # Run webserver in foreground and terminate it. + p = subprocess.Popen(["airflow", "webserver"]) + p.terminate() + p.wait() + + # Assert that no process remains. + self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait()) + self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait()) + + @unittest.skipIf("TRAVIS" in os.environ and bool(os.environ["TRAVIS"]), + "Skipping test due to lack of required file permission") + def test_cli_webserver_background(self): + import subprocess + import psutil + + def wait_pidfile(pidfile): + while True: + try: + with open(pidfile) as f: + return int(f.read()) + except IOError: + sleep(1) + + # Confirm that webserver hasn't been launched. + self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait()) + self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait()) + + # Run webserver in background. + subprocess.Popen(["airflow", "webserver", "-D"]) + pidfile = cli.setup_locations("webserver")[0] + wait_pidfile(pidfile) + + # Assert that gunicorn and its monitor are launched. + self.assertEqual(0, subprocess.Popen(["pgrep", "-c", "airflow"]).wait()) + self.assertEqual(0, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait()) + + # Terminate monitor process. + pidfile = cli.setup_locations("webserver-monitor")[0] + pid = wait_pidfile(pidfile) + p = psutil.Process(pid) + p.terminate() + p.wait() + + # Assert that no process remains. + self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait()) + self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait()) + + class SecurityTests(unittest.TestCase): def setUp(self): configuration.load_test_config()