Some time ago I had done a (crude) patch to mailman v2 to use FreeBSD
"kqueue" to allow runners to wake up immediately when a file was
queued, enabling most runners to sleep indefinitely, keeping overhead
on my small Virtual Private Server to a minimum.  I offered the patch
to the FreeBSD mailman "port" maintainer, but they only expressed
interest in a mailman3 patch.

Well, I've recently/finally switched over to mailman v3, and I just
did what I hope is a cleaner version of the change.  I didn't see a
PyPI library with cross platform support that didn't want to usurp the
main loop, and since I already had the kqueue code, I wrote the
simplest possible "DirWatcher" class family.  The change aims to
"first, do no harm": it will always fall back to using sleep.

It will also use the Linux "inotify" facility using the PyPI
inotify_simple package (if installed), which I debugged
"in the lab".

NOTE!! I'm almost certainly seeing signal handling problems when I try
to shut down mailman (under FreeBSD), so the changes should not be
considered ready for production in any way!

I'm not a zope interfaces user, nor a student of mailman project
conventions, so again, I'm not asserting the code, as is, is ready for
integration.

I AM interested in knowing if there's any interest in this, before I
bother setting myself up on gitlab, and bringing the change forward
from 3.3.1.

Something I consider an open issue is that right now I've changed the
default sleep_time in schema.cfg from 1 minute to 15 minutes, which is
FAR less than ideal if the code falls back to (blind) sleep between
directory scans.  I considered looking to see if the current
instance's class had overridden _do_periodic from the base Runner
class, and if not, sleeping indefinitely (or at least a "long time"),
but that seemed like a hack, and for testing, it was easier to not
write any ugly code, and just wack schema.cfg.

Since the changes are small, I'm including them in-line below.

==== core/dirwatch.py

"""
Simple O/S independent Class to sleep until a single directory changes
(written for Mailman)

Phil Budne <p...@regressive.org>
August 5, 2021
"""

import errno                    # kqueue
import os                       # kqueue
import select                   # kqueue
import time                     # kqueue, base

from mailman.interfaces.dirwatch import IDirWatcher
from public import public
from zope.interface import implementer

_alternatives = []              # list of classes to try

DEBUG = False                   # TEMP!!!

@public
@implementer(IDirWatcher)
class BaseDirWatcher:
    """See `IDirWatch`."""

    USE = True
    def __init__(self, directory):
        """Create a dirwatch object.

        :param directory: The directory to watch.
        """
        self.directory = directory
        # TEMP!!!
        if DEBUG:
            self.log = open("/tmp/dirwatch.log", "a")

    def watch(self, float_sec):
        """See `IDirWatcher`."""
        self.debug("BaseDirWatcher sleeping %s", float_sec) # TEMP
        time.sleep(float_sec)

    def debug(self, format, *args): # TEMP!!!
        if DEBUG:
            self.log.write(("%s [%d] " + format + "\n") % ((time.time(), 
os.getpid(),) + args))
            self.log.flush()

@public
@implementer(IDirWatcher)
class KQueueDirWatcher(BaseDirWatcher):
    def __init__(self, directory):
        super().__init__(directory)
        self._kq = self._kq_dirfd = None
        self._kq = select.kqueue()
        self._kq_dirfd = os.open(self.directory, os.O_RDONLY)
        ev = select.kevent(self._kq_dirfd,
                           filter=select.KQ_FILTER_VNODE,
                           flags =select.KQ_EV_ADD|select.KQ_EV_CLEAR,
                           fflags=select.KQ_NOTE_WRITE)
        # install ev, return no events, don't sleep
        self._kq.control([ev], 0, 0)

    def watch(self, float_sec):
        self.debug("kq.watch %s %s", self.USE, self._kq) # TEMP
        if self.USE and self._kq:
            try:
                self.debug('calling kq.control wait=%s', float_sec) # TEMP
                # no changes to events, return at most 10 events
                # (should only ever return 1?)
                events = self._kq.control([], 10, float_sec)
                self.debug('kq.control returned %d event(s)', len(events)) # 
TEMP
                # ignore returned events
                return
            except KeyboardInterrupt:
                raise
            except OSError as exc:
                # suppress EINTR, like time.sleep
                if exc.errno == errno.EINTR:
                    return
                self.debug('kq.control exception: %s', exc) # TEMP
        super().sleep(float_sec)
