diogosilva30 commented on PR #65943:
URL: https://github.com/apache/airflow/pull/65943#issuecomment-4386327199

   @jscheffl yes, this is intentional. We ship DAG factories and reusable 
operators as modules inside the `plugins/` folder so multiple DAGs can be 
instantiated from the same logic without duplication.
   
   **Pattern overview:**
   
   ```
   plugins/
   └── common/
       └── operators/
           └── example_dag_factory.py   ← shared factory + tasks
   dags/
   └── prod/
       └── my_dag.py                    ← thin wrapper that calls the factory
   ```
   
   **`plugins/common/operators/example_dag_factory.py`** (shared logic):
   
   ```python
   """Reusable DAG factory for fetching and exporting metrics."""
   
   from datetime import timedelta
   from airflow.sdk import task
   
   
   @task
   def fetch_data(source: str) -> list[dict]:
       """Fetch records from a data source."""
       # ... implementation ...
       return []
   
   
   @task
   def export_metrics(data: list[dict], conn_id: str) -> None:
       """Export metrics via an external connection."""
       # ... implementation ...
   
   
   def metrics_dag_definition(source: str, conn_id: str) -> None:
       """Wire up the DAG tasks."""
       data = fetch_data(source=source)
       export_metrics(data=data, conn_id=conn_id)
   
   
   metrics_dag_kwargs = {
       "schedule": timedelta(minutes=5),
       "tags": ["metrics"],
   }
   ```
   
   **`dags/prod/my_dag.py`** (thin DAG file):
   
   ```python
   """DAG for exporting prod metrics."""
   
   import functools
   from common import create_dag
   from common.operators.example_dag_factory import metrics_dag_definition, 
metrics_dag_kwargs
   
   create_dag(
       dag_name="prod_metrics",
       dag_definition=functools.partial(
           metrics_dag_definition,
           source="prod",
           conn_id="metrics_conn_prod",
       ),
       dag_file=__file__,
       **metrics_dag_kwargs,
   )
   ```
   
   The DAG file itself is essentially a one-liner — all the task logic lives in 
the shared plugin module. Because the worker needs to execute the tasks defined 
in that module, it must be able to import from `plugins/`, which is why loading 
plugins on the worker is required for this pattern to work.
   
   Regarding the `core.execute_tasks_new_python_interpreter` proposal — that 
sounds like a clean approach and aligns well with the existing executor 
pattern. I'll work on updating the PR to honour that flag rather than 
hard-switching.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to