Re: [I] Failure during enqueuing DAGs with dynamic tasks [airflow]

2025-06-15 Thread via GitHub


kubmichael commented on issue #51575:
URL: https://github.com/apache/airflow/issues/51575#issuecomment-2973644986

   > Would you be able to share a snippet of the code that you're running to do 
this? Please feel free to strip out any proprietary information, I'm mostly 
interested in the DAG that's generating the value that's eventually passed to 
`.expand()`.
   
   Hey, this is a code of dag
   `@dag(
   "snapshot_scraper",
   schedule="0 7 * * *",
   default_args={"depends_on_past": False},
   start_date=START_DATE,
   catchup=False,
   )
   def snapshot_scraper_dag():
   t_start = EmptyOperator(task_id="start")
   
   @task_group
   def snapshot_scraper_tg(scraper_config: SnapshotScraperConfig):
   @task
   def scrape(config: SnapshotScraperConfig, **context):
   prev_start_date_success = context["prev_start_date_success"]
   logger.info(f"scraping snapshot {config.name}, 
{prev_start_date_success}")
   from utils import load_secret_env
   
   load_secret_env()
   
   run_helper.scrape_snapshot(config, 
_parse_date(prev_start_date_success))
   
   @task
   def parse(config: SnapshotScraperConfig, **context):
   prev_start_date_success = context["prev_start_date_success"]
   logger.info(f"parsing snapshot {config.name}, 
{prev_start_date_success}")
   from utils import load_secret_env
   
   load_secret_env()
   
   run_helper.parse_snapshot(config, 
_parse_date(prev_start_date_success))
   
   scrape(scraper_config) >> parse(scraper_config)  # type: ignore
   
   snapshot_scraper_tg_obj = snapshot_scraper_tg.expand(
   scraper_config=list(load_snapshot_configs())
   )
   
   t_end = EmptyOperator(task_id="end", trigger_rule=TriggerRule.ALL_DONE)
   
   t_start >> snapshot_scraper_tg_obj >> t_end
   
   
   snapshot_scraper_dag()
   `
   and 
   `
   def load_snapshot_configs():
   for domain in _load_yaml("./snapshots.yaml"):
   for space in domain.get("spaces", []):
   yield SnapshotScraperConfig(
   domain=domain["domain"],
   name=space["name"],
   space_id=space["space_id"],
   )
   `
   So, we have two dags that creates task dynamically, both have more than 16 
tasks, and when they scheduled at the same time airflow-dag-processor can't 
enqueue it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Failure during enqueuing DAGs with dynamic tasks [airflow]

2025-06-10 Thread via GitHub


jroachgolf84 commented on issue #51575:
URL: https://github.com/apache/airflow/issues/51575#issuecomment-2959698831

   Would you be able to share a snippet of the code that you're running to do 
this? Please feel free to strip out any proprietary information, I'm mostly 
interested in the DAG that's generating the value that's eventually passed to 
`.expand()`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org