Dear All-

After upgrading to Airflow 2.4.2 and running `airflow db upgrade`, I started 
receiving “[…] already registered for DAG warnings. I posted by DAG file below 
for reference.

Anyone know what I can do to relieve these warnings? It seems like DAG is 
written appropriately, but another set of eyes is always helpful.  Thanks all!

Best,

Anthony



default_args = {
    "owner": "airflow",
    "start_date": datetime(2019, 6, 21),
    "email": [__email__],
    "email_on_failure": True,
    "on_failure_callback": send_slack_message_failed,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=1),
}

with DAG(
    dag_id="PAPro_ETL",
    catchup=False,
    default_args=default_args,
    schedule_interval="@daily",
    max_active_runs=1,
) as dag:
    start = DummyOperator(task_id="start")

    send_slack_message_success = SlackAPIPostOperator(
        task_id="send_slack_message_success",
        token=Variable.get("slack_token"),
        channel=Variable.get("slack_status_channel"),
        username="BI-ETL-airflow",
        text=":white_check_mark: {{ dag }} for {{ ds }} was completed @ "
        + f"{datetime.now().strftime('%F %T')} UTC.",
    )

    for truncate_table in truncate_tables:
        if truncate_table == "qu_pap_clicks" or truncate_table == 
"qu_pap_impressions":
            truncate_staging = PythonOperator(
                task_id=f"truncate_staging_{truncate_table}_table",
                python_callable=truncate_staging_table,
                op_args=[truncate_table],
            )

            load_destination_table = PostgresOperator(
                task_id=f"load_destination_{truncate_table}_table",
                postgres_conn_id=destination_connection,
                sql=f"papro_etl/insert_{truncate_table}.sql"
                )

            with TaskGroup(group_id=f'{truncate_table}_extract') as 
LoadStagingTable:
                for schema_name, schema in schemas.items():
                    extract_to_staging = PostgresOperator(
                        task_id=f"{schema_name}_{truncate_table}_extract",
                        postgres_conn_id=staging_connection,
                        sql=f"papro_etl/extract_{truncate_table}.sql",
                        params={"schema_name": schema_name, "schema": schema},
                    )

                    (start >> truncate_staging >> LoadStagingTable
                    >> load_destination_table >> send_slack_message_success)

        else:
            truncate_staging = PythonOperator(
                task_id=f"truncate_staging_{truncate_table}_table",
                python_callable=truncate_staging_table,
                op_args=[truncate_table],
            )

            compare_rows = PythonOperator(
                task_id=f"compare_{truncate_table}_rows",
                python_callable=compare_staging_to_destination,
                op_args=[truncate_table],
                provide_context=True,
            )

            load_destination_table = PythonOperator(
                task_id=f"load_destination_{truncate_table}_table",
                python_callable=truncate_and_load_destination_table,
                op_args=[truncate_table]
            )

            with TaskGroup(group_id=f'{truncate_table}_extract') as 
LoadStagingTable:
                for schema_name, schema in schemas.items():
                    extract_to_staging = PostgresOperator(
                        task_id=f"{schema_name}_{truncate_table}_extract",
                        postgres_conn_id=staging_connection,
                        sql=f"papro_etl/extract_{truncate_table}.sql",
                        params={"schema_name": schema_name, "schema": schema},
                    )
                    (start >> truncate_staging >> LoadStagingTable
                    >> compare_rows >> load_destination_table
                    >> send_slack_message_success)

Reply via email to