We had this bug in production. Attached file is a stress test I wrote to reproduce and understand the issue.
import asyncore, logging, os, signal, shutil, sys, tempfile, time import transaction import ZODB from ZODB.FileStorage import FileStorage from ZODB.POSException import ConflictError from ZODB.tests.ConflictResolution import PCounter from ZODB.tests.MinPO import MinPO from ZEO.ClientStorage import ClientStorage from ZEO.StorageServer import StorageServer from ZEO.tests.forker import get_port
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') class PRandomlyConflictingCounter(PCounter): def _p_resolveConflict(self, oldState, savedState, newState): if newState.get('_value', 0) % 5: return PCounter._p_resolveConflict( self, oldState, savedState, newState) raise ConflictError def fork(name): pid = os.fork() if pid: return pid handler = logging.FileHandler("%s-%s.log" % (name, os.getpid())) handler.setFormatter(formatter) logging.getLogger().addHandler(handler) class Server(object): def __init__(self, name): addr = "localhost", get_port() self.pid = fork("server") if self.pid: self.client = lambda *args, **kw: \ ClientStorage(addr, name, *args, **kw) else: path = "%s.fs" % os.getpid() server = StorageServer(addr, {name: FileStorage(path)}) asyncore.loop() assert 0 class main(object): def __new__(cls): self = object.__new__(cls) self.wd = None self.servers = [] self.clients = [] pid = os.getpid() try: self.wd = tempfile.mkdtemp() os.chdir(self.wd) root = logging.getLogger() root.setLevel(logging.DEBUG) assert not root.handlers self.init_server(PCounter()) self.init_server(PRandomlyConflictingCounter()) for i in xrange(4): self.clients.append(self.run_client()) signal.pause() finally: if pid != os.getpid(): os._exit(0) elif self.wd: for server in self.servers: try: os.waitpid(server.pid, 0) except: pass for pid in self.clients: try: os.waitpid(pid, 0) except: pass #shutil.rmtree(self.wd) def init_server(self, obj): server = Server(str(len(self.servers))) self.servers.append(server) db = ZODB.DB(server.client()) cn = db.open() cn.root()[None] = obj transaction.commit() server.oid = obj._p_oid cn.close() def run_client(self): pid = fork("client") if pid: return pid self.cn = [ZODB.DB(server.client()).open() for server in self.servers] delay = len(self.clients) * 0.1 while 1: try: if delay: time.sleep(delay) for i, cn in enumerate(self.cn): cn.get(self.servers[i].oid).inc() transaction.commit() except ConflictError: transaction.abort() if __name__ == "__main__": main()
signature.asc
Description: OpenPGP digital signature
_______________________________________________ For more information about ZODB, see http://zodb.org/ ZODB-Dev mailing list - ZODB-Dev@zope.org https://mail.zope.org/mailman/listinfo/zodb-dev