This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 4f1c45de63650f2abf4b5ac4ce47a1ae6558ee64 Author: Daniel Standish <15932138+dstand...@users.noreply.github.com> AuthorDate: Wed Jan 5 12:12:27 2022 -0800 Rename `to_delete` to `to_cancel` in TriggerRunner (#20658) The queue's purpose is to track triggers that need to be canceled. The language `to_delete` was a bit confusing because for one it does not actually delete them but cancel them. The deletion work is actually in `cleanup_finished_triggers`. It seems that this method will usually not do anything and it's only for cancelling triggers that are currently running but for whatever reason no longer should be. E.g. when a task is killed and therefore the trigger is no longer needed, or some [...] (cherry picked from commit c20ad79b40ea2b213f6dca221221c6dbd55bd08f) --- airflow/jobs/triggerer_job.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/airflow/jobs/triggerer_job.py b/airflow/jobs/triggerer_job.py index a47bc3b..25a4c79 100644 --- a/airflow/jobs/triggerer_job.py +++ b/airflow/jobs/triggerer_job.py @@ -205,7 +205,7 @@ class TriggerRunner(threading.Thread, LoggingMixin): to_create: Deque[Tuple[int, BaseTrigger]] # Inbound queue of deleted triggers - to_delete: Deque[int] + to_cancel: Deque[int] # Outbound queue of events events: Deque[Tuple[int, TriggerEvent]] @@ -221,7 +221,7 @@ class TriggerRunner(threading.Thread, LoggingMixin): self.triggers = {} self.trigger_cache = {} self.to_create = deque() - self.to_delete = deque() + self.to_cancel = deque() self.events = deque() self.failed_triggers = deque() @@ -242,7 +242,7 @@ class TriggerRunner(threading.Thread, LoggingMixin): while not self.stop: # Run core logic await self.create_triggers() - await self.delete_triggers() + await self.cancel_triggers() await self.cleanup_finished_triggers() # Sleep for a bit await asyncio.sleep(1) @@ -270,13 +270,13 @@ class TriggerRunner(threading.Thread, LoggingMixin): self.log.warning("Trigger %s had insertion attempted twice", trigger_id) await asyncio.sleep(0) - async def delete_triggers(self): + async def cancel_triggers(self): """ - Drain the to_delete queue and ensure all triggers that are not in the + Drain the to_cancel queue and ensure all triggers that are not in the DB are cancelled, so the cleanup job deletes them. """ - while self.to_delete: - trigger_id = self.to_delete.popleft() + while self.to_cancel: + trigger_id = self.to_cancel.popleft() if trigger_id in self.triggers: # We only delete if it did not exit already self.triggers[trigger_id]["task"].cancel() @@ -384,7 +384,7 @@ class TriggerRunner(threading.Thread, LoggingMixin): current_trigger_ids = set(self.triggers.keys()) # Work out the two difference sets new_trigger_ids = requested_trigger_ids.difference(current_trigger_ids) - old_trigger_ids = current_trigger_ids.difference(requested_trigger_ids) + cancel_trigger_ids = current_trigger_ids.difference(requested_trigger_ids) # Bulk-fetch new trigger records new_triggers = Trigger.bulk_fetch(new_trigger_ids) # Add in new triggers @@ -401,9 +401,9 @@ class TriggerRunner(threading.Thread, LoggingMixin): self.failed_triggers.append(new_id) continue self.to_create.append((new_id, trigger_class(**new_triggers[new_id].kwargs))) - # Remove old triggers - for old_id in old_trigger_ids: - self.to_delete.append(old_id) + # Enqueue orphaned triggers for cancellation + for old_id in cancel_trigger_ids: + self.to_cancel.append(old_id) def get_trigger_by_classpath(self, classpath: str) -> Type[BaseTrigger]: """