bbovenzi commented on PR #64271:
URL: https://github.com/apache/airflow/pull/64271#issuecomment-4201934777

   Finally, since I am talking about traversing the nested NodeResponse[] many 
times. I would like to make sure we test this against a dag with 1000+ tasks 
with a bunch of groups too.
   
   
   Here's an example:
   
   
   ```
   """Large DAG with 1000 tasks for testing grid and graph views."""
   from datetime import datetime
   
   from airflow.models.dag import DAG
   from airflow.operators.bash import BashOperator
   from airflow.providers.standard.operators.empty import EmptyOperator
   from airflow.utils.task_group import TaskGroup
   
   
   with DAG(
       dag_id="large_dag_1000_tasks",
       start_date=datetime(2025, 1, 1),
       catchup=False,
       schedule="@daily",
       tags=["testing", "large", "performance"],
   ) as dag:
       start = EmptyOperator(task_id="start")
       end = EmptyOperator(task_id="end")
   
       # Group 1: Data Ingestion (200 tasks)
       with TaskGroup("data_ingestion", tooltip="Data ingestion tasks") as 
ingestion_group:
           ingestion_tasks = []
           for i in range(200):
               task = EmptyOperator(task_id=f"ingest_{i:03d}")
               ingestion_tasks.append(task)
           
           # Chain tasks in batches of 10
           for i in range(0, len(ingestion_tasks) - 10, 10):
               ingestion_tasks[i] >> ingestion_tasks[i + 10]
   
       # Group 2: Data Processing (300 tasks with nested groups)
       with TaskGroup("data_processing", tooltip="Data processing tasks") as 
processing_group:
           
           # Subgroup: Validation (100 tasks)
           with TaskGroup("validation", tooltip="Validation tasks") as 
validation_group:
               validation_tasks = []
               for i in range(100):
                   task = EmptyOperator(task_id=f"validate_{i:03d}")
                   validation_tasks.append(task)
               # Fan out pattern
               validation_tasks[0] >> validation_tasks[1:50]
               for t in validation_tasks[1:50]:
                   t >> validation_tasks[50]
   
           # Subgroup: Transformation (100 tasks)
           with TaskGroup("transformation", tooltip="Transformation tasks") as 
transform_group:
               transform_tasks = []
               for i in range(100):
                   task = BashOperator(
                       task_id=f"transform_{i:03d}",
                       bash_command=f"echo 'Transforming batch {i}'"
                   )
                   transform_tasks.append(task)
               # Linear chain
               for i in range(len(transform_tasks) - 1):
                   transform_tasks[i] >> transform_tasks[i + 1]
   
           # Subgroup: Enrichment (100 tasks)
           with TaskGroup("enrichment", tooltip="Enrichment tasks") as 
enrichment_group:
               enrichment_tasks = []
               for i in range(100):
                   task = EmptyOperator(task_id=f"enrich_{i:03d}")
                   enrichment_tasks.append(task)
               # Parallel execution
               pass  # All run in parallel
   
           validation_group >> transform_group >> enrichment_group
   
       # Group 3: Analytics (250 tasks with deeply nested groups)
       with TaskGroup("analytics", tooltip="Analytics tasks") as 
analytics_group:
           
           with TaskGroup("metrics", tooltip="Metrics calculation") as 
metrics_group:
               with TaskGroup("daily_metrics", tooltip="Daily metrics") as 
daily_metrics:
                   daily_tasks = 
[EmptyOperator(task_id=f"daily_metric_{i:03d}") for i in range(50)]
                   daily_tasks[0] >> daily_tasks[1:]
               
               with TaskGroup("weekly_metrics", tooltip="Weekly metrics") as 
weekly_metrics:
                   weekly_tasks = 
[EmptyOperator(task_id=f"weekly_metric_{i:03d}") for i in range(50)]
                   weekly_tasks[0] >> weekly_tasks[1:]
               
               daily_metrics >> weekly_metrics
   
           with TaskGroup("reports", tooltip="Report generation") as 
reports_group:
               report_tasks = []
               for i in range(150):
                   task = EmptyOperator(task_id=f"report_{i:03d}")
                   report_tasks.append(task)
               # Diamond pattern
               report_tasks[0] >> report_tasks[1:75]
               for t in report_tasks[1:75]:
                   t >> report_tasks[75:150]
   
           metrics_group >> reports_group
   
       # Group 4: Export (200 tasks)
       with TaskGroup("export", tooltip="Data export tasks") as export_group:
           export_tasks = []
           for i in range(200):
               task = EmptyOperator(task_id=f"export_{i:03d}")
               export_tasks.append(task)
           # Fan-in pattern
           export_tasks[0:100] >> export_tasks[100]
           export_tasks[100] >> export_tasks[101:200]
   
       # Group 5: Cleanup (50 tasks)
       with TaskGroup("cleanup", tooltip="Cleanup tasks") as cleanup_group:
           cleanup_tasks = [EmptyOperator(task_id=f"cleanup_{i:03d}") for i in 
range(50)]
           # All parallel
   
       # Wire up the main flow
       start >> ingestion_group >> processing_group >> analytics_group >> 
export_group >> cleanup_group >> end
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to