This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 26318774ec Completed D400 for airflow/listener/* directory (#27731)
26318774ec is described below
commit 26318774ecb90d4bff352c5c8960424aaae7c7ea
Author: Dov Benyomin Sohacheski <[email protected]>
AuthorDate: Thu Nov 17 13:54:07 2022 +0200
Completed D400 for airflow/listener/* directory (#27731)
---
airflow/listeners/events.py | 4 ++++
airflow/listeners/listener.py | 7 ++++---
2 files changed, 8 insertions(+), 3 deletions(-)
diff --git a/airflow/listeners/events.py b/airflow/listeners/events.py
index ed7b7408c5..ca598d0004 100644
--- a/airflow/listeners/events.py
+++ b/airflow/listeners/events.py
@@ -30,6 +30,8 @@ _is_listening = False
def on_task_instance_state_session_flush(session, flush_context):
"""
+ Flush task instance's state.
+
Listens for session.flush() events that modify TaskInstance's state, and
notify listeners that listen
for that event. Doing it this way enable us to be stateless in the
SQLAlchemy event listener.
"""
@@ -69,6 +71,7 @@ def on_task_instance_state_session_flush(session,
flush_context):
def register_task_instance_state_events():
+ """Register a task instance state event"""
global _is_listening
if not _is_listening:
event.listen(Session, "after_flush",
on_task_instance_state_session_flush)
@@ -76,6 +79,7 @@ def register_task_instance_state_events():
def unregister_task_instance_state_events():
+ """Unregister a task instance state event"""
global _is_listening
event.remove(Session, "after_flush", on_task_instance_state_session_flush)
_is_listening = False
diff --git a/airflow/listeners/listener.py b/airflow/listeners/listener.py
index 754bc1af0b..a53f149807 100644
--- a/airflow/listeners/listener.py
+++ b/airflow/listeners/listener.py
@@ -35,7 +35,7 @@ _listener_manager = None
class ListenerManager:
- """Class that manages registration of listeners and provides hook property
for calling them"""
+ """Manage listener registration and provides hook property for calling
them."""
def __init__(self):
from airflow.listeners import spec
@@ -49,7 +49,7 @@ class ListenerManager:
@property
def hook(self) -> _HookRelay:
- """Returns hook, on which plugin methods specified in spec can be
called."""
+ """Return hook, on which plugin methods specified in spec can be
called."""
return self.pm.hook
def add_listener(self, listener):
@@ -60,12 +60,13 @@ class ListenerManager:
self.pm.register(listener)
def clear(self):
- """Remove registered plugins"""
+ """Remove registered plugins."""
for plugin in self.pm.get_plugins():
self.pm.unregister(plugin)
def get_listener_manager() -> ListenerManager:
+ """Get singleton listener manager"""
global _listener_manager
if not _listener_manager:
_listener_manager = ListenerManager()