bryan-alvarado-r opened a new issue, #32864: URL: https://github.com/apache/airflow/issues/32864
### Apache Airflow version Other Airflow 2 version (please specify below) ### What happened Airflow 2.5.3 doesn't show dags created by loop add the code. ` import os from datetime import datetime import sys from airflow import DAG from airflow.models import Variable from airflow.models.baseoperator import chain from airflow.operators.empty import EmptyOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator from rappiflow.operators import MultiStatementSnowflakeOperator from settings import get_default_args_opsgenie from airflow.decorators import dag from airflow.utils.dag_parsing_context import get_parsing_context queries_base_path = os.path.join(os.path.dirname(__file__), "queries") vertical = "FIN" # Vertical name process = "DNP_DRT" schema = "GLOBAL_FINANCES" area = "dataquality" process_name = "UE_GENERALS" # Dag Name owner_process = "[email protected]" # Owner Dag priority = "P1" url_notifications = "T2QSQ3L48/A019S72HQNA/317980517498631409/ZWvTeUVNmVxr8KA8b25F0utZ" url_validations = "T2QSQ3L48/A01A1KRA5U2/318123689142926435/kdQ3XZabfzENrFA4cYgRbFmL" countries = ["CO", "MX", "BR", "AR", "CL", "UY", "PE", "CR", "EC"] days_dnp_drt = 600 split_steps_int = 100 total_task_in_line_int = split_steps_int - 1 dag_id_lst = [] dag_lst_dct = {} dag_values_dct = {} settings = { "snowflake_conn_id": "fin_conn_snowflake_repro", "google_sheets_id": "conn_google_sheets", } default_args = get_default_args_opsgenie( vertical, url_notifications, priority, owner_process ) ENV = Variable.get("ENV", "") current_dag_id = get_parsing_context().dag_id print(get_parsing_context()) current_dag = None if len(sys.argv) > 3: print(sys.argv) current_dag = sys.argv[3] for ope in ["DNP", "DRT"]: dag_lst_dct[ope] = {} for country in countries: temp_lst = [] for i in range(days_dnp_drt, 1, -split_steps_int): dag_id_str = "FIN_UE_{}_{}_{}_{}".format( ope, country, str(i), str(i - total_task_in_line_int) ) dag_values_dct[dag_id_str] = { "start": i, "stop": i - total_task_in_line_int, } temp_lst.append(dag_id_str) dag_lst_dct[ope].update({country: temp_lst}) for ope in ["DNP", "DRT"]: for country in countries: for dag_id in dag_lst_dct[ope][country]: if current_dag is not None and current_dag != dag_id: continue @dag( dag_id=dag_id, schedule_interval=None, start_date=datetime(2020, 1, 14), max_active_runs=1, default_args=default_args, catchup=False, template_searchpath=queries_base_path, tags=["REPROCESO"], ) def create_dag_by_country(): start_task = EmptyOperator( task_id="START_TASK", trigger_rule="all_done" ) end_task = EmptyOperator(task_id="END_TASK", trigger_rule="all_done") ope_lst = [ MultiStatementSnowflakeOperator( task_id="UE_" + ope + "_" + country + "_" + str(day), sql="FIN_UE/FIN_UE_DNP_DRT/TBL_UE_{}.sql".format(ope), snowflake_conn_id=settings["snowflake_conn_id"], params={ "country": country, "db": ENV, "schema": "GLOBAL_FINANCES", "months_dnp_drt": day, }, trigger_rule="all_done", ) for day in range( dag_values_dct[dag_id]["start"], dag_values_dct[dag_id]["stop"] - 1, -1, ) ] chain(*ope_lst) start_task >> ope_lst[0] ope_lst[-1] >> end_task create_dag_by_country()` ### What you think should happen instead _No response_ ### How to reproduce Add the code and generate tasks ### Operating System Linux ### Versions of Apache Airflow Providers snowflake providers ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
