vatsrahul1001 opened a new pull request, #64659:
URL: https://github.com/apache/airflow/pull/64659

   ## Summary
   Guard against `watcher.trigger` being `None` in 
`add_asset_trigger_references` when filtering
   watchers during the removal phase.
   This can happen when `Trigger.clean_unused()` (in the triggerer) deletes a 
trigger row between
   parsing loops, leaving an `AssetWatcherModel` with a dangling reference. The 
SQLAlchemy
   relationship resolves to `None`, causing `AttributeError: 'NoneType' object 
has no attribute 'classpath'`
   and crashing the dag processor.
   Watchers with deleted triggers are correctly dropped by the filter since 
they're orphaned.
   
   ```
   
   Traceback (most recent call last):
     File "/usr/python/bin/airflow", line 10, in <module>
       sys.exit(main())
     File "/opt/airflow/airflow-core/src/airflow/__main__.py", line 55, in main
       args.func(args)
     File "/opt/airflow/airflow-core/src/airflow/cli/cli_config.py", line 49, 
in command
       return func(*args, **kwargs)
     File "/opt/airflow/airflow-core/src/airflow/utils/memray_utils.py", line 
59, in wrapper
       return func(*args, **kwargs)
     File "/opt/airflow/airflow-core/src/airflow/utils/cli.py", line 113, in 
wrapper
       return f(*args, **kwargs)
     File 
"/opt/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py",
 line 54, in wrapped_function
       return func(*args, **kwargs)
     File 
"/opt/airflow/airflow-core/src/airflow/cli/commands/dag_processor_command.py", 
line 64, in dag_processor
       run_command_with_daemon_option(
     File "/opt/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py", 
line 86, in run_command_with_daemon_option
       callback()
     File 
"/opt/airflow/airflow-core/src/airflow/cli/commands/dag_processor_command.py", 
line 67, in <lambda>
       callback=lambda: run_job(job=job_runner.job, 
execute_callable=job_runner._execute),
     File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 100, 
in wrapper
       return func(*args, session=session, **kwargs)  # type: ignore[arg-type]
     File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 355, in 
run_job
       return execute_job(job, execute_callable=execute_callable)
     File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 384, in 
execute_job
       ret = execute_callable()
     File 
"/opt/airflow/airflow-core/src/airflow/jobs/dag_processor_job_runner.py", line 
61, in _execute
       self.processor.run()
     File "/opt/airflow/airflow-core/src/airflow/dag_processing/manager.py", 
line 334, in run
       return self._run_parsing_loop()
     File "/opt/airflow/airflow-core/src/airflow/dag_processing/manager.py", 
line 441, in _run_parsing_loop
       self._collect_results()
     File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 100, 
in wrapper
       return func(*args, session=session, **kwargs)  # type: ignore[arg-type]
     File "/opt/airflow/airflow-core/src/airflow/dag_processing/manager.py", 
line 948, in _collect_results
       self._file_stats[file] = process_parse_results(
     File "/opt/airflow/airflow-core/src/airflow/dag_processing/manager.py", 
line 1347, in process_parse_results
       update_dag_parsing_results_in_db(
     File "/opt/airflow/airflow-core/src/airflow/dag_processing/collection.py", 
line 463, in update_dag_parsing_results_in_db
       for attempt in run_with_db_retries(logger=log):
     File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 
438, in __iter__
       do = self.iter(retry_state=retry_state)
     File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 
371, in iter
       result = action(retry_state)
     File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 
393, in <lambda>
       self._add_action_func(lambda rs: rs.outcome.result())
     File "/usr/python/lib/python3.10/concurrent/futures/_base.py", line 451, 
in result
       return self.__get_result()
     File "/usr/python/lib/python3.10/concurrent/futures/_base.py", line 403, 
in __get_result
       raise self._exception
     File "/opt/airflow/airflow-core/src/airflow/dag_processing/collection.py", 
line 473, in update_dag_parsing_results_in_db
       SerializedDAG.bulk_write_to_db(
     File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 98, in 
wrapper
       return func(*args, **kwargs)
     File 
"/opt/airflow/airflow-core/src/airflow/serialization/definitions/dag.py", line 
221, in bulk_write_to_db
       asset_op.add_asset_trigger_references(orm_assets, session=session)
     File "/opt/airflow/airflow-core/src/airflow/dag_processing/collection.py", 
line 1101, in add_asset_trigger_references
       asset_model.watchers = [
     File "/opt/airflow/airflow-core/src/airflow/dag_processing/collection.py", 
line 1104, in <listcomp>
       if BaseEventTrigger.hash(watcher.trigger.classpath, 
watcher.trigger.kwargs)
   AttributeError: 'NoneType' object has no attribute 'classpath'
   ```
   
   - [X] Yes (please specify the tool below)
   
   <!--
   Generated-by: [Tool Name] following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   -->
   
   ---
   
   * Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information. Note: commit author/co-author name and email in commits 
become permanently public when merged.
   * For fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   * When adding dependency, check compliance with the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   * For significant user-facing changes create newsfragment: 
`{pr_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
 You can add this file in a follow-up commit after the PR is created so you 
know the PR number.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to