GitHub user Yuvraj-Dhepe closed a discussion: DAG Tasks Creation
```python
from airflow.utils.task_group import TaskGroup
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta
import sys
def foo():
func_name = sys._getframe().f_code.co_name
python_task = PythonOperator(
task_id=func_name, python_callable=run_task, op_args=[func_name]
)
return python_task
def fun():
func_name = sys._getframe().f_code.co_name
python_task = PythonOperator(
task_id=func_name, python_callable=run_task, op_args=[func_name]
)
return python_task
class Foo:
def class_foo(self, dag):
func_name = sys._getframe().f_code.co_name
with TaskGroup(f"{func_name}", dag=dag) as task:
python_task = PythonOperator(
task_id=func_name, python_callable=run_task, op_args=[func_name]
)
# python_task
return task
class Fun:
def class_fun(self, dag):
func_name = sys._getframe().f_code.co_name
python_task = PythonOperator(
task_id=func_name, python_callable=run_task, op_args=[func_name]
)
# python_task
return python_task
def run_task(x):
print(f"Hello {x}")
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime.now(),
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
}
# NOTE: Runs the dag, but even if t2 is not in the dag dependency still it
shows up in the dashboard, I couldn't understand the reason for it.
# with DAG(
# dag_id="dag_sample",
# default_args=default_args,
# description="A simple Airflow DAG",
# schedule=timedelta(days=1), # Explicit schedule
# catchup=False,
# tags=["D1"],
# ) as dag:
# start = EmptyOperator(task_id="start")
# t1 = foo()
# t2 = fun()
# python_task = PythonOperator(
# task_id="in_dag_python_task",
# python_callable=run_task,
# )
# end = EmptyOperator(task_id="end")
# start >> t1 >> python_task >> end
# NOTE:
# NOTE: Runs the dag, but even if t2 is not in the dag dependency still it
shows up in the dashboard, I couldn't understand the reason for it.
with DAG(
dag_id="dag_sample",
default_args=default_args,
description="A simple Airflow DAG",
schedule=timedelta(days=1), # Explicit schedule
catchup=False,
tags=["D1"],
) as dag:
start = EmptyOperator(task_id="start")
t1 = Foo().class_foo(dag)
t2 = Fun().class_fun(dag)
python_task = PythonOperator(
task_id="python_task", python_callable=run_task,
op_args=["in_dag_python_task"]
)
end = EmptyOperator(task_id="end")
start >> t1 >> python_task >> end
```
Dear Team,
This is the minimal way, I can showcase my problem, here even if t2 is never a
dag dependency it still shows up in dashboard.
By minimal, I meant in my use case, which is more complex has a similar
scenario:
a) I have a dag blueprint that adds some core task groups run for every dag
created by users.
b) My blueprint consists of a dag having multiple SUPER task groups lined to
one another. Each alternating SUPER task group are similar in what
functionality they perform. Each SUPER task group is a collection of:
- user_task groups (user functionalities)
- system_task groups (blueprint system functionalities, which change
depending on context)
- some nominal tasks (stay as is, no matter the context)
c) I am facing problems with how the tasks inside the system_task groups are
spawning inside the dag. Currently system_task groups are created by `methods`
of `instances` of SystemTasks class, depending on context.
d) These functions are called by main dag, to get system tasks to be run
alongside user tasks. In the current approach, dag is created by user,
user_task_group is an input to my blueprint, where I simply chain
user_task_group with blueprints system_task_groups.
e) The only problem I have is in the last SUPER task group, where the
system_task group is spawning in the very beginning as a branch parallel to
user_task group.
The code logic, I have gone through several times, but don't see a reason for
this outlier branching, because it doesn't happen in the previous SUPER task
group. So why only in the last one?
Hence I created the above dummy python script, where it seems even if I don't
use a task inside the dependency, but it still shows inside the dag. There is
no weird branching or linkage for the above simple python script like the one I
got in my complex scenario, however, I still wanted to know is this intended or
so?
GitHub link: https://github.com/apache/airflow/discussions/49502
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]