This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a76e0fe16e Add `airflow db drop-archived` command (#29309)
a76e0fe16e is described below

commit a76e0fe16ef12749c3fea1b68d82936b238fafbb
Author: Ephraim Anierobi <splendidzig...@gmail.com>
AuthorDate: Thu Feb 9 22:26:29 2023 +0100

    Add `airflow db drop-archived` command (#29309)
    
    * Add `airflow db drop-archived` command
    
    This command drops the archive tables directly
    
    As part of this, the _confirm_drop_archives function was made more 
interactive
    
    * fixup! Add `airflow db drop-archived` command
    
    * Fix test and add doc
    
    * Apply suggestions from code review
    
    Co-authored-by: Jed Cunningham 
<66968678+jedcunning...@users.noreply.github.com>
---
 airflow/cli/cli_parser.py               |   6 ++
 airflow/cli/commands/db_command.py      |  11 +++-
 airflow/utils/db_cleanup.py             |  58 ++++++++++++++----
 docs/apache-airflow/howto/usage-cli.rst |  10 ++++
 tests/cli/commands/test_db_command.py   |  28 +++++++++
 tests/utils/test_db_cleanup.py          | 100 +++++++++++++++++++++++++++++---
 6 files changed, 191 insertions(+), 22 deletions(-)

diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 9a82d2f06e..bf2e98d0f3 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -1626,6 +1626,12 @@ DB_COMMANDS = (
             ARG_DB_TABLES,
         ),
     ),
+    ActionCommand(
+        name="drop-archived",
+        help="Drop archived tables created through the db clean command",
+        
func=lazy_load_command("airflow.cli.commands.db_command.drop_archived"),
+        args=(ARG_DB_TABLES, ARG_YES),
+    ),
 )
 CONNECTIONS_COMMANDS = (
     ActionCommand(
diff --git a/airflow/cli/commands/db_command.py 
b/airflow/cli/commands/db_command.py
index 468aa3d87f..72ee55c86d 100644
--- a/airflow/cli/commands/db_command.py
+++ b/airflow/cli/commands/db_command.py
@@ -27,7 +27,7 @@ from airflow import settings
 from airflow.exceptions import AirflowException
 from airflow.utils import cli as cli_utils, db
 from airflow.utils.db import REVISION_HEADS_MAP
-from airflow.utils.db_cleanup import config_dict, export_cleaned_records, 
run_cleanup
+from airflow.utils.db_cleanup import config_dict, drop_archived_tables, 
export_cleaned_records, run_cleanup
 from airflow.utils.process_utils import execute_interactive
 
 
@@ -218,3 +218,12 @@ def export_cleaned(args):
         table_names=args.tables,
         drop_archives=args.drop_archives,
     )
+
+
+@cli_utils.action_cli(check_db=False)
+def drop_archived(args):
+    """Drops archived tables from metadata database."""
+    drop_archived_tables(
+        table_names=args.tables,
+        needs_confirm=not args.yes,
+    )
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index a9d12c7aa3..41b89931f5 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -39,6 +39,7 @@ from airflow.cli.simple_table import AirflowConsole
 from airflow.models import Base
 from airflow.utils import timezone
 from airflow.utils.db import reflect_tables
+from airflow.utils.helpers import ask_yesno
 from airflow.utils.session import NEW_SESSION, provide_session
 
 logger = logging.getLogger(__file__)
@@ -301,13 +302,21 @@ def _confirm_delete(*, date: DateTime, tables: list[str]):
 
 
 def _confirm_drop_archives(*, tables: list[str]):
+    # if length of tables is greater than 3, show the total count
+    if len(tables) > 3:
+        text_ = f"{len(tables)} archived tables prefixed with 
{ARCHIVE_TABLE_PREFIX}"
+    else:
+        text_ = f"the following archived tables {tables}"
     question = (
-        f"You have requested that we drop archived records for tables 
{tables!r}.\n"
-        f"This is irreversible.  Consider backing up the tables first \n"
-        f"Enter 'drop archived tables' (without quotes) to proceed."
+        f"You have requested that we drop {text_}.\n"
+        f"This is irreversible. Consider backing up the tables first \n"
     )
     print(question)
-    answer = input().strip()
+    if len(tables) > 3:
+        show_tables = ask_yesno("Show tables? (y/n): ")
+        if show_tables:
+            print(tables, "\n")
+    answer = input("Enter 'drop archived tables' (without quotes) to 
proceed.\n").strip()
     if not answer == "drop archived tables":
         raise SystemExit("User did not confirm; exiting.")
 
@@ -347,6 +356,19 @@ def _effective_table_names(*, table_names: list[str] | 
None):
     return effective_table_names, effective_config_dict
 
 
+def _get_archived_table_names(table_names, session):
+    inspector = inspect(session.bind)
+    db_table_names = [x for x in inspector.get_table_names() if 
x.startswith(ARCHIVE_TABLE_PREFIX)]
+    effective_table_names, _ = _effective_table_names(table_names=table_names)
+    # Filter out tables that don't start with the archive prefix
+    archived_table_names = [
+        table_name
+        for table_name in db_table_names
+        if any("__" + x + "__" in table_name for x in effective_table_names)
+    ]
+    return archived_table_names
+
+
 @provide_session
 def run_cleanup(
     *,
@@ -410,16 +432,14 @@ def export_cleaned_records(
     export_format, output_path, table_names=None, drop_archives=False, 
session: Session = NEW_SESSION
 ):
     """Export cleaned data to the given output path in the given format."""
-    effective_table_names, _ = _effective_table_names(table_names=table_names)
-    if drop_archives:
-        _confirm_drop_archives(tables=sorted(effective_table_names))
-    inspector = inspect(session.bind)
-    db_table_names = [x for x in inspector.get_table_names() if 
x.startswith(ARCHIVE_TABLE_PREFIX)]
+    archived_table_names = _get_archived_table_names(table_names, session)
+    # If user chose to drop archives, check there are archive tables that 
exists
+    # before asking for confirmation
+    if drop_archives and archived_table_names:
+        _confirm_drop_archives(tables=sorted(archived_table_names))
     export_count = 0
     dropped_count = 0
-    for table_name in db_table_names:
-        if not any("__" + x + "__" in table_name for x in 
effective_table_names):
-            continue
+    for table_name in archived_table_names:
         logger.info("Exporting table %s", table_name)
         _dump_table_to_file(
             target_table=table_name,
@@ -433,3 +453,17 @@ def export_cleaned_records(
             session.execute(text(f"DROP TABLE {table_name}"))
             dropped_count += 1
     logger.info("Total exported tables: %s, Total dropped tables: %s", 
export_count, dropped_count)
+
+
+@provide_session
+def drop_archived_tables(table_names, needs_confirm, session):
+    """Drop archived tables."""
+    archived_table_names = _get_archived_table_names(table_names, session)
+    if needs_confirm and archived_table_names:
+        _confirm_drop_archives(tables=sorted(archived_table_names))
+    dropped_count = 0
+    for table_name in archived_table_names:
+        logger.info("Dropping archived table %s", table_name)
+        session.execute(text(f"DROP TABLE {table_name}"))
+        dropped_count += 1
+    logger.info("Total dropped tables: %s", dropped_count)
diff --git a/docs/apache-airflow/howto/usage-cli.rst 
b/docs/apache-airflow/howto/usage-cli.rst
index 976b0b938f..cd66d7e0cb 100644
--- a/docs/apache-airflow/howto/usage-cli.rst
+++ b/docs/apache-airflow/howto/usage-cli.rst
@@ -232,6 +232,16 @@ location must exist.
 Other options include: ``--tables`` to specify the tables to export, 
``--drop-archives`` to drop the archive tables after
 exporting.
 
+Dropping the archived tables
+----------------------------
+
+If during the ``db clean`` process, you did not use the ``--skip-archive`` 
option which drops the archived table, you can
+still drop the archive tables using the ``db drop-archived`` command. This 
operation is irreversible and you are encouraged
+to use the ``db export-cleaned`` command to backup the tables to disk before 
dropping them.
+
+You can specify the tables to drop using the ``--tables`` option. If no tables 
are specified, all archive tables will be
+dropped.
+
 Beware cascading deletes
 ^^^^^^^^^^^^^^^^^^^^^^^^
 
diff --git a/tests/cli/commands/test_db_command.py 
b/tests/cli/commands/test_db_command.py
index 78f873b65c..f91a7767ad 100644
--- a/tests/cli/commands/test_db_command.py
+++ b/tests/cli/commands/test_db_command.py
@@ -507,3 +507,31 @@ class TestCLIDBClean:
         export_archived_mock.assert_called_once_with(
             export_format="csv", output_path="path", table_names=None, 
drop_archives=expected
         )
+
+    @pytest.mark.parametrize(
+        "extra_args, expected", [(["--tables", "hello, goodbye"], ["hello", 
"goodbye"]), ([], None)]
+    )
+    @patch("airflow.cli.commands.db_command.drop_archived_tables")
+    def test_tables_in_drop_archived_records_command(self, 
mock_drop_archived_records, extra_args, expected):
+        args = self.parser.parse_args(
+            [
+                "db",
+                "drop-archived",
+                *extra_args,
+            ]
+        )
+        db_command.drop_archived(args)
+        
mock_drop_archived_records.assert_called_once_with(table_names=expected, 
needs_confirm=True)
+
+    @pytest.mark.parametrize("extra_args, expected", [(["-y"], False), ([], 
True)])
+    @patch("airflow.cli.commands.db_command.drop_archived_tables")
+    def test_confirm_in_drop_archived_records_command(self, 
mock_drop_archived_records, extra_args, expected):
+        args = self.parser.parse_args(
+            [
+                "db",
+                "drop-archived",
+                *extra_args,
+            ]
+        )
+        db_command.drop_archived(args)
+        mock_drop_archived_records.assert_called_once_with(table_names=None, 
needs_confirm=expected)
diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py
index f72630888a..7d36e37ced 100644
--- a/tests/utils/test_db_cleanup.py
+++ b/tests/utils/test_db_cleanup.py
@@ -42,6 +42,7 @@ from airflow.utils.db_cleanup import (
     _confirm_drop_archives,
     _dump_table_to_file,
     config_dict,
+    drop_archived_tables,
     export_cleaned_records,
     run_cleanup,
 )
@@ -336,27 +337,53 @@ class TestDBCleanup:
         "drop_archive",
         [True, False],
     )
+    @patch("airflow.utils.db_cleanup._dump_table_to_file")
     @patch("airflow.utils.db_cleanup._confirm_drop_archives")
-    def test_confirm_drop_called_when_drop_archives_is_true(self, 
confirm_drop_mock, drop_archive):
+    @patch("airflow.utils.db_cleanup.inspect")
+    def test_confirm_drop_called_when_drop_archives_is_true_and_archive_exists(
+        self, inspect_mock, confirm_drop_mock, _dump_table_to_file_mock, 
drop_archive
+    ):
         """test that drop confirmation input is called when appropriate"""
-        export_cleaned_records(export_format="csv", output_path="path", 
drop_archives=drop_archive)
+        inspector = inspect_mock.return_value
+        inspector.get_table_names.return_value = 
[f"{ARCHIVE_TABLE_PREFIX}dag_run__233"]
+        export_cleaned_records(
+            export_format="csv", output_path="path", 
drop_archives=drop_archive, session=MagicMock()
+        )
         if drop_archive:
             confirm_drop_mock.assert_called()
         else:
             confirm_drop_mock.assert_not_called()
 
-    def test_confirm_drop_archives(self):
-        tables = ["table1", "table2"]
+    @pytest.mark.parametrize(
+        "tables",
+        [
+            ["table1", "table2"],
+            ["table1", "table2", "table3"],
+            ["table1", "table2", "table3", "table4"],
+        ],
+    )
+    @patch("airflow.utils.db_cleanup.ask_yesno")
+    def test_confirm_drop_archives(self, mock_ask_yesno, tables):
+        expected = (
+            f"You have requested that we drop the following archived tables 
{tables}.\n"
+            "This is irreversible. Consider backing up the tables first"
+        )
+        if len(tables) > 3:
+            expected = (
+                f"You have requested that we drop {len(tables)} archived 
tables prefixed with "
+                f"_airflow_deleted__.\n"
+                "This is irreversible. Consider backing up the tables first \n"
+                "\n"
+                f"{tables}"
+            )
+
+        mock_ask_yesno.return_value = True
         with patch("sys.stdout", new=StringIO()) as fake_out, patch(
             "builtins.input", side_effect=["drop archived tables"]
         ):
             _confirm_drop_archives(tables=tables)
             output = fake_out.getvalue().strip()
-            expected = (
-                f"You have requested that we drop archived records for tables 
{tables}.\n"
-                "This is irreversible.  Consider backing up the tables first 
\n"
-                "Enter 'drop archived tables' (without quotes) to proceed."
-            )
+
         assert output == expected
 
     def test_user_did_not_confirm(self):
@@ -394,6 +421,26 @@ class TestDBCleanup:
         else:
             assert "Total exported tables: 1, Total dropped tables: 0" in 
caplog.text
 
+    @pytest.mark.parametrize("drop_archive", [True, False])
+    @patch("airflow.utils.db_cleanup._dump_table_to_file")
+    @patch("airflow.utils.db_cleanup.inspect")
+    @patch("airflow.utils.db_cleanup._confirm_drop_archives")
+    @patch("builtins.input", side_effect=["drop archived tables"])
+    def test_export_cleaned_no_confirm_if_no_tables(
+        self, mock_input, mock_confirm, inspect_mock, dump_mock, caplog, 
drop_archive
+    ):
+        """Test no confirmation if no archived tables found"""
+        session_mock = MagicMock()
+        inspector = inspect_mock.return_value
+        # No tables with the archive prefix
+        inspector.get_table_names.return_value = ["dag_run", "task_instance"]
+        export_cleaned_records(
+            export_format="csv", output_path="path", 
drop_archives=drop_archive, session=session_mock
+        )
+        mock_confirm.assert_not_called()
+        dump_mock.assert_not_called()
+        assert "Total exported tables: 0, Total dropped tables: 0" in 
caplog.text
+
     @patch("airflow.utils.db_cleanup.csv")
     def test_dump_table_to_file_function_for_csv(self, mock_csv):
         mockopen = mock_open()
@@ -417,6 +464,41 @@ class TestDBCleanup:
             )
         assert "Export format json is not supported" in str(exc_info.value)
 
+    @pytest.mark.parametrize("tables", [["log", "dag"], ["dag_run", 
"task_instance"]])
+    @patch("airflow.utils.db_cleanup._confirm_drop_archives")
+    @patch("airflow.utils.db_cleanup.inspect")
+    def test_drop_archived_tables_no_confirm_if_no_archived_tables(
+        self, inspect_mock, mock_confirm, tables, caplog
+    ):
+        """
+        Test no confirmation if no archived tables found.
+        Archived tables starts with a prefix defined in ARCHIVE_TABLE_PREFIX.
+        """
+        inspector = inspect_mock.return_value
+        inspector.get_table_names.return_value = tables
+        drop_archived_tables(tables, needs_confirm=True, session=MagicMock())
+        mock_confirm.assert_not_called()
+        assert "Total dropped tables: 0" in caplog.text
+
+    @pytest.mark.parametrize("confirm", [True, False])
+    @patch("airflow.utils.db_cleanup.inspect")
+    @patch("airflow.utils.db_cleanup._confirm_drop_archives")
+    @patch("builtins.input", side_effect=["drop archived tables"])
+    def test_drop_archived_tables(self, mock_input, confirm_mock, 
inspect_mock, caplog, confirm):
+        """Test drop_archived_tables"""
+        archived_table = f"{ARCHIVE_TABLE_PREFIX}dag_run__233"
+        normal_table = "dag_run"
+        inspector = inspect_mock.return_value
+        inspector.get_table_names.return_value = [archived_table, normal_table]
+        drop_archived_tables([normal_table], needs_confirm=confirm, 
session=MagicMock())
+        assert f"Dropping archived table {archived_table}" in caplog.text
+        assert f"Dropping archived table {normal_table}" not in caplog.text
+        assert "Total dropped tables: 1" in caplog.text
+        if confirm:
+            confirm_mock.assert_called()
+        else:
+            confirm_mock.assert_not_called()
+
 
 def create_tis(base_date, num_tis, external_trigger=False):
     with create_session() as session:

Reply via email to