vatsrahul1001 opened a new pull request, #64659:
URL: https://github.com/apache/airflow/pull/64659
## Summary
Guard against `watcher.trigger` being `None` in
`add_asset_trigger_references` when filtering
watchers during the removal phase.
This can happen when `Trigger.clean_unused()` (in the triggerer) deletes a
trigger row between
parsing loops, leaving an `AssetWatcherModel` with a dangling reference. The
SQLAlchemy
relationship resolves to `None`, causing `AttributeError: 'NoneType' object
has no attribute 'classpath'`
and crashing the dag processor.
Watchers with deleted triggers are correctly dropped by the filter since
they're orphaned.
```
Traceback (most recent call last):
File "/usr/python/bin/airflow", line 10, in <module>
sys.exit(main())
File "/opt/airflow/airflow-core/src/airflow/__main__.py", line 55, in main
args.func(args)
File "/opt/airflow/airflow-core/src/airflow/cli/cli_config.py", line 49,
in command
return func(*args, **kwargs)
File "/opt/airflow/airflow-core/src/airflow/utils/memray_utils.py", line
59, in wrapper
return func(*args, **kwargs)
File "/opt/airflow/airflow-core/src/airflow/utils/cli.py", line 113, in
wrapper
return f(*args, **kwargs)
File
"/opt/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py",
line 54, in wrapped_function
return func(*args, **kwargs)
File
"/opt/airflow/airflow-core/src/airflow/cli/commands/dag_processor_command.py",
line 64, in dag_processor
run_command_with_daemon_option(
File "/opt/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py",
line 86, in run_command_with_daemon_option
callback()
File
"/opt/airflow/airflow-core/src/airflow/cli/commands/dag_processor_command.py",
line 67, in <lambda>
callback=lambda: run_job(job=job_runner.job,
execute_callable=job_runner._execute),
File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 100,
in wrapper
return func(*args, session=session, **kwargs) # type: ignore[arg-type]
File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 355, in
run_job
return execute_job(job, execute_callable=execute_callable)
File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 384, in
execute_job
ret = execute_callable()
File
"/opt/airflow/airflow-core/src/airflow/jobs/dag_processor_job_runner.py", line
61, in _execute
self.processor.run()
File "/opt/airflow/airflow-core/src/airflow/dag_processing/manager.py",
line 334, in run
return self._run_parsing_loop()
File "/opt/airflow/airflow-core/src/airflow/dag_processing/manager.py",
line 441, in _run_parsing_loop
self._collect_results()
File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 100,
in wrapper
return func(*args, session=session, **kwargs) # type: ignore[arg-type]
File "/opt/airflow/airflow-core/src/airflow/dag_processing/manager.py",
line 948, in _collect_results
self._file_stats[file] = process_parse_results(
File "/opt/airflow/airflow-core/src/airflow/dag_processing/manager.py",
line 1347, in process_parse_results
update_dag_parsing_results_in_db(
File "/opt/airflow/airflow-core/src/airflow/dag_processing/collection.py",
line 463, in update_dag_parsing_results_in_db
for attempt in run_with_db_retries(logger=log):
File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line
438, in __iter__
do = self.iter(retry_state=retry_state)
File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line
371, in iter
result = action(retry_state)
File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line
393, in <lambda>
self._add_action_func(lambda rs: rs.outcome.result())
File "/usr/python/lib/python3.10/concurrent/futures/_base.py", line 451,
in result
return self.__get_result()
File "/usr/python/lib/python3.10/concurrent/futures/_base.py", line 403,
in __get_result
raise self._exception
File "/opt/airflow/airflow-core/src/airflow/dag_processing/collection.py",
line 473, in update_dag_parsing_results_in_db
SerializedDAG.bulk_write_to_db(
File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 98, in
wrapper
return func(*args, **kwargs)
File
"/opt/airflow/airflow-core/src/airflow/serialization/definitions/dag.py", line
221, in bulk_write_to_db
asset_op.add_asset_trigger_references(orm_assets, session=session)
File "/opt/airflow/airflow-core/src/airflow/dag_processing/collection.py",
line 1101, in add_asset_trigger_references
asset_model.watchers = [
File "/opt/airflow/airflow-core/src/airflow/dag_processing/collection.py",
line 1104, in <listcomp>
if BaseEventTrigger.hash(watcher.trigger.classpath,
watcher.trigger.kwargs)
AttributeError: 'NoneType' object has no attribute 'classpath'
```
- [X] Yes (please specify the tool below)
<!--
Generated-by: [Tool Name] following [the
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
-->
---
* Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information. Note: commit author/co-author name and email in commits
become permanently public when merged.
* For fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
* When adding dependency, check compliance with the [ASF 3rd Party License
Policy](https://www.apache.org/legal/resolved.html#category-x).
* For significant user-facing changes create newsfragment:
`{pr_number}.significant.rst`, in
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
You can add this file in a follow-up commit after the PR is created so you
know the PR number.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]