emredjan opened a new issue, #26599: URL: https://github.com/apache/airflow/issues/26599
### Apache Airflow version 2.4.0 ### What happened On version 2.4.0, in DAGs with simple taskflow based tasks (nothing dynamic), I was getting the warnings about "Dependency already registered for DAG", that weren't giving warnings prior to 2.4.0, and I wanted to test with a simpler one. I copied the exact dag from the taskflow tutorial documentation, the simple extract-transform-load example located here: https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html#example-taskflow-api-pipeline ```python import json import pendulum from airflow.decorators import dag, task @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example'], ) def tutorial_taskflow_api(): """ ### TaskFlow API Tutorial Documentation This is a simple data pipeline example which demonstrates the use of the TaskFlow API using three simple tasks for Extract, Transform, and Load. Documentation that goes along with the Airflow TaskFlow API tutorial is located [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html) """ @task() def extract(): """ #### Extract task A simple Extract task to get data ready for the rest of the data pipeline. In this case, getting data is simulated by reading from a hardcoded JSON string. """ data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' order_data_dict = json.loads(data_string) return order_data_dict @task(multiple_outputs=True) def transform(order_data_dict: dict): """ #### Transform task A simple Transform task which takes in the collection of order data and computes the total order value. """ total_order_value = 0 for value in order_data_dict.values(): total_order_value += value return {"total_order_value": total_order_value} @task() def load(total_order_value: float): """ #### Load task A simple Load task which takes in the result of the Transform task and instead of saving it to end user review, just prints it out. """ print(f"Total order value is: {total_order_value:.2f}") order_data = extract() order_summary = transform(order_data) load(order_summary["total_order_value"]) tutorial_taskflow_api() ``` Running these gives these warnings, even though no complex dependencies, no dynamic task generation exists: Log from the last step: ``` *** Reading local file: /data/apps/airflow//logs/dag_id=tutorial_taskflow_api/run_id=manual__2022-09-22T14:08:19.967374+00:00/task_id=load/attempt=1.log [2022-09-22, 16:08:23 CEST] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: tutorial_taskflow_api.load manual__2022-09-22T14:08:19.967374+00:00 [queued]> [2022-09-22, 16:08:23 CEST] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: tutorial_taskflow_api.load manual__2022-09-22T14:08:19.967374+00:00 [queued]> [2022-09-22, 16:08:23 CEST] {taskinstance.py:1362} INFO - -------------------------------------------------------------------------------- [2022-09-22, 16:08:23 CEST] {taskinstance.py:1363} INFO - Starting attempt 1 of 1 [2022-09-22, 16:08:23 CEST] {taskinstance.py:1364} INFO - -------------------------------------------------------------------------------- [2022-09-22, 16:08:23 CEST] {taskinstance.py:1383} INFO - Executing <Task(_PythonDecoratedOperator): load> on 2022-09-22 14:08:19.967374+00:00 [2022-09-22, 16:08:23 CEST] {standard_task_runner.py:54} INFO - Started process 3995388 to run task [2022-09-22, 16:08:23 CEST] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'tutorial_taskflow_api', 'load', 'manual__2022-09-22T14:08:19.967374+00:00', '--job-id', '1185', '--raw', '--subdir', 'DAGS_FOLDER/dwh/tutorial_taskflow_api.py', '--cfg-path', '/tmp/tmps2oxurlk'] [2022-09-22, 16:08:23 CEST] {standard_task_runner.py:83} INFO - Job 1185: Subtask load [2022-09-22, 16:08:23 CEST] {dagbag.py:525} INFO - Filling up the DagBag from /data/apps/airflow/dags/dwh/tutorial_taskflow_api.py [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): extract>, transform already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, extract already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): extract>, transform already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, extract already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): extract>, transform already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, extract already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): extract>, transform already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, extract already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): extract>, transform already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, extract already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, load already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): load>, transform already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, load already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): load>, transform already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, load already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): load>, transform already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, load already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): load>, transform already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, load already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): load>, transform already registered for DAG: tutorial_taskflow_api [2022-09-22, 16:08:23 CEST] {task_command.py:384} INFO - Running <TaskInstance: tutorial_taskflow_api.load manual__2022-09-22T14:08:19.967374+00:00 [running]> on host svgbiappp045.gbi.int [2022-09-22, 16:08:23 CEST] {warnings.py:109} WARNING - /data/apps/.pyenv/versions/3.10.5/envs/airflow-py310/lib/python3.10/site-packages/airflow/models/renderedtifields.py:258: SAWarning: Coercing Subquery object into a select() for use in IN(); please pass a select() construct explicitly tuple_(cls.dag_id, cls.task_id, cls.run_id).notin_(subq2), [2022-09-22, 16:08:23 CEST] {taskinstance.py:1590} INFO - Exporting the following env vars: AIRFLOW_CTX_DAG_OWNER=airflow AIRFLOW_CTX_DAG_ID=tutorial_taskflow_api AIRFLOW_CTX_TASK_ID=load AIRFLOW_CTX_EXECUTION_DATE=2022-09-22T14:08:19.967374+00:00 AIRFLOW_CTX_TRY_NUMBER=1 AIRFLOW_CTX_DAG_RUN_ID=manual__2022-09-22T14:08:19.967374+00:00 [2022-09-22, 16:08:23 CEST] {logging_mixin.py:117} INFO - Total order value is: 1236.70 [2022-09-22, 16:08:23 CEST] {python.py:177} INFO - Done. Returned value was: None [2022-09-22, 16:08:23 CEST] {taskinstance.py:1401} INFO - Marking task as SUCCESS. dag_id=tutorial_taskflow_api, task_id=load, execution_date=20220922T140819, start_date=20220922T140823, end_date=20220922T140823 [2022-09-22, 16:08:23 CEST] {local_task_job.py:164} INFO - Task exited with return code 0 [2022-09-22, 16:08:23 CEST] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check ``` ### What you think should happen instead These tasks should run without warnings about dependencies being already registered. ### How to reproduce Copy the tutorial taskflow DAG and run it on 2.4.0. ### Operating System Red Hat Enterprise Linux 8.6 (Ootpa) ### Versions of Apache Airflow Providers _No response_ ### Deployment Virtualenv installation ### Deployment details CeleryExecutor with rabbitmq, 1 main machine for webserver/scheduler and 1 additional worker node ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
