Another reply to my own message, sorry.
Another thing I might be doing wrong is my usage of MetaData.
The code I have looks like
db = create_engine(self.dbstring)
meta = self.table_dict['metadata']
meta.bind = db
meta.create_all()
Session = scoped_session(sessionmaker())
Session.configure(bind=db)
pool = ThreadPool(Session, self.jobs)
ids = self.get_idlink_ids(Session)
for i in ids:
pool.add_task(self.load_geno_table_from_file, Session, i)
pool.start()
Perhaps creation of this metadata instance is not thread-safe? I found
reference to a ThreadLocalMetaData. Would it better to use that instead?
Regards, Faheem.
On Thu, 12 Aug 2010, Faheem Mitha wrote:
Addendum: the types of error I'm seeing includes SQLA trying to execute
notices from the PG server eg. one of the tracebacks I'm seeing is:
#0 PyObject_Malloc (nbytes=86) at ../Objects/obmalloc.c:756
#1 0x0000000000455eb5 in PyString_FromString (str=0x2de0ece0 "WARNING:
there is no transaction in progress\n")
at ../Objects/stringobject.c:139
This smells like memory being overwritten. Any idea what might be causing
this?
Another possibility is that my usage of scoped_session is wrong. I can't find
any explicit examples of usage in the official documentation, so this was
partly guesswork on my part. Here is a sketch of my usage. The model I'm
using is a thread pool, which lines up n jobs in a queue, and has a pool of k
threads executing them. The problems seem to occur when n is too large.
Session = scoped_session(sessionmaker())
Session.configure(bind=db)
pool = ThreadPool(Session, self.jobs)
ids = self.get_idlink_ids(Session)
for i in ids:
pool.add_task(self.load_geno_table_from_file, Session, i)
pool.start()
where load_geno_table_from_file is
def load_geno_table_from_file(self, session, i):
session.execute("ALTER TABLE %s.geno%s SET ( autovacuum_enabled =
false );"%(self.schema, i))
self.drop_geno_table_constraints(session, 'geno%s'%i)
self.copy_data_to_geno_table(session, 'geno%s'%i, 'tmp/geno%s'%i)
self.restore_geno_table_constraints(session, 'geno%s'%i)
session.execute("ALTER TABLE %s.geno%s SET ( autovacuum_enabled =
true );"%(self.schema, i))
and add_task is
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.queue.put((func, args, kargs))
So, Session is passed to load_geno_table_from_file, which executes inside
each thread. Is that Ok? I'm adding the rest of the thread code below for
reference.
Regards, Faheem
*******************************************************
import Queue, threading, urllib2, time
class Worker(threading.Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, session, queue, num):
threading.Thread.__init__(self)
self.num = num
self.queue = queue
self.setDaemon(True)
self.session = session
def run(self):
import traceback
while True:
func, args, kargs = self.queue.get()
try:
func(*args, **kargs)
except:
traceback.print_exc()
self.queue.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, session, num_threads):
from geno import Geno_Shard
self.queue = Queue.Queue()
self.workerlist = []
self.num = num_threads
self.session = session
for i in range(num_threads):
self.workerlist.append(Worker(session, self.queue, i))
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.queue.put((func, args, kargs))
def start(self):
for w in self.workerlist:
w.start()
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.queue.join()
On Thu, 12 Aug 2010, Faheem Mitha wrote:
Hi,
I'm using scoped_session with PostgreSQL to run multiple threads, each
thread creating a table. However, I'm getting some pretty weird and
inconsistent errors (segfaults from Python with some very peculiar errors),
and I was wondering if my usage was at fault.
The program has a serial bit, and a parallel bit, and currently I'm doing
something like this. For the serial bit I do
db = create_engine(dbstring)
Session = sessionmaker()
session = Session(bind=db)
and then later I do
session.close()
db = create_engine(self.dbstring)
Session = scoped_session(sessionmaker())
Session.configure(bind=db)
Looking at this is seems likely that is would be better to just use
scoped_session everywhere, that is, just start with
db = create_engine(self.dbstring)
Session = scoped_session(sessionmaker())
Session.configure(bind=db)
[proceed with using Session in serial mode and eventually use it in
parallel mode too]
I'm basically writing to confirm that it is Ok to use scoped_session in
this way. The way I'm doing it looks a little dodgy. I don't know if this
is really the cause of my problem - just clutching at straws here. Thanks
in advance. Please CC me on any reply.
Regards, Faheem
--
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To post to this group, send email to sqlalch...@googlegroups.com.
To unsubscribe from this group, send email to
sqlalchemy+unsubscr...@googlegroups.com.
For more options, visit this group at
http://groups.google.com/group/sqlalchemy?hl=en.