kubmichael commented on issue #51575:
URL: https://github.com/apache/airflow/issues/51575#issuecomment-2973644986
> Would you be able to share a snippet of the code that you're running to do
this? Please feel free to strip out any proprietary information, I'm mostly
interested in the DAG that's generating the value that's eventually passed to
`.expand()`.
Hey, this is a code of dag
`@dag(
"snapshot_scraper",
schedule="0 7 * * *",
default_args={"depends_on_past": False},
start_date=START_DATE,
catchup=False,
)
def snapshot_scraper_dag():
t_start = EmptyOperator(task_id="start")
@task_group
def snapshot_scraper_tg(scraper_config: SnapshotScraperConfig):
@task
def scrape(config: SnapshotScraperConfig, **context):
prev_start_date_success = context["prev_start_date_success"]
logger.info(f"scraping snapshot {config.name},
{prev_start_date_success}")
from utils import load_secret_env
load_secret_env()
run_helper.scrape_snapshot(config,
_parse_date(prev_start_date_success))
@task
def parse(config: SnapshotScraperConfig, **context):
prev_start_date_success = context["prev_start_date_success"]
logger.info(f"parsing snapshot {config.name},
{prev_start_date_success}")
from utils import load_secret_env
load_secret_env()
run_helper.parse_snapshot(config,
_parse_date(prev_start_date_success))
scrape(scraper_config) >> parse(scraper_config) # type: ignore
snapshot_scraper_tg_obj = snapshot_scraper_tg.expand(
scraper_config=list(load_snapshot_configs())
)
t_end = EmptyOperator(task_id="end", trigger_rule=TriggerRule.ALL_DONE)
t_start >> snapshot_scraper_tg_obj >> t_end
snapshot_scraper_dag()
`
and
`
def load_snapshot_configs():
for domain in _load_yaml("./snapshots.yaml"):
for space in domain.get("spaces", []):
yield SnapshotScraperConfig(
domain=domain["domain"],
name=space["name"],
space_id=space["space_id"],
)
`
So, we have two dags that creates task dynamically, both have more than 16
tasks, and when they scheduled at the same time airflow-dag-processor can't
enqueue it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org