_alternatives.append(KQueueDirWatcher)

# Linux inotify not an included battery.
# Ubuntu (and debian?) have pyinotify (old/unmaintained?)  which wants
#       to take over the main loop, and do callbacks.  If one felt
#       strongly about it, one could create a pyinotify.Notifier
#       object with a timeout value set, create an event handler that
#       sets a bool in the DirWatcher object, and have watch call
#       {check,read,process}_events (or something like that).  But
#       inotify_simple is so much .... simpler.  Since determining
#       what's available happens at run time, you could have BOTH!
try:
    import inotify_simple

    @public
    @implementer(IDirWatcher)
    class INotifySimpleDirWatcher(BaseDirWatcher):
        def __init__(self, directory):
            super().__init__(directory)
            self.inotify = inotify_simple.INotify()
            # just look for renames
            self.inotify.add_watch(directory, inotify_simple.masks.MOVE)
            self.debug('inotify_simple init succeeded') # TEMP

        def watch(self, float_seconds):
            self.debug("inotify.watch %s %s", self.USE, self.inotify) # TEMP
            if self.USE and self.inotify:
                try:
                    events = self.inotify.read(int(float_seconds*1000 + 0.5))
                    self.debug("inotify_simple read returned %d", len(events)) 
# TEMP
                    return
                except KeyboardInterrupt:
                    raise
                except OSError as exc:
                    print(exc)
                    # suppress EINTR, like time.sleep
                    if exc.errno == errno.EINTR:
                        return
                except Exception as exc:
                    pass
            super().watch(float_seconds)
    _alternatives.append(INotifySimpleDirWatcher)
except:
    pass

def factory(directory):
    for dw_class in _alternatives:
        if dw_class.USE:
            try:
                return dw_class(directory)
            except:
                self.debug("%s init failed", dw_class) # TEMP
                dw_class.USE = False
    return BaseDirWatcher(directory)

if __name__ == '__main__':
    watcher = factory('.')
    watcher.debug("got %s", watcher) # TEMP
    # enough time to type "touch foo" (and "mv foo bar" on Linux)
    watcher.watch(20.0)

==== interfaces/dirwatch.py

"""Interface for directory watchers."""

from public import public
from zope.interface import Interface

@public
class IDirWatcher(Interface):
    """The directory watcher."""

    def watch(self, float_sec):
        """Watch for new files in directory, for at most float_sec seconds.

           No value returned.
        """

==== diffs

--- config/schema.cfg-mm3.3.1   2021-07-07 17:24:59.000000000 -0400
+++ config/schema.cfg   2021-08-05 20:53:06.552912000 -0400
@@ -256,7 +256,9 @@
 # The sleep interval for the runner.  It wakes up once every interval to
 # process the files in its slice of the queue directory.  Some runners may
 # ignore this.
-sleep_time: 1s
+# PLB: Using dirwatch, don't need to poll.  unless runner has
+# _do_periodic work.
+sleep_time: 15m
 
 
 [database]
--- core/runner.py-mm3.3.1      2020-04-10 01:53:06.000000000 -0400
+++ core/runner.py      2021-08-05 17:36:41.769775000 -0400
@@ -293,7 +293,10 @@
         """See `IRunner`."""
         if filecnt or self.sleep_float <= 0:
             return
-        time.sleep(self.sleep_float)
+        if self.switchboard:
+            self.switchboard.snooze(self.sleep_float)
+        else:
+            time.sleep(self.sleep_float)
 
     def _short_circuit(self):
         """See `IRunner`."""
--- interfaces/switchboard.py-mm3.3.1   2020-04-10 01:53:06.000000000 -0400
+++ interfaces/switchboard.py   2021-08-05 17:47:46.712414000 -0400
@@ -84,3 +84,6 @@
         time, so moving them is enough to ensure that a normal dequeing
         operation will handle them.
         """
+
+    def snooze(self, sleep_float):
+        """Wait at most sleep_float seconds for a change to the queue."""
_______________________________________________
Mailman-Developers mailing list -- mailman-developers@python.org
To unsubscribe send an email to mailman-developers-le...@python.org
https://mail.python.org/mailman3/lists/mailman-developers.python.org/
Mailman FAQ: https://wiki.list.org/x/AgA3

Security Policy: https://wiki.list.org/x/QIA9

Reply via email to