http://www.mediawiki.org/wiki/Special:Code/MediaWiki/71193
Revision: 71193 Author: daniel Date: 2010-08-17 09:27:50 +0000 (Tue, 17 Aug 2010) Log Message: ----------- effective error recovery Modified Paths: -------------- trunk/extensions/XMLRC/bridge/udp2xmpp.py Modified: trunk/extensions/XMLRC/bridge/udp2xmpp.py =================================================================== --- trunk/extensions/XMLRC/bridge/udp2xmpp.py 2010-08-17 05:43:00 UTC (rev 71192) +++ trunk/extensions/XMLRC/bridge/udp2xmpp.py 2010-08-17 09:27:50 UTC (rev 71193) @@ -84,17 +84,22 @@ self.loglevel = LOG_VERBOSE self.wiki_info = wiki_info; + def append_exec_info(self, message, error_type = None, error_value = None, trbk = None): + if trbk and not error_type: + message = message + "\n" + " ".join( traceback.format_tb( trbk ) ) + elif error_type: + message = message + " * " + " ".join( traceback.format_exception( error_type, error_value, trbk ) ) + + return message + def warn(self, message, error_type = None, error_value = None, trbk = None): if self.loglevel >= LOG_QUIET: - if trbk and not error_type: - message = message + "\n" + " ".join( traceback.format_tb( trbk ) ) - elif error_type: - message = message + " * " + " ".join( traceback.format_exception( error_type, error_value, trbk ) ) - + message = self.append_exec_info( message, error_type, error_value, trbk ) sys.stderr.write( "WARNING: %s\n" % ( message.encode( self.console_encoding ) ) ) - def info(self, message): + def info(self, message, error_type = None, error_value = None, trbk = None): if self.loglevel >= LOG_VERBOSE: + message = self.append_exec_info( message, error_type, error_value, trbk ) sys.stderr.write( "INFO: %s\n" % ( message.encode( self.console_encoding ) ) ) def debug(self, message): @@ -292,51 +297,55 @@ self.debug( "relying RC message: %s" % m ) return t.send_message( m, rc ) + def check_connections( self, connection_sockets, broken, context, exec_info = (), test = True ): + remove = set() + c = 0 + + for sock, conn in connection_sockets.items(): + if ( not test and not conn.is_connected() ) or ( test and not conn.test_connection() ): + if test: + self.warn( "is_connected for connection %s returned false (%s)" % (repr(conn), context), *exec_info ); + else: + self.warn( "test_connection for connection %s failed (%s)" % (repr(conn), context), *exec_info ); + + broken.add(conn) + remove.add(sock) + c += 1 + + for sock in remove: + del connection_sockets[ sock ] + + return c + def select_connections( self, connection_sockets, broken, timeout = 1 ): - waiting = [] + waiting = set() + self.check_connections( connection_sockets, broken, "prior to socket-select", test = False ) + try: (in_socks , out_socks, err_socks) = select.select(connection_sockets.keys(),[],connection_sockets.keys(),1) for sock in err_socks: - con = connection_sockets[ sock ] - self.warn("exception in socket %s, connection %s" % (repr(sock), repr(con))); + conn = connection_sockets[ sock ] + self.warn("exception in socket %s, connection %s" % (repr(sock), repr(conn))); - broken.append( con ) + broken.add( conn ) del connection_sockets[ sock ] for sock in in_socks: - con = connection_sockets[ sock ] - waiting.append( con ) + conn = connection_sockets[ sock ] + waiting.add( conn ) except socket.error, e: - error_type, error_value, trbk = sys.exc_info() - found = False + found = self.check_connections( connection_sockets, broken, "after exception", test = True, exec_info = sys.exc_info() ) - for sock, conn in connection_sockets.items(): - if not conn.test_connection(): - self.warn("test_connection for connection %s failed after exception" % repr(con), error_type, error_value, trbk); - found = True - - broken.append(conn) - del connection_sockets[ sock ] - - if not found: + if found == 0: self.warn("exception ocurred, but all connections seem valid!", error_type, error_value, trbk); except IOError, e: - error_type, error_value, trbk = sys.exc_info() - found = False + found = self.check_connections( connection_sockets, broken, "after exception", test = True, exec_info = sys.exc_info() ) - for sock, conn in connection_sockets.items(): - if not conn.test_connection(): - self.warn("test_connection for connection %s failed after exception" % repr(con), error_type, error_value, trbk); - found = True - - broken.append(conn) - del connection_sockets[ sock ] - - if not found: + if found == 0: self.warn("exception ocurred, but all connections seem valid!", error_type, error_value, trbk); return waiting @@ -365,7 +374,6 @@ try: conn.reconnect() - broken.remove( conn ) connection_sockets[ conn.get_socket() ] = conn self.info( "reconnected %s!" % repr(conn) ) @@ -373,6 +381,9 @@ error_type, error_value, trbk = sys.exc_info() self.warn( "Error during reconnect for connection %s!" % repr(conn), error_type, error_value, trbk ) + if len(broken) >0: + broken -= set( connection_sockets.values() ) + for conn in waiting: try: conn.process() @@ -381,7 +392,7 @@ if not conn.test_connection(): self.warn("test_connection for connection %s failed after exception in process()" % repr(conn), error_type, error_value, trbk); - broken.append(conn) + broken.add(conn) del connection_sockets[ conn.get_socket() ] else: self.info("connection %s seems to be valid after exception in process()" % repr(conn), error_type, error_value, trbk); @@ -391,8 +402,8 @@ self.info("service loop terminated, disconnecting") - for con in connections: - con.close() + for conn in connections: + conn.close() self.info("done.") @@ -431,6 +442,8 @@ self.backoff_factor = backoff_factor self.backoff_max_tock = backoff_max_tock + self.connect_info = None + def process( self ): self.jabber.Process(1) @@ -458,14 +471,14 @@ return MucChannel( self, room_jid, room_nick ) - def process_message(self, con, message): + def process_message(self, conn, message): if (message.getError()): self.warn("received %s error from <%s>: %s" % (message.getType(), message.getError(), message.getFrom() )) elif message.getBody(): if message.getFrom().getResource() != self.jid.getNode(): #FIXME: this inly works if no different nick was specified when joining the channel self.debug("discarding %s message from <%s>: %s" % (message.getType(), message.getFrom(), message.getBody().strip() )) - def process_iq(self, con, iq): + def process_iq(self, conn, iq): self.debug("received iq: %s" % repr(iq)) self.last_iq = iq @@ -474,22 +487,27 @@ return resource; - def connect( self, jid, password ): + def connect( self, jid, password, port = 5222, host = None ): + self.connect_info = { 'jid': jid, 'password': password, 'port': port, 'host': host } + if type( jid ) != object: jid = xmpp.protocol.JID( jid ) if jid.getResource() is None: jid = xmpp.protocol.JID( host= jid.getDomain(), node= jid.getNode(), resource = self.guess_local_resource() ) - self.jabber = xmpp.Client(jid.getDomain(),debug=self.xmpp_debug) - con= self.jabber.connect( secure = self.connection_security ) + if host is None: + host = jid.getDomain() - if not con: - self.warn( 'could not connect to %s!' % jid.getDomain() ) + self.jabber = xmpp.Client( host, port = port, debug = self.xmpp_debug ) + conn= self.jabber.connect( secure = self.connection_security ) + + if not conn: + self.warn( 'could not connect to %s:%s!' % (host, port) ) return False - self.debug( 'connected with %s' % con ) + self.debug( 'connected with %s' % conn ) auth= self.jabber.auth( jid.getNode(), password, resource= jid.getResource() ) @@ -512,7 +530,7 @@ self.on_connect() - return con + return conn def on_connect( self ): self.jabber.sendInitPresence(self) @@ -525,8 +543,13 @@ self.relay.join_channels() #FIXME: this re-joins *all* channels. not just the ones for this connection! def get_socket( self ): - return self.jabber.Connection._sock + try: + return self.jabber.Connection._sock + except AttributeError: + pass + return None + def is_connected( self ): return self.jabber.isConnected() @@ -578,7 +601,7 @@ return self.last_iq def reconnect_backoff( self ): - self.debug( "reconnect_backoff: tick = %d, tock = %d" ) + self.debug( "reconnect_backoff: tick = %d, tock = %d" % (self.backoff_tick, self.backoff_tock) ) if self.backoff_tick <= 0: self.backoff_tock = min( self.backoff_tock + 1, self.backoff_max_tock ) @@ -595,8 +618,8 @@ except: pass - self.jabber.reconnectAndReauth(self) - self.on_connect() + if self.connect_info: + self.connect( **self.connect_info ) class CommandConnection (Connection): def __init__( self, relay, socket ): @@ -864,6 +887,8 @@ config.set( 'XMPP', 'message-encoding', 'utf-8' ) config.set( 'XMPP', 'debug-flags', 'client|component|got' ) config.set( 'XMPP', 'security', 'auto' ) + config.set( 'XMPP', 'port', '5222' ) + config.set( 'XMPP', 'host', '' ) # read config file........ if not config.read( cfg ): @@ -921,6 +946,12 @@ relay.loglevel = options.loglevel + xmpp_port = config.getint( 'XMPP', 'port' ) + xmpp_host = config.get( 'XMPP', 'host' ) + + if xmpp_host == '': + xmpp_host = None + xmpp_debug = [] if options.xmpp_debug: xmpp_debug = config.get( 'XMPP', 'debug-flags' ).split("|") @@ -938,7 +969,7 @@ # -- DO STUFF ----------------------------------------------------------------------------------- # connect................ - if not xmpp_con.connect( jid = config.get( 'XMPP', 'jid' ), password = config.get( 'XMPP', 'password' ) ): + if not xmpp_con.connect( jid = config.get( 'XMPP', 'jid' ), password = config.get( 'XMPP', 'password' ), port = xmpp_port, host = xmpp_host ): sys.exit(1) if not udp_con.connect( port = config.getint( 'UDP', 'port' ), interface = config.get( 'UDP', 'interface' ) ): _______________________________________________ MediaWiki-CVS mailing list MediaWiki-CVS@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs