TakawaAkirayo commented on issue #41231:
URL: https://github.com/apache/airflow/issues/41231#issuecomment-2270708754

   @tirkarthi 
   class RemoteJob:
       RUNNING_STATUS = "running"
       FAILED_STATUS = "failed"
       SUCCESS_STATUS = "success"
   
       def __init__(self, status):
           self.status = status
   
       def get_status(self):
           return self.status
   
       def set_status(self, status):
           self.status = status
   
       def kill(self):
           self.set_status(RemoteJob.FAILED_STATUS)
   
   
   class RemoteService:
       _instance = None
   
       event_queue: list[tuple[RemoteJobTrigger, int]]
       remote_jobs: dict[int, RemoteJob]
   
       def __new__(cls):
           if cls._instance is None:
               cls._instance = super().__new__(cls)
               cls._instance.event_queue = []
               cls._instance.remote_jobs = {
                   1: RemoteJob(RemoteJob.RUNNING_STATUS),
               }
           return cls._instance
   
       def complete_job(self, job_id: int):
           self.remote_jobs[job_id].set_status(RemoteJob.SUCCESS_STATUS)
   
       def kill_job(self, caller: RemoteJobTrigger, job_id: int):
           self.event_queue.append((caller, job_id))
           self.remote_jobs[job_id].kill()
   
       def get_job(self, job_id: int):
           return self.remote_jobs[job_id]
   
       def reset(self):
           self.event_queue = []
           self.remote_jobs = {
               1: RemoteJob(RemoteJob.RUNNING_STATUS),
           }
   
   
   class TriggerInstances:
       _instance = None
   
       def __new__(cls):
           if cls._instance is None:
               cls._instance = super().__new__(cls)
               cls._instance.trigger_instances = []
           return cls._instance
   
       def reset(self):
           self.trigger_instances.clear()
   
   
   class RemoteJobTrigger(BaseTrigger):
       def __init__(self, remote_job_id):
           super().__init__()
           self.remote_job_id = remote_job_id
           self.finished = False
           self.cleanup_done = False
           self.last_status = None
           TriggerInstances().trigger_instances.append(self)
   
       def get_status(self):
           return RemoteService().get_job(self.remote_job_id).get_status()
   
       async def run(self):
           print(f"Trigger object {id(self)}: start to run")
   
           while self.get_status() == RemoteJob.RUNNING_STATUS:
               await asyncio.sleep(0.1)
           self.finished = True
           print(f"Trigger object {id(self)}: finished with status: 
{self.get_status()}")
           yield TriggerEvent(self.remote_job_id)
   
       async def cleanup(self) -> None:
           RemoteService().kill_job(self, self.remote_job_id)
           self.cleanup_done = True
           print(f"Trigger object {id(self)}: cleanup done")
   
       def has_been_cleaned(self):
           return self.cleanup_done
   
       def failed(self):
           return self.get_status() == RemoteJob.FAILED_STATUS
   
       def serialize(self):
           return (
               "tests.jobs.test_triggerer_job.RemoteJobTrigger",
               {"remote_job_id": self.remote_job_id},
           )
   
   
   @pytest.mark.asyncio
   async def 
test_disable_trigger_cleanup_on_reassigned_to_other_triggerers(session, 
tmp_path):
       """
        This verifies the resolution of disable the cleanup of triggers if they 
reassigned to other triggers.
        Triggers will be canceled and cleaned up by the current triggerer 
process
        if the current triggerer finds that those running trigger instances
        have been reassigned to other triggerers.
        However, if the cleanup is called prematurely while the trigger is 
still in an active state,
        it could impact the same trigger when reassigned to another triggerer 
process.
        Therefore, introducing the capability to disable this behavior in such 
scenarios.
        The scenario is as follows:
           1. TaskInstance TI1 calls the remote service to run a job and saves 
the remote job ID.
           2. TaskInstance TI1 defers itself and creates Trigger T1, persisting 
the remote job ID with T1.
           3. T1 gets picked up by TriggererJobRunner TJR1 and starts running 
T1, it will use this remote job
              id to check the remote job status until success or failed.
           4. TJR1 misses a heartbeat, most likely due to high host load 
causing delays in
              each TriggererJobRunner._run_trigger_loop loop.
           5. A second TriggererJobRunner TJR2 notices that T1 has missed its 
heartbeat,
              so it starts the process of picking up any Triggers that TJR1 may 
have had,
              including T1.
           6. TJR1 loads the triggers. Because the running T1 in TJR1 has been 
reassigned to TJR2,
              TJR2 updates the triggerer ID of T1 to TJR2.
              As a result, TJR1 puts T1 in the cancel queue because another 
triggerer is already running it.
           7. TJR1 cancels T1, and the cleanup of T1 is called, causing the 
remote job to be canceled.
              The T1 instance in TJR2 will find that the remote job has been 
canceled and exit normally.
           8. Finally, the remote job fails due to this reassignment.
        This test verifies that the cleanup behavior is configurable under the 
reassignment case.
        """
       from typing import Callable, Awaitable
       from airflow.configuration import conf
       from airflow.jobs.triggerer_job_runner import TriggerDetails
   
       remote_service = RemoteService()
       # The trigger will be created sequentially,
       # so we can use a global variable to track the trigger instances
       trigger_instances_holder = TriggerInstances()
   
       # Build the scenario that the trigger reassigned to another triggerer
       # due to the first triggerer missed heartbeat
       async def make_triggerer2_occupy_the_trigger_of_triggerer1(
           check: Callable[[TriggerDetails, TriggerDetails], Awaitable[None]]
       ):
   
           # Create trigger to check the remote job status
           trigger = RemoteJobTrigger(1)
           # Remove this one from the trigger_instances_holder, we only need to 
track the instances
           # that created by the triggerer job runner
           del trigger_instances_holder.trigger_instances[0]
   
           trigger_orm = Trigger.from_object(trigger)
           trigger_orm.id = 1
           session.add(trigger_orm)
   
           dag = DagModel(dag_id="test-dag")
           dag_run = DagRun(dag.dag_id, run_id="abc", run_type="none")
           ti = TaskInstance(
               PythonOperator(task_id="dummy-task", python_callable=print),
               run_id=dag_run.run_id,
               state=TaskInstanceState.DEFERRED,
           )
           ti.dag_id = dag.dag_id
           ti.trigger_id = 1
           session.add(dag)
           session.add(dag_run)
           session.add(ti)
   
           job1 = Job()
           health_check_threshold_value = conf.getfloat("triggerer", 
"triggerer_health_check_threshold")
           # triggerer1 miss heartbeat due to high host load, so triggerer2 can 
pick up the trigger
           job1.latest_heartbeat = timezone.utcnow() - datetime.timedelta(
               seconds=health_check_threshold_value + 1)
   
           job2 = Job()
           session.add(job1)
           session.add(job2)
   
           session.commit()
   
           job_runner1 = TriggererJobRunner(job1)
           job_runner2 = TriggererJobRunner(job2)
   
           # job_runner1 load triggers, it will pick up the trigger
           job_runner1.load_triggers()
           assert len(job_runner1.trigger_runner.to_create) == 1
           # Create the trigger and start running
           await job_runner1.trigger_runner.create_triggers()
           assert len(job_runner1.trigger_runner.triggers) == 1
   
           job2.heartbeat(lambda x: {}, session)
           job_runner2.load_triggers()
           assert len(job_runner2.trigger_runner.to_create) == 1
           await job_runner2.trigger_runner.create_triggers()
           # Now job_runner2 will take over the trigger for execution
           assert len(job_runner2.trigger_runner.triggers) == 1
   
           # Globally, there are two trigger instances
           assert len(trigger_instances_holder.trigger_instances) == 2
   
           # job_runner2 performs heartbeat normally
           job2.heartbeat(lambda x: {}, session)
   
           # Let the job_runner2 start to run
           job_runner2.trigger_runner.daemon = True
           job_runner2.trigger_runner.start()
   
           # Now the job_runner1 will find that the trigger has been reassigned 
to someone else
           # and will cancel the trigger
           job_runner1.load_triggers()
           job_runner1.trigger_runner.daemon = True
           job_runner1.trigger_runner.start()
   
           try:
               trigger_details_of_job1 = job_runner1.trigger_runner.triggers[1]
               trigger_details_of_job2 = job_runner2.trigger_runner.triggers[1]
               await check(trigger_details_of_job1, trigger_details_of_job2)
           finally:
               job_runner1.trigger_runner.stop = True
               job_runner1.trigger_runner.join(30)
   
               job_runner2.trigger_runner.stop = True
               job_runner2.trigger_runner.join(30)
   
               # clean up state
               remote_service.reset()
               trigger_instances_holder.reset()
               session.delete(trigger_orm)
               session.delete(ti)
               session.delete(dag_run)
               session.delete(dag)
               session.commit()
   
       """
       We will test the following cases:
           1. The trigger of job_runner1 is cancelled due to reassignment,
              and then cleanup which fail the same remote job, so finally the 
remote job will fail due to
              this reassignment.
           2. The trigger of job_runner1 is cancelled due to reassignment but 
will not cleanup the same
              remote job, so the trigger of job_runner2 will keep check the 
status of remote job until success.
       """
   
       async def expect_the_trigger_will_be_cleanup_on_reassigned(
           trigger_details1: TriggerDetails, trigger_details2: TriggerDetails
       ):
           # The trigger of job_runner2 should finish because the trigger of 
job_runner1 has been cancelled,
           # and it will terminate the remote job when cleanup is called
           trigger_of_job_2 = trigger_instances_holder.trigger_instances[1]
           while trigger_of_job_2.has_been_cleaned() is False:
               await asyncio.sleep(0.1)
           # If reach here, the trigger of job_runner2 should be finished and 
cleaned up
           assert trigger_of_job_2.finished is True
           # The remote job should fail due to the trigger of job_runner1 has 
been cancelled,
           # and it has cleaned up the same remote job
           assert trigger_of_job_2.failed() is True
   
           # The cancelled trigger should already be cleaned up
           cancelled_trigger = trigger_instances_holder.trigger_instances[0]
           assert cancelled_trigger.has_been_cleaned() is True
   
           # The remote job should have failed because the trigger of 
job_runner1 was cancelled
           assert remote_service.get_job(1).get_status() == 
RemoteJob.FAILED_STATUS
   
           # There should be two kill events, each from a different trigger 
instance
           # managed by separate triggerers
           assert len(remote_service.event_queue) == 2
           # The first kill job request comes from the canceled trigger of 
job_runner1
           (cancel_caller, remote_job_id) = remote_service.event_queue[0]
           assert cancel_caller == cancelled_trigger and remote_job_id == 1
   
       await 
make_triggerer2_occupy_the_trigger_of_triggerer1(expect_the_trigger_will_be_cleanup_on_reassigned)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to