emredjan opened a new issue, #26599:
URL: https://github.com/apache/airflow/issues/26599

   ### Apache Airflow version
   
   2.4.0
   
   ### What happened
   
   On version 2.4.0, in DAGs with simple taskflow based tasks (nothing 
dynamic), I was getting the warnings about "Dependency already registered for 
DAG", that weren't giving warnings prior to 2.4.0, and I wanted to test with a 
simpler one. I copied the exact dag from the taskflow tutorial documentation, 
the simple extract-transform-load example located here: 
https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html#example-taskflow-api-pipeline
   
   ```python
   import json
   import pendulum
   
   from airflow.decorators import dag, task
   
   @dag(
       schedule=None,
       start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
       catchup=False,
       tags=['example'],
   )
   def tutorial_taskflow_api():
       """
       ### TaskFlow API Tutorial Documentation
       This is a simple data pipeline example which demonstrates the use of
       the TaskFlow API using three simple tasks for Extract, Transform, and 
Load.
       Documentation that goes along with the Airflow TaskFlow API tutorial is
       located
       
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
       """
   
       @task()
       def extract():
           """
           #### Extract task
           A simple Extract task to get data ready for the rest of the data
           pipeline. In this case, getting data is simulated by reading from a
           hardcoded JSON string.
           """
           data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
   
           order_data_dict = json.loads(data_string)
           return order_data_dict
   
       @task(multiple_outputs=True)
       def transform(order_data_dict: dict):
           """
           #### Transform task
           A simple Transform task which takes in the collection of order data 
and
           computes the total order value.
           """
           total_order_value = 0
   
           for value in order_data_dict.values():
               total_order_value += value
   
           return {"total_order_value": total_order_value}
   
       @task()
       def load(total_order_value: float):
           """
           #### Load task
           A simple Load task which takes in the result of the Transform task 
and
           instead of saving it to end user review, just prints it out.
           """
   
           print(f"Total order value is: {total_order_value:.2f}")
   
       order_data = extract()
       order_summary = transform(order_data)
       load(order_summary["total_order_value"])
   
   tutorial_taskflow_api()
   ```
   
   Running these gives these warnings, even though no complex dependencies, no 
