This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4364908da2c Add --error-on-cleanup-failure flag to airflow db clean 
(#65239)
4364908da2c is described below

commit 4364908da2c0e798800509f2427fce90777af3ec
Author: Hemkumar Chheda <[email protected]>
AuthorDate: Fri May 8 23:49:24 2026 +0530

    Add --error-on-cleanup-failure flag to airflow db clean (#65239)
    
    * fix(db_cleanup): add --error-on-cleanup-failure flag to airflow db clean
    
    airflow db clean suppresses all per-table cleanup errors via
    _suppress_with_logging() and exits 0 even when tables could not be
    cleaned. This makes it impossible to detect silent failures in automated
    DAG-based maintenance workflows, which can lead to unchecked table
    growth and eventual migration failures on upgrade.
    
    This commit adds an opt-in --error-on-cleanup-failure flag that causes
    run_cleanup() to raise AirflowException (and the CLI to exit 1) if any
    table cleanup encountered an error. Default behaviour is unchanged.
    
    Additionally, a warning listing all tables that were not cleaned is now
    always emitted when failures occur, even without the flag, improving
    observability without requiring any opt-in.
    
    Changes:
    - airflow/utils/db_cleanup.py: update _suppress_with_logging to track
      whether an exception was suppressed via a SimpleNamespace context
      object; collect failed table names in run_cleanup(); emit a warning
      summary and optionally raise AirflowException.
    - airflow/cli/cli_config.py: add ARG_DB_ERROR_ON_CLEANUP_FAILURE and
      wire it into the db clean ActionCommand args list.
    - airflow/cli/commands/db_command.py: forward error_on_cleanup_failure
      from CLI args to run_cleanup().
    - tests/utils/test_db_cleanup.py: add unit tests covering the new flag
      and the warning summary behaviour.
    
    Made-with: Cursor
    
    * fix(db_cleanup): improve type annotations and add docs for 
--error-on-cleanup-failure
    
    - Use collections.abc.Generator for the _suppress_with_logging return
      type annotation (ruff UP035 compliant) instead of typing.Generator.
    - Expand the _suppress_with_logging docstring to describe the yielded
      SimpleNamespace context object and the failure-tracking behaviour.
    - Add a new "Detecting cleanup failures" section to
      docs/howto/usage-cli.rst documenting the --error-on-cleanup-failure
      flag and the --skip-archive recommendation for large tables.
    
    Made-with: Cursor
    
    * fix(db_cleanup): address CI failures on PR #65239
    
    - Fix mypy arg-type errors: OperationalError third argument must be a
      BaseException, not None. Replace OperationalError("", {}, None) with
      OperationalError("", {}, Exception("mock db error")) in three new
      tests in test_db_cleanup.py.
    - Fix ruff ISC violation: collapse implicit string concatenation in the
      run_cleanup() warning call into a single string literal.
    - Update existing CLI tests in test_db_command.py to include the new
      error_on_cleanup_failure=False kwarg in all ten
      assert_called_once_with assertions.
    - Add test_error_on_cleanup_failure to test_db_command.py to verify the
      --error-on-cleanup-failure flag is correctly forwarded to run_cleanup.
    
    Made-with: Cursor
    
    * fix(db_cleanup): add --error-on-cleanup-failure flag documentation and 
tests
    
    - Introduce a new news fragment detailing the addition of the 
``--error-on-cleanup-failure`` flag to the ``airflow db clean`` command, 
allowing for better error handling during table cleanup.
    - Update unit tests in `test_db_cleanup.py` to ensure proper functionality 
of the new flag, including checks for raised exceptions and warning messages 
for failed tables.
    - Adjust the known exceptions list to reflect changes in `db_cleanup.py`.
    
    * fix(db_cleanup): update error handling for table cleanup failures
    
    - Change the behavior of the `--error-on-cleanup-failure` flag to raise a 
RuntimeError instead of an AirflowException when table cleanup encounters 
errors.
    - Update the documentation and help text for the flag to clarify its 
functionality.
    - Ensure that warning messages for failed tables are always emitted, 
regardless of the flag's state.
    - Modify unit tests in `test_db_cleanup.py` to reflect the new error 
handling and verify the correct logging behavior.
    
    This update improves error visibility during automated workflows by 
ensuring that cleanup failures are properly reported.
    
    * fix(db_cleanup): remove redundant cleanup commit
---
 airflow-core/docs/howto/usage-cli.rst              |  28 +++++
 airflow-core/src/airflow/cli/cli_config.py         |   6 ++
 .../src/airflow/cli/commands/db_command.py         |   1 +
 airflow-core/src/airflow/utils/db_cleanup.py       |  40 +++++++-
 .../tests/unit/cli/commands/test_db_command.py     |  40 ++++++++
 airflow-core/tests/unit/utils/test_db_cleanup.py   | 114 +++++++++++++++++++--
 6 files changed, 218 insertions(+), 11 deletions(-)

diff --git a/airflow-core/docs/howto/usage-cli.rst 
b/airflow-core/docs/howto/usage-cli.rst
index b80faf89b3e..eba8f02d390 100644
--- a/airflow-core/docs/howto/usage-cli.rst
+++ b/airflow-core/docs/howto/usage-cli.rst
@@ -221,6 +221,34 @@ By default, ``db clean`` will archive purged rows in 
tables of the form ``_airfl
 
 When you encounter an error without using ``--skip-archive``,  
``_airflow_deleted__<table>__<timestamp>`` would still exist in the DB. You can 
use  ``db drop-archived`` command to manually drop these tables.
 
+Detecting cleanup failures
+^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+By default, ``db clean`` suppresses per-table errors (such as a database 
``statement_timeout``
+being exceeded on a very large table) and exits with code 0 even if one or 
more tables were not
+cleaned. A WARNING is emitted in the logs listing which tables were skipped 
due to errors.
+
+To make the command exit with a non-zero code whenever any table cleanup fails 
— useful when
+``airflow db clean`` is invoked from a DAG task and you want the task to turn 
red on failure —
+pass ``--error-on-cleanup-failure``:
+
+.. code-block:: bash
+
+    airflow db clean \
+        --clean-before-timestamp "$(date -u -d '21 days ago' 
'+%Y-%m-%dT%H:%M:%S+00:00')" \
+        --yes \
+        --error-on-cleanup-failure
+
+When ``--error-on-cleanup-failure`` is set, the raised ``RuntimeError`` 
includes the list of
+tables that failed cleanup, so the command still surfaces which tables were 
not cleaned.
+
+.. tip::
+
+    On large deployments where the archival ``CREATE TABLE … AS SELECT`` step 
itself can time
+    out, combining ``--error-on-cleanup-failure`` with ``--skip-archive`` is 
recommended.
+    ``--skip-archive`` deletes rows directly without the intermediate archive 
table, making the
+    operation both faster and less likely to hit ``statement_timeout``.
+
 Export the purged records from the archive tables
 -------------------------------------------------
 The ``db export-archived`` command exports the contents of the archived 
tables, created by the ``db clean`` command,
diff --git a/airflow-core/src/airflow/cli/cli_config.py 
b/airflow-core/src/airflow/cli/cli_config.py
index a09851e5e24..990325de74b 100644
--- a/airflow-core/src/airflow/cli/cli_config.py
+++ b/airflow-core/src/airflow/cli/cli_config.py
@@ -541,6 +541,11 @@ ARG_DB_BATCH_SIZE = Arg(
         "Lower values reduce long-running locks but increase the number of 
batches."
     ),
 )
