tatiana opened a new issue, #51644:
URL: https://github.com/apache/airflow/issues/51644

   ### Apache Airflow version
   
   3.0.2
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   The following DAG used to work in Airflow 2:
   ```
   from datetime import datetime
   
   from airflow import DAG
   from airflow.datasets import Dataset, DatasetAlias
   from airflow.operators.empty import EmptyOperator
   
   
   dataset = Dataset("something")
   
   
   
   with DAG("example_inactive_asset", start_date=datetime(2023, 4, 20)) as dag:
       do_nothing = EmptyOperator(
           task_id="start_task",
           outlets=[dataset]
       )
   ```
   
   However, in Airflow 3.0.2 it raises an exception if I try to run it using 
the `airflow dags test example_inactive_asset` command:
   
   ```
   │ /Users/tati/Library/Application                                            
                      │
   │ 
Support/hatch/env/virtual/astronomer-cosmos/ekzAVDNG/tests.py3.11-3.0-1.9/lib/python3.11/site-pa
 │
   │ ckages/airflow/sdk/execution_time/task_runner.py:792 in 
_validate_task_inlets_and_outlets        │
   │                                                                            
                      │
   │    789 │   if TYPE_CHECKING:                                               
                      │
   │    790 │   │   assert isinstance(inactive_assets_resp, 
InactiveAssetsResult)                     │
   │    791 │   if inactive_assets := inactive_assets_resp.inactive_assets:     
                      │
   │ ❱  792 │   │   raise AirflowInactiveAssetInInletOrOutletException(         
                      │
   │    793 │   │   │   inactive_asset_keys=[                                   
                      │
   │    794 │   │   │   │   AssetUniqueKey.from_profile(asset_profile) for 
asset_profile in inactive  │
   │    795 │   │   │   ]                                                       
                      │
   │                                                                            
                      │
   │ ╭─────────────────────────────────────────── locals 
───────────────────────────────────────────╮ │
   │ │      inactive_assets = [AssetProfile(name='something', uri='something', 
type='Asset')]       │ │
   │ │ inactive_assets_resp = InactiveAssetsResult(                             
                    │ │
   │ │                        │   inactive_assets=[                             
                    │ │
   │ │                        │   │   AssetProfile(                             
                    │ │
   │ │                        │   │   │   name='something',                     
                    │ │
   │ │                        │   │   │   uri='something',                      
                    │ │
   │ │                        │   │   │   type='Asset'                          
                    │ │
   │ │                        │   │   )                                         
                    │ │
   │ │                        │   ],                                            
                    │ │
   │ │                        │   type='InactiveAssetsResult'                   
                    │ │
   │ │                        )                                                 
                    │ │
   │ │                  log = <BoundLoggerLazyProxy(logger=None, 
wrapper_class=None,                │ │
   │ │                        processors=None, context_class=None, 
initial_values={'logger_name':   │ │
   │ │                        'task'}, logger_factory_args=())>                 
                    │ │
   │ │                   ti = <RuntimeTaskInstance                              
                    │ │
   │ │                        │   
id=UUID('0197637d-cb2d-7d46-b079-6585b7ee853e')                   │ │
   │ │                        │   task_id='start_task'                          
                    │ │
   │ │                        │   dag_id='example_inactive_asset'               
                    │ │
   │ │                        │   
run_id='manual__2025-06-12T09:34:37.857251+00:00'                 │ │
   │ │                        │   max_tries=0                                   
                    │ │
   │ │                        │   task=<class                                   
                    │ │
   │ │                        
'airflow.providers.standard.operators.empty.EmptyOperator'>           │ │
   │ │                        │   start_date=datetime.datetime(2025, 6, 12, 9, 
34, 38, 74867,       │ │
   │ │                        tzinfo=datetime.timezone.utc)                     
                    │ │
   │ │                        >                                                 
                    │ │
   │ 
╰──────────────────────────────────────────────────────────────────────────────────────────────╯
 │
   
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
   AirflowInactiveAssetInInletOrOutletException: Task has the following 
inactive assets in its inlets 
   or outlets: Asset(name='something', uri='something')
   
   2025-06-12 10:34:38 [debug    ] Sending request                [task] 
msg=TaskState(state=<TaskInstanceState.FAILED: 'failed'>, 
end_date=datetime.datetime(2025, 6, 12, 9, 34, 38, 630590, 
tzinfo=datetime.timezone.utc), type='TaskState', rendered_map_index=None)
   2025-06-12 10:34:38 [debug    ] Received message from task runner [task] 
msg=TaskState(state=<TaskInstanceState.FAILED: 'failed'>, 
end_date=datetime.datetime(2025, 6, 12, 9, 34, 38, 630590, 
tzinfo=datetime.timezone.utc), type='TaskState', rendered_map_index=None)
   2025-06-12 10:34:38 [debug    ] Running finalizers             [task] 
ti=RuntimeTaskInstance(id=UUID('0197637d-cb2d-7d46-b079-6585b7ee853e'), 
task_id='start_task', dag_id='example_inactive_asset', 
run_id='manual__2025-06-12T09:34:37.857251+00:00', try_number=1, map_index=-1, 
hostname='alchueyr', context_carrier=None, task=<Task(EmptyOperator): 
start_task>, max_tries=0, start_date=datetime.datetime(2025, 6, 12, 9, 34, 38, 
74867, tzinfo=datetime.timezone.utc), end_date=None, 
state=<TaskInstanceState.FAILED: 'failed'>, is_mapped=False, 
rendered_map_index=None)
   2025-06-12 10:34:38 [debug    ] Updating task instance state   
new_state=failed ti_id=0197637d-cb2d-7d46-b079-6585b7ee853e
   2025-06-12 10:34:38 [debug    ] Retrieved current task instance state 
max_tries=0 previous_state=running ti_id=0197637d-cb2d-7d46-b079-6585b7ee853e 
try_number=1
   2025-06-12 10:34:38 [info     ] Task instance state updated    
new_state=failed rows_affected=1 ti_id=0197637d-cb2d-7d46-b079-6585b7ee853e
   [2025-06-12T10:34:38.641+0100] {_client.py:1026} INFO - HTTP Request: PATCH 
http://in-process.invalid/./task-instances/0197637d-cb2d-7d46-b079-6585b7ee853e/state
 "HTTP/1.1 204 No Content"
   [2025-06-12T10:34:38.646+0100] {dagrun.py:1147} INFO - Marking run <DagRun 
example_inactive_asset @ 2025-06-12 09:34:37.691328+00:00: 
manual__2025-06-12T09:34:37.857251+00:00, state:running, queued_at: None. 
run_type: manual> failed
   [2025-06-12T10:34:38.646+0100] {dagrun.py:1226} INFO - DagRun Finished: 
dag_id=example_inactive_asset, logical_date=2025-06-12 09:34:37.691328+00:00, 
run_id=manual__2025-06-12T09:34:37.857251+00:00, run_start_date=2025-06-12 
09:34:37.691328+00:00, run_end_date=2025-06-12 09:34:38.646873+00:00, 
run_duration=0.955545, state=failed, run_type=manual, 
data_interval_start=2025-06-12 09:34:37.691328+00:00, 
data_interval_end=2025-06-12 09:34:37.691328+00:00,
   DagRun failed
   ```
   
   If I try to run the same DAG using `airflow standalone`, it works:
   <img width="812" alt="Image" 
src="https://github.com/user-attachments/assets/1bf5a722-9f5f-49df-a5ed-4566bef3b91c";
 />
   
   
   
   ### What you think should happen instead?
   
   Airflow 3 `dag.test()` should have a consistent behaviour with `standalone`, 
and suceed when testing this DAG.
   
   ### How to reproduce
   
   Described previously
   
   ### Operating System
   
   N/A
   
   ### Versions of Apache Airflow Providers
   
   N/A
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   N/A
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to