Note that this program uses stuff from MetaPy, which I posted here
previously, and which is also available from
http://pobox.com/~kragen/sw/MetaPy-6.tar.gz.

#!/usr/bin/python
# I have some lists that have the following characteristics:
# - not time-sensitive: it's generally OK if stuff gets sent out some
#   time after I write it
# - limited output bandwidth: people read them partly because
#   they only get a few emails a day, and so I don't want to send
#   lots of output in a day
# - bursty input: while the average input bandwidth is far less
#   than the output bandwidth, the input bandwidth often varies
#   widely.

# So this is a rate-limiting queue system with earliest-deadline-first
# scheduling.  You feed it messages, each with the number of days
# within which you'd like the message to be posted; it computes the
# deadline by adding that number of days to the time it receives it.

# This program runs as a cron job to fetch the N messages from the
# queue with the earliest deadlines; usually N is 1.

# Each queue is a file in RFC-822 format with one header: Deadline,
# which is expressed in time_t format.  The body of the message
# (normally an RFC-822 message) is fed to sendmail to send it.

# where is MetaPy?
import sys
sys.path.insert(0, '/home/kragen/devel/metapy')

import os, string, sys, time, errno, re, MetaPy.Sugar

def add_message_to_queue(queuedir, message):
    """Find an unused filename and write the message into it."""
    ii = 0
    while 1:
        filename = "file%03d" % ii
        filename = os.path.join(queuedir, filename)
        try:
            fd = os.open(filename, os.O_WRONLY|os.O_EXCL|os.O_CREAT)
            break
        except OSError, val:
            errnum, msg = val
            if errnum != errno.EEXIST:
                raise
        ii = ii + 1

    file = os.fdopen(fd, 'w')
    file.write(message)
    file.close()
    return filename

def read_file(filename):
    "Return an object representing the contents of file 'filename'."
    rv = parse_message(open(filename).read())
    rv.filename = filename
    return rv

def queue_contents(queuedir):
    "Read the contents of the queue from the filesystem and return it, sorted."
    rv = map(lambda filename, q=queuedir:
             read_file(os.path.join(q, filename)),
             os.listdir(queuedir))
    rv.sort(lambda a, b: cmp(int(a.headers()['deadline']),
                             int(b.headers()['deadline'])))
    return rv

sendmailcmd = "/usr/lib/sendmail '%s'"
sendmailcmd = "echo '%s'; cat"
def send_mail(message, dest):
    "Send a message and remove it from the queue."
    sendmail = os.popen(sendmailcmd % dest, 'w')
    sendmail.write(message.body())
    rv = sendmail.close()
    if rv is None:
        os.unlink(message.filename)
    else:
        sys.stderr.write("Sendmail produced nonzero exit code %s\n"
                         % rv)

def earlyhours(message):
    "Return the number of seconds and hours until a message's deadline."
    earliness = int(message.headers()['deadline']) - time.time()
    return earliness, int(earliness/3600)

def run_queue(queuedir, n, dest):
    "Select the first N mails in the queue and send them."
    contents = queue_contents(queuedir)
    for message in contents[:n]:
        send_mail(message, dest)
        earliness, hours = earlyhours(message)
        if earliness > 0:
            print "message sent %d hours early" % hours
        else:
            print "MESSAGE SENT %d HOURS LATE" % -hours
        
def list_queue(queuedir):
    "List the contents of the queue."
    contents = queue_contents(queuedir)
    for message in contents:
        headers = message.headers()
        subject = parse_message(message.body()).headers()['subject']
        while subject[-1] == '\n' or len(subject) > 50:
            subject = subject[:-1]
        print "%10s %4d %50s" % (message.filename, earlyhours(message)[1],
                                 subject)


def encapsulate_message(message, ndays):
    "Encapsulate a message in an RFC-822 message with a deadline header."
    seconds_per_day = 24 * 60 * 60
    now = int(time.time())
    deadline = now + seconds_per_day * ndays
    return "Deadline: %d\n\n%s" % (deadline, message)