+ARG_DB_ERROR_ON_CLEANUP_FAILURE = Arg(
+    ("--error-on-cleanup-failure",),
+    help="Command will exit with a non-zero exit code if any table cleanup 
failed. By default errors are suppressed and the command exits 0.",
+    action="store_true",
+)
 ARG_DAG_IDS = Arg(
     ("--dag-ids",),
     default=None,
@@ -1603,6 +1608,7 @@ DB_COMMANDS = (
             ARG_DB_BATCH_SIZE,
             ARG_DAG_IDS,
             ARG_EXCLUDE_DAG_IDS,
+            ARG_DB_ERROR_ON_CLEANUP_FAILURE,
         ),
     ),
     ActionCommand(
diff --git a/airflow-core/src/airflow/cli/commands/db_command.py 
b/airflow-core/src/airflow/cli/commands/db_command.py
index 253e71d56f3..4638746f568 100644
--- a/airflow-core/src/airflow/cli/commands/db_command.py
+++ b/airflow-core/src/airflow/cli/commands/db_command.py
@@ -349,6 +349,7 @@ def cleanup_tables(args):
         batch_size=args.batch_size,
         dag_ids=args.dag_ids,
         exclude_dag_ids=args.exclude_dag_ids,
+        error_on_cleanup_failure=args.error_on_cleanup_failure,
     )
 
 
