This is an automated email from the ASF dual-hosted git repository. turbaszek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push: new 7d1eae3 Refactor info command to use AirflowConsole (#14757) 7d1eae3 is described below commit 7d1eae34348d642437f2392cb5f49ac4f1e7b89b Author: Tomek Urbaszek <turbas...@apache.org> AuthorDate: Sun Mar 14 11:09:32 2021 +0100 Refactor info command to use AirflowConsole (#14757) This change unifies way how we render info output. This solves few problems: - users can use output flag - because of that users can use plain output which can be useful when working with docker --- airflow/cli/cli_parser.py | 1 + airflow/cli/commands/info_command.py | 315 ++++++++++++++------------------ airflow/cli/simple_table.py | 9 +- tests/cli/commands/test_info_command.py | 107 +++++------ 4 files changed, 197 insertions(+), 235 deletions(-) diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index c39328f..a1c3370 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -1551,6 +1551,7 @@ airflow_commands: List[CLICommand] = [ ARG_ANONYMIZE, ARG_FILE_IO, ARG_VERBOSE, + ARG_OUTPUT, ), ), ActionCommand( diff --git a/airflow/cli/commands/info_command.py b/airflow/cli/commands/info_command.py index aea6ea8..a0a65b6 100644 --- a/airflow/cli/commands/info_command.py +++ b/airflow/cli/commands/info_command.py @@ -22,14 +22,14 @@ import os import platform import subprocess import sys -from typing import Optional +from typing import List, Optional from urllib.parse import urlsplit, urlunsplit import requests import tenacity from airflow import configuration -from airflow.cli.simple_table import AirflowConsole, SimpleTable +from airflow.cli.simple_table import AirflowConsole from airflow.providers_manager import ProvidersManager from airflow.typing_compat import Protocol from airflow.utils.cli import suppress_logs_and_warning @@ -91,7 +91,6 @@ class PiiAnonymizer(Anonymizer): if url_parts.netloc: # unpack userinfo = None - host = None username = None password = None @@ -179,202 +178,162 @@ _MACHINE_TO_ARCHITECTURE = { } -class _BaseInfo: - def info(self, console: AirflowConsole) -> None: - """ - Print required information to provided console. - You should implement this function in custom classes. - """ - raise NotImplementedError() +class AirflowInfo: + """Renders information about Airflow instance""" - def show(self) -> None: - """Shows info""" - console = AirflowConsole() - self.info(console) + def __init__(self, anonymizer): + self.anonymizer = anonymizer - def render_text(self) -> str: - """Exports the info to string""" - console = AirflowConsole(record=True) - with console.capture(): - self.info(console) - return console.export_text() - - -class AirflowInfo(_BaseInfo): - """All information related to Airflow, system and other.""" - - def __init__(self, anonymizer: Anonymizer): - self.airflow_version = airflow_version - self.system = SystemInfo(anonymizer) - self.tools = ToolsInfo(anonymizer) - self.paths = PathsInfo(anonymizer) - self.config = ConfigInfo(anonymizer) - self.provider = ProvidersInfo() - - def info(self, console: AirflowConsole): - console.print( - f"[bold][green]Apache Airflow[/bold][/green]: {self.airflow_version}\n", highlight=False - ) - self.system.info(console) - self.tools.info(console) - self.paths.info(console) - self.config.info(console) - self.provider.info(console) - - -class SystemInfo(_BaseInfo): - """Basic system and python information""" - - def __init__(self, anonymizer: Anonymizer): - self.operating_system = OperatingSystem.get_current() - self.arch = Architecture.get_current() - self.uname = platform.uname() - self.locale = locale.getdefaultlocale() - self.python_location = anonymizer.process_path(sys.executable) - self.python_version = sys.version.replace("\n", " ") - - def info(self, console: AirflowConsole): - table = SimpleTable(title="System info") - table.add_column() - table.add_column(width=100) - table.add_row("OS", self.operating_system or "NOT AVAILABLE") - table.add_row("architecture", self.arch or "NOT AVAILABLE") - table.add_row("uname", str(self.uname)) - table.add_row("locale", str(self.locale)) - table.add_row("python_version", self.python_version) - table.add_row("python_location", self.python_location) - console.print(table) - - -class PathsInfo(_BaseInfo): - """Path information""" - - def __init__(self, anonymizer: Anonymizer): - system_path = os.environ.get("PATH", "").split(os.pathsep) - - self.airflow_home = anonymizer.process_path(configuration.get_airflow_home()) - self.system_path = [anonymizer.process_path(p) for p in system_path] - self.python_path = [anonymizer.process_path(p) for p in sys.path] - self.airflow_on_path = any( - os.path.exists(os.path.join(path_elem, "airflow")) for path_elem in system_path - ) - - def info(self, console: AirflowConsole): - table = SimpleTable(title="Paths info") - table.add_column() - table.add_column(width=150) - table.add_row("airflow_home", self.airflow_home) - table.add_row("system_path", os.pathsep.join(self.system_path)) - table.add_row("python_path", os.pathsep.join(self.python_path)) - table.add_row("airflow_on_path", str(self.airflow_on_path)) - console.print(table) - - -class ProvidersInfo(_BaseInfo): - """providers information""" + @staticmethod + def _get_version(cmd: List[str], grep: Optional[bytes] = None): + """Return tools version.""" + try: + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + except OSError: + return "NOT AVAILABLE" + stdoutdata, _ = proc.communicate() + data = [f for f in stdoutdata.split(b"\n") if f] + if grep: + data = [line for line in data if grep in line] + if len(data) != 1: + return "NOT AVAILABLE" + else: + return data[0].decode() - def info(self, console: AirflowConsole): - table = SimpleTable(title="Providers info") - table.add_column() - table.add_column(width=150) - for provider_value in ProvidersManager().providers.values(): - table.add_row(provider_value.provider_info['package-name'], provider_value.version) - console.print(table) + @staticmethod + def _task_logging_handler(): + """Returns task logging handler.""" + def get_fullname(o): + module = o.__class__.__module__ + if module is None or module == str.__class__.__module__: + return o.__class__.__name__ # Avoid reporting __builtin__ + else: + return module + '.' + o.__class__.__name__ -class ConfigInfo(_BaseInfo): - """Most critical config properties""" + try: + handler_names = [get_fullname(handler) for handler in logging.getLogger('airflow.task').handlers] + return ", ".join(handler_names) + except Exception: # noqa pylint: disable=broad-except + return "NOT AVAILABLE" - def __init__(self, anonymizer: Anonymizer): - self.executor = configuration.conf.get("core", "executor") - self.sql_alchemy_conn = anonymizer.process_url( + @property + def _airflow_info(self): + executor = configuration.conf.get("core", "executor") + sql_alchemy_conn = self.anonymizer.process_url( configuration.conf.get("core", "SQL_ALCHEMY_CONN", fallback="NOT AVAILABLE") ) - self.dags_folder = anonymizer.process_path( + dags_folder = self.anonymizer.process_path( configuration.conf.get("core", "dags_folder", fallback="NOT AVAILABLE") ) - self.plugins_folder = anonymizer.process_path( + plugins_folder = self.anonymizer.process_path( configuration.conf.get("core", "plugins_folder", fallback="NOT AVAILABLE") ) - self.base_log_folder = anonymizer.process_path( + base_log_folder = self.anonymizer.process_path( configuration.conf.get("logging", "base_log_folder", fallback="NOT AVAILABLE") ) - self.remote_base_log_folder = anonymizer.process_path( + remote_base_log_folder = self.anonymizer.process_path( configuration.conf.get("logging", "remote_base_log_folder", fallback="NOT AVAILABLE") ) + return [ + ("version", airflow_version), + ("executor", executor), + ("task_logging_handler", self._task_logging_handler()), + ("sql_alchemy_conn", sql_alchemy_conn), + ("dags_folder", dags_folder), + ("plugins_folder", plugins_folder), + ("base_log_folder", base_log_folder), + ("remote_base_log_folder", remote_base_log_folder), + ] + @property - def task_logging_handler(self): - """Returns task logging handler.""" + def _system_info(self): + operating_system = OperatingSystem.get_current() + arch = Architecture.get_current() + uname = platform.uname() + _locale = locale.getdefaultlocale() + python_location = self.anonymizer.process_path(sys.executable) + python_version = sys.version.replace("\n", " ") + + return [ + ("OS", operating_system or "NOT AVAILABLE"), + ("architecture", arch or "NOT AVAILABLE"), + ("uname", str(uname)), + ("locale", str(_locale)), + ("python_version", python_version), + ("python_location", python_location), + ] - def get_fullname(o): - module = o.__class__.__module__ - if module is None or module == str.__class__.__module__: - return o.__class__.__name__ # Avoid reporting __builtin__ - else: - return module + '.' + o.__class__.__name__ + @property + def _tools_info(self): + git_version = self._get_version(["git", "--version"]) + ssh_version = self._get_version(["ssh", "-V"]) + kubectl_version = self._get_version(["kubectl", "version", "--short=True", "--client=True"]) + gcloud_version = self._get_version(["gcloud", "version"], grep=b"Google Cloud SDK") + cloud_sql_proxy_version = self._get_version(["cloud_sql_proxy", "--version"]) + mysql_version = self._get_version(["mysql", "--version"]) + sqlite3_version = self._get_version(["sqlite3", "--version"]) + psql_version = self._get_version(["psql", "--version"]) + + return [ + ("git", git_version), + ("ssh", ssh_version), + ("kubectl", kubectl_version), + ("gcloud", gcloud_version), + ("cloud_sql_proxy", cloud_sql_proxy_version), + ("mysql", mysql_version), + ("sqlite3", sqlite3_version), + ("psql", psql_version), + ] - try: - handler_names = [get_fullname(handler) for handler in logging.getLogger('airflow.task').handlers] - return ", ".join(handler_names) - except Exception: # noqa pylint: disable=broad-except - return "NOT AVAILABLE" + @property + def _paths_info(self): + system_path = os.environ.get("PATH", "").split(os.pathsep) + airflow_home = self.anonymizer.process_path(configuration.get_airflow_home()) + system_path = [self.anonymizer.process_path(p) for p in system_path] + python_path = [self.anonymizer.process_path(p) for p in sys.path] + airflow_on_path = any(os.path.exists(os.path.join(path_elem, "airflow")) for path_elem in system_path) + + return [ + ("airflow_home", airflow_home), + ("system_path", os.pathsep.join(system_path)), + ("python_path", os.pathsep.join(python_path)), + ("airflow_on_path", str(airflow_on_path)), + ] - def info(self, console: AirflowConsole): - table = SimpleTable(title="Config info") - table.add_column() - table.add_column(width=150) - table.add_row("executor", self.executor) - table.add_row("task_logging_handler", self.task_logging_handler) - table.add_row("sql_alchemy_conn", self.sql_alchemy_conn) - table.add_row("dags_folder", self.dags_folder) - table.add_row("plugins_folder", self.plugins_folder) - table.add_row("base_log_folder", self.base_log_folder) - console.print(table) - - -class ToolsInfo(_BaseInfo): - """The versions of the tools that Airflow uses""" - - def __init__(self, anonymize: Anonymizer): - del anonymize # Nothing to anonymize here. - self.git_version = self._get_version(["git", "--version"]) - self.ssh_version = self._get_version(["ssh", "-V"]) - self.kubectl_version = self._get_version(["kubectl", "version", "--short=True", "--client=True"]) - self.gcloud_version = self._get_version(["gcloud", "version"], grep=b"Google Cloud SDK") - self.cloud_sql_proxy_version = self._get_version(["cloud_sql_proxy", "--version"]) - self.mysql_version = self._get_version(["mysql", "--version"]) - self.sqlite3_version = self._get_version(["sqlite3", "--version"]) - self.psql_version = self._get_version(["psql", "--version"]) - - def _get_version(self, cmd, grep=None): - """Return tools version.""" - try: - proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - except OSError: - return "NOT AVAILABLE" - stdoutdata, _ = proc.communicate() - data = [f for f in stdoutdata.split(b"\n") if f] - if grep: - data = [line for line in data if grep in line] - if len(data) != 1: - return "NOT AVAILABLE" + @property + def _providers_info(self): + return [(p.provider_info['package-name'], p.version) for p in ProvidersManager().providers.values()] + + def show(self, output: str, console: Optional[AirflowConsole] = None) -> None: + """Shows information about Airflow instance""" + all_info = { + "Apache Airflow": self._airflow_info, + "System info": self._system_info, + "Tools info": self._tools_info, + "Paths info": self._paths_info, + "Providers info": self._providers_info, + } + + console = console or AirflowConsole(show_header=False) + if output in ("table", "plain"): + # Show each info as table with key, value column + for key, info in all_info.items(): + console.print(f"\n[bold][green]{key}[/bold][/green]", highlight=False) + console.print_as(data=[{"key": k, "value": v} for k, v in info], output=output) else: - return data[0].decode() + # Render info in given format, change keys to snake_case + console.print_as( + data=[{k.lower().replace(" ", "_"): dict(v)} for k, v in all_info.items()], output=output + ) - def info(self, console: AirflowConsole): - table = SimpleTable(title="Tools info") - table.add_column() - table.add_column(width=150) - table.add_row("git", self.git_version) - table.add_row("ssh", self.ssh_version) - table.add_row("kubectl", self.kubectl_version) - table.add_row("gcloud", self.gcloud_version) - table.add_row("cloud_sql_proxy", self.cloud_sql_proxy_version) - table.add_row("mysql", self.mysql_version) - table.add_row("sqlite3", self.sqlite3_version) - table.add_row("psql", self.psql_version) - console.print(table) + def render_text(self, output: str) -> str: + """Exports the info to string""" + console = AirflowConsole(record=True) + with console.capture(): + self.show(output=output, console=console) + return console.export_text() class FileIoException(Exception): @@ -419,6 +378,6 @@ def show_info(args): anonymizer = PiiAnonymizer() if args.anonymize or args.file_io else NullAnonymizer() info = AirflowInfo(anonymizer) if args.file_io: - _send_report_to_fileio(info.render_text()) + _send_report_to_fileio(info.render_text(args.output)) else: - info.show() + info.show(args.output) diff --git a/airflow/cli/simple_table.py b/airflow/cli/simple_table.py index 7b47ecf..fbbb0c0 100644 --- a/airflow/cli/simple_table.py +++ b/airflow/cli/simple_table.py @@ -32,11 +32,14 @@ from airflow.utils.platform import is_tty class AirflowConsole(Console): """Airflow rich console""" - def __init__(self, *args, **kwargs): + def __init__(self, show_header: bool = True, *args, **kwargs): super().__init__(*args, **kwargs) # Set the width to constant to pipe whole output from console self._width = 200 if not is_tty() else self._width + # If show header in tables + self.show_header = show_header + def print_as_json(self, data: Dict): """Renders dict as json text representation""" json_content = json.dumps(data) @@ -53,9 +56,7 @@ class AirflowConsole(Console): self.print("No data found") return - table = SimpleTable( - show_header=True, - ) + table = SimpleTable(show_header=self.show_header) for col in data[0].keys(): table.add_column(col) diff --git a/tests/cli/commands/test_info_command.py b/tests/cli/commands/test_info_command.py index 3d4c60f..a503039 100644 --- a/tests/cli/commands/test_info_command.py +++ b/tests/cli/commands/test_info_command.py @@ -73,29 +73,23 @@ class TestPiiAnonymizer(unittest.TestCase): assert after == self.instance.process_url(before) -class TestAirflowInfo(unittest.TestCase): - def test_info(self): - instance = info_command.AirflowInfo(info_command.NullAnonymizer()) - text = capture_show_output(instance) - assert "Apache Airflow" in text - assert airflow_version in text - - -class TestSystemInfo(unittest.TestCase): - def test_info(self): - instance = info_command.SystemInfo(info_command.NullAnonymizer()) - text = capture_show_output(instance) - assert "System info" in text - +class TestAirflowInfo: + @classmethod + def setup_class(cls): + # pylint: disable=attribute-defined-outside-init + cls.parser = cli_parser.get_parser() -class TestPathsInfo(unittest.TestCase): - def test_info(self): - instance = info_command.PathsInfo(info_command.NullAnonymizer()) - text = capture_show_output(instance) - assert "Paths info" in text + @classmethod + def teardown_class(cls) -> None: + for handler_ref in logging._handlerList[:]: + logging._removeHandlerRef(handler_ref) + importlib.reload(airflow_local_settings) + configure_logging() + @staticmethod + def unique_items(items): + return {i[0] for i in items} -class TestConfigInfo(unittest.TestCase): @conf_vars( { ("core", "executor"): "TEST_EXECUTOR", @@ -103,43 +97,50 @@ class TestConfigInfo(unittest.TestCase): ("core", "plugins_folder"): "TEST_PLUGINS_FOLDER", ("logging", "base_log_folder"): "TEST_LOG_FOLDER", ('core', 'sql_alchemy_conn'): 'postgresql+psycopg2://postgres:airflow@postgres/airflow', + ('logging', 'remote_logging'): 'True', + ('logging', 'remote_base_log_folder'): 's3://logs-name', } ) - def test_should_read_config(self): - instance = info_command.ConfigInfo(info_command.NullAnonymizer()) - text = capture_show_output(instance) - assert "TEST_EXECUTOR" in text - assert "TEST_DAGS_FOLDER" in text - assert "TEST_PLUGINS_FOLDER" in text - assert "TEST_LOG_FOLDER" in text - assert "postgresql+psycopg2://postgres:airflow@postgres/airflow" in text - - -class TestConfigInfoLogging(unittest.TestCase): - def test_should_read_logging_configuration(self): - with conf_vars( - { - ('logging', 'remote_logging'): 'True', - ('logging', 'remote_base_log_folder'): 'stackdriver://logs-name', - } - ): - importlib.reload(airflow_local_settings) - configure_logging() - instance = info_command.ConfigInfo(info_command.NullAnonymizer()) - text = capture_show_output(instance) - assert "stackdriver" in text - - def tearDown(self) -> None: - for handler_ref in logging._handlerList[:]: - logging._removeHandlerRef(handler_ref) + def test_airflow_info(self): importlib.reload(airflow_local_settings) configure_logging() + instance = info_command.AirflowInfo(info_command.NullAnonymizer()) + expected = { + 'executor', + 'version', + 'task_logging_handler', + 'plugins_folder', + 'base_log_folder', + 'remote_base_log_folder', + 'dags_folder', + 'sql_alchemy_conn', + } + assert self.unique_items(instance._airflow_info) == expected -class TestShowInfo(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.parser = cli_parser.get_parser() + def test_system_info(self): + instance = info_command.AirflowInfo(info_command.NullAnonymizer()) + expected = {'uname', 'architecture', 'OS', 'python_location', 'locale', 'python_version'} + assert self.unique_items(instance._system_info) == expected + + def test_paths_info(self): + instance = info_command.AirflowInfo(info_command.NullAnonymizer()) + expected = {'airflow_on_path', 'airflow_home', 'system_path', 'python_path'} + assert self.unique_items(instance._paths_info) == expected + + def test_tools_info(self): + instance = info_command.AirflowInfo(info_command.NullAnonymizer()) + expected = { + 'cloud_sql_proxy', + 'gcloud', + 'git', + 'kubectl', + 'mysql', + 'psql', + 'sqlite3', + 'ssh', + } + assert self.unique_items(instance._tools_info) == expected @conf_vars( { @@ -151,7 +152,7 @@ class TestShowInfo(unittest.TestCase): info_command.show_info(self.parser.parse_args(["info"])) output = stdout.getvalue() - assert f"Apache Airflow: {airflow_version}" in output + assert airflow_version in output assert "postgresql+psycopg2://postgres:airflow@postgres/airflow" in output @conf_vars( @@ -164,7 +165,7 @@ class TestShowInfo(unittest.TestCase): info_command.show_info(self.parser.parse_args(["info", "--anonymize"])) output = stdout.getvalue() - assert f"Apache Airflow: {airflow_version}" in output + assert airflow_version in output assert "postgresql+psycopg2://p...s:PASSWORD@postgres/airflow" in output @conf_vars(