TakawaAkirayo commented on issue #41231: URL: https://github.com/apache/airflow/issues/41231#issuecomment-2270708754
@tirkarthi class RemoteJob: RUNNING_STATUS = "running" FAILED_STATUS = "failed" SUCCESS_STATUS = "success" def __init__(self, status): self.status = status def get_status(self): return self.status def set_status(self, status): self.status = status def kill(self): self.set_status(RemoteJob.FAILED_STATUS) class RemoteService: _instance = None event_queue: list[tuple[RemoteJobTrigger, int]] remote_jobs: dict[int, RemoteJob] def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance.event_queue = [] cls._instance.remote_jobs = { 1: RemoteJob(RemoteJob.RUNNING_STATUS), } return cls._instance def complete_job(self, job_id: int): self.remote_jobs[job_id].set_status(RemoteJob.SUCCESS_STATUS) def kill_job(self, caller: RemoteJobTrigger, job_id: int): self.event_queue.append((caller, job_id)) self.remote_jobs[job_id].kill() def get_job(self, job_id: int): return self.remote_jobs[job_id] def reset(self): self.event_queue = [] self.remote_jobs = { 1: RemoteJob(RemoteJob.RUNNING_STATUS), } class TriggerInstances: _instance = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance.trigger_instances = [] return cls._instance def reset(self): self.trigger_instances.clear() class RemoteJobTrigger(BaseTrigger): def __init__(self, remote_job_id): super().__init__() self.remote_job_id = remote_job_id self.finished = False self.cleanup_done = False self.last_status = None TriggerInstances().trigger_instances.append(self) def get_status(self): return RemoteService().get_job(self.remote_job_id).get_status() async def run(self): print(f"Trigger object {id(self)}: start to run") while self.get_status() == RemoteJob.RUNNING_STATUS: await asyncio.sleep(0.1) self.finished = True print(f"Trigger object {id(self)}: finished with status: {self.get_status()}") yield TriggerEvent(self.remote_job_id) async def cleanup(self) -> None: RemoteService().kill_job(self, self.remote_job_id) self.cleanup_done = True print(f"Trigger object {id(self)}: cleanup done") def has_been_cleaned(self): return self.cleanup_done def failed(self): return self.get_status() == RemoteJob.FAILED_STATUS def serialize(self): return ( "tests.jobs.test_triggerer_job.RemoteJobTrigger", {"remote_job_id": self.remote_job_id}, ) @pytest.mark.asyncio async def test_disable_trigger_cleanup_on_reassigned_to_other_triggerers(session, tmp_path): """ This verifies the resolution of disable the cleanup of triggers if they reassigned to other triggers. Triggers will be canceled and cleaned up by the current triggerer process if the current triggerer finds that those running trigger instances have been reassigned to other triggerers. However, if the cleanup is called prematurely while the trigger is still in an active state, it could impact the same trigger when reassigned to another triggerer process. Therefore, introducing the capability to disable this behavior in such scenarios. The scenario is as follows: 1. TaskInstance TI1 calls the remote service to run a job and saves the remote job ID. 2. TaskInstance TI1 defers itself and creates Trigger T1, persisting the remote job ID with T1. 3. T1 gets picked up by TriggererJobRunner TJR1 and starts running T1, it will use this remote job id to check the remote job status until success or failed. 4. TJR1 misses a heartbeat, most likely due to high host load causing delays in each TriggererJobRunner._run_trigger_loop loop. 5. A second TriggererJobRunner TJR2 notices that T1 has missed its heartbeat, so it starts the process of picking up any Triggers that TJR1 may have had, including T1. 6. TJR1 loads the triggers. Because the running T1 in TJR1 has been reassigned to TJR2, TJR2 updates the triggerer ID of T1 to TJR2. As a result, TJR1 puts T1 in the cancel queue because another triggerer is already running it. 7. TJR1 cancels T1, and the cleanup of T1 is called, causing the remote job to be canceled. The T1 instance in TJR2 will find that the remote job has been canceled and exit normally. 8. Finally, the remote job fails due to this reassignment. This test verifies that the cleanup behavior is configurable under the reassignment case. """ from typing import Callable, Awaitable from airflow.configuration import conf from airflow.jobs.triggerer_job_runner import TriggerDetails remote_service = RemoteService() # The trigger will be created sequentially, # so we can use a global variable to track the trigger instances trigger_instances_holder = TriggerInstances() # Build the scenario that the trigger reassigned to another triggerer # due to the first triggerer missed heartbeat async def make_triggerer2_occupy_the_trigger_of_triggerer1( check: Callable[[TriggerDetails, TriggerDetails], Awaitable[None]] ): # Create trigger to check the remote job status trigger = RemoteJobTrigger(1) # Remove this one from the trigger_instances_holder, we only need to track the instances # that created by the triggerer job runner del trigger_instances_holder.trigger_instances[0] trigger_orm = Trigger.from_object(trigger) trigger_orm.id = 1 session.add(trigger_orm) dag = DagModel(dag_id="test-dag") dag_run = DagRun(dag.dag_id, run_id="abc", run_type="none") ti = TaskInstance( PythonOperator(task_id="dummy-task", python_callable=print), run_id=dag_run.run_id, state=TaskInstanceState.DEFERRED, ) ti.dag_id = dag.dag_id ti.trigger_id = 1 session.add(dag) session.add(dag_run) session.add(ti) job1 = Job() health_check_threshold_value = conf.getfloat("triggerer", "triggerer_health_check_threshold") # triggerer1 miss heartbeat due to high host load, so triggerer2 can pick up the trigger job1.latest_heartbeat = timezone.utcnow() - datetime.timedelta( seconds=health_check_threshold_value + 1) job2 = Job() session.add(job1) session.add(job2) session.commit() job_runner1 = TriggererJobRunner(job1) job_runner2 = TriggererJobRunner(job2) # job_runner1 load triggers, it will pick up the trigger job_runner1.load_triggers() assert len(job_runner1.trigger_runner.to_create) == 1 # Create the trigger and start running await job_runner1.trigger_runner.create_triggers() assert len(job_runner1.trigger_runner.triggers) == 1 job2.heartbeat(lambda x: {}, session) job_runner2.load_triggers() assert len(job_runner2.trigger_runner.to_create) == 1 await job_runner2.trigger_runner.create_triggers() # Now job_runner2 will take over the trigger for execution assert len(job_runner2.trigger_runner.triggers) == 1 # Globally, there are two trigger instances assert len(trigger_instances_holder.trigger_instances) == 2 # job_runner2 performs heartbeat normally job2.heartbeat(lambda x: {}, session) # Let the job_runner2 start to run job_runner2.trigger_runner.daemon = True job_runner2.trigger_runner.start() # Now the job_runner1 will find that the trigger has been reassigned to someone else # and will cancel the trigger job_runner1.load_triggers() job_runner1.trigger_runner.daemon = True job_runner1.trigger_runner.start() try: trigger_details_of_job1 = job_runner1.trigger_runner.triggers[1] trigger_details_of_job2 = job_runner2.trigger_runner.triggers[1] await check(trigger_details_of_job1, trigger_details_of_job2) finally: job_runner1.trigger_runner.stop = True job_runner1.trigger_runner.join(30) job_runner2.trigger_runner.stop = True job_runner2.trigger_runner.join(30) # clean up state remote_service.reset() trigger_instances_holder.reset() session.delete(trigger_orm) session.delete(ti) session.delete(dag_run) session.delete(dag) session.commit() """ We will test the following cases: 1. The trigger of job_runner1 is cancelled due to reassignment, and then cleanup which fail the same remote job, so finally the remote job will fail due to this reassignment. 2. The trigger of job_runner1 is cancelled due to reassignment but will not cleanup the same remote job, so the trigger of job_runner2 will keep check the status of remote job until success. """ async def expect_the_trigger_will_be_cleanup_on_reassigned( trigger_details1: TriggerDetails, trigger_details2: TriggerDetails ): # The trigger of job_runner2 should finish because the trigger of job_runner1 has been cancelled, # and it will terminate the remote job when cleanup is called trigger_of_job_2 = trigger_instances_holder.trigger_instances[1] while trigger_of_job_2.has_been_cleaned() is False: await asyncio.sleep(0.1) # If reach here, the trigger of job_runner2 should be finished and cleaned up assert trigger_of_job_2.finished is True # The remote job should fail due to the trigger of job_runner1 has been cancelled, # and it has cleaned up the same remote job assert trigger_of_job_2.failed() is True # The cancelled trigger should already be cleaned up cancelled_trigger = trigger_instances_holder.trigger_instances[0] assert cancelled_trigger.has_been_cleaned() is True # The remote job should have failed because the trigger of job_runner1 was cancelled assert remote_service.get_job(1).get_status() == RemoteJob.FAILED_STATUS # There should be two kill events, each from a different trigger instance # managed by separate triggerers assert len(remote_service.event_queue) == 2 # The first kill job request comes from the canceled trigger of job_runner1 (cancel_caller, remote_job_id) = remote_service.event_queue[0] assert cancel_caller == cancelled_trigger and remote_job_id == 1 await make_triggerer2_occupy_the_trigger_of_triggerer1(expect_the_trigger_will_be_cleanup_on_reassigned) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org