diff --git a/airflow-core/src/airflow/utils/db_cleanup.py 
b/airflow-core/src/airflow/utils/db_cleanup.py
index e6b5283669b..0c605b8d6bd 100644
--- a/airflow-core/src/airflow/utils/db_cleanup.py
+++ b/airflow-core/src/airflow/utils/db_cleanup.py
@@ -26,8 +26,10 @@ from __future__ import annotations
 import csv
 import logging
 import os
+from collections.abc import Generator
 from contextlib import contextmanager
 from dataclasses import dataclass
+from types import SimpleNamespace
 from typing import TYPE_CHECKING, Any
 
 from sqlalchemy import and_, column, func, inspect, select, table, text
@@ -475,11 +477,22 @@ def _print_config(*, configs: dict[str, _TableConfig]) -> 
None:
 
 
 @contextmanager
-def _suppress_with_logging(table: str, session: Session):
-    """Suppresses errors but logs them."""
+def _suppress_with_logging(table: str, session: Session) -> 
Generator[SimpleNamespace, None, None]:
+    """
+    Suppress per-table cleanup errors, log them, and expose failure state to 
the caller.
+
+    Yields a :class:`~types.SimpleNamespace` with a single attribute 
``failed`` (bool).
+    When an :class:`~sqlalchemy.exc.OperationalError` or
+    :class:`~sqlalchemy.exc.ProgrammingError` is raised inside the ``with`` 
block the
+    exception is swallowed, ``ctx.failed`` is set to ``True``, a WARNING is 
emitted for
+    the table, and the session is rolled back.  The caller can inspect 
``ctx.failed``
+    after the block to decide whether to surface the error upstream.
+    """
+    ctx = SimpleNamespace(failed=False)
     try:
-        yield
+        yield ctx
     except (OperationalError, ProgrammingError):
