ephraimbuddy commented on code in PR #29058:
URL: https://github.com/apache/airflow/pull/29058#discussion_r1087901858


##########
airflow/utils/db_cleanup.py:
##########
@@ -159,6 +170,14 @@ def _do_delete(*, query, orm_model, skip_archive, session):
     logger.debug("delete statement:\n%s", delete.compile())
     session.execute(delete)
     session.commit()
+    if export_to_csv:
+        if not output_path.startswith(AIRFLOW_HOME):
+            output_path = os.path.join(AIRFLOW_HOME, output_path)
+        os.makedirs(output_path, exist_ok=True)
+        _to_csv(
+            target_table=target_table, 
file_path=f"{output_path}/{target_table_name}.csv", session=session
+        )
+        skip_archive = True

Review Comment:
   I thought about this more and I think we should change the 
approach....instead of running the  export right when we are cleaning data 
which to me means archiving to file, what do you think about having a separate 
command `airflow db export-cleaned` that would export from the archive tables 
to the given output folder? I experimented with this, see below:
   
   ```diff
   diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
   index 6890368f9b..4e260da7c5 100644
   --- a/airflow/cli/cli_parser.py
   +++ b/airflow/cli/cli_parser.py
   @@ -468,6 +468,17 @@ ARG_DB_SKIP_ARCHIVE = Arg(
        help="Don't preserve purged records in an archive table.",
        action="store_true",
    )
   +ARG_DB_EXPORT_FORMAT = Arg(
   +    ("--export-format",),
   +    help="The file format to export the cleaned data",
   +    choices=("csv",)
   +)
   +ARG_DB_OUTPUT_PATH = Arg(
   +    ("--output-path",),
   +    help="The path to export the cleaned data",
   +    metavar='FILEPATH',
   +    required=True,
   +)
    
    
    # pool
   @@ -1589,6 +1600,15 @@ DB_COMMANDS = (
                ARG_DB_SKIP_ARCHIVE,
            ),
        ),
   +    ActionCommand(
   +        name="export-cleaned",
   +        help="Export purged records from the archive tables",
   +        
func=lazy_load_command("airflow.cli.commands.db_command.export_archived"),
   +        args=(
   +            ARG_DB_EXPORT_FORMAT,
   +            ARG_DB_OUTPUT_PATH,
   +        )
   +    )
    )
    CONNECTIONS_COMMANDS = (
        ActionCommand(
   diff --git a/airflow/cli/commands/db_command.py 
b/airflow/cli/commands/db_command.py
   index 4064285db8..d27a569e35 100644
   --- a/airflow/cli/commands/db_command.py
   +++ b/airflow/cli/commands/db_command.py
   @@ -27,7 +27,7 @@ from airflow import settings
    from airflow.exceptions import AirflowException
    from airflow.utils import cli as cli_utils, db
    from airflow.utils.db import REVISION_HEADS_MAP
   -from airflow.utils.db_cleanup import config_dict, run_cleanup
   +from airflow.utils.db_cleanup import config_dict, run_cleanup, 
export_cleaned
    from airflow.utils.process_utils import execute_interactive
    
    
   @@ -207,3 +207,13 @@ def cleanup_tables(args):
            confirm=not args.yes,
            skip_archive=args.skip_archive,
        )
   +
   +
   +@cli_utils.action_cli(check_db=False)
   +def export_archived(args):
   +    if not os.path.exists(args.output_path):
   +        raise AirflowException(f"The specified --output-path 
{args.output_path} does not exist")
   +    export_cleaned(
   +        export_format=args.export_format,
   +        output_path=args.output_path,
   +    )
   diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
   index ac96a6abdb..cbb5963db1 100644
   --- a/airflow/utils/db_cleanup.py
   +++ b/airflow/utils/db_cleanup.py
   @@ -20,13 +20,14 @@ This module took inspiration from the community 
maintenance dag
    """
    from __future__ import annotations
    
   +import csv
    import logging
    from contextlib import contextmanager
    from dataclasses import dataclass
    from typing import Any
    
    from pendulum import DateTime
   -from sqlalchemy import and_, column, false, func, table, text
   +from sqlalchemy import and_, column, false, func, table, text, inspect
    from sqlalchemy.exc import OperationalError, ProgrammingError
    from sqlalchemy.ext.compiler import compiles
    from sqlalchemy.orm import Query, Session, aliased
   @@ -40,6 +41,8 @@ from airflow.utils.session import NEW_SESSION, 
provide_session
    
    logger = logging.getLogger(__file__)
    
   +ARCHIVE_TABLE_PREFIX = "_airflow_deleted__"
   +
    
    @dataclass
    class _TableConfig:
   @@ -131,7 +134,7 @@ def _do_delete(*, query, orm_model, skip_archive, 
session):
        # using bulk delete
        # create a new table and copy the rows there
        timestamp_str = re.sub(r"[^\d]", "", datetime.utcnow().isoformat())[:14]
   -    target_table_name = 
f"_airflow_deleted__{orm_model.name}__{timestamp_str}"
   +    target_table_name = 
f"{ARCHIVED_TABLE_PREFIX}{orm_model.name}__{timestamp_str}"
        print(f"Moving data to table {target_table_name}")
        stmt = CreateTableAs(target_table_name, query.selectable)
        logger.debug("ctas query:\n%s", stmt.compile())
   @@ -370,3 +373,27 @@ def run_cleanup(
                    session=session,
                )
                session.commit()
   +
   +
   +def _dump_db(*, target_table, file_path, export_format, session):
   +    if export_format == "csv":
   +        with open(file_path + ".csv", "w") as f:
   +            csv_writer = csv.writer(f)
   +            cursor = session.execute(text(f"SELECT * FROM {target_table}"))
   +            csv_writer.writerow(cursor.keys())
   +            csv_writer.writerows(cursor.fetchall())
   +
   +
   +@provide_session
   +def export_cleaned(export_format, output_path, session: Session = 
NEW_SESSION):
   +    inspector = inspect(session.bind)
   +    table_names = inspector.get_table_names()
   +    for table_name in table_names:
   +        if table_name.startswith(ARCHIVE_TABLE_PREFIX):
   +            _dump_db(
   +                target_table=table_name,
   +                file_path=f"{output_path}/{table_name}",
   +                export_format=export_format,
   +                session=session,
   +            )
   +    logger.info("Done")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to