Another reply to my own message, sorry.

Another thing I might be doing wrong is my usage of MetaData.
The code I have looks like

   db = create_engine(self.dbstring)
   meta = self.table_dict['metadata']
   meta.bind = db
   meta.create_all()
   Session = scoped_session(sessionmaker())
   Session.configure(bind=db)
   pool = ThreadPool(Session, self.jobs)
   ids = self.get_idlink_ids(Session)
   for i in ids:
       pool.add_task(self.load_geno_table_from_file, Session, i)
   pool.start()

Perhaps creation of this metadata instance is not thread-safe? I found reference to a ThreadLocalMetaData. Would it better to use that instead?

                                                       Regards, Faheem.

On Thu, 12 Aug 2010, Faheem Mitha wrote:


Addendum: the types of error I'm seeing includes SQLA trying to execute notices from the PG server eg. one of the tracebacks I'm seeing is:

#0  PyObject_Malloc (nbytes=86) at ../Objects/obmalloc.c:756
#1  0x0000000000455eb5 in PyString_FromString (str=0x2de0ece0 "WARNING:
there is no transaction in progress\n")
   at ../Objects/stringobject.c:139

This smells like memory being overwritten. Any idea what might be causing this?

Another possibility is that my usage of scoped_session is wrong. I can't find any explicit examples of usage in the official documentation, so this was partly guesswork on my part. Here is a sketch of my usage. The model I'm using is a thread pool, which lines up n jobs in a queue, and has a pool of k threads executing them. The problems seem to occur when n is too large.

Session = scoped_session(sessionmaker())
Session.configure(bind=db)
pool = ThreadPool(Session, self.jobs)
ids = self.get_idlink_ids(Session)
for i in ids:
   pool.add_task(self.load_geno_table_from_file, Session, i)
pool.start()

where load_geno_table_from_file is

def load_geno_table_from_file(self, session, i):
session.execute("ALTER TABLE %s.geno%s SET ( autovacuum_enabled = false );"%(self.schema, i))
       self.drop_geno_table_constraints(session, 'geno%s'%i)
       self.copy_data_to_geno_table(session, 'geno%s'%i, 'tmp/geno%s'%i)
       self.restore_geno_table_constraints(session, 'geno%s'%i)
session.execute("ALTER TABLE %s.geno%s SET ( autovacuum_enabled = true );"%(self.schema, i))

and add_task is

def add_task(self, func, *args, **kargs):
       """Add a task to the queue"""
       self.queue.put((func, args, kargs))

So, Session is passed to load_geno_table_from_file, which executes inside each thread. Is that Ok? I'm adding the rest of the thread code below for reference.
                                                         Regards, Faheem

*******************************************************
import Queue, threading, urllib2, time

class Worker(threading.Thread):
   """Thread executing tasks from a given tasks queue"""
   def __init__(self, session, queue, num):
       threading.Thread.__init__(self)
       self.num = num
       self.queue = queue
       self.setDaemon(True)
       self.session = session

   def run(self):
       import traceback
       while True:
           func, args, kargs = self.queue.get()
           try:
               func(*args, **kargs)
           except:
               traceback.print_exc()
           self.queue.task_done()

class ThreadPool:
   """Pool of threads consuming tasks from a queue"""
   def __init__(self, session, num_threads):
       from geno import Geno_Shard
       self.queue = Queue.Queue()
       self.workerlist = []
       self.num = num_threads
       self.session = session
       for i in range(num_threads):
           self.workerlist.append(Worker(session, self.queue, i))

   def add_task(self, func, *args, **kargs):
       """Add a task to the queue"""
       self.queue.put((func, args, kargs))

   def start(self):
       for w in self.workerlist:
           w.start()

   def wait_completion(self):
       """Wait for completion of all the tasks in the queue"""
       self.queue.join()



On Thu, 12 Aug 2010, Faheem Mitha wrote:


Hi,

I'm using scoped_session with PostgreSQL to run multiple threads, each thread creating a table. However, I'm getting some pretty weird and inconsistent errors (segfaults from Python with some very peculiar errors), and I was wondering if my usage was at fault.

The program has a serial bit, and a parallel bit, and currently I'm doing something like this. For the serial bit I do

db = create_engine(dbstring)
Session = sessionmaker()
session = Session(bind=db)

and then later I do

session.close()

db = create_engine(self.dbstring)
Session = scoped_session(sessionmaker())
Session.configure(bind=db)

Looking at this is seems likely that is would be better to just use scoped_session everywhere, that is, just start with

db = create_engine(self.dbstring)
Session = scoped_session(sessionmaker())
Session.configure(bind=db)
[proceed with using Session in serial mode and eventually use it in parallel mode too]

I'm basically writing to confirm that it is Ok to use scoped_session in this way. The way I'm doing it looks a little dodgy. I don't know if this is really the cause of my problem - just clutching at straws here. Thanks in advance. Please CC me on any reply.

                                                         Regards, Faheem




--
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To post to this group, send email to sqlalch...@googlegroups.com.
To unsubscribe from this group, send email to 
sqlalchemy+unsubscr...@googlegroups.com.
For more options, visit this group at 
http://groups.google.com/group/sqlalchemy?hl=en.

Reply via email to