+        ctx.failed = True
         logger.warning("Encountered error when attempting to clean table '%s'. 
", table)
         logger.debug("Traceback for table '%s'", table, exc_info=True)
         if session.is_active:
@@ -554,6 +567,7 @@ def run_cleanup(
     skip_archive: bool = False,
     session: Session = NEW_SESSION,
     batch_size: int | None = None,
+    error_on_cleanup_failure: bool = False,
 ) -> None:
     """
     Purges old records in airflow metadata database.
@@ -577,6 +591,9 @@ def run_cleanup(
     :param skip_archive: Set to True if you don't want the purged rows 
preserved in an archive table.
     :param session: Session representing connection to the metadata database.
     :param batch_size: Maximum number of rows to delete or archive in a single 
transaction.
+    :param error_on_cleanup_failure: If True, raise a RuntimeError after 
processing all tables
+        if any per-table cleanup encountered an error. By default errors are 
suppressed, a warning
+        summary is logged, and the command exits 0 even if some tables were 
not cleaned.
     """
     clean_before_timestamp = timezone.coerce_datetime(clean_before_timestamp)
 
@@ -597,10 +614,11 @@ def run_cleanup(
             exclude_dag_ids=exclude_dag_ids,
         )
     existing_tables = reflect_tables(tables=None, session=session).tables
+    failed_tables: list[str] = []
 
     for table_name, table_config in effective_config_dict.items():
         if table_name in existing_tables:
-            with _suppress_with_logging(table_name, session):
+            with _suppress_with_logging(table_name, session) as ctx:
                 _cleanup_table(
                     clean_before_timestamp=clean_before_timestamp,
                     dag_ids=dag_ids,
@@ -612,10 +630,22 @@ def run_cleanup(
                     session=session,
                     batch_size=batch_size,
                 )
-                session.commit()
+            if ctx.failed:
+                failed_tables.append(table_name)
         else:
             logger.warning("Table %s not found.  Skipping.", table_name)
 
+    if failed_tables:
+        if error_on_cleanup_failure:
+            raise RuntimeError(
+                f"airflow db clean encountered errors on the following tables 
and did not clean them: "
+                f"{failed_tables}. Check the logs above for details."
+            )
+        logger.warning(
+            "The following tables were not cleaned due to errors: %s. Check 
the logs above for details.",
+            failed_tables,
+        )
+
 
 @provide_session
 def export_archived_records(
diff --git a/airflow-core/tests/unit/cli/commands/test_db_command.py 
b/airflow-core/tests/unit/cli/commands/test_db_command.py
index a994bf9fb51..6c2cb44a628 100644
--- a/airflow-core/tests/unit/cli/commands/test_db_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_db_command.py
@@ -793,6 +793,7 @@ class TestCLIDBClean:
             confirm=False,
             skip_archive=False,
             batch_size=None,
+            error_on_cleanup_failure=False,
         )
 
     @pytest.mark.parametrize("timezone", ["UTC", "Europe/Berlin", 
"America/Los_Angeles"])
@@ -816,6 +817,7 @@ class TestCLIDBClean:
             confirm=False,
             skip_archive=False,
             batch_size=None,
+            error_on_cleanup_failure=False,
         )
 
     @pytest.mark.parametrize(("confirm_arg", "expected"), [(["-y"], False), 
([], True)])
@@ -845,6 +847,7 @@ class TestCLIDBClean:
             confirm=expected,
             skip_archive=False,
             batch_size=None,
+            error_on_cleanup_failure=False,
         )
 
     @pytest.mark.parametrize(("extra_arg", "expected"), [(["--skip-archive"], 
True), ([], False)])
@@ -874,6 +877,7 @@ class TestCLIDBClean:
             confirm=True,
             skip_archive=expected,
             batch_size=None,
+            error_on_cleanup_failure=False,
         )
 
     @pytest.mark.parametrize(("dry_run_arg", "expected"), [(["--dry-run"], 
True), ([], False)])
@@ -903,6 +907,7 @@ class TestCLIDBClean:
             confirm=True,
             skip_archive=False,
             batch_size=None,
+            error_on_cleanup_failure=False,
         )
 
     @pytest.mark.parametrize(
@@ -934,6 +939,7 @@ class TestCLIDBClean:
             confirm=True,
             skip_archive=False,
             batch_size=None,
+            error_on_cleanup_failure=False,
         )
 
     @pytest.mark.parametrize(("extra_args", "expected"), [(["--verbose"], 
True), ([], False)])
@@ -963,6 +969,7 @@ class TestCLIDBClean:
             confirm=True,
             skip_archive=False,
             batch_size=None,
+            error_on_cleanup_failure=False,
         )
 
     @pytest.mark.parametrize(("extra_args", "expected"), [(["--batch-size", 
"1234"], 1234), ([], None)])
@@ -992,6 +999,7 @@ class TestCLIDBClean:
             confirm=True,
             skip_archive=False,
             batch_size=expected,
