This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 7c8ce40598792f2aa764ae348498fed91a4efc47 Author: jkramer-ginkgo <68444586+jkramer-gin...@users.noreply.github.com> AuthorDate: Sat Feb 26 14:25:15 2022 -0500 Fix assignment of unassigned triggers (#21770) Previously, the query returned no alive triggerers which resulted in all triggers to be assigned to the current triggerer. This works fine, despite the logic bug, in the case where there's a single triggerer. But with multiple triggerers, concurrent iterations of the TriggerJob loop would bounce trigger ownership to whichever loop ran last. Addresses https://github.com/apache/airflow/issues/21616 (cherry picked from commit b26d4d8a290ce0104992ba28850113490c1ca445) --- airflow/models/trigger.py | 10 ++++++--- tests/models/test_trigger.py | 52 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py index 5749589..aa0d2b1 100644 --- a/airflow/models/trigger.py +++ b/airflow/models/trigger.py @@ -17,7 +17,7 @@ import datetime from typing import Any, Dict, List, Optional -from sqlalchemy import Column, Integer, String, func +from sqlalchemy import Column, Integer, String, func, or_ from airflow.models.base import Base from airflow.models.taskinstance import TaskInstance @@ -175,7 +175,7 @@ class Trigger(Base): alive_triggerer_ids = [ row[0] for row in session.query(BaseJob.id).filter( - BaseJob.end_date is None, + BaseJob.end_date.is_(None), BaseJob.latest_heartbeat > timezone.utcnow() - datetime.timedelta(seconds=30), BaseJob.job_type == "TriggererJob", ) @@ -184,7 +184,11 @@ class Trigger(Base): # Find triggers who do NOT have an alive triggerer_id, and then assign # up to `capacity` of those to us. trigger_ids_query = ( - session.query(cls.id).filter(cls.triggerer_id.notin_(alive_triggerer_ids)).limit(capacity).all() + session.query(cls.id) + # notin_ doesn't find NULL rows + .filter(or_(cls.triggerer_id.is_(None), cls.triggerer_id.notin_(alive_triggerer_ids))) + .limit(capacity) + .all() ) session.query(cls).filter(cls.id.in_([i.id for i in trigger_ids_query])).update( {cls.triggerer_id: triggerer_id}, diff --git a/tests/models/test_trigger.py b/tests/models/test_trigger.py index aacfa88..99cd71f 100644 --- a/tests/models/test_trigger.py +++ b/tests/models/test_trigger.py @@ -15,8 +15,11 @@ # specific language governing permissions and limitations # under the License. +import datetime + import pytest +from airflow.jobs.triggerer_job import TriggererJob from airflow.models import TaskInstance, Trigger from airflow.operators.dummy import DummyOperator from airflow.triggers.base import TriggerEvent @@ -36,9 +39,11 @@ def session(): def clear_db(session): session.query(TaskInstance).delete() session.query(Trigger).delete() + session.query(TriggererJob).delete() yield session session.query(TaskInstance).delete() session.query(Trigger).delete() + session.query(TriggererJob).delete() session.commit() @@ -124,3 +129,50 @@ def test_submit_failure(session, create_task_instance): updated_task_instance = session.query(TaskInstance).one() assert updated_task_instance.state == State.SCHEDULED assert updated_task_instance.next_method == "__fail__" + + +def test_assign_unassigned(session, create_task_instance): + """ + Tests that unassigned triggers of all appropriate states are assigned. + """ + finished_triggerer = TriggererJob(None, heartrate=10, state=State.SUCCESS) + finished_triggerer.end_date = timezone.utcnow() - datetime.timedelta(hours=1) + session.add(finished_triggerer) + assert not finished_triggerer.is_alive() + healthy_triggerer = TriggererJob(None, heartrate=10, state=State.RUNNING) + session.add(healthy_triggerer) + assert healthy_triggerer.is_alive() + new_triggerer = TriggererJob(None, heartrate=10, state=State.RUNNING) + session.add(new_triggerer) + assert new_triggerer.is_alive() + session.commit() + trigger_on_healthy_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) + trigger_on_healthy_triggerer.id = 1 + trigger_on_healthy_triggerer.triggerer_id = healthy_triggerer.id + trigger_on_killed_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) + trigger_on_killed_triggerer.id = 2 + trigger_on_killed_triggerer.triggerer_id = finished_triggerer.id + trigger_unassigned_to_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) + trigger_unassigned_to_triggerer.id = 3 + assert trigger_unassigned_to_triggerer.triggerer_id is None + session.add(trigger_on_healthy_triggerer) + session.add(trigger_on_killed_triggerer) + session.add(trigger_unassigned_to_triggerer) + session.commit() + assert session.query(Trigger).count() == 3 + Trigger.assign_unassigned(new_triggerer.id, 100, session=session) + session.expire_all() + # Check that trigger on killed triggerer and unassigned trigger are assigned to new triggerer + assert ( + session.query(Trigger).filter(Trigger.id == trigger_on_killed_triggerer.id).one().triggerer_id + == new_triggerer.id + ) + assert ( + session.query(Trigger).filter(Trigger.id == trigger_unassigned_to_triggerer.id).one().triggerer_id + == new_triggerer.id + ) + # Check that trigger on healthy triggerer still assigned to existing triggerer + assert ( + session.query(Trigger).filter(Trigger.id == trigger_on_healthy_triggerer.id).one().triggerer_id + == healthy_triggerer.id + )