ashb commented on code in PR #46677:
URL: https://github.com/apache/airflow/pull/46677#discussion_r1958396696
##########
airflow/jobs/triggerer_job_runner.py:
##########
@@ -314,95 +122,222 @@ def on_kill(self):
Called when there is an external kill command (via the heartbeat
mechanism, for example).
"""
+ # TODO: signal instead.
self.trigger_runner.stop = True
- def _kill_listener(self):
- if self.listener:
- for h in self.listener.handlers:
- h.close()
- self.listener.stop()
-
def _exit_gracefully(self, signum, frame) -> None:
# The first time, try to exit nicely
- if not self.trigger_runner.stop:
+ if self.trigger_runner and not self.trigger_runner.stop:
self.log.info("Exiting gracefully upon receiving signal %s",
signum)
self.trigger_runner.stop = True
- self._kill_listener()
else:
self.log.warning("Forcing exit due to second exit signal %s",
signum)
+
+ self.trigger_runner.kill(signal.SIGKILL)
sys.exit(os.EX_SOFTWARE)
def _execute(self) -> int | None:
self.log.info("Starting the triggerer")
try:
- # set job_id so that it can be used in log file names
- self.trigger_runner.job_id = self.job.id
+ # Kick off runner sub-process without DB access
+ self.trigger_runner = TriggerRunnerSupervisor.start(
+ job=self.job, capacity=self.capacity, logger=log
+ )
- # Kick off runner thread
- self.trigger_runner.start()
- # Start our own DB loop in the main thread
- self._run_trigger_loop()
+ # Run the main DB comms loop in this process
+ self.trigger_runner.run_db_loop()
+ return self.trigger_runner._exit_code
except Exception:
- self.log.exception("Exception when executing
TriggererJobRunner._run_trigger_loop")
+ self.log.exception("Exception when executing
TriggerRunnerSupervisor.run_db_loop")
raise
finally:
self.log.info("Waiting for triggers to clean up")
- # Tell the subthread to stop and then wait for it.
+ # Tell the subtproc to stop and then wait for it.
# If the user interrupts/terms again, _graceful_exit will allow
them
# to force-kill here.
- self.trigger_runner.stop = True
- self.trigger_runner.join(30)
+ self.trigger_runner.kill(escalation_delay=10, force=True)
self.log.info("Exited trigger loop")
return None
- def _run_trigger_loop(self) -> None:
- """Run synchronously and handle all database reads/writes; the
main-thread trigger loop."""
- while not self.trigger_runner.stop:
- if not self.trigger_runner.is_alive():
- self.log.error("Trigger runner thread has died! Exiting.")
+
+log: FilteringBoundLogger = structlog.get_logger(logger_name=__name__)
+
+
+# Using this as a simple namespace
+class messages:
+ class StartTriggerer(BaseModel):
+ """Tell the async trigger runner process to start, and where to send
status update messages."""
+
+ requests_fd: int
+ kind: Literal["StartTriggerer"] = "StartTriggerer"
+
+ class CancelTriggers(BaseModel):
+ """Request to cancel running triggers."""
+
+ ids: Iterable[int]
+ kind: Literal["CancelTriggersMessage"] = "CancelTriggersMessage"
+
+ class TriggerStateChanges(BaseModel):
+ """Report state change about triggers back to the
TriggerRunnerSupervisor."""
+
+ kind: Literal["TriggerStateChanges"] = "TriggerStateChanges"
+ events: Annotated[
+ list[tuple[int, events.DiscrimatedTriggerEvent]] | None,
+ # We have to specify a default here, as otherwise Pydantic
struggles to deal with the discriminated
+ # union :shrug:
+ Field(default=None),
+ ]
+ # Format of list[str] is the exc traceback format
+ failures: list[tuple[int, list[str] | None]] | None = None
+ finished: list[int] | None = None
+
+
+ToAsyncProcess = Annotated[
+ Union[workloads.RunTrigger, messages.CancelTriggers,
messages.StartTriggerer],
+ Field(discriminator="kind"),
+]
+
+
+ToSyncProcess = Annotated[
+ Union[messages.TriggerStateChanges],
+ Field(discriminator="kind"),
+]
+
+
[email protected](kw_only=True)
+class TriggerLoggingFactory:
+ log_path: str
+
+ bound_logger: WrappedLogger = attrs.field(init=False)
+
+ def __call__(self, processors: Iterable[structlog.typing.Processor]) ->
WrappedLogger:
+ if hasattr(self, "bound_logger"):
+ return self.bound_logger
+
+ from airflow.sdk.log import init_log_file
+
+ log_file = init_log_file(self.log_path)
+
+ pretty_logs = False
+ if pretty_logs:
+ underlying_logger: WrappedLogger =
structlog.WriteLogger(log_file.open("w", buffering=1))
+ else:
+ underlying_logger = structlog.BytesLogger(log_file.open("wb"))
+ logger = structlog.wrap_logger(underlying_logger,
processors=processors).bind()
+ self.bound_logger = logger
+ return logger
+
+
[email protected](kw_only=True)
+class TriggerRunnerSupervisor(WatchedSubprocess):
+ """
+ TriggerRunnerSupervisor is responsible for monitoring the subprocess and
marshalling DB access.
+
+ This class (which runs in the main process) is responsible for querying
the DB, sending RunTrigger
+ workload messages to the subprocess, and collecting results and updating
them in the DB.
+ """
+
+ job: Job
+ capacity: int
+
+ health_check_threshold = conf.getint("triggerer",
"triggerer_health_check_threshold")
+
+ runner: TriggerRunner | None = None
+ stop: bool = False
+
+ decoder: ClassVar[TypeAdapter[ToSyncProcess]] = TypeAdapter(ToSyncProcess)
+
+ # Maps trigger IDs that we think are running in the sub process
+ running_triggers: set[int] = attrs.field(factory=set, init=False)
+
+ logger_cache: dict[int, TriggerLoggingFactory] = attrs.field(factory=dict,
init=False)
+
+ # A list of triggers that we have told the async process to cancel. We
keep them here until we receive the
+ # FinishedTriggers message
+ cancelling_triggers: set[int] = attrs.field(factory=set, init=False)
+
+ # Outbound queue of events
+ events: deque[tuple[int, events.TriggerEvent]] =
attrs.field(factory=deque, init=False)
+
+ # Outbound queue of failed triggers
+ failed_triggers: deque[tuple[int, list[str] | None]] =
attrs.field(factory=deque, init=False)
+
+ def is_alive(self) -> bool:
+ # Set by `_service_subprocess` in the loop
+ return self._exit_code is None
+
+ @classmethod
+ def start( # type: ignore[override]
+ cls,
+ *,
+ job: Job,
+ logger=None,
+ **kwargs,
+ ):
+ proc = super().start(id=job.id, job=job, target=cls.run_in_process,
logger=logger, **kwargs)
+
+ msg = messages.StartTriggerer(requests_fd=proc._requests_fd)
+ proc._send(msg)
+ return proc
+
+ def _handle_request(self, msg: ToSyncProcess, log: FilteringBoundLogger)
-> None: # type: ignore[override]
+ if isinstance(msg, messages.TriggerStateChanges):
+ log.debug("State change from async process", state=msg)
+ if msg.events:
+ self.events.extend(msg.events)
+ if msg.failures:
+ self.failed_triggers.extend(msg.failures)
+ for id in msg.finished or ():
+ self.running_triggers.discard(id)
+ self.cancelling_triggers.discard(id)
+ # TODO: Close logger? Or is deleting it enough
+ self.logger_cache.pop(id, None)
Review Comment:
Ah there is no close, so deleting the last reference is the right thing to
do. I'll delete/change the comment
##########
airflow/jobs/triggerer_job_runner.py:
##########
@@ -314,95 +122,222 @@ def on_kill(self):
Called when there is an external kill command (via the heartbeat
mechanism, for example).
"""
+ # TODO: signal instead.
self.trigger_runner.stop = True
- def _kill_listener(self):
- if self.listener:
- for h in self.listener.handlers:
- h.close()
- self.listener.stop()
-
def _exit_gracefully(self, signum, frame) -> None:
# The first time, try to exit nicely
- if not self.trigger_runner.stop:
+ if self.trigger_runner and not self.trigger_runner.stop:
self.log.info("Exiting gracefully upon receiving signal %s",
signum)
self.trigger_runner.stop = True
- self._kill_listener()
else:
self.log.warning("Forcing exit due to second exit signal %s",
signum)
+
+ self.trigger_runner.kill(signal.SIGKILL)
sys.exit(os.EX_SOFTWARE)
def _execute(self) -> int | None:
self.log.info("Starting the triggerer")
try:
- # set job_id so that it can be used in log file names
- self.trigger_runner.job_id = self.job.id
+ # Kick off runner sub-process without DB access
+ self.trigger_runner = TriggerRunnerSupervisor.start(
+ job=self.job, capacity=self.capacity, logger=log
+ )
- # Kick off runner thread
- self.trigger_runner.start()
- # Start our own DB loop in the main thread
- self._run_trigger_loop()
+ # Run the main DB comms loop in this process
+ self.trigger_runner.run_db_loop()
+ return self.trigger_runner._exit_code
except Exception:
- self.log.exception("Exception when executing
TriggererJobRunner._run_trigger_loop")
+ self.log.exception("Exception when executing
TriggerRunnerSupervisor.run_db_loop")
raise
finally:
self.log.info("Waiting for triggers to clean up")
- # Tell the subthread to stop and then wait for it.
+ # Tell the subtproc to stop and then wait for it.
# If the user interrupts/terms again, _graceful_exit will allow
them
# to force-kill here.
- self.trigger_runner.stop = True
- self.trigger_runner.join(30)
+ self.trigger_runner.kill(escalation_delay=10, force=True)
self.log.info("Exited trigger loop")
return None
- def _run_trigger_loop(self) -> None:
- """Run synchronously and handle all database reads/writes; the
main-thread trigger loop."""
- while not self.trigger_runner.stop:
- if not self.trigger_runner.is_alive():
- self.log.error("Trigger runner thread has died! Exiting.")
+
+log: FilteringBoundLogger = structlog.get_logger(logger_name=__name__)
+
+
+# Using this as a simple namespace
+class messages:
+ class StartTriggerer(BaseModel):
+ """Tell the async trigger runner process to start, and where to send
status update messages."""
+
+ requests_fd: int
+ kind: Literal["StartTriggerer"] = "StartTriggerer"
+
+ class CancelTriggers(BaseModel):
+ """Request to cancel running triggers."""
+
+ ids: Iterable[int]
+ kind: Literal["CancelTriggersMessage"] = "CancelTriggersMessage"
+
+ class TriggerStateChanges(BaseModel):
+ """Report state change about triggers back to the
TriggerRunnerSupervisor."""
+
+ kind: Literal["TriggerStateChanges"] = "TriggerStateChanges"
+ events: Annotated[
+ list[tuple[int, events.DiscrimatedTriggerEvent]] | None,
+ # We have to specify a default here, as otherwise Pydantic
struggles to deal with the discriminated
+ # union :shrug:
+ Field(default=None),
+ ]
+ # Format of list[str] is the exc traceback format
+ failures: list[tuple[int, list[str] | None]] | None = None
+ finished: list[int] | None = None
+
+
+ToAsyncProcess = Annotated[
+ Union[workloads.RunTrigger, messages.CancelTriggers,
messages.StartTriggerer],
+ Field(discriminator="kind"),
+]
+
+
+ToSyncProcess = Annotated[
+ Union[messages.TriggerStateChanges],
+ Field(discriminator="kind"),
+]
Review Comment:
Cool, will do
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]