Which task specifically is it complaining about?

What version did you upgrade from?
A better late-than-never Ash

On Oct 24 2022, at 2:00 pm, Anthony Joyce <[email protected]> 
wrote:
> Dear All-
>
> After upgrading to Airflow 2.4.2 and running `airflow db upgrade`, I started 
> receiving “[…] already registered for DAG warnings. I posted by DAG file 
> below for reference.
>
> Anyone know what I can do to relieve these warnings? It seems like DAG is 
> written appropriately, but another set of eyes is always helpful. Thanks all!
>
> Best,
>
> Anthony
>
>
>
> default_args = {
> "owner": "airflow",
> "start_date": datetime(2019, 6, 21),
> "email": [__email__],
> "email_on_failure": True,
> "on_failure_callback": send_slack_message_failed,
> "email_on_retry": False,
> "retries": 1,
> "retry_delay": timedelta(minutes=1),
> }
>
> with DAG(
> dag_id="PAPro_ETL",
> catchup=False,
> default_args=default_args,
> schedule_interval="@daily",
> max_active_runs=1,
> ) as dag:
> start = DummyOperator(task_id="start")
>
> send_slack_message_success = SlackAPIPostOperator(
> task_id="send_slack_message_success",
> token=Variable.get("slack_token"),
> channel=Variable.get("slack_status_channel"),
> username="BI-ETL-airflow",
> text=":white_check_mark: {{ dag }} for {{ ds }} was completed @ "
> + f"{datetime.now().strftime('%F %T')} UTC.",
> )
>
> for truncate_table in truncate_tables:
> if truncate_table == "qu_pap_clicks" or truncate_table == 
> "qu_pap_impressions":
> truncate_staging = PythonOperator(
> task_id=f"truncate_staging_{truncate_table}_table",
> python_callable=truncate_staging_table,
> op_args=[truncate_table],
> )
>
> load_destination_table = PostgresOperator(
> task_id=f"load_destination_{truncate_table}_table",
> postgres_conn_id=destination_connection,
> sql=f"papro_etl/insert_{truncate_table}.sql"
> )
>
> with TaskGroup(group_id=f'{truncate_table}_extract') as LoadStagingTable:
> for schema_name, schema in schemas.items():
> extract_to_staging = PostgresOperator(
> task_id=f"{schema_name}_{truncate_table}_extract",
> postgres_conn_id=staging_connection,
> sql=f"papro_etl/extract_{truncate_table}.sql",
> params={"schema_name": schema_name, "schema": schema},
> )
>
> (start >> truncate_staging >> LoadStagingTable
> >> load_destination_table >> send_slack_message_success)
>
> else:
> truncate_staging = PythonOperator(
> task_id=f"truncate_staging_{truncate_table}_table",
> python_callable=truncate_staging_table,
> op_args=[truncate_table],
> )
>
> compare_rows = PythonOperator(
> task_id=f"compare_{truncate_table}_rows",
> python_callable=compare_staging_to_destination,
> op_args=[truncate_table],
> provide_context=True,
> )
>
> load_destination_table = PythonOperator(
> task_id=f"load_destination_{truncate_table}_table",
> python_callable=truncate_and_load_destination_table,
> op_args=[truncate_table]
> )
>
> with TaskGroup(group_id=f'{truncate_table}_extract') as LoadStagingTable:
> for schema_name, schema in schemas.items():
> extract_to_staging = PostgresOperator(
> task_id=f"{schema_name}_{truncate_table}_extract",
> postgres_conn_id=staging_connection,
> sql=f"papro_etl/extract_{truncate_table}.sql",
> params={"schema_name": schema_name, "schema": schema},
> )
> (start >> truncate_staging >> LoadStagingTable
> >> compare_rows >> load_destination_table
> >> send_slack_message_success)
>

Reply via email to