[sqlalchemy] Bulk update using multi threading and exception handling

2014-07-22 Thread Milind Vaidya
I am developing a back-end scripts to populate a bunch of tables.

1. dbinterface.py : contains all mapper classes and DBHandler class which 
has required static methods for queries.
e.g. 

class Destination(Base): 
 __table__ = Base.metadata.tables['destination']


@staticmethod
def _save(rows):
try:
DBHandler.session.add_all(rows)
DBHandler.session.commit()
except Exception as exp:
logger.debug(Error saving data: %s, str(exp))

 Question 1: Is this a good idea to handle exceptions like this ? The save 
method accepts list of objects to be saved. If there is problem with one 
object I don't want to prevent others from getting updated. Other 
alternative will be handling exception in calling code, roll back and retry 
saving the list.


2. basetables.py : This will populate base tables in the DB. The data will 
be fetched from a web service using async calls.

def populate(hosts):
100 '''Fetch property value for each host'''
101 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as 
executor:
102 future_to_host = {executor.submit(call__ws, host.host_name): 
host
103 for host in hosts}
104 for future in concurrent.futures.as_completed(future_to_host):
105 host = future_to_host[future]
106 try:
107 property = future.result()
108 except Exception as exp:
109 logger.debug(Error fetching hosts from webervice: \
110 %s, exp)
111 else:
112 host.property = property
113 logger.info(host: %s, property: %s, host, property)
114 finally:
115 try:
116 colo = find_colo(host.host_name)
117 logger.info(Colo: %s, colo)
118 except Exception as exp:
119 logger.debug(Invalid colo: %s, exp)
120 hosts.remove(host)
121 else:
122 host_colo = DBHandler.fetch_colo(colo)
123 if host_colo is not None:
124 host.emitter_host_colo = host_colo
125 logger.info(Host Colo: %s, host_colo)
126 else:
127 hosts.remove(host)
128 logger.debug(Removing Host error processing 
colo : %s, host)
129  DBHandler._save(hosts)

Questions 2 : is this a good model considering there will be 1000s of hosts 
being updated in a bulk update and fact being session is static variable in 
DBHandler ? 
Colo is another table and hence mapper object. If the colo is not present 
for any of the host it will be fetched from static method fetch_colo of DB 
handler based on name. Now there is one to many mapping from colo to host, 
aka one colo can represent many hosts. In such case, would the call to DB 
be avoided if the colo is already fetched previously and only relationship 
with new host will be taken care of?

Pardon my ignorance for I am newbie to sqlalchemy n  python in general

-- 
You received this message because you are subscribed to the Google Groups 
sqlalchemy group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.


Re: [sqlalchemy] Bulk update using multi threading and exception handling

2014-07-22 Thread Michael Bayer

On Jul 22, 2014, at 7:47 AM, Milind Vaidya kava...@gmail.com wrote:

 I am developing a back-end scripts to populate a bunch of tables.
 
 1. dbinterface.py : contains all mapper classes and DBHandler class which has 
 required static methods for queries.
 e.g. 
 
 class Destination(Base): 
  __table__ = Base.metadata.tables['destination']
 
 
 @staticmethod
 def _save(rows):
 try:
 DBHandler.session.add_all(rows)
 DBHandler.session.commit()
 except Exception as exp:
 logger.debug(Error saving data: %s, str(exp))
 
  Question 1: Is this a good idea to handle exceptions like this ? The save 
 method accepts list of objects to be saved. If there is problem with one 
 object I don't want to prevent others from getting updated. Other alternative 
 will be handling exception in calling code, roll back and retry saving the 
 list.

the use case of if there is a problem with one object i don't want others 
having an issue can't be handled generically, for the simple reason that if 
object X is dependent on object Y, a failure to persist X means Y can't be 
persisted either.

It sounds like what you're really dealing with is inserting unrelated rows.   
The mechanics of transactions at both the SQLAlchemy level and in many cases at 
the DB level prevents the transaction from proceeding from a failed INSERT, 
*unless* you use a savepoint.

So the pattern, when you expect *individual* rows to fail, is:

for row in rows:
try:
with session.begin_nested():
session.add(row)
except orm_exc.IntegrityError:
   logger.error(Error, exc_info=True)
session.commit()

the above will not be as performant as a mass insert of all the rows as once as 
the Session needs to flush for each one individually.


 
 2. basetables.py : This will populate base tables in the DB. The data will be 
 fetched from a web service using async calls.
 
 def populate(hosts):
 100 '''Fetch property value for each host'''
 101 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
 102 future_to_host = {executor.submit(call__ws, host.host_name): host
 103 for host in hosts}
 104 for future in concurrent.futures.as_completed(future_to_host):
 105 host = future_to_host[future]
 106 try:
 107 property = future.result()
 108 except Exception as exp:
 109 logger.debug(Error fetching hosts from webervice: \
 110 %s, exp)
 111 else:
 112 host.property = property
 113 logger.info(host: %s, property: %s, host, property)
 114 finally:
 115 try:
 116 colo = find_colo(host.host_name)
 117 logger.info(Colo: %s, colo)
 118 except Exception as exp:
 119 logger.debug(Invalid colo: %s, exp)
 120 hosts.remove(host)
 121 else:
 122 host_colo = DBHandler.fetch_colo(colo)
 123 if host_colo is not None:
 124 host.emitter_host_colo = host_colo
 125 logger.info(Host Colo: %s, host_colo)
 126 else:
 127 hosts.remove(host)
 128 logger.debug(Removing Host error processing colo 
 : %s, host)
 129  DBHandler._save(hosts)
 
 Questions 2 : is this a good model considering there will be 1000s of hosts 
 being updated in a bulk update and fact being session is static variable in 
 DBHandler ? 
 Colo is another table and hence mapper object. If the colo is not present for 
 any of the host it will be fetched from static method fetch_colo of DB 
 handler based on name. Now there is one to many mapping from colo to host, 
 aka one colo can represent many hosts. In such case, would the call to DB be 
 avoided if the colo is already fetched previously and only relationship with 
 new host will be taken care of?
 
 Pardon my ignorance for I am newbie to sqlalchemy n  python in general

I'm not very familiar with concurrent.futures but I will note that the Session 
isn't thread safe, as it refers to a single DBAPI Connection/transaction in 
progress as well as lots of internal state, and all the objects associated with 
a given Session are in fact proxies to the state of that Session and 
transaction so they aren't either.   If you are running concurrent, multiple 
threads, you typically have a Session per thread, and every object that is 
handled within that thread should originate within that thread.   To pass the 
state of objects between threads, you should use the Session.merge() method.   
I talk in depth about why the Session works this way in my talk The SQLAlchemy 
Session in Depth 
http://www.sqlalchemy.org/library.html#thesqlalchemysessionindepth .   There is 
also discussion of this in