class parse_message:
    "An RFC-822 message class."
    twonewlines = re.compile(r"\r?\n\r?\n")
    def __init__(self, contents):
        self.contents = contents
        self.hdrend = None
        self.bodystart = None
    def find_twonewlines(self):
        (self.hdrend,
         self.bodystart) = self.twonewlines.search(self.contents).span()
    def gethdrend(self):
        if self.hdrend is None: self.find_twonewlines()
        return self.hdrend
    def headers(self):
        "This is a method and not an attribute to avoid circular refs."
        return Headers(self)
    def set_contents(self, contents):
        self.contents = contents
        self.hdrend, self.bodystart = None, None
    def body(self):
        if self.bodystart is None: self.find_twonewlines()
        return str(self)[self.bodystart:]
    def __str__(self):
        return self.contents

class Headers(MetaPy.Sugar.Mapping):
    "RFC-822 message headers."
    headerstart = re.compile(r"^(\S+):\s*", re.MULTILINE)
    def __init__(self, message):
        self.message = message
    def nextheader(self, start=0):
        hdr = self.headerstart.search(str(self.message), start)
        hdrend = self.message.gethdrend()
        if hdr is not None and hdr.start() < hdrend:
            return hdr.span() + hdr.span(1)
        else:
            return (hdrend,) * 4
    def items(self):
        prevend = 0
        rv = []
        name = None
        while 1:
            start, end, namestart, nameend = self.nextheader(prevend)
            if name is not None:
                val = str(self.message)[prevend:start]
                rv.append((name, val))
            if start == end:
                # empty means we've reached the end of the headers
                return rv
            prevend = end
            name = string.lower(str(self.message)[namestart:nameend])
    def __getitem__(self, itemname):
        items = self.items()
        for name, val in items:
            if name == string.lower(itemname):
                return val
        raise KeyError, itemname
    def __delitem__(self, itemname):
        if not self.has_key(itemname): raise KeyError, itemname
        self.modifyself(itemname)
    def __setitem__(self, itemname, newval):
        self.modifyself(itemname, itemname, newval)
    def modifyself(self, delname=None, addname=None, addval=None):
        prevend, prevstart, newver, name = 0, 0, [], None
        while 1:
            start, end, namestart, nameend = self.nextheader(prevend)
            if name is not None:
                if name != string.lower(delname):
                    newver.append(str(self.message)[prevstart:start])
            if start == end:
                if addname is not None:
                    newver.append('\n%s: %s' % (addname, addval))
                newver.append(str(self.message)[start:])
                self.message.set_contents(string.join(newver, ''))
                return
            prevstart, prevend = start, end
            name = string.lower(str(self.message)[namestart:nameend])

def main(argv):
    if len(argv) > 1 and argv[1] == '-o' and len(argv) == 4:
        qdir, ndays = argv[2:]
        add_message_to_queue(qdir,
                             encapsulate_message(sys.stdin.read(),
                                                 float(ndays)))
        sys.exit(0)
    elif len(argv) > 1 and argv[1] == '-i' and len(argv) == 5:
        qdir, nmsgs, dest = argv[2:]
        run_queue(qdir, int(nmsgs), dest)
        sys.exit(0)
    elif len(argv) > 1 and argv[1] == '-m' and len(argv) == 4:
        qdir, dest = argv[2:]
        msg = parse_message(sys.stdin.read())
        ndays = float(msg.headers().get('lifetime', '14'))
        msg.headers()['to'] = dest
        add_message_to_queue(qdir, encapsulate_message(msg, ndays))
        sys.exit(0)
    elif len(argv) > 1 and argv[1] == '-l' and len(argv) == 3:
        list_queue(argv[2])
        sys.exit(0)
    else:
        sys.stderr.write(("Usage: %(name)s -o qdir ndays\n" +
                          "or %(name)s -i qdir nmsgs dest@somewhere\n" +
                          "or %(name)s -m qdir dest@somewhere (for mail input)\n"
                          "or %(name)s -l qdir (to list a queue)\n")
                         % {'name': argv[0]})
        sys.exit(1)
        
if __name__ == "__main__": main(sys.argv)




Reply via email to