Note that this program uses stuff from MetaPy, which I posted here previously, and which is also available from http://pobox.com/~kragen/sw/MetaPy-6.tar.gz.
#!/usr/bin/python # I have some lists that have the following characteristics: # - not time-sensitive: it's generally OK if stuff gets sent out some # time after I write it # - limited output bandwidth: people read them partly because # they only get a few emails a day, and so I don't want to send # lots of output in a day # - bursty input: while the average input bandwidth is far less # than the output bandwidth, the input bandwidth often varies # widely. # So this is a rate-limiting queue system with earliest-deadline-first # scheduling. You feed it messages, each with the number of days # within which you'd like the message to be posted; it computes the # deadline by adding that number of days to the time it receives it. # This program runs as a cron job to fetch the N messages from the # queue with the earliest deadlines; usually N is 1. # Each queue is a file in RFC-822 format with one header: Deadline, # which is expressed in time_t format. The body of the message # (normally an RFC-822 message) is fed to sendmail to send it. # where is MetaPy? import sys sys.path.insert(0, '/home/kragen/devel/metapy') import os, string, sys, time, errno, re, MetaPy.Sugar def add_message_to_queue(queuedir, message): """Find an unused filename and write the message into it.""" ii = 0 while 1: filename = "file%03d" % ii filename = os.path.join(queuedir, filename) try: fd = os.open(filename, os.O_WRONLY|os.O_EXCL|os.O_CREAT) break except OSError, val: errnum, msg = val if errnum != errno.EEXIST: raise ii = ii + 1 file = os.fdopen(fd, 'w') file.write(message) file.close() return filename def read_file(filename): "Return an object representing the contents of file 'filename'." rv = parse_message(open(filename).read()) rv.filename = filename return rv def queue_contents(queuedir): "Read the contents of the queue from the filesystem and return it, sorted." rv = map(lambda filename, q=queuedir: read_file(os.path.join(q, filename)), os.listdir(queuedir)) rv.sort(lambda a, b: cmp(int(a.headers()['deadline']), int(b.headers()['deadline']))) return rv sendmailcmd = "/usr/lib/sendmail '%s'" sendmailcmd = "echo '%s'; cat" def send_mail(message, dest): "Send a message and remove it from the queue." sendmail = os.popen(sendmailcmd % dest, 'w') sendmail.write(message.body()) rv = sendmail.close() if rv is None: os.unlink(message.filename) else: sys.stderr.write("Sendmail produced nonzero exit code %s\n" % rv) def earlyhours(message): "Return the number of seconds and hours until a message's deadline." earliness = int(message.headers()['deadline']) - time.time() return earliness, int(earliness/3600) def run_queue(queuedir, n, dest): "Select the first N mails in the queue and send them." contents = queue_contents(queuedir) for message in contents[:n]: send_mail(message, dest) earliness, hours = earlyhours(message) if earliness > 0: print "message sent %d hours early" % hours else: print "MESSAGE SENT %d HOURS LATE" % -hours def list_queue(queuedir): "List the contents of the queue." contents = queue_contents(queuedir) for message in contents: headers = message.headers() subject = parse_message(message.body()).headers()['subject'] while subject[-1] == '\n' or len(subject) > 50: subject = subject[:-1] print "%10s %4d %50s" % (message.filename, earlyhours(message)[1], subject) def encapsulate_message(message, ndays): "Encapsulate a message in an RFC-822 message with a deadline header." seconds_per_day = 24 * 60 * 60 now = int(time.time()) deadline = now + seconds_per_day * ndays return "Deadline: %d\n\n%s" % (deadline, message) class parse_message: "An RFC-822 message class." twonewlines = re.compile(r"\r?\n\r?\n") def __init__(self, contents): self.contents = contents self.hdrend = None self.bodystart = None def find_twonewlines(self): (self.hdrend, self.bodystart) = self.twonewlines.search(self.contents).span() def gethdrend(self): if self.hdrend is None: self.find_twonewlines() return self.hdrend def headers(self): "This is a method and not an attribute to avoid circular refs." return Headers(self) def set_contents(self, contents): self.contents = contents self.hdrend, self.bodystart = None, None def body(self): if self.bodystart is None: self.find_twonewlines() return str(self)[self.bodystart:] def __str__(self): return self.contents class Headers(MetaPy.Sugar.Mapping): "RFC-822 message headers." headerstart = re.compile(r"^(\S+):\s*", re.MULTILINE) def __init__(self, message): self.message = message def nextheader(self, start=0): hdr = self.headerstart.search(str(self.message), start) hdrend = self.message.gethdrend() if hdr is not None and hdr.start() < hdrend: return hdr.span() + hdr.span(1) else: return (hdrend,) * 4 def items(self): prevend = 0 rv = [] name = None while 1: start, end, namestart, nameend = self.nextheader(prevend) if name is not None: val = str(self.message)[prevend:start] rv.append((name, val)) if start == end: # empty means we've reached the end of the headers return rv prevend = end name = string.lower(str(self.message)[namestart:nameend]) def __getitem__(self, itemname): items = self.items() for name, val in items: if name == string.lower(itemname): return val raise KeyError, itemname def __delitem__(self, itemname): if not self.has_key(itemname): raise KeyError, itemname self.modifyself(itemname) def __setitem__(self, itemname, newval): self.modifyself(itemname, itemname, newval) def modifyself(self, delname=None, addname=None, addval=None): prevend, prevstart, newver, name = 0, 0, [], None while 1: start, end, namestart, nameend = self.nextheader(prevend) if name is not None: if name != string.lower(delname): newver.append(str(self.message)[prevstart:start]) if start == end: if addname is not None: newver.append('\n%s: %s' % (addname, addval)) newver.append(str(self.message)[start:]) self.message.set_contents(string.join(newver, '')) return prevstart, prevend = start, end name = string.lower(str(self.message)[namestart:nameend]) def main(argv): if len(argv) > 1 and argv[1] == '-o' and len(argv) == 4: qdir, ndays = argv[2:] add_message_to_queue(qdir, encapsulate_message(sys.stdin.read(), float(ndays))) sys.exit(0) elif len(argv) > 1 and argv[1] == '-i' and len(argv) == 5: qdir, nmsgs, dest = argv[2:] run_queue(qdir, int(nmsgs), dest) sys.exit(0) elif len(argv) > 1 and argv[1] == '-m' and len(argv) == 4: qdir, dest = argv[2:] msg = parse_message(sys.stdin.read()) ndays = float(msg.headers().get('lifetime', '14')) msg.headers()['to'] = dest add_message_to_queue(qdir, encapsulate_message(msg, ndays)) sys.exit(0) elif len(argv) > 1 and argv[1] == '-l' and len(argv) == 3: list_queue(argv[2]) sys.exit(0) else: sys.stderr.write(("Usage: %(name)s -o qdir ndays\n" + "or %(name)s -i qdir nmsgs dest@somewhere\n" + "or %(name)s -m qdir dest@somewhere (for mail input)\n" "or %(name)s -l qdir (to list a queue)\n") % {'name': argv[0]}) sys.exit(1) if __name__ == "__main__": main(sys.argv)