This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new a76e0fe16e Add `airflow db drop-archived` command (#29309) a76e0fe16e is described below commit a76e0fe16ef12749c3fea1b68d82936b238fafbb Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Thu Feb 9 22:26:29 2023 +0100 Add `airflow db drop-archived` command (#29309) * Add `airflow db drop-archived` command This command drops the archive tables directly As part of this, the _confirm_drop_archives function was made more interactive * fixup! Add `airflow db drop-archived` command * Fix test and add doc * Apply suggestions from code review Co-authored-by: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com> --- airflow/cli/cli_parser.py | 6 ++ airflow/cli/commands/db_command.py | 11 +++- airflow/utils/db_cleanup.py | 58 ++++++++++++++---- docs/apache-airflow/howto/usage-cli.rst | 10 ++++ tests/cli/commands/test_db_command.py | 28 +++++++++ tests/utils/test_db_cleanup.py | 100 +++++++++++++++++++++++++++++--- 6 files changed, 191 insertions(+), 22 deletions(-) diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index 9a82d2f06e..bf2e98d0f3 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -1626,6 +1626,12 @@ DB_COMMANDS = ( ARG_DB_TABLES, ), ), + ActionCommand( + name="drop-archived", + help="Drop archived tables created through the db clean command", + func=lazy_load_command("airflow.cli.commands.db_command.drop_archived"), + args=(ARG_DB_TABLES, ARG_YES), + ), ) CONNECTIONS_COMMANDS = ( ActionCommand( diff --git a/airflow/cli/commands/db_command.py b/airflow/cli/commands/db_command.py index 468aa3d87f..72ee55c86d 100644 --- a/airflow/cli/commands/db_command.py +++ b/airflow/cli/commands/db_command.py @@ -27,7 +27,7 @@ from airflow import settings from airflow.exceptions import AirflowException from airflow.utils import cli as cli_utils, db from airflow.utils.db import REVISION_HEADS_MAP -from airflow.utils.db_cleanup import config_dict, export_cleaned_records, run_cleanup +from airflow.utils.db_cleanup import config_dict, drop_archived_tables, export_cleaned_records, run_cleanup from airflow.utils.process_utils import execute_interactive @@ -218,3 +218,12 @@ def export_cleaned(args): table_names=args.tables, drop_archives=args.drop_archives, ) + + +@cli_utils.action_cli(check_db=False) +def drop_archived(args): + """Drops archived tables from metadata database.""" + drop_archived_tables( + table_names=args.tables, + needs_confirm=not args.yes, + ) diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py index a9d12c7aa3..41b89931f5 100644 --- a/airflow/utils/db_cleanup.py +++ b/airflow/utils/db_cleanup.py @@ -39,6 +39,7 @@ from airflow.cli.simple_table import AirflowConsole from airflow.models import Base from airflow.utils import timezone from airflow.utils.db import reflect_tables +from airflow.utils.helpers import ask_yesno from airflow.utils.session import NEW_SESSION, provide_session logger = logging.getLogger(__file__) @@ -301,13 +302,21 @@ def _confirm_delete(*, date: DateTime, tables: list[str]): def _confirm_drop_archives(*, tables: list[str]): + # if length of tables is greater than 3, show the total count + if len(tables) > 3: + text_ = f"{len(tables)} archived tables prefixed with {ARCHIVE_TABLE_PREFIX}" + else: + text_ = f"the following archived tables {tables}" question = ( - f"You have requested that we drop archived records for tables {tables!r}.\n" - f"This is irreversible. Consider backing up the tables first \n" - f"Enter 'drop archived tables' (without quotes) to proceed." + f"You have requested that we drop {text_}.\n" + f"This is irreversible. Consider backing up the tables first \n" ) print(question) - answer = input().strip() + if len(tables) > 3: + show_tables = ask_yesno("Show tables? (y/n): ") + if show_tables: + print(tables, "\n") + answer = input("Enter 'drop archived tables' (without quotes) to proceed.\n").strip() if not answer == "drop archived tables": raise SystemExit("User did not confirm; exiting.") @@ -347,6 +356,19 @@ def _effective_table_names(*, table_names: list[str] | None): return effective_table_names, effective_config_dict +def _get_archived_table_names(table_names, session): + inspector = inspect(session.bind) + db_table_names = [x for x in inspector.get_table_names() if x.startswith(ARCHIVE_TABLE_PREFIX)] + effective_table_names, _ = _effective_table_names(table_names=table_names) + # Filter out tables that don't start with the archive prefix + archived_table_names = [ + table_name + for table_name in db_table_names + if any("__" + x + "__" in table_name for x in effective_table_names) + ] + return archived_table_names + + @provide_session def run_cleanup( *, @@ -410,16 +432,14 @@ def export_cleaned_records( export_format, output_path, table_names=None, drop_archives=False, session: Session = NEW_SESSION ): """Export cleaned data to the given output path in the given format.""" - effective_table_names, _ = _effective_table_names(table_names=table_names) - if drop_archives: - _confirm_drop_archives(tables=sorted(effective_table_names)) - inspector = inspect(session.bind) - db_table_names = [x for x in inspector.get_table_names() if x.startswith(ARCHIVE_TABLE_PREFIX)] + archived_table_names = _get_archived_table_names(table_names, session) + # If user chose to drop archives, check there are archive tables that exists + # before asking for confirmation + if drop_archives and archived_table_names: + _confirm_drop_archives(tables=sorted(archived_table_names)) export_count = 0 dropped_count = 0 - for table_name in db_table_names: - if not any("__" + x + "__" in table_name for x in effective_table_names): - continue + for table_name in archived_table_names: logger.info("Exporting table %s", table_name) _dump_table_to_file( target_table=table_name, @@ -433,3 +453,17 @@ def export_cleaned_records( session.execute(text(f"DROP TABLE {table_name}")) dropped_count += 1 logger.info("Total exported tables: %s, Total dropped tables: %s", export_count, dropped_count) + + +@provide_session +def drop_archived_tables(table_names, needs_confirm, session): + """Drop archived tables.""" + archived_table_names = _get_archived_table_names(table_names, session) + if needs_confirm and archived_table_names: + _confirm_drop_archives(tables=sorted(archived_table_names)) + dropped_count = 0 + for table_name in archived_table_names: + logger.info("Dropping archived table %s", table_name) + session.execute(text(f"DROP TABLE {table_name}")) + dropped_count += 1 + logger.info("Total dropped tables: %s", dropped_count) diff --git a/docs/apache-airflow/howto/usage-cli.rst b/docs/apache-airflow/howto/usage-cli.rst index 976b0b938f..cd66d7e0cb 100644 --- a/docs/apache-airflow/howto/usage-cli.rst +++ b/docs/apache-airflow/howto/usage-cli.rst @@ -232,6 +232,16 @@ location must exist. Other options include: ``--tables`` to specify the tables to export, ``--drop-archives`` to drop the archive tables after exporting. +Dropping the archived tables +---------------------------- + +If during the ``db clean`` process, you did not use the ``--skip-archive`` option which drops the archived table, you can +still drop the archive tables using the ``db drop-archived`` command. This operation is irreversible and you are encouraged +to use the ``db export-cleaned`` command to backup the tables to disk before dropping them. + +You can specify the tables to drop using the ``--tables`` option. If no tables are specified, all archive tables will be +dropped. + Beware cascading deletes ^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/tests/cli/commands/test_db_command.py b/tests/cli/commands/test_db_command.py index 78f873b65c..f91a7767ad 100644 --- a/tests/cli/commands/test_db_command.py +++ b/tests/cli/commands/test_db_command.py @@ -507,3 +507,31 @@ class TestCLIDBClean: export_archived_mock.assert_called_once_with( export_format="csv", output_path="path", table_names=None, drop_archives=expected ) + + @pytest.mark.parametrize( + "extra_args, expected", [(["--tables", "hello, goodbye"], ["hello", "goodbye"]), ([], None)] + ) + @patch("airflow.cli.commands.db_command.drop_archived_tables") + def test_tables_in_drop_archived_records_command(self, mock_drop_archived_records, extra_args, expected): + args = self.parser.parse_args( + [ + "db", + "drop-archived", + *extra_args, + ] + ) + db_command.drop_archived(args) + mock_drop_archived_records.assert_called_once_with(table_names=expected, needs_confirm=True) + + @pytest.mark.parametrize("extra_args, expected", [(["-y"], False), ([], True)]) + @patch("airflow.cli.commands.db_command.drop_archived_tables") + def test_confirm_in_drop_archived_records_command(self, mock_drop_archived_records, extra_args, expected): + args = self.parser.parse_args( + [ + "db", + "drop-archived", + *extra_args, + ] + ) + db_command.drop_archived(args) + mock_drop_archived_records.assert_called_once_with(table_names=None, needs_confirm=expected) diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py index f72630888a..7d36e37ced 100644 --- a/tests/utils/test_db_cleanup.py +++ b/tests/utils/test_db_cleanup.py @@ -42,6 +42,7 @@ from airflow.utils.db_cleanup import ( _confirm_drop_archives, _dump_table_to_file, config_dict, + drop_archived_tables, export_cleaned_records, run_cleanup, ) @@ -336,27 +337,53 @@ class TestDBCleanup: "drop_archive", [True, False], ) + @patch("airflow.utils.db_cleanup._dump_table_to_file") @patch("airflow.utils.db_cleanup._confirm_drop_archives") - def test_confirm_drop_called_when_drop_archives_is_true(self, confirm_drop_mock, drop_archive): + @patch("airflow.utils.db_cleanup.inspect") + def test_confirm_drop_called_when_drop_archives_is_true_and_archive_exists( + self, inspect_mock, confirm_drop_mock, _dump_table_to_file_mock, drop_archive + ): """test that drop confirmation input is called when appropriate""" - export_cleaned_records(export_format="csv", output_path="path", drop_archives=drop_archive) + inspector = inspect_mock.return_value + inspector.get_table_names.return_value = [f"{ARCHIVE_TABLE_PREFIX}dag_run__233"] + export_cleaned_records( + export_format="csv", output_path="path", drop_archives=drop_archive, session=MagicMock() + ) if drop_archive: confirm_drop_mock.assert_called() else: confirm_drop_mock.assert_not_called() - def test_confirm_drop_archives(self): - tables = ["table1", "table2"] + @pytest.mark.parametrize( + "tables", + [ + ["table1", "table2"], + ["table1", "table2", "table3"], + ["table1", "table2", "table3", "table4"], + ], + ) + @patch("airflow.utils.db_cleanup.ask_yesno") + def test_confirm_drop_archives(self, mock_ask_yesno, tables): + expected = ( + f"You have requested that we drop the following archived tables {tables}.\n" + "This is irreversible. Consider backing up the tables first" + ) + if len(tables) > 3: + expected = ( + f"You have requested that we drop {len(tables)} archived tables prefixed with " + f"_airflow_deleted__.\n" + "This is irreversible. Consider backing up the tables first \n" + "\n" + f"{tables}" + ) + + mock_ask_yesno.return_value = True with patch("sys.stdout", new=StringIO()) as fake_out, patch( "builtins.input", side_effect=["drop archived tables"] ): _confirm_drop_archives(tables=tables) output = fake_out.getvalue().strip() - expected = ( - f"You have requested that we drop archived records for tables {tables}.\n" - "This is irreversible. Consider backing up the tables first \n" - "Enter 'drop archived tables' (without quotes) to proceed." - ) + assert output == expected def test_user_did_not_confirm(self): @@ -394,6 +421,26 @@ class TestDBCleanup: else: assert "Total exported tables: 1, Total dropped tables: 0" in caplog.text + @pytest.mark.parametrize("drop_archive", [True, False]) + @patch("airflow.utils.db_cleanup._dump_table_to_file") + @patch("airflow.utils.db_cleanup.inspect") + @patch("airflow.utils.db_cleanup._confirm_drop_archives") + @patch("builtins.input", side_effect=["drop archived tables"]) + def test_export_cleaned_no_confirm_if_no_tables( + self, mock_input, mock_confirm, inspect_mock, dump_mock, caplog, drop_archive + ): + """Test no confirmation if no archived tables found""" + session_mock = MagicMock() + inspector = inspect_mock.return_value + # No tables with the archive prefix + inspector.get_table_names.return_value = ["dag_run", "task_instance"] + export_cleaned_records( + export_format="csv", output_path="path", drop_archives=drop_archive, session=session_mock + ) + mock_confirm.assert_not_called() + dump_mock.assert_not_called() + assert "Total exported tables: 0, Total dropped tables: 0" in caplog.text + @patch("airflow.utils.db_cleanup.csv") def test_dump_table_to_file_function_for_csv(self, mock_csv): mockopen = mock_open() @@ -417,6 +464,41 @@ class TestDBCleanup: ) assert "Export format json is not supported" in str(exc_info.value) + @pytest.mark.parametrize("tables", [["log", "dag"], ["dag_run", "task_instance"]]) + @patch("airflow.utils.db_cleanup._confirm_drop_archives") + @patch("airflow.utils.db_cleanup.inspect") + def test_drop_archived_tables_no_confirm_if_no_archived_tables( + self, inspect_mock, mock_confirm, tables, caplog + ): + """ + Test no confirmation if no archived tables found. + Archived tables starts with a prefix defined in ARCHIVE_TABLE_PREFIX. + """ + inspector = inspect_mock.return_value + inspector.get_table_names.return_value = tables + drop_archived_tables(tables, needs_confirm=True, session=MagicMock()) + mock_confirm.assert_not_called() + assert "Total dropped tables: 0" in caplog.text + + @pytest.mark.parametrize("confirm", [True, False]) + @patch("airflow.utils.db_cleanup.inspect") + @patch("airflow.utils.db_cleanup._confirm_drop_archives") + @patch("builtins.input", side_effect=["drop archived tables"]) + def test_drop_archived_tables(self, mock_input, confirm_mock, inspect_mock, caplog, confirm): + """Test drop_archived_tables""" + archived_table = f"{ARCHIVE_TABLE_PREFIX}dag_run__233" + normal_table = "dag_run" + inspector = inspect_mock.return_value + inspector.get_table_names.return_value = [archived_table, normal_table] + drop_archived_tables([normal_table], needs_confirm=confirm, session=MagicMock()) + assert f"Dropping archived table {archived_table}" in caplog.text + assert f"Dropping archived table {normal_table}" not in caplog.text + assert "Total dropped tables: 1" in caplog.text + if confirm: + confirm_mock.assert_called() + else: + confirm_mock.assert_not_called() + def create_tis(base_date, num_tis, external_trigger=False): with create_session() as session: