On Jul 22, 2014, at 7:47 AM, Milind Vaidya <kava...@gmail.com> wrote:

> I am developing a back-end scripts to populate a bunch of tables.
> 
> 1. dbinterface.py : contains all mapper classes and DBHandler class which has 
> required static methods for queries.
> e.g. 
> 
> class Destination(Base): 
>          __table__ = Base.metadata.tables['destination']
> 
> 
> @staticmethod
> def _save(rows):
>     try:
>         DBHandler.session.add_all(rows)
>         DBHandler.session.commit()
>     except Exception as exp:
>         logger.debug("Error saving data: %s", str(exp))
> 
>  Question 1: Is this a good idea to handle exceptions like this ? The save 
> method accepts list of objects to be saved. If there is problem with one 
> object I don't want to prevent others from getting updated. Other alternative 
> will be handling exception in calling code, roll back and retry saving the 
> list.

the use case of "if there is a problem with one object i don't want others 
having an issue" can't be handled generically, for the simple reason that if 
object X is dependent on object Y, a failure to persist X means Y can't be 
persisted either.

It sounds like what you're really dealing with is inserting unrelated rows.   
The mechanics of transactions at both the SQLAlchemy level and in many cases at 
the DB level prevents the transaction from proceeding from a failed INSERT, 
*unless* you use a savepoint.

So the pattern, when you expect *individual* rows to fail, is:

for row in rows:
    try:
        with session.begin_nested():
            session.add(row)
    except orm_exc.IntegrityError:
       logger.error("Error", exc_info=True)
session.commit()

the above will not be as performant as a mass insert of all the rows as once as 
the Session needs to flush for each one individually.


> 
> 2. basetables.py : This will populate base tables in the DB. The data will be 
> fetched from a web service using async calls.
> 
> def populate(hosts):
> 100     '''Fetch property value for each host'''
> 101     with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
> 102         future_to_host = {executor.submit(call__ws, host.host_name): host
> 103                 for host in hosts}
> 104         for future in concurrent.futures.as_completed(future_to_host):
> 105             host = future_to_host[future]
> 106             try:
> 107                 property = future.result()
> 108             except Exception as exp:
> 109                 logger.debug("Error fetching hosts from webervice: \
> 110                         %s", exp)
> 111             else:
> 112                 host.property = property
> 113                 logger.info("host: %s, property: %s", host, property)
> 114             finally:
> 115                 try:
> 116                     colo = find_colo(host.host_name)
> 117                     logger.info("Colo: %s", colo)
> 118                 except Exception as exp:
> 119                     logger.debug("Invalid colo: %s", exp)
> 120                     hosts.remove(host)
> 121                 else:
> 122                     host_colo = DBHandler.fetch_colo(colo)
> 123                     if host_colo is not None:
> 124                         host.emitter_host_colo = host_colo
> 125                         logger.info("Host Colo: %s", host_colo)
> 126                     else:
> 127                         hosts.remove(host)
> 128                         logger.debug("Removing Host error processing colo 
> : %s", host)
> 129      DBHandler._save(hosts)
> 
> Questions 2 : is this a good model considering there will be 1000s of hosts 
> being updated in a bulk update and fact being session is static variable in 
> DBHandler ? 
> Colo is another table and hence mapper object. If the colo is not present for 
> any of the host it will be fetched from static method fetch_colo of DB 
> handler based on name. Now there is one to many mapping from colo to host, 
> aka one colo can represent many hosts. In such case, would the call to DB be 
> avoided if the colo is already fetched previously and only relationship with 
> new host will be taken care of?
> 
> Pardon my ignorance for I am newbie to sqlalchemy n  python in general

I'm not very familiar with concurrent.futures but I will note that the Session 
isn't thread safe, as it refers to a single DBAPI Connection/transaction in 
progress as well as lots of internal state, and all the objects associated with 
a given Session are in fact proxies to the state of that Session and 
transaction so they aren't either.   If you are running concurrent, multiple 
threads, you typically have a Session per thread, and every object that is 
handled within that thread should originate within that thread.   To pass the 
state of objects between threads, you should use the Session.merge() method.   
I talk in depth about why the Session works this way in my talk "The SQLAlchemy 
Session in Depth" 
http://www.sqlalchemy.org/library.html#thesqlalchemysessionindepth .   There is 
also discussion of this in 
http://docs.sqlalchemy.org/en/rel_0_9/orm/session.html#is-the-session-thread-safe.

Alternatively, if you really want to have all of the concurrent work proceeding 
on a single transaction, you can share a Session and its objects between 
threads if you mutex all database-specific operations.   My approach towards 
this is to create a coarse-grained facade around both your Session and all the 
objects within, exposing only non-database-linked objects on the outside to 
your concurrent.futures workers.  These coarse-grained methods would provide a 
mutex such that the state of the Session isn't impacted by more than one thread 
at the same time.   

-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

Reply via email to