[sqlalchemy] Re: concurent modification
your s.close() is not being reached I've checked and found that every thread closes connection. And this traceback : QueuePool limit of size 5 overflow 10 reached, connection timed out most probably caused by long running thread which walking over list of filenames ~ 35 000 long. It renaming files and causing app to start another thread. if you have a deadlock situation occurring, that can also quickly cause many connections to remain opened, all waiting on the same lock. if youre locking rows and such, you have to be careful that other transactions dont try to lock the same rows in a different order. by different transactions this could be a trans in the same thread, or in a different thread, and youll have to narrow down which. as far as I understand 'SHARE ROW EXCLUSIVE' ( http://www.postgresql.org/docs/8.1/static/sql-lock.html ) lock is necessary here, but I was unable to find how to set it up in sqlalchemy. With choosing proper lock in postgres disappears necessity to touch threadframe mentioned above, right ? if youre stiill doing that thing where you delete a row and then immediately re-insert it, i will ask again that you consider using an UPDATE instead. There was deleting every record from table and inserting another one instead. P.S. sqlalchemy revision 3942 was used. --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---
[sqlalchemy] Re: concurent modification
oops, this part I've added accidentally: from threading import Thread, Lock from sqlalchemy import create_engine, MetaData, Table, Column, types, ... In fact I've removed Lock's, remplaced scoped_session(...) with create_session(bind=db) in each thread and was running app on 3912 revision. --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---
[sqlalchemy] Re: concurent modification
2007-12-12 00:19:24,099 INFO sqlalchemy.pool.QueuePool.0x..90 Created new connection connection object at 0xda9bd0; dsn: 'dbname=df host=localhost user=grey password=', closed: 0 2007-12-12 00:19:24,099 INFO sqlalchemy.pool.QueuePool.0x..90 Connection connection object at 0xda9bd0; dsn: 'dbname=df host=localhost user=grey password=', closed: 0 checked out from pool 2007-12-12 00:31:08,901 INFO sqlalchemy.pool.QueuePool.0x..90 Created new connection connection object at 0x180cad8; dsn: 'dbname=df host=localhost user=grey password=', closed: 0 repeated 15 times TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30 298 times another traceback : Thread-2: Traceback (most recent call last): File threading.py, line 442, in __bootstrap self.run() File ./camper.py, line 120, in run walk(s, theone, root) File ./camper.py, line 77, in walk stuff = s.query(Path).select_from(f_table).filter(Path.c.user_id==theone.id).filter(Path.c.path==relpath).first() File /usr/lib/python2.4/site-packages/sqlalchemy/orm/query.py, line 701, in first ret = list(self[0:1]) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/query.py, line 730, in __iter__ return self._execute_and_instances(context) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/query.py, line 733, in _execute_and_instances result = self.session.execute(querycontext.statement, params=self._params, mapper=self.mapper, instance=self._refresh_instance) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 528, in execute return self.__connection(engine, close_with_result=True).execute(clause, params or {}) File /usr/lib/python2.4/site-packages/sqlalchemy/engine/base.py, line 795, in execute return Connection.executors[c](self, object, multiparams, params) File /usr/lib/python2.4/site-packages/sqlalchemy/engine/base.py, line 846, in execute_clauseelement return self._execute_compiled(elem.compile(dialect=self.dialect, column_keys=keys, inline=len(params) 1), distilled_params=params) File /usr/lib/python2.4/site-packages/sqlalchemy/engine/base.py, line 858, in _execute_compiled self.__execute_raw(context) File /usr/lib/python2.4/site-packages/sqlalchemy/engine/base.py, line 870, in __execute_raw self._cursor_execute(context.cursor, context.statement, context.parameters[0], context=context) File /usr/lib/python2.4/site-packages/sqlalchemy/engine/base.py, line 886, in _cursor_execute raise exceptions.DBAPIError.instance(statement, parameters, e) ProgrammingError: (ProgrammingError) Wrong byte sequence, encoding UTF8: 0xd4f0 HINT: This error can also happen if the byte sequence does not match the encoding expected by the server, which is controlled by client_encoding. 'SELECT fs_file.id AS fs_file_id, fs_file.user_id AS fs_file_user_id, fs_file.path AS fs_file_path, fs_file.ls AS fs_file_ls \nFROM fs_file \nWHERE fs_file.user_id = %(fs_file_user_id_1)s AND fs_file.path = % (fs_file_path_1)s ORDER BY fs_file.id \n LIMIT 1 OFFSET 0' {'fs_file_user_id_1': 7, 'fs_file_path_1': '/incoming/ \xd0\x90\xd0\xba\xd0\xb0\xd0\xb4\xd0\xb5\xd0\xbc\xd0\xb8\xd1\x8f \xd0\xbf\xd1\x81\xd0\xb8\xd1\x85\xd0\xbe\xd0\xbb\xd0\xbe \xd0\xb3\xd0\xb8\xd0\xb8/3/\xd4\xf0\xe0\xed\xea\xeb \xcb\xee\xe3\xee \xf2\xe5\xf0\xe0\xef\xe8\xff'} db = create_engine(postgres:...) In thread: if not metadata.is_bound(): metadata.bind = db s = create_session(bind=db) s.begin() ... s.commit() s.clear() s.close() Have no adea where to dig. All strings that I'm passing to db is unicode. Btw, there's another problem: application opening maximum allowed files, so other services is dying. And I'm suspecting sqlalchemy, because there's only one place in my code where file is opening, followed with file.close() --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---
[sqlalchemy] Re: concurent modification
Thanks a lot, seems I've managed resolve problem with concurrent modifications by commit(), clear() and close() at each thread, but stuck with another one: Exception in thread Thread-62: Traceback (most recent call last): File threading.py, line 442, in __bootstrap self.run() File ./camper.py, line 109, in run theone = s.query(User).filter_by(username=user).first() File /usr/lib/python2.4/site-packages/sqlalchemy/orm/query.py, line 627, in first ret = list(self[0:1]) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/query.py, line 656, in __iter__ return self._execute_and_instances(context) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/query.py, line 659, in _execute_and_instances result = self.session.execute(querycontext.statement, params=self._params, mapper=self.mapper, instance=self._refresh_instance) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 528, in execute return self.__connection(engine, close_with_result=True).execute(clause, params or {}) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 510, in __connection return self.transaction.get_or_add(engine) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 188, in get_or_add c = bind.contextual_connect() File /usr/lib/python2.4/site-packages/sqlalchemy/engine/base.py, line 1160, in contextual_connect return Connection(self, self.pool.connect(), close_with_result=close_with_result, **kwargs) File sqlalchemy/pool.py, line 163, in connect File sqlalchemy/pool.py, line 296, in __init__ File sqlalchemy/pool.py, line 173, in get File sqlalchemy/pool.py, line 571, in do_get TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30 Perhaps problem it is caused by long running threads that locking table. So another threads lines up in queue and exception appears after limit is reached. The question is it exists a way to resolve this problem not touching default values like size of queue or timeout ? --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---
[sqlalchemy] Re: concurent modification
i can only make comments about fragments like this but i cant address your full design issue since you havent supplied a fully working illustration of what it is youre trying to do. the daemon: http://dpaste.com/hold/27089/ and required utils.py modele: http://dpaste.com/hold/27092/ it was running with revision 3863 the biggest issue I can see is that the code seems to have a weak notion of explicit transaction boundaries...its opening many sessions, one per thread (since youre using scoped_session), and just keeping them opened, with just one commit() when a delete is issued and thats it. How to open one session for all threads ? Using Lock with inserts/ updates should help in this case, right ? addtionally, youre issuing some SQL directly to the database without notifying the session about objects which may have been removed; youre executing a SQL statement through sess.execute() but that has no effect on the User object stored in the session..plain SQL executes dont refresh or expire anything thats currently present in the session. when you commit(), the underlying flush() apparently is hitting that user, or perhaps a different one, and attempting to update it. I was trying to represent this SQL statement in ORM: DELETE FROM fs_file WHERE path='/' and user_id=theone.id. If I'd need to delete user I'll use session.query.delete(user), indeed. But how to perform writing operations in proper way then, without execute() ? I was looking, thinking, asking and thinking again but came to nithing yet ) locking youre doing doesnt have much effect here, without more context it seems like its not needed and is just adding to the confusion. I was thinking it will exclude few doubtfull assumption from my inspection list. Its possible that you'd benefit here from using explciit transactions, so that you dont need to be dependent on the Session in order to commit raw SQL which youve executed. You can begin() and commit() transactions using an Engine or a Connection...such as: conn = db.connect() trans = conn.begin() session = Session(bind=conn) # do stuff with the conn, do stuff with session trans.commit() But I see that sqlalchemy beginning transaction anyway Transactions should be opened and closed for individual groups of operations. For example, if your arrange thread starts up, performs some work, and then completes, it should be opening a transaction and/ or Session session != transaction ? the difference between them is presence in session of cache ? strategy=threadlocal are more than likely confusing the issue. removed. In fact cannot say that clearly understand what it does scoped_session is going to accumulate sessions for every thread, even if that thread has died out, which will certainly cause memory to grow unbounded if you keep creating new threads. it would probably be cleaner to have each arrange() thread keep track of its own Session and transaction, which are disposed of when the thread completes. Thanks for the idea, I'll try to elaborate. --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---
[sqlalchemy] Re: concurent modification
I think the error throw of the IntegrityError is totally expensive from both a DB perspective as well as a python perspective. if missing data is truly so rare then it might be fine, but the necessesity of then using SAVEPOINT seems to complicate things more than necessary. but you'd have to benchmark different scenarios to see which is best. Thanks, but unfortunately, cannot get rid of tracebacks. from threading import Thread, Lock from sqlalchemy import create_engine, MetaData, Table, Column, types, schema, insert, update, delete from sqlalchemy.orm import mapper, backref, scoped_session, sessionmaker from sqlalchemy.exceptions import IntegrityError metadata = MetaData() u_table = Table('auth_user', metadata, \ Column('id', types.Integer, primary_key=True), \ Column('username', types.String(30))) f_table = Table('fs_file', metadata, \ Column('id', types.Integer, primary_key=True), \ Column('user_id', None, schema.ForeignKey(auth_user.id)), \ Column('path', types.String, unique=True), \ Column('ls', types.PickleType, nullable=False)) class User(object): pass mapper(User, u_table) mapper(Path, f_table) session = scoped_session(sessionmaker(transactional=True, autoflush=False)) db = create_engine(postgres://..., strategy=threadlocal) session.configure(bind=db) db.echo = True class LockingManager(object): def __init__(self): self._lock = Lock() if not metadata.is_bound(): metadata.bind = db def delete(self, uid=None): self._lock.acquire() try: session.execute(delete(f_table), {'user_id': uid}) session.commit() #line 88 finally: self._lock.release() def insert(self, uid=None, path=None, ls=None): ... def update(self, uid=None, path=None, ls=None): ... def select(self, username=None): stuff = session.query(User).filter_by(username=username).first() return stuff def arrange(Thread): def __init__ (self, event, manager): Thread.__init__(self) self.e = event self.manager = manager def run(self): theone = self.manager.select(username='me') self.manager.delete(uid=theone.id) ... class Watch(ProcessEvent): def __init__(self, watch_manager, lock_manager): self._watch_manager = watch_manager self._lock_manager = lock_manager def process_IN_DELETE(self, event): super(Watch, self).process_default(event) arrange(event, self._lock_manager).start() def main(): lm = LockingManager() wm = WatchManager() wm.add_watch(HOMEDIR, mask, proc_fun=Watch(wm, lm), rec=True, auto_add=True) ... if (__name__ == __main__): main() Traceback is happening on delete and always look like this: Traceback (most recent call last): File threading.py, line 442, in __bootstrap self.run() File ./camper.py, line 189, in run walk(self.manager, theone, root) File ./camper.py, line 101, in walk manager.delete(uid=theone.id) File ./camper.py, line 88, in delete session.commit() File /usr/lib/python2.4/site-packages/sqlalchemy/orm/scoping.py, line 74, in do return getattr(self.registry(), name)(*args, **kwargs) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 484, in commit self.transaction = self.transaction.commit() File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 211, in commit self.session.flush() File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 686, in flush self.uow.flush(self, objects) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/ unitofwork.py, line 207, in flush flush_context.execute() File /usr/lib/python2.4/site-packages/sqlalchemy/orm/ unitofwork.py, line 434, in execute UOWExecutor().execute(self, head) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/ unitofwork.py, line 1053, in execute self.execute_save_steps(trans, task) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/ unitofwork.py, line 1067, in execute_save_steps self.save_objects(trans, task) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/ unitofwork.py, line 1058, in save_objects task.mapper.save_obj(task.polymorphic_tosave_objects, trans) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/mapper.py, line 1089, in save_obj raise exceptions.ConcurrentModificationError(Updated rowcount %d does not match number of objects updated %d % (rows, len(update))) ConcurrentModificationError: Updated rowcount 0 does not match number of objects updated 1 As far as I understand, one query at a time executing at a time now. --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit
[sqlalchemy] Re: concurent modification
you should select() for the record first, and if not present, insert() it. if youre concerned about lots of selects() you can implement a simple cache in your application, such as a dictionary of ids that already exist. if theres concern over the cache growing too large, you can use a least-recently-used type of cache which limits its size. unfortunately select and updates totaly unpredictable, from my point of view at least. OK, i guess you are using the SAVEPOINT then since the postgres docs seem to recommend that as a workaround for the transaction abort its doing at the IntegrityError point...and the abort is bad for you because you want to recover from it, meaning that you're essentially using the IntegrityError as a method of checking for existing data. Where is the flaw in such approach ? The thing is in necessity to minimize the time of one transaction as most as possible, since very intensive load expected. But I've replaced it with select/insert sequence anyway. consider even loading the whole table into memory; if its only a few thousand rows, memory is cheap. I've tried, but forced to rebuild db and app structure, because of memory usage. (where they all continually modify data in overlapping rows), the actual modifications to the structure can be serialized using a producer/consumer model where a single event queue handles I'd rewrite script with threading.Lock() to execute one query to specific table at a time. But got problem : if not metadata.is_bound(): metadata.bind = db session.connection() session.begin() session.execute(insert(f_table), {'user_id': 1, 'path': '/', 'ls': [], }) session.commit() session.close() INFO sqlalchemy.engine.threadlocal.TLEngine.0x..6c ROLLBACK in postgresql-8.2-main.log: EET LOG: unexpected EOF on client connection sqlalchemy revision 3848 used. Table structure wasn't changed. --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---
[sqlalchemy] Re: concurent modification
one immediate flaw I see in your application is that you are using implicit execution of SQL constructs without them having any connection to the ongoing transaction, such as: f_table.delete(f_table.c.user_id==theone.id).execute() f_table.insert().execute(user_id=theone.id, path='/', ls=ls) I've replaced with this : session.execute(delete(f_table), {'user_id': theone.id}) session.execute(insert(f_table), {'user_id': theone.id, 'path': '/', 'ls': ls}) session.commit() the easiest solution would be to just turn on the threadlocal=True flag on your engine, which means that all connection requests in the same thread use the same connection and transaction. db = create_engine(postgres://..., strategy=threadlocal) So a step like that should help, but the design of your application still seems to require random deletes and updates of data in different threads, and youre also using some really esoteric features such as SAVEPOINT transactions (i.e. via the begin_nested()). If you're using all that and having trouble figuring out how to arrange concurrency properly, its possible that you'd need to simplify your application a bit. But how to determine if record is exists and update if it does or insert instead without executing select, which would be very slow ? I'm using nested begin_nested() here to avoid rollback of whole transaction. If its really just a multithreaded daemon I'd might even try using normal threading.Lock objects to limit access to critical mutable structures. I'll try, but now cannot get rid of this error, which happens every time: Exception in thread Thread-3: Traceback (most recent call last): File threading.py, line 442, in __bootstrap self.run() File ./camper.py, line 126, in run walk(session, theone, root) File ./camper.py, line 85, in walk stuff = session.query(Path).select_from(f_table.join(u_table)).filter(User.c.id==theone.id).filter(Path.c.path==relpath).first() File /usr/lib/python2.4/site-packages/sqlalchemy/orm/query.py, line 627, in first ret = list(self[0:1]) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/query.py, line 656, in __iter__ return self._execute_and_instances(context) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/query.py, line 659, in _execute_and_instances result = self.session.execute(querycontext.statement, params=self._params, mapper=self.mapper, instance=self._refresh_instance) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 528, in execute return self.__connection(engine, close_with_result=True).execute(clause, params or {}) File /usr/lib/python2.4/site-packages/sqlalchemy/engine/base.py, line 796, in execute return Connection.executors[c](self, object, multiparams, params) File /usr/lib/python2.4/site-packages/sqlalchemy/engine/base.py, line 847, in execute_clauseelement return self._execute_compiled(elem.compile(dialect=self.dialect, column_keys=keys, inline=len(params) 1), distilled_params=params) File /usr/lib/python2.4/site-packages/sqlalchemy/engine/base.py, line 859, in _execute_compiled self.__execute_raw(context) File /usr/lib/python2.4/site-packages/sqlalchemy/engine/base.py, line 871, in __execute_raw self._cursor_execute(context.cursor, context.statement, context.parameters[0], context=context) File /usr/lib/python2.4/site-packages/sqlalchemy/engine/base.py, line 887, in _cursor_execute raise exceptions.DBAPIError.instance(statement, parameters, e) ProgrammingError: (ProgrammingError) current transaction is aborted, commands ignored until end of transaction block 'SELECT fs_file.id AS fs_file_id, fs_file.user_id AS fs_file_user_id, fs_file.path AS fs_file_path, fs_file.ls AS fs_file_ls \nFROM fs_file JOIN auth_user ON auth_user.id = fs_file.user_id \nWHERE auth_user.id = %(auth_user_id)s AND fs_file.path = %(fs_file_path)s ORDER BY fs_file.id \n LIMIT 1 OFFSET 0' {'fs_file_path': '/', 'auth_user_id': 1} the context : try: session.execute(insert(f_table), {'user_id': theone.id, 'path': relpath, 'ls': ls}) except IntegrityError: stuff = session.query(...) #line 85 --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---
[sqlalchemy] Re: concurent modification
On 30 нояб, 21:07, Michael Bayer [EMAIL PROTECTED] wrote: hey there - this is line 179 right now: if bind in conn_dict: similar mismatches in the stacktrace are present for lines 505 and 509 (line 505 is a blank line now). the full snip of code is: if bind in conn_dict: (conn, trans, autoclose) = conn_dict[bind] self.__connections[conn] = self.__connections[bind.engine] = (conn, conn.begin_nested(), autoclose) return conn so my guess is that you arent running the most recent trunk. Of course you're right. I'd run script on another host by chance, sorry. There's the output with revision 3846: Exception in thread Thread-5: Traceback (most recent call last): File threading.py, line 442, in __bootstrap self.run() File ./camper.py, line 122, in run walk(session, theone, root) File ./camper.py, line 91, in walk session.commit() File /usr/lib/python2.4/site-packages/sqlalchemy/orm/scoping.py, line 74, in do return getattr(self.registry(), name)(*args, **kwargs) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 484, in commit self.transaction = self.transaction.commit() File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 211, in commit self.session.flush() File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 686, in flush self.uow.flush(self, objects) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/ unitofwork.py, line 207, in flush flush_context.execute() File /usr/lib/python2.4/site-packages/sqlalchemy/orm/ unitofwork.py, line 434, in execute UOWExecutor().execute(self, head) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/ unitofwork.py, line 1053, in execute self.execute_save_steps(trans, task) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/ unitofwork.py, line 1067, in execute_save_steps self.save_objects(trans, task) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/ unitofwork.py, line 1058, in save_objects task.mapper.save_obj(task.polymorphic_tosave_objects, trans) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/mapper.py, line 1104, in save_obj raise exceptions.ConcurrentModificationError(Updated rowcount %d does not match number of objects updated %d % (rows, len(update))) ConcurrentModificationError: Updated rowcount 0 does not match number of objects updated 1 The code: from sqlalchemy import create_engine, MetaData, Table, Column, types, schema from sqlalchemy.orm import mapper, relation, backref, create_session, scoped_session, sessionmaker from sqlalchemy.exceptions import IntegrityError metadata = MetaData() u_table = Table('auth_user', metadata, \ Column('id', types.Integer, primary_key=True), \ Column('username', types.String(30))) f_table = Table('fs_file', metadata, \ Column('id', types.Integer, primary_key=True), \ Column('user_id', None, schema.ForeignKey(auth_user.id)), \ Column('path', types.String, unique=True), \ Column('ls', types.PickleType, nullable=False)) class User(object): pass mapper(User, u_table) mapper(Path, f_table) session = scoped_session(sessionmaker(transactional=True, autoflush=False)) db = create_engine(postgres://...) session.configure(bind=db) def walk(session, theone, root): f_table.delete(f_table.c.user_id==theone.id).execute() f_table.insert().execute(user_id=theone.id, path='/', ls=ls) session.flush() session.connection() transaction = session.begin() ... nested = session.begin_nested() try: f_table.insert().execute(user_id=theone.id, path=relpath, ls = ls) except IntegrityError: stuff = session.query(Path).select_from(..).. #update session.commit() # line 91 session.commit() session.clear() class arrange(Thread): def __init__ (self, event): Thread.__init__(self) self.e = event def run(self): if not metadata.is_bound(): metadata.bind = db session.connection() theone = session.query(User).filter_by(username=user).first() if not theone: session.clear() session.close() return walk(session, theone, root) ... #possible delete/update/insert statements ... --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---
[sqlalchemy] Re: concurent modification
based on your stacktrace, here's your bug, fixed in r3839: cannot understand how this is working from sqlalchemy import * from sqlalchemy.orm import * session = scoped_session(sessionmaker(transactional=True, autoflush=False)) engine = create_engine('postgres://scott:[EMAIL PROTECTED]/test') session.configure(bind=engine) this creates connection to the db and binding it to the `session` object, which can be used like : session.query(MappedObject).. session.connection() creating of new connection for separate transaction in new thread session.begin() session.begin_nested() session.connection() another connection for nested transaction ? session.commit() session.commit() commiting nested and normal transactions I'm right ? This is my version that still raising exception: [global definitions] session = scoped_session(sessionmaker(transactional=True, autoflush=False)) db = create_engine(postgres://..) session.configure(bind=db) [thread] if not metadata.is_bound(): metadata.bind = db session.connection() session.query(MappedObject)... ... inserts/updates [another thread] f_table.delete(..) f_table.insert session.flush() session.connection() transaction = session.begin() nested = session.begin_nested() try: f_table.insert()... except IntegrityError: session.query(..) #update session.commit() session.commit() the same traceback: KeyError: Engine(postgres://...) This application is using inotify library to rename files that were created not in utf-8 encoding. Also it stores metadata in db: one thread on one created file. When aplication is runnig and there's an empty table in db, it walking through fs directory and renaming every file that named incorrectly. Error happens only in process of walking, when it found the file to rename. One thread is continuing walking while another is trying to insert record about renamed file in table that is first thread is using. --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---
[sqlalchemy] Re: concurent modification
i think its impossible for you to be getting the same stacktrace that you were, can you post what youre getting now please... Exception in thread Thread-6: Traceback (most recent call last): File threading.py, line 442, in __bootstrap self.run() File ./camper.py, line 104, in run session.connection() File /usr/lib/python2.4/site-packages/sqlalchemy/orm/scoping.py, line 74, in do return getattr(self.registry(), name)(*args, **kwargs) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 505, in connection return self.__connection(self.get_bind(mapper, **kwargs)) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 509, in __connection return self.transaction.get_or_add(engine) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 179, in get_or_add (conn, trans, autoclose) = self.__parent.__connections[bind] KeyError: Engine(postgres://...) --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---
[sqlalchemy] Re: concurent modification
I'm probably completely wrong about this, but in your example above, I don't think the statement f_table.insert().execute() necessarily uses the same connection as the ORM-level transaction and queries. If I got it right, then, as far as I know, two transactions is impossible in one connection and most probably second thread is trying to change something in table that thread one is dealing with. Using SQL expressions with ORM transactions is covered in this section of the docs: http://www.sqlalchemy.org/docs/04/session.html#unitofwork_sql Hope that helps, Not sure where is appropriate to use SessionContext (which is deprecated), sessionmaker, scoped_session, create_session and stuff. I've found this in documentation and this is just works ) : session = scoped_session(create_session) Is it necessary to specify transactional=True somewhere ? If yes, then where exactly ? Btw, I was using only ORM operations. --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---
[sqlalchemy] Re: concurent modification
the 0.4 pattern for using scoped_session is: session = scoped_session(sessionmaker(transactional=(True|False), autoflush=(True|False))) defined globaly: session = scoped_session(sessionmaker(transactional=True, autoflush=False)) result: Exception in thread Thread-3: Traceback (most recent call last): File threading.py, line 442, in __bootstrap self.run() File ./camper.py, line 116, in run walk(session, theone, root) File ./camper.py, line 79, in walk stuff = session.query(Path).select_from(f_table.join(u_table)).filter(User.c.id==theone.id).filter(Path.c.path==relpath).first() File /usr/lib/python2.4/site-packages/sqlalchemy/orm/query.py, line 600, in first ret = list(self[0:1]) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/query.py, line 629, in __iter__ return self._execute_and_instances(context) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/query.py, line 632, in _execute_and_instances result = self.session.execute(querycontext.statement, params=self._params, mapper=self.mapper) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 527, in execute return self.__connection(engine, close_with_result=True).execute(clause, params or {}, **kwargs) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 509, in __connection return self.transaction.get_or_add(engine) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 179, in get_or_add (conn, trans, autoclose) = self.__parent.__connections[bind] KeyError: Engine(postgres://user:[EMAIL PROTECTED]/mydb) turning on transactional=True there is probably a good idea, as long as you commit() your changes expicitly and close out the session at the end of a web request (is this a web application ?) this is a daemon --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---
[sqlalchemy] Re: concurent modification
it seems like youre doing something else in there that youre not supposed to (like accessing SessionTransaction perhaps ? dont use the old 0.3 patterns anymore). First heard about SessionTransaction. Also I've looked through changes between 0.3 and 0.4 versions and made changes. heres a working example: the same KeyError: Engine(postgres://.. I'm using svn revision 3837 from sqlalchemy import * from sqlalchemy import create_engine, MetaData, Table, Column, types, schema from sqlalchemy.orm import * from sqlalchemy.orm import mapper, relation, backref, create_session, scoped_session, sessionmaker session = scoped_session(sessionmaker(transactional=True, autoflush=False)) metadata = MetaData() u_table = Table(...) f_table = Table(...) mapper(User, u_table) mapper(Path, f_table) db = create_engine(postgres://...) session.configure(bind=db) --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---
[sqlalchemy] Re: concurent modification
default isolation mode settings usually dont need any changes. we dont have an official API for that yet, so you can apply it to all connections using a custom connect() function sent to create_engine, or you can try setting it individually as conn = session.connection(); conn.connection.set isolation. I've found that default postgres isolation level is `Read Committed Isolation Level` : http://www.postgresql.org/docs/7.4/interactive/transaction-iso.html but probably I need `Serializable Isolation Level` : The level Serializable provides the strictest transaction isolation. This level emulates serial transaction execution, as if transactions had been executed one after another, serially, rather than concurrently. ? however, if youre getting that error, you either have a concurrent process/thread already making an incompatible modification to the row, *or* your mapping is not set up properly and the update isnt even finding the correct rows to updateif the problem happens every time, then the latter case is almost certainly the issue. you really should view your SQL log to get a very clear idea of whats going on and i doubt isolation modes are part of the solution here. Based on my observations it happens only with concurent inserts/ updates. One thread : {{{ f_table.insert().execute() session.flush() transaction = session.begin() nested = session.begin_nested() try: f_table.insert().execute() except IntegrityError: #record is exists and we got exception corresponding to contraint stuff = session.query(Path).select_from(..) ..update.. nested.commit() transaction.commit() session.clear() }}} Another thread: {{{ f_table.delete(...) #or update }}} if you are getting genuine, occasional concurrent modification errors, you can also look into using pessimistic locking, i.e. query.with_lockmode(), to pre-lock rows inside a transaction and thus prevent conflicts from occuring on the table in question. Could you please point on example of usage query.with_lockmode() ? By the way how is it working ? On changing table that is locked, SA will raise an exception or queries will be placed in queue for execution after transaction that locked table ? --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---
[sqlalchemy] Re: concurent modification
I was thinking to change isolation level in fact. But first, I don't know how to do this with sqlalchemy, second, with default isolation level commits should work properly weather or not something were inserted or updated during commit and third, I was using nested transaction like this : transaction = session.begin() nested = session.begin_nested() try: ..insert.. #trying to insert values to column that have UNIQUE constraint except: ..update.. nested.commit() transaction.commit() so, I'm not sure that done everything right. btw, there's a daemon, and have no user to watch on it. On 23 нояб, 09:43, Sanjay [EMAIL PROTECTED] wrote: I think all one can do is just forget and start over again, when concurrent modifications error occurs. That means, catchig the exception and just showing some failure message to the user is all one can do. Sanjay --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---
[sqlalchemy] concurent modification
Does someone have an idea how to fix the problem that cause this traceback : Exception in thread Thread-5: Traceback (most recent call last): File threading.py, line 442, in __bootstrap self.run() File ./camper.py, line 138, in run walk(session, theone, root) File ./camper.py, line 49, in walk session.flush() File /usr/lib/python2.4/site-packages/sqlalchemy/orm/scoping.py, line 74, in do return getattr(self.registry(), name)(*args, **kwargs) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 683, in flush self.uow.flush(self, objects) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/ unitofwork.py, line 209, in flush flush_context.execute() File /usr/lib/python2.4/site-packages/sqlalchemy/orm/ unitofwork.py, line 436, in execute UOWExecutor().execute(self, head) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/ unitofwork.py, line 1055, in execute self.execute_save_steps(trans, task) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/ unitofwork.py, line 1069, in execute_save_steps self.save_objects(trans, task) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/ unitofwork.py, line 1060, in save_objects task.mapper.save_obj(task.polymorphic_tosave_objects, trans) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/mapper.py, line 1123, in save_obj raise exceptions.ConcurrentModificationError(Updated rowcount %d does not match number of objects updated %d % (rows, len(update))) ConcurrentModificationError: Updated rowcount 0 does not match number of objects updated 1 Thanks --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---
[sqlalchemy] Re: deleting
You must istantiate an istance of the obj to be deleted. This is an Object Manager, so all operation is available on object. or you can execute a plain string-sql direclty from engine connection = engine.connect() connection.execute(DELETE FROM a WHERE b = c) or I can do this : f_table.delete(f_table.c.user_id==theone.id).execute() --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---
[sqlalchemy] deleting
Good Day sqlalchemy. I was searching, but didn't found a way to delete records from db not executing selection first. So, how to represent this SQL statement in slqalchemy ORM : DELETE FROM a WHERE b = c ? not it look like this : stuff = session.query(A).select_from(a_table).filter(A.c.b==c).first() session.delete(stuff) P.S. How to understand the following : Traceback (most recent call last): File threading.py, line 442, in __bootstrap self.run() File ./camper.py, line 173, in run session.delete(stuff) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/scoping.py, line 74, in do return getattr(self.registry(), name)(*args, **kwargs) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 849, i n delete self._delete_impl(object) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/session.py, line 1007, in _delete_impl raise exceptions.InvalidRequestError(Instance '%s' is not persisted % mapp erutil.instance_str(obj)) InvalidRequestError: Instance '[EMAIL PROTECTED]' is not persisted It occurs sometimes in threaded application in this code : from sqlalchemy.orm import mapper, relation, backref, create_session, scoped_session session = scoped_session(create_session) stuff = session.query(Path).select_from(f_table.join(u_table)).filter(User.c.id==theone.id).first() session.delete(stuff) --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---
[sqlalchemy] sqlalchemy + django
Could someone please explain the following behaviour: from django.http import HttpResponseRedirect, Http404 from django.shortcuts import render_to_response from df.tst import Path as ThePath from sqlalchemy import * from sqlalchemy.orm import * metadata = MetaData() u_table = Table('auth_user', metadata, \ Column('id', Integer, primary_key=True), \ Column('username', String(30))) f_table = Table('fellowship_file', metadata, \ Column('id', Integer, primary_key=True), \ Column('user_id', None, ForeignKey(auth_user.id)), \ Column('ls', PickleType, nullable=False)) class User(object): pass class Path(ThePath): pass clear_mappers() mapper(User, u_table) mapper(Path, f_table) db = create_engine('...') metadata.create_all(db) context = create_session(bind=db) def ls(request, user=None): if not user: raise Http404 stuff = context.query(Path).filter(User.c.username==user).first() context.clear() context.close() return render_to_response('index.html', {'ls': stuff.id}) Traceback (most recent call last): File /usr/lib/python2.4/site-packages/django/core/handlers/base.py in get_response 77. response = callback(request, *callback_args, **callback_kwargs) File /home/grey/src/df/fs/views.py in ls 37. stuff = context.query(Path).filter(User.c.username==user).first() File /usr/lib/python2.4/site-packages/sqlalchemy/orm/query.py in first 920. ret = list(self[0:1]) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/query.py in __iter__ 958. return iter(self.select_whereclause()) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/query.py in select_whereclause 359. return self._select_statement(statement, params=params) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/query.py in _select_statement 1072. return self.execute(statement, params=params, **kwargs) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/query.py in execute 973. return self.instances(result, **kwargs) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/query.py in instances 1032. self.select_mapper._instance(context, row, result) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/mapper.py in _instance 1497. self.populate_instance(context, instance, row, identitykey, isnew) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/mapper.py in populate_instance 1534. prop.execute(selectcontext, instance, row, identitykey, isnew) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/interfaces.py in execute 163. self._get_context_strategy(selectcontext).process_row(selectcontext, instance, row, identitykey, isnew) File /usr/lib/python2.4/site-packages/sqlalchemy/orm/strategies.py in process_row 39. instance.__dict__[self.key] = row[self.columns[0]] File /usr/lib/python2.4/site-packages/sqlalchemy/engine/base.py in __getitem__ 1171. return self.__parent._get_col(self.__row, key) File /usr/lib/python2.4/site-packages/sqlalchemy/engine/base.py in _get_col 993. return rec[1].convert_result_value(row[rec[2]], self.dialect) File /usr/lib/python2.4/site-packages/sqlalchemy/types.py in convert_result_value 323. return self.pickler.loads(str(buf)) AttributeError at /users/grey/ 'module' object has no attribute 'Path' P.S. This is working without django. P.P.S. Django community is assured that this problem is not related to django in any sense. --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---