bryan-alvarado-r opened a new issue, #32864:
URL: https://github.com/apache/airflow/issues/32864

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   Airflow 2.5.3 doesn't show dags created by loop add the code.
   ` import os
   from datetime import datetime
   import sys
   
   from airflow import DAG
   from airflow.models import Variable
   from airflow.models.baseoperator import chain
   from airflow.operators.empty import EmptyOperator
   from airflow.operators.trigger_dagrun import TriggerDagRunOperator
   from rappiflow.operators import MultiStatementSnowflakeOperator
   from settings import get_default_args_opsgenie
   from airflow.decorators import dag
   from airflow.utils.dag_parsing_context import get_parsing_context
   
   queries_base_path = os.path.join(os.path.dirname(__file__), "queries")
   
   vertical = "FIN"  # Vertical name
   process = "DNP_DRT"
   schema = "GLOBAL_FINANCES"
   area = "dataquality"
   process_name = "UE_GENERALS"  # Dag Name
   owner_process = "[email protected]"  # Owner Dag
   priority = "P1"
   url_notifications = 
"T2QSQ3L48/A019S72HQNA/317980517498631409/ZWvTeUVNmVxr8KA8b25F0utZ"
   url_validations = 
"T2QSQ3L48/A01A1KRA5U2/318123689142926435/kdQ3XZabfzENrFA4cYgRbFmL"
   
   countries = ["CO", "MX", "BR", "AR", "CL", "UY", "PE", "CR", "EC"]
   
   days_dnp_drt = 600
   split_steps_int = 100
   total_task_in_line_int = split_steps_int - 1
   dag_id_lst = []
   dag_lst_dct = {}
   dag_values_dct = {}
   
   settings = {
       "snowflake_conn_id": "fin_conn_snowflake_repro",
       "google_sheets_id": "conn_google_sheets",
   }
   
   default_args = get_default_args_opsgenie(
       vertical, url_notifications, priority, owner_process
   )
   
   ENV = Variable.get("ENV", "")
   
   current_dag_id = get_parsing_context().dag_id
   print(get_parsing_context())
   current_dag = None
   if len(sys.argv) > 3:
       print(sys.argv)
       current_dag = sys.argv[3]
   
   for ope in ["DNP", "DRT"]:
       dag_lst_dct[ope] = {}
       for country in countries:
           temp_lst = []
           for i in range(days_dnp_drt, 1, -split_steps_int):
               dag_id_str = "FIN_UE_{}_{}_{}_{}".format(
                   ope, country, str(i), str(i - total_task_in_line_int)
               )
               dag_values_dct[dag_id_str] = {
                   "start": i,
                   "stop": i - total_task_in_line_int,
               }
               temp_lst.append(dag_id_str)
           dag_lst_dct[ope].update({country: temp_lst})
   
   
   for ope in ["DNP", "DRT"]:
       for country in countries:
           for dag_id in dag_lst_dct[ope][country]:
               if current_dag is not None and current_dag != dag_id:
                   continue
               @dag(
                   dag_id=dag_id,
                   schedule_interval=None,
                   start_date=datetime(2020, 1, 14),
                   max_active_runs=1,
                   default_args=default_args,
                   catchup=False,
                   template_searchpath=queries_base_path,
                   tags=["REPROCESO"],
               )
               def create_dag_by_country():
                   start_task = EmptyOperator(
                       task_id="START_TASK", trigger_rule="all_done"
                   )
                   end_task = EmptyOperator(task_id="END_TASK", 
trigger_rule="all_done")
                   ope_lst = [
                       MultiStatementSnowflakeOperator(
                           task_id="UE_" + ope + "_" + country + "_" + str(day),
                           
sql="FIN_UE/FIN_UE_DNP_DRT/TBL_UE_{}.sql".format(ope),
                           snowflake_conn_id=settings["snowflake_conn_id"],
                           params={
                               "country": country,
                               "db": ENV,
                               "schema": "GLOBAL_FINANCES",
                               "months_dnp_drt": day,
                           },
                           trigger_rule="all_done",
                       )
                       for day in range(
                           dag_values_dct[dag_id]["start"],
                           dag_values_dct[dag_id]["stop"] - 1,
                           -1,
                       )
                   ]
                   chain(*ope_lst)
                   start_task >> ope_lst[0]
                   ope_lst[-1] >> end_task
   
               create_dag_by_country()`
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   Add the code and generate tasks
   
   ### Operating System
   
   Linux
   
   ### Versions of Apache Airflow Providers
   
   snowflake providers
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to