dynamic task generation exists:
   
   Log from the last step:
   
   ```
   *** Reading local file: 
/data/apps/airflow//logs/dag_id=tutorial_taskflow_api/run_id=manual__2022-09-22T14:08:19.967374+00:00/task_id=load/attempt=1.log
   [2022-09-22, 16:08:23 CEST] {taskinstance.py:1165} INFO - Dependencies all 
met for <TaskInstance: tutorial_taskflow_api.load 
manual__2022-09-22T14:08:19.967374+00:00 [queued]>
   [2022-09-22, 16:08:23 CEST] {taskinstance.py:1165} INFO - Dependencies all 
met for <TaskInstance: tutorial_taskflow_api.load 
manual__2022-09-22T14:08:19.967374+00:00 [queued]>
   [2022-09-22, 16:08:23 CEST] {taskinstance.py:1362} INFO - 
   
--------------------------------------------------------------------------------
   [2022-09-22, 16:08:23 CEST] {taskinstance.py:1363} INFO - Starting attempt 1 
of 1
   [2022-09-22, 16:08:23 CEST] {taskinstance.py:1364} INFO - 
   
--------------------------------------------------------------------------------
   [2022-09-22, 16:08:23 CEST] {taskinstance.py:1383} INFO - Executing 
<Task(_PythonDecoratedOperator): load> on 2022-09-22 14:08:19.967374+00:00
   [2022-09-22, 16:08:23 CEST] {standard_task_runner.py:54} INFO - Started 
process 3995388 to run task
   [2022-09-22, 16:08:23 CEST] {standard_task_runner.py:82} INFO - Running: 
['airflow', 'tasks', 'run', 'tutorial_taskflow_api', 'load', 
'manual__2022-09-22T14:08:19.967374+00:00', '--job-id', '1185', '--raw', 
'--subdir', 'DAGS_FOLDER/dwh/tutorial_taskflow_api.py', '--cfg-path', 
'/tmp/tmps2oxurlk']
   [2022-09-22, 16:08:23 CEST] {standard_task_runner.py:83} INFO - Job 1185: 
Subtask load
   [2022-09-22, 16:08:23 CEST] {dagbag.py:525} INFO - Filling up the DagBag 
from /data/apps/airflow/dags/dwh/tutorial_taskflow_api.py
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): extract>, transform already registered for 
DAG: tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): transform>, extract already registered for 
DAG: tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): extract>, transform already registered for 
DAG: tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): transform>, extract already registered for 
DAG: tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): extract>, transform already registered for 
DAG: tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): transform>, extract already registered for 
DAG: tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): extract>, transform already registered for 
DAG: tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): transform>, extract already registered for 
DAG: tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): extract>, transform already registered for 
DAG: tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): transform>, extract already registered for 
DAG: tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): transform>, load already registered for DAG: 
tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): load>, transform already registered for DAG: 
tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): transform>, load already registered for DAG: 
tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): load>, transform already registered for DAG: 
tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): transform>, load already registered for DAG: 
tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): load>, transform already registered for DAG: 
tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): transform>, load already registered for DAG: 
tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): load>, transform already registered for DAG: 
tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): transform>, load already registered for DAG: 
tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency 
<Task(_PythonDecoratedOperator): load>, transform already registered for DAG: 
tutorial_taskflow_api
   [2022-09-22, 16:08:23 CEST] {task_command.py:384} INFO - Running 
<TaskInstance: tutorial_taskflow_api.load 
manual__2022-09-22T14:08:19.967374+00:00 [running]> on host svgbiappp045.gbi.int
   [2022-09-22, 16:08:23 CEST] {warnings.py:109} WARNING - 
/data/apps/.pyenv/versions/3.10.5/envs/airflow-py310/lib/python3.10/site-packages/airflow/models/renderedtifields.py:258:
 SAWarning: Coercing Subquery object into a select() for use in IN(); please 
pass a select() construct explicitly
     tuple_(cls.dag_id, cls.task_id, cls.run_id).notin_(subq2),
   
   [2022-09-22, 16:08:23 CEST] {taskinstance.py:1590} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=tutorial_taskflow_api
   AIRFLOW_CTX_TASK_ID=load
   AIRFLOW_CTX_EXECUTION_DATE=2022-09-22T14:08:19.967374+00:00
   AIRFLOW_CTX_TRY_NUMBER=1
   AIRFLOW_CTX_DAG_RUN_ID=manual__2022-09-22T14:08:19.967374+00:00
   [2022-09-22, 16:08:23 CEST] {logging_mixin.py:117} INFO - Total order value 
is: 1236.70
   [2022-09-22, 16:08:23 CEST] {python.py:177} INFO - Done. Returned value was: 
None
   [2022-09-22, 16:08:23 CEST] {taskinstance.py:1401} INFO - Marking task as 
SUCCESS. dag_id=tutorial_taskflow_api, task_id=load, 
execution_date=20220922T140819, start_date=20220922T140823, 
end_date=20220922T140823
   [2022-09-22, 16:08:23 CEST] {local_task_job.py:164} INFO - Task exited with 
return code 0
   [2022-09-22, 16:08:23 CEST] {local_task_job.py:273} INFO - 0 downstream 
tasks scheduled from follow-on schedule check
   ```
   
   ### What you think should happen instead
   
   These tasks should run without warnings about dependencies being already 
registered.
   
   ### How to reproduce
   
   Copy the tutorial taskflow DAG and run it on 2.4.0.
   
   ### Operating System
   
   Red Hat Enterprise Linux 8.6 (Ootpa)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   CeleryExecutor with rabbitmq, 1 main machine for webserver/scheduler and 1 
additional worker node
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to