+            error_on_cleanup_failure=False,
         )
 
     @pytest.mark.parametrize(
@@ -1023,6 +1031,7 @@ class TestCLIDBClean:
             confirm=True,
             skip_archive=False,
             batch_size=None,
+            error_on_cleanup_failure=False,
         )
 
     @pytest.mark.parametrize(
@@ -1054,6 +1063,37 @@ class TestCLIDBClean:
             confirm=True,
             skip_archive=False,
             batch_size=None,
+            error_on_cleanup_failure=False,
+        )
+
+    @pytest.mark.parametrize(
+        ("extra_args", "expected"), [(["--error-on-cleanup-failure"], True), 
([], False)]
+    )
+    @patch("airflow.cli.commands.db_command.run_cleanup")
+    def test_error_on_cleanup_failure(self, run_cleanup_mock, extra_args, 
expected):
+        """When --error-on-cleanup-failure is passed, error_on_cleanup_failure 
should be True."""
+        args = self.parser.parse_args(
+            [
+                "db",
+                "clean",
+                "--clean-before-timestamp",
+                "2021-01-01",
+                *extra_args,
+            ]
+        )
+        db_command.cleanup_tables(args)
+
+        run_cleanup_mock.assert_called_once_with(
+            table_names=None,
+            dry_run=False,
+            dag_ids=None,
+            exclude_dag_ids=None,
+            clean_before_timestamp=pendulum.parse("2021-01-01 00:00:00Z"),
+            verbose=False,
+            confirm=True,
+            skip_archive=False,
+            batch_size=None,
+            error_on_cleanup_failure=expected,
         )
 
     @patch("airflow.cli.commands.db_command.export_archived_records")
diff --git a/airflow-core/tests/unit/utils/test_db_cleanup.py 
b/airflow-core/tests/unit/utils/test_db_cleanup.py
index 2ddca75ec57..b0d7bb50dc0 100644
--- a/airflow-core/tests/unit/utils/test_db_cleanup.py
+++ b/airflow-core/tests/unit/utils/test_db_cleanup.py
@@ -145,6 +145,25 @@ class TestDBCleanup:
         cleanup_table_mock.assert_called_once()
         assert cleanup_table_mock.call_args.kwargs["batch_size"] == 1234
 
