Addendum: the types of error I'm seeing includes SQLA trying to execute notices from the PG server eg. one of the tracebacks I'm seeing is:

#0  PyObject_Malloc (nbytes=86) at ../Objects/obmalloc.c:756
#1  0x0000000000455eb5 in PyString_FromString (str=0x2de0ece0 "WARNING:
there is no transaction in progress\n")
    at ../Objects/stringobject.c:139

This smells like memory being overwritten. Any idea what might be causing this?

Another possibility is that my usage of scoped_session is wrong. I can't find any explicit examples of usage in the official documentation, so this was partly guesswork on my part. Here is a sketch of my usage. The model I'm using is a thread pool, which lines up n jobs in a queue, and has a pool of k threads executing them. The problems seem to occur when n is too large.

Session = scoped_session(sessionmaker())
Session.configure(bind=db)
pool = ThreadPool(Session, self.jobs)
ids = self.get_idlink_ids(Session)
for i in ids:
    pool.add_task(self.load_geno_table_from_file, Session, i)
pool.start()

where load_geno_table_from_file is

def load_geno_table_from_file(self, session, i):
        session.execute("ALTER TABLE %s.geno%s SET ( autovacuum_enabled = false 
);"%(self.schema, i))
        self.drop_geno_table_constraints(session, 'geno%s'%i)
        self.copy_data_to_geno_table(session, 'geno%s'%i, 'tmp/geno%s'%i)
        self.restore_geno_table_constraints(session, 'geno%s'%i)
        session.execute("ALTER TABLE %s.geno%s SET ( autovacuum_enabled = true 
);"%(self.schema, i))

and add_task is

def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.queue.put((func, args, kargs))

So, Session is passed to load_geno_table_from_file, which executes inside each thread. Is that Ok? I'm adding the rest of the thread code below for reference.
                                                          Regards, Faheem

*******************************************************
import Queue, threading, urllib2, time

class Worker(threading.Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, session, queue, num):
        threading.Thread.__init__(self)
        self.num = num
        self.queue = queue
        self.setDaemon(True)
        self.session = session

    def run(self):
        import traceback
        while True:
            func, args, kargs = self.queue.get()
            try:
                func(*args, **kargs)
            except:
                traceback.print_exc()
            self.queue.task_done()

class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, session, num_threads):
        from geno import Geno_Shard
        self.queue = Queue.Queue()
        self.workerlist = []
        self.num = num_threads
        self.session = session
        for i in range(num_threads):
            self.workerlist.append(Worker(session, self.queue, i))

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.queue.put((func, args, kargs))

    def start(self):
        for w in self.workerlist:
            w.start()

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.queue.join()



On Thu, 12 Aug 2010, Faheem Mitha wrote:


Hi,

I'm using scoped_session with PostgreSQL to run multiple threads, each thread creating a table. However, I'm getting some pretty weird and inconsistent errors (segfaults from Python with some very peculiar errors), and I was wondering if my usage was at fault.

The program has a serial bit, and a parallel bit, and currently I'm doing something like this. For the serial bit I do

db = create_engine(dbstring)
Session = sessionmaker()
session = Session(bind=db)

and then later I do

session.close()

db = create_engine(self.dbstring)
Session = scoped_session(sessionmaker())
Session.configure(bind=db)

Looking at this is seems likely that is would be better to just use scoped_session everywhere, that is, just start with

db = create_engine(self.dbstring)
Session = scoped_session(sessionmaker())
Session.configure(bind=db)
[proceed with using Session in serial mode and eventually use it in parallel mode too]

I'm basically writing to confirm that it is Ok to use scoped_session in this way. The way I'm doing it looks a little dodgy. I don't know if this is really the cause of my problem - just clutching at straws here. Thanks in advance. Please CC me on any reply.

                                                         Regards, Faheem



--
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To post to this group, send email to sqlalch...@googlegroups.com.
To unsubscribe from this group, send email to 
sqlalchemy+unsubscr...@googlegroups.com.
For more options, visit this group at 
http://groups.google.com/group/sqlalchemy?hl=en.

Reply via email to