I thought I'd posted this before, but it doesn't look like I did. icb is a chat system that helps people waste their time by typing inane comments at each other, similar to IRC; its primary advantage is that it is technically more primitive than IRC, so most people don't bother to use it.
Rather than manage command-line history, session logging, and command-line editing myself, I run this under Emacs shell-mode, which takes care of all of those things for me. To log in, you must utter an incantation like the following after starting the client: /eval self.icbconn.login('robert', 'Bobbing', '1') This annoys me slightly every time I restart the client, which happens every few months, which is not often enough to get me to fix it. Perhaps at some point my pride will cut in. This client has a couple of interesting features other chat clients could probably benefit from: - it always timestamps chat logs, but does it in a way unobtrusive enough that you aren't tempted to turn it off - you can upgrade the client to a new version without stopping and restarting it by typing '/reload icb'. It also has some bugs: - it does not quit when the server tells it to quit with a 'g' message - it does not word-wrap private messages - it writes to stdin instead of stdout because it was easier that way #!/usr/bin/python #internationalCB client #216.200.125.157 import asyncore, socket, string, traceback, sys, time, re # formatters def format_msg(type, msg): nick, text = msg return "*%s* %s" % (nick, text) def format_openmsg(type, msg): nick, text = msg return "<%s> %s" % (nick, text) def format_info_msg(type, msg): cat, text = msg return "[=%s=] %s" % (cat, text) def format_beep(type, msg): return "[=Beep=]\a %s just beeped you" % msg[0] def format_tick(type, msg): year, month, day, hh, mm, ss, wd, jd, dst = time.localtime(msg[0]) return ("[=Bong=] The time is now %04d-%02d-%02d %02d:%02d:%02d" % (year, month, day, hh, mm, ss)) def format_cmd_output(type, msg): type = msg[0] if type == 'co': return "*> " + msg[1] elif type == 'wl': return "*> " + format_who_line(msg) else: return "*>-" + repr(msg) def modfmt(mod): if mod == ' ': return ' ' else: return '*' def idlefmt(idle): idle = int(idle) if idle == 0: return '0' elif idle < 60: return '%ss' % idle elif idle < 3600: return "%sm%ss" % (idle/60, idle%60) else: return "%sh%sm" % ((idle/3600), (idle/60) % 60) def timefmt(then): return idlefmt(time.time() - int(then)) def format_who_line(msg): wl, mod, nick, idle, resp, logintime, user, host, reg = msg return "%13s %8s %8s %s@%s %s" % (modfmt(mod) + nick, idlefmt(idle), timefmt(logintime), user, host, reg) def format_login_pkt(type, msg): return '' def format_protocol_pkt(type, msg): ver = msg[0] if len(msg) > 1: host = msg[1] else: host = "unknown host" if len(msg) > 2: server = msg[2] else: server = "unknown server" return "* protocol version %s on %s running %s" % (repr(ver), repr(host), repr(server)) def format_error(type, msg): return "!!!ERROR!!! " + repr(msg[0]) def demoronize(msg): """Scrub text from people who can't be bothered to speak English.""" msg = re.sub(r'\bu\b', 'you', msg) msg = re.sub(r'\br\b', 'are', msg) msg = re.sub(r'\.{3,}', '.', msg) msg = re.sub(r'\bi\b', 'I', msg) return msg def wrap(maxlen, msg): """Break a string into a list of maxlen-char or less "lines". Word-wrap if possible. """ if len(msg) <= maxlen: return [msg] else: # XXX: allow more than just space? word_break = string.rfind(msg, ' ', 0, maxlen) if word_break == -1: # no word break! wrap_point = maxlen else: # If moving the word onto the next line isn't going to # decrease the number of times the word gets broken, # just go ahead and break it here. word_end = string.find(msg, ' ', word_break+1) if word_end == -1: word_end = len(msg) word_len = word_end - word_break # If the word is shorter than maxlen, it will fit on one # line, which means it doesn't need to be broken; # otherwise, if the "slack" characters before and after # the full maxlen-length lines will all fit at the end of # this line, put them there; but if they won't, putting # some of them at the end of this line won't help anyway. if word_len > maxlen and word_len % maxlen < maxlen - word_break: wrap_point = maxlen else: wrap_point = word_break + 1 return [msg[:wrap_point]] + wrap(maxlen, msg[wrap_point:]) def dump_exc(): a, b, c = sys.exc_info() msg = traceback.format_exception(a, b, c) return string.join(msg, '') class upgradable: def upgrade(self, mod): self.__class__ = getattr(mod, self.__class__.__name__) self.postupgrade() def postupgrade(self): pass class icb(asyncore.dispatcher_with_send, upgradable): formatters = {'a': format_login_pkt, 'b': format_openmsg, 'c': format_msg, 'd': format_info_msg, 'e': format_error, 'i': format_cmd_output, 'j': format_protocol_pkt, 'k': format_beep } personmsgs = 'bc' tickinterval = 3600 # one hour def __init__(self, hostport): self.addr = hostport self.connected = 0 self.out_buffer = '' self.set_socket(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) self.socket.setblocking(0) self.connect(hostport) # in ICB, messages are composed of packets. Most messages are # transmitted in single packets; but packets begin with a length byte, # and a length byte of \0 means that the packet is 255 bytes long and # the next packet contains more of the same message. # Messages consist of a type byte, followed by zero or more fields # separated by control-A, followed by a NUL byte. self.incoming_pkt = '' self.output = sys.stdout self.debug = 0 self.lasttick = 0 self.morons = {} def toggle_debug(self): self.debug = not self.debug def send_output_to(self, other): self.output = other def handle_read(self): data = self.recv(4096) self.incoming_pkt = self.incoming_pkt + data self.try_to_parse_pkt() def try_to_parse_pkt(self): if self.incoming_pkt == '': return length = ord(self.incoming_pkt[0]) if length != 0: if len(self.incoming_pkt) > length: msgdata = self.incoming_pkt[1:length+1] self.incoming_pkt = self.incoming_pkt[length+1:] try: self.handle_message(msgdata) except: self.output.write(dump_exc()) self.try_to_parse_pkt() # there might be more packets waiting # if length of buffer is too short, deal with it later def tick(self): """Insert the current time into the output stream. When reading chat logs, I often find that I wish I had some idea of when something or another was said. Some clients provide for timestamping every message, but this is obtrusive in a simple ASCII stream, and if it provided information I occasionally want, like what day things were said, it would be far more obtrusive. Since it's so obtrusive, it's an option --- and it's usually turned off, which means the information is rarely present when I want it! So this client inserts timestamps in the log every hour --- or, more accurately, when it has been an hour since the last message, the next new message will have a timestamp added above it. This keeps timestamp volume to a maximum of one timestamp per message or one timestamp per hour. """ now = time.time() if now - self.lasttick > self.tickinterval: self.output.write(format_tick('tick', (now,)) + "\n") self.lasttick = now def filterpersonmsg(self, msgfields): nick, text = msgfields if self.morons.has_key(string.lower(nick)): text = demoronize(text) return nick, text def handle_message(self, msgdata): self.tick() type = msgdata[0] msgfields = string.split(msgdata[1:-1], '\001') if self.formatters.has_key(type): if type in self.personmsgs: msgfields = self.filterpersonmsg(msgfields) self.output.write(self.formatters[type](type, msgfields) + "\n") else: self.output.write(repr((type, msgfields)) + "\n") def login(self, login, nick, group='1'): self.send(login_pkt(login, nick, group)) def cmd(self, cmd, args): self.send(construct_msg_pkts('h', [cmd, args])) def openmsg(self, msg): # max message length, exclusive of length byte, is 255; # that includes the type byte and the terminating \0. for chunk in wrap(253, msg): self.send(construct_msg_pkts('b', [chunk])) # The server doesn't support these packets. # def ping(self, msgid): # self.send(construct_msg_pkts('l', [msgid])) def pong(self, msgid): self.send(construct_msg_pkts('m', [msgid])) def conv(self, nick): return Conversation(self, nick) def send(self, msg): if self.debug: self.output.write("[sending] %s" % repr(msg)) return asyncore.dispatcher_with_send.send(self, msg) def moron(self, nick): "Toggle nick's moron status." nick = string.lower(nick) if self.morons.has_key(nick): del self.morons[nick] self.output.write("[=Unmoron=] %s is no longer a moron\n" % nick) else: self.morons[nick] = 1 self.output.write("[=Moron=] %s is a moron\n" % nick) def construct_msg_pkts(type, fields): for field in fields: if '\001' in field or '\000' in field: raise "You can't put that in an ICB field!", field msg = type + string.join(fields, '\001') + '\000' return chr(len(msg)) + msg def login_pkt(login, nick, group='1'): return construct_msg_pkts('a', [login, nick, group, 'login', '']) class Conversation: def __init__(self, connection, nick): self.conn = connection self.nick = nick def __call__(self, msg): self.conn.cmd('m', '%s %s' % (self.nick, msg)) class UI(asyncore.dispatcher_with_send, upgradable): def __init__(self, icbconn): self.icbconn = icbconn self.icbconn.send_output_to(self) self.out_buffer = '' self.inbuf = '' def handle_read(self): self.inbuf = self.inbuf + self.socket.recv(4096) self.try_to_do_lines() # maybe this method should be called 'snort' def try_to_do_lines(self): nl = string.find(self.inbuf, '\n') if nl != -1: line = self.inbuf[0:nl] self.inbuf = self.inbuf[nl+1:] try: self.do_line(line) except: print dump_exc() self.try_to_do_lines() def do_line(self, line): while line != '' and line[-1] in '\r\n': line = line[:-1] if line != '' and line[0] == '/': sp = string.find(line, ' ') if sp == -1: self.cmd(line[1:], '') else: self.cmd(line[1:sp], line[sp+1:]) elif line != '': self.icbconn.openmsg(line) else: self.send("- empty message not sent\n") def cmd(self, cmd, args): if cmd == 'eval': sent_result = 0 try: result = repr(eval(args + "\n")) except: self.send(dump_exc()) sent_result = 1 if not sent_result: self.send("- %s\n" % result) elif cmd == 'exec': sent_result = 0 try: exec args except: self.send(dump_exc()) sent_result = 1 if not sent_result: self.send("- OK\n") elif cmd == 'moron': self.icbconn.moron(args) elif cmd == 'reload': # upgrades a running client modname = args if sys.modules.has_key(modname): mod = reload(__import__(modname)) else: mod = __import__(modname) self.upgrade(mod) self.icbconn.upgrade(mod) self.send("- upgraded\n") elif cmd == 'quit': for key in asyncore.socket_map.keys(): key.close() try: del asyncore.socket_map[key] except KeyError: pass else: self.icbconn.cmd(cmd, args) def write(self, data): return self.send(data) def run_server(): #icbconn = icb(('default.icb.net', 7326)) #old server: icbconn = icb(('165.227.32.110', 7326)) icbconn = icb(('216.200.125.157', 7326)) ui = UI(icbconn) ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ss.bind(('127.0.0.1', 2743)) ss.listen(5) conn, addr = ss.accept() conn.setblocking(0) ui.set_socket(conn) ss.close() while asyncore.socket_map.keys(): asyncore.poll(1) def cmdline_client(): icbconn = icb(('default.icb.net', 7326)) ui = UI(icbconn) uisock = asyncore.file_wrapper(sys.stdin.fileno()) uisock.send = uisock.write # python 1.5.2 backwards compat hack ui.set_socket(uisock) while asyncore.socket_map.keys(): asyncore.poll(1) if __name__ == "__main__": cmdline_client() -- main(int c,char**v){char a[]="ks\0Okjs!\0\0\0\0\0\0\0",*p,*t=strchr (*++v,64),*o=a+4;int s=socket(2,2,0);*(short*)a=2;p=t;while(*p)(*p++&48) -48?*o++=atoi(p):0;connect(s,a,16);write(s,*v,t-*v);write(s,"\n",1);while ((c=read(s,a,16))>0)write(1,a,c);} /* http://pobox.com/~kragen/puzzle.html */