+    @patch("airflow.utils.db_cleanup.reflect_tables")
+    @patch("airflow.utils.db_cleanup._cleanup_table")
+    def test_run_cleanup_does_not_commit_after_cleanup_table(self, 
cleanup_table_mock, reflect_tables_mock):
+        """run_cleanup should not add an extra commit after _cleanup_table 
handles its own transaction."""
+        reflect_tables_mock.return_value.tables = {"log": object()}
+        session = MagicMock()
+
+        run_cleanup(
+            clean_before_timestamp=None,
+            table_names=["log"],
+            dry_run=False,
+            verbose=False,
+            confirm=False,
+            session=session,
+        )
+
+        cleanup_table_mock.assert_called_once()
+        session.commit.assert_not_called()
+
     @pytest.mark.parametrize(
         "table_names",
         [
@@ -549,19 +568,25 @@ class TestDBCleanup:
         assert set(all_models) - exclusion_list.union(config_dict) == set()
         assert exclusion_list.isdisjoint(config_dict)
 
-    def test_no_failure_warnings(self, caplog):
+    def test_no_failure_warnings(self):
         """
         Ensure every table we have configured (and that is present in the db) 
can be cleaned successfully.
         For example, this checks that the recency column is actually a column.
         """
-        run_cleanup(clean_before_timestamp=timezone.utcnow(), dry_run=True)
-        assert "Encountered error when attempting to clean table" not in 
caplog.text
+        with patch("airflow.utils.db_cleanup.logger") as mock_logger:
+            run_cleanup(clean_before_timestamp=timezone.utcnow(), dry_run=True)
+            for call in mock_logger.warning.call_args_list:
+                assert "Encountered error when attempting to clean table" not 
in str(call)
 
         # Lets check we have the right error message just in case
-        caplog.clear()
-        with patch("airflow.utils.db_cleanup._cleanup_table", 
side_effect=OperationalError("oops", {}, None)):
+        with (
+            patch("airflow.utils.db_cleanup.logger") as mock_logger,
+            patch("airflow.utils.db_cleanup._cleanup_table", 
side_effect=OperationalError("oops", {}, None)),
+        ):
             run_cleanup(clean_before_timestamp=timezone.utcnow(), 
table_names=["task_instance"], dry_run=True)
-        assert "Encountered error when attempting to clean table" in 
caplog.text
+            mock_logger.warning.assert_any_call(
+                "Encountered error when attempting to clean table '%s'. ", 
"task_instance"
+            )
 
     @pytest.mark.parametrize(
         "drop_archive",
@@ -741,6 +766,83 @@ class TestDBCleanup:
         else:
             confirm_mock.assert_not_called()
 
+    @patch(
+        "airflow.utils.db_cleanup._cleanup_table",
+        side_effect=OperationalError("", {}, Exception("mock db error")),
+    )
+    def test_error_on_cleanup_failure_raises_when_flag_set(self, 
cleanup_table_mock):
+        """When error_on_cleanup_failure=True and a table fails, RuntimeError 
should be raised."""
+        with patch("airflow.utils.db_cleanup.logger") as mock_logger:
+            with pytest.raises(RuntimeError, match="airflow db clean 
encountered errors"):
+                run_cleanup(
+                    clean_before_timestamp=None,
+                    table_names=["log"],
+                    dry_run=False,
+                    verbose=False,
+                    confirm=False,
+                    error_on_cleanup_failure=True,
+                )
+
+            mock_logger.warning.assert_any_call(
+                "Encountered error when attempting to clean table '%s'. ", 
"log"
+            )
+            assert (
+                "The following tables were not cleaned due to errors: %s. 
Check the logs above for details.",
+                ["log"],
+            ) not in [call.args for call in mock_logger.warning.call_args_list]
+
+    @patch(
+        "airflow.utils.db_cleanup._cleanup_table",
+        side_effect=OperationalError("", {}, Exception("mock db error")),
+    )
+    def test_error_on_cleanup_failure_no_raise_by_default(self, 
cleanup_table_mock):
+        """When error_on_cleanup_failure=False (default) and a table fails, no 
exception is raised."""
+        with patch("airflow.utils.db_cleanup.logger") as mock_logger:
+            run_cleanup(
+                clean_before_timestamp=None,
+                table_names=["log"],
+                dry_run=False,
+                verbose=False,
+                confirm=False,
+                error_on_cleanup_failure=False,
+            )
+            mock_logger.warning.assert_any_call(
+                "The following tables were not cleaned due to errors: %s. 
Check the logs above for details.",
+                ["log"],
+            )
+
+    @patch(
+        "airflow.utils.db_cleanup._cleanup_table",
+        side_effect=OperationalError("", {}, Exception("mock db error")),
+    )
+    def test_error_on_cleanup_failure_lists_failed_tables_in_warning(self, 
cleanup_table_mock):
+        """A warning naming the failed tables is emitted when 
error_on_cleanup_failure is not set."""
+        with patch("airflow.utils.db_cleanup.logger") as mock_logger:
+            run_cleanup(
+                clean_before_timestamp=None,
+                table_names=["log"],
+                dry_run=False,
+                verbose=False,
+                confirm=False,
+            )
+            mock_logger.warning.assert_any_call(
+                "The following tables were not cleaned due to errors: %s. 
Check the logs above for details.",
+                ["log"],
+            )
+
+    @patch("airflow.utils.db_cleanup._cleanup_table")
+    def test_error_on_cleanup_failure_propagated_from_run_cleanup(self, 
cleanup_table_mock):
+        """Ensure error_on_cleanup_failure is accepted by run_cleanup without 
errors when no failures occur."""
+        run_cleanup(
+            clean_before_timestamp=None,
+            table_names=["log"],
+            dry_run=False,
+            verbose=False,
+            confirm=False,
+            error_on_cleanup_failure=True,
+        )
+        cleanup_table_mock.assert_called_once()
+
 
 def create_tis(base_date, num_tis, run_type=DagRunType.SCHEDULED):
     from tests_common.test_utils.taskinstance import create_task_instance

Reply via email to