ephraimbuddy commented on code in PR #29058: URL: https://github.com/apache/airflow/pull/29058#discussion_r1087901858
########## airflow/utils/db_cleanup.py: ########## @@ -159,6 +170,14 @@ def _do_delete(*, query, orm_model, skip_archive, session): logger.debug("delete statement:\n%s", delete.compile()) session.execute(delete) session.commit() + if export_to_csv: + if not output_path.startswith(AIRFLOW_HOME): + output_path = os.path.join(AIRFLOW_HOME, output_path) + os.makedirs(output_path, exist_ok=True) + _to_csv( + target_table=target_table, file_path=f"{output_path}/{target_table_name}.csv", session=session + ) + skip_archive = True Review Comment: I thought about this more and I think we should change the approach....instead of running the export right when we are cleaning data which to me means archiving to file, what do you think about having a separate command `airflow db export-cleaned` that would export from the archive tables to the given output folder? I experimented with this, see below: ```diff diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index 6890368f9b..4e260da7c5 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -468,6 +468,17 @@ ARG_DB_SKIP_ARCHIVE = Arg( help="Don't preserve purged records in an archive table.", action="store_true", ) +ARG_DB_EXPORT_FORMAT = Arg( + ("--export-format",), + help="The file format to export the cleaned data", + choices=("csv",) +) +ARG_DB_OUTPUT_PATH = Arg( + ("--output-path",), + help="The path to export the cleaned data", + metavar='FILEPATH', + required=True, +) # pool @@ -1589,6 +1600,15 @@ DB_COMMANDS = ( ARG_DB_SKIP_ARCHIVE, ), ), + ActionCommand( + name="export-cleaned", + help="Export purged records from the archive tables", + func=lazy_load_command("airflow.cli.commands.db_command.export_archived"), + args=( + ARG_DB_EXPORT_FORMAT, + ARG_DB_OUTPUT_PATH, + ) + ) ) CONNECTIONS_COMMANDS = ( ActionCommand( diff --git a/airflow/cli/commands/db_command.py b/airflow/cli/commands/db_command.py index 4064285db8..d27a569e35 100644 --- a/airflow/cli/commands/db_command.py +++ b/airflow/cli/commands/db_command.py @@ -27,7 +27,7 @@ from airflow import settings from airflow.exceptions import AirflowException from airflow.utils import cli as cli_utils, db from airflow.utils.db import REVISION_HEADS_MAP -from airflow.utils.db_cleanup import config_dict, run_cleanup +from airflow.utils.db_cleanup import config_dict, run_cleanup, export_cleaned from airflow.utils.process_utils import execute_interactive @@ -207,3 +207,13 @@ def cleanup_tables(args): confirm=not args.yes, skip_archive=args.skip_archive, ) + + +@cli_utils.action_cli(check_db=False) +def export_archived(args): + if not os.path.exists(args.output_path): + raise AirflowException(f"The specified --output-path {args.output_path} does not exist") + export_cleaned( + export_format=args.export_format, + output_path=args.output_path, + ) diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py index ac96a6abdb..cbb5963db1 100644 --- a/airflow/utils/db_cleanup.py +++ b/airflow/utils/db_cleanup.py @@ -20,13 +20,14 @@ This module took inspiration from the community maintenance dag """ from __future__ import annotations +import csv import logging from contextlib import contextmanager from dataclasses import dataclass from typing import Any from pendulum import DateTime -from sqlalchemy import and_, column, false, func, table, text +from sqlalchemy import and_, column, false, func, table, text, inspect from sqlalchemy.exc import OperationalError, ProgrammingError from sqlalchemy.ext.compiler import compiles from sqlalchemy.orm import Query, Session, aliased @@ -40,6 +41,8 @@ from airflow.utils.session import NEW_SESSION, provide_session logger = logging.getLogger(__file__) +ARCHIVE_TABLE_PREFIX = "_airflow_deleted__" + @dataclass class _TableConfig: @@ -131,7 +134,7 @@ def _do_delete(*, query, orm_model, skip_archive, session): # using bulk delete # create a new table and copy the rows there timestamp_str = re.sub(r"[^\d]", "", datetime.utcnow().isoformat())[:14] - target_table_name = f"_airflow_deleted__{orm_model.name}__{timestamp_str}" + target_table_name = f"{ARCHIVED_TABLE_PREFIX}{orm_model.name}__{timestamp_str}" print(f"Moving data to table {target_table_name}") stmt = CreateTableAs(target_table_name, query.selectable) logger.debug("ctas query:\n%s", stmt.compile()) @@ -370,3 +373,27 @@ def run_cleanup( session=session, ) session.commit() + + +def _dump_db(*, target_table, file_path, export_format, session): + if export_format == "csv": + with open(file_path + ".csv", "w") as f: + csv_writer = csv.writer(f) + cursor = session.execute(text(f"SELECT * FROM {target_table}")) + csv_writer.writerow(cursor.keys()) + csv_writer.writerows(cursor.fetchall()) + + +@provide_session +def export_cleaned(export_format, output_path, session: Session = NEW_SESSION): + inspector = inspect(session.bind) + table_names = inspector.get_table_names() + for table_name in table_names: + if table_name.startswith(ARCHIVE_TABLE_PREFIX): + _dump_db( + target_table=table_name, + file_path=f"{output_path}/{table_name}", + export_format=export_format, + session=session, + ) + logger.info("Done") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org