Now it features a /moron command to translate a certain dialect into English and a /shorten command to toggle automatic shortening of URLs.
I added the /shorten feature because people kept pasting these annoying shortened URLs into ICB; I couldn't tell if I'd visited them or not, or whether they looked like something I'd be interested in. But they kept pasting them, because other people's ICB clients were too dumb to handle long URLs --- typically they'd word-wrap them so they couldn't be easily pasted into a browser. With /shorten turned on, whenever someone utters a URL over 50 characters, the client fires off a request to csua.org for a shortened version of the same URL. (makeashorterlink.com and smlnk.com have similar services.) When it gets a response, it posts the response to the channel. Now, I hope, people will paste the full URLs --- it's easier than shortening them by hand --- and rely on my client to do the shortening. Other recently-added features include timestamping of chat logs, word-wrapping of sent messages, and dynamic upgrades, but I think those were already present last time I sent it out. Run it under emacs shell-mode for best results; once you've connected, you'll need to type something hideous like /eval self.icbconn.login('Temptress', 'bhollima', '1') to get started. This version is no longer compatible with Python 1.5.2, although that could be fixed easily if it bothers somebody. #!/usr/bin/python #internationalCB client #216.200.125.157 import asyncore, socket, string, traceback, sys, time, re, urlparse, urllib # formatters def format_msg(type, msg): nick, text = msg return "*%s* %s" % (nick, text) def format_openmsg(type, msg): nick, text = msg return "<%s> %s" % (nick, text) def format_info_msg(type, msg): cat, text = msg return "[=%s=] %s" % (cat, text) def format_beep(type, msg): return "[=Beep=]\a %s just beeped you" % msg[0] def format_tick(type, msg): year, month, day, hh, mm, ss, wd, jd, dst = time.localtime(msg[0]) return ("[=Bong=] The time is now %04d-%02d-%02d %02d:%02d:%02d" % (year, month, day, hh, mm, ss)) def format_cmd_output(type, msg): type = msg[0] if type == 'co': return "*> " + msg[1] elif type == 'wl': return "*> " + format_who_line(msg) else: return "*>-" + repr(msg) def modfmt(mod): if mod == ' ': return ' ' else: return '*' def idlefmt(idle): idle = int(idle) if idle == 0: return '0' elif idle < 60: return '%ss' % idle elif idle < 3600: return "%sm%ss" % (idle/60, idle%60) else: return "%sh%sm" % ((idle/3600), (idle/60) % 60) def timefmt(then): return idlefmt(time.time() - int(then)) def format_who_line(msg): wl, mod, nick, idle, resp, logintime, user, host, reg = msg return "%13s %8s %8s %s@%s %s" % (modfmt(mod) + nick, idlefmt(idle), timefmt(logintime), user, host, reg) def format_login_pkt(type, msg): return '' def format_protocol_pkt(type, msg): ver = msg[0] if len(msg) > 1: host = msg[1] else: host = "unknown host" if len(msg) > 2: server = msg[2] else: server = "unknown server" return "* protocol version %s on %s running %s" % (repr(ver), repr(host), repr(server)) def format_error(type, msg): return "!!!ERROR!!! " + repr(msg[0]) def demoronize(msg): """Scrub text from people who can't be bothered to speak English.""" msg = re.sub(r'\bu\b', 'you', msg) msg = re.sub(r'\br\b', 'are', msg) msg = re.sub(r'\.{3,}', '.', msg) msg = re.sub(r'\bi\b', 'I', msg) msg = re.sub(r'\blol\b', 'heh', msg) return msg def wrap(maxlen, msg): """Break a string into a list of maxlen-char or less "lines". Word-wrap if possible. """ if len(msg) <= maxlen: return [msg] else: # XXX: allow more than just space? word_break = string.rfind(msg, ' ', 0, maxlen) if word_break == -1: # no word break! wrap_point = maxlen else: # If moving the word onto the next line isn't going to # decrease the number of times the word gets broken, # just go ahead and break it here. word_end = string.find(msg, ' ', word_break+1) if word_end == -1: word_end = len(msg) word_len = word_end - word_break # If the word is shorter than maxlen, it will fit on one # line, which means it doesn't need to be broken; # otherwise, if the "slack" characters before and after # the full maxlen-length lines will all fit at the end of # this line, put them there; but if they won't, putting # some of them at the end of this line won't help anyway. if word_len > maxlen and word_len % maxlen < maxlen - word_break: wrap_point = maxlen else: wrap_point = word_break + 1 return [msg[:wrap_point]] + wrap(maxlen, msg[wrap_point:]) def dump_exc(): a, b, c = sys.exc_info() msg = traceback.format_exception(a, b, c) return string.join(msg, '') class upgradable: def upgrade(self, mod): self.__class__ = getattr(mod, self.__class__.__name__) self.postupgrade() def postupgrade(self): pass class httpreq(asyncore.dispatcher_with_send): def __init__(self, url, handler=lambda x: sys.stdout.write(x)): self.scheme, self.netloc, self.path, self.params, self.query, self.fragment = urlparse.urlparse(url) assert self.scheme == 'http' self.handler = handler # is the following level of pain really necessary? self.out_buffer = '' self.set_socket(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) self.socket.setblocking(0) if ':' in self.netloc: host, port = self.netloc.split(':') port = int(port) else: host = self.netloc port = 80 self.connect((host, port)) localpart = urlparse.urlunparse(('', '', self.path, self.params, self.query, '')) self.send('GET %s HTTP/1.0\r\nHost: %s\r\n\r\n' % (localpart, self.netloc)) self.buf = '' def handle_read(self): data = self.recv(4096) if not data: self.handler(self.buf) self.close() else: self.buf = self.buf + data def handle_connect(self): pass def handle_close(self): self.close() def shorten_url(url, callback): """Arranges to call 'callback' with a shortened version of 'url'.""" def find_shortened_url(bigstring): lines = bigstring.split('\n') for line in lines: pos = line.find('href="http://csua.org/u/') if pos != -1: start = pos + len('href="') end = line.find('"', start) return line[start:end] def call_callback(bigstring, callback=callback, find_shortened_url=find_shortened_url): callback(find_shortened_url(bigstring)) httpreq('http://csua.org/u/index.html?url=%s' % urllib.quote(url), call_callback) class icb(asyncore.dispatcher_with_send, upgradable): formatters = {'a': format_login_pkt, 'b': format_openmsg, 'c': format_msg, 'd': format_info_msg, 'e': format_error, 'i': format_cmd_output, 'j': format_protocol_pkt, 'k': format_beep } personmsgs = 'bc' tickinterval = 3600 # one hour def __init__(self, hostport): self.addr = hostport self.connected = 0 self.out_buffer = '' self.set_socket(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) self.socket.setblocking(0) self.connect(hostport) # in ICB, messages are composed of packets. Most messages are # transmitted in single packets; but packets begin with a length byte, # and a length byte of \0 means that the packet is 255 bytes long and # the next packet contains more of the same message. # Messages consist of a type byte, followed by zero or more fields # separated by control-A, followed by a NUL byte. self.incoming_pkt = '' self.output = sys.stdout self.debug = 0 self.lasttick = 0 self.morons = {} self.shortening_urls = 0 def postupgrade(self): # attributes for new functionality go here so someone can /reload # without causing errors if not hasattr(self, 'shortening_urls'): self.shortening_urls = 0 def toggle_debug(self): self.debug = not self.debug def send_output_to(self, other): self.output = other def handle_read(self): data = self.recv(4096) self.incoming_pkt = self.incoming_pkt + data self.try_to_parse_pkt() def try_to_parse_pkt(self): if self.incoming_pkt == '': return length = ord(self.incoming_pkt[0]) if length != 0: if len(self.incoming_pkt) > length: msgdata = self.incoming_pkt[1:length+1] self.incoming_pkt = self.incoming_pkt[length+1:] try: self.handle_message(msgdata) except: self.output.write(dump_exc()) self.try_to_parse_pkt() # there might be more packets waiting # if length of buffer is too short, deal with it later def tick(self): """Insert the current time into the output stream. When reading chat logs, I often find that I wish I had some idea of when something or another was said. Some clients provide for timestamping every message, but this is obtrusive in a simple ASCII stream, and if it provided information I occasionally want, like what day things were said, it would be far more obtrusive. Since it's so obtrusive, it's an option --- and it's usually turned off, which means the information is rarely present when I want it! So this client inserts timestamps in the log every hour --- or, more accurately, when it has been an hour since the last message, the next new message will have a timestamp added above it. This keeps timestamp volume to a maximum of one timestamp per message or one timestamp per hour. """ now = time.time() if now - self.lasttick > self.tickinterval: self.output.write(format_tick('tick', (now,)) + "\n") self.lasttick = now def filterpersonmsg(self, msgfields): nick, text = msgfields if self.morons.has_key(string.lower(nick)): text = demoronize(text) return nick, text def handle_message(self, msgdata): self.tick() type = msgdata[0] msgfields = string.split(msgdata[1:-1], '\001') if self.formatters.has_key(type): if type in self.personmsgs: msgfields = self.filterpersonmsg(msgfields) self.output.write(self.formatters[type](type, msgfields) + "\n") if type == 'b' and self.shortening_urls: self.shorten_urls(msgfields[1]) else: self.output.write(repr((type, msgfields)) + "\n") def login(self, login, nick, group='1'): self.send(login_pkt(login, nick, group)) def cmd(self, cmd, args): self.send(construct_msg_pkts('h', [cmd, args])) def openmsg(self, msg): # max message length, exclusive of length byte, is 255; # that includes the type byte and the terminating \0. So we can # only fit 253 bytes in a packet. # but when text is wrapped there, the icb server chops off the # last five characters. And 248 still chopped off a char once. # ... the problem is that this doesn't include nick length. # Mine is 7. for chunk in wrap(247, msg): self.send(construct_msg_pkts('b', [chunk])) def send_shorter_url(self, text): self.openmsg("[shorturlbot] shortened URL is %s" % text) def shorten_urls(self, text): urls = re.findall('http://[^<>() ]*[^<>() ,.:;]', text) for url in urls: if len(url) > 50 and url.find('http://csua.org') != 0: self.output.write("* shortening url %s (/shorten to turn off)\n" % url) shorten_url(url, self.send_shorter_url) # The server doesn't support these packets. # def ping(self, msgid): # self.send(construct_msg_pkts('l', [msgid])) def pong(self, msgid): self.send(construct_msg_pkts('m', [msgid])) def conv(self, nick): return Conversation(self, nick) def send(self, msg): if self.debug: self.output.write("[sending] %s" % repr(msg)) return asyncore.dispatcher_with_send.send(self, msg) def moron(self, nick): "Toggle nick's moron status." nick = string.lower(nick) if self.morons.has_key(nick): del self.morons[nick] self.output.write("[=Unmoron=] %s is no longer a moron\n" % nick) else: self.morons[nick] = 1 self.output.write("[=Moron=] %s is a moron\n" % nick) def toggle_shortening(self): self.shortening_urls = not self.shortening_urls if self.shortening_urls: self.output.write("[=Shortening=] Shortening long URLs\n") else: self.output.write("[=NotShortening=] No longer shortening long URLs\n") def turn_shortening_off(self): if self.shortening_urls: self.toggle_shortening() def construct_msg_pkts(type, fields): for field in fields: if '\001' in field or '\000' in field: raise "You can't put that in an ICB field!", field msg = type + string.join(fields, '\001') + '\000' return chr(len(msg)) + msg def login_pkt(login, nick, group='1'): return construct_msg_pkts('a', [login, nick, group, 'login', '']) class Conversation: def __init__(self, connection, nick): self.conn = connection self.nick = nick def __call__(self, msg): self.conn.cmd('m', '%s %s' % (self.nick, msg)) class UI(asyncore.dispatcher_with_send, upgradable): def __init__(self, icbconn): self.icbconn = icbconn self.icbconn.send_output_to(self) self.out_buffer = '' self.inbuf = '' def handle_read(self): self.inbuf = self.inbuf + self.socket.recv(4096) self.try_to_do_lines() # maybe this method should be called 'snort' def try_to_do_lines(self): nl = string.find(self.inbuf, '\n') if nl != -1: line = self.inbuf[0:nl] self.inbuf = self.inbuf[nl+1:] try: self.do_line(line) except: print dump_exc() self.try_to_do_lines() def do_line(self, line): while line != '' and line[-1] in '\r\n': line = line[:-1] if line != '' and line[0] == '/': sp = string.find(line, ' ') if sp == -1: self.cmd(line[1:], '') else: self.cmd(line[1:sp], line[sp+1:]) elif line != '': self.icbconn.openmsg(line) else: self.send("- empty message not sent\n") def cmd(self, cmd, args): if cmd == 'eval': sent_result = 0 try: result = repr(eval(args + "\n")) except: self.send(dump_exc()) sent_result = 1 if not sent_result: self.send("- %s\n" % result) elif cmd == 'exec': sent_result = 0 try: exec args except: self.send(dump_exc()) sent_result = 1 if not sent_result: self.send("- OK\n") elif cmd == 'moron': self.icbconn.moron(args) elif cmd == 'shorten': self.icbconn.toggle_shortening() elif cmd == 'j': self.icbconn.cmd(cmd, args) self.icbconn.turn_shortening_off() elif cmd == 'reload': # upgrades a running client modname = args if sys.modules.has_key(modname): mod = reload(__import__(modname)) else: mod = __import__(modname) self.upgrade(mod) self.icbconn.upgrade(mod) self.send("- upgraded\n") elif cmd == 'quit': for key in asyncore.socket_map.keys(): key.close() try: del asyncore.socket_map[key] except KeyError: pass else: self.icbconn.cmd(cmd, args) def write(self, data): return self.send(data) def run_server(): #icbconn = icb(('default.icb.net', 7326)) #old server: icbconn = icb(('165.227.32.110', 7326)) icbconn = icb(('216.200.125.157', 7326)) ui = UI(icbconn) ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ss.bind(('127.0.0.1', 2743)) ss.listen(5) conn, addr = ss.accept() conn.setblocking(0) ui.set_socket(conn) ss.close() while asyncore.socket_map.keys(): asyncore.poll(1) def cmdline_client(): icbconn = icb(('default.icb.net', 7326)) ui = UI(icbconn) uisock = asyncore.file_wrapper(sys.stdin.fileno()) uisock.send = uisock.write # python 1.5.2 backwards compat hack ui.set_socket(uisock) while asyncore.socket_map.keys(): asyncore.poll(1) if __name__ == "__main__": cmdline_client()