Dear All-
After upgrading to Airflow 2.4.2 and running `airflow db upgrade`, I started
receiving “[…] already registered for DAG warnings. I posted by DAG file below
for reference.
Anyone know what I can do to relieve these warnings? It seems like DAG is
written appropriately, but another set of eyes is always helpful. Thanks all!
Best,
Anthony
default_args = {
"owner": "airflow",
"start_date": datetime(2019, 6, 21),
"email": [__email__],
"email_on_failure": True,
"on_failure_callback": send_slack_message_failed,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=1),
}
with DAG(
dag_id="PAPro_ETL",
catchup=False,
default_args=default_args,
schedule_interval="@daily",
max_active_runs=1,
) as dag:
start = DummyOperator(task_id="start")
send_slack_message_success = SlackAPIPostOperator(
task_id="send_slack_message_success",
token=Variable.get("slack_token"),
channel=Variable.get("slack_status_channel"),
username="BI-ETL-airflow",
text=":white_check_mark: {{ dag }} for {{ ds }} was completed @ "
+ f"{datetime.now().strftime('%F %T')} UTC.",
)
for truncate_table in truncate_tables:
if truncate_table == "qu_pap_clicks" or truncate_table ==
"qu_pap_impressions":
truncate_staging = PythonOperator(
task_id=f"truncate_staging_{truncate_table}_table",
python_callable=truncate_staging_table,
op_args=[truncate_table],
)
load_destination_table = PostgresOperator(
task_id=f"load_destination_{truncate_table}_table",
postgres_conn_id=destination_connection,
sql=f"papro_etl/insert_{truncate_table}.sql"
)
with TaskGroup(group_id=f'{truncate_table}_extract') as
LoadStagingTable:
for schema_name, schema in schemas.items():
extract_to_staging = PostgresOperator(
task_id=f"{schema_name}_{truncate_table}_extract",
postgres_conn_id=staging_connection,
sql=f"papro_etl/extract_{truncate_table}.sql",
params={"schema_name": schema_name, "schema": schema},
)
(start >> truncate_staging >> LoadStagingTable
>> load_destination_table >> send_slack_message_success)
else:
truncate_staging = PythonOperator(
task_id=f"truncate_staging_{truncate_table}_table",
python_callable=truncate_staging_table,
op_args=[truncate_table],
)
compare_rows = PythonOperator(
task_id=f"compare_{truncate_table}_rows",
python_callable=compare_staging_to_destination,
op_args=[truncate_table],
provide_context=True,
)
load_destination_table = PythonOperator(
task_id=f"load_destination_{truncate_table}_table",
python_callable=truncate_and_load_destination_table,
op_args=[truncate_table]
)
with TaskGroup(group_id=f'{truncate_table}_extract') as
LoadStagingTable:
for schema_name, schema in schemas.items():
extract_to_staging = PostgresOperator(
task_id=f"{schema_name}_{truncate_table}_extract",
postgres_conn_id=staging_connection,
sql=f"papro_etl/extract_{truncate_table}.sql",
params={"schema_name": schema_name, "schema": schema},
)
(start >> truncate_staging >> LoadStagingTable
>> compare_rows >> load_destination_table
>> send_slack_message_success)