On Jul 22, 2014, at 7:47 AM, Milind Vaidya kava...@gmail.com wrote:
I am developing a back-end scripts to populate a bunch of tables.
1. dbinterface.py : contains all mapper classes and DBHandler class which has
required static methods for queries.
e.g.
class Destination(Base):
__table__ = Base.metadata.tables['destination']
@staticmethod
def _save(rows):
try:
DBHandler.session.add_all(rows)
DBHandler.session.commit()
except Exception as exp:
logger.debug(Error saving data: %s, str(exp))
Question 1: Is this a good idea to handle exceptions like this ? The save
method accepts list of objects to be saved. If there is problem with one
object I don't want to prevent others from getting updated. Other alternative
will be handling exception in calling code, roll back and retry saving the
list.
the use case of if there is a problem with one object i don't want others
having an issue can't be handled generically, for the simple reason that if
object X is dependent on object Y, a failure to persist X means Y can't be
persisted either.
It sounds like what you're really dealing with is inserting unrelated rows.
The mechanics of transactions at both the SQLAlchemy level and in many cases at
the DB level prevents the transaction from proceeding from a failed INSERT,
*unless* you use a savepoint.
So the pattern, when you expect *individual* rows to fail, is:
for row in rows:
try:
with session.begin_nested():
session.add(row)
except orm_exc.IntegrityError:
logger.error(Error, exc_info=True)
session.commit()
the above will not be as performant as a mass insert of all the rows as once as
the Session needs to flush for each one individually.
2. basetables.py : This will populate base tables in the DB. The data will be
fetched from a web service using async calls.
def populate(hosts):
100 '''Fetch property value for each host'''
101 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
102 future_to_host = {executor.submit(call__ws, host.host_name): host
103 for host in hosts}
104 for future in concurrent.futures.as_completed(future_to_host):
105 host = future_to_host[future]
106 try:
107 property = future.result()
108 except Exception as exp:
109 logger.debug(Error fetching hosts from webervice: \
110 %s, exp)
111 else:
112 host.property = property
113 logger.info(host: %s, property: %s, host, property)
114 finally:
115 try:
116 colo = find_colo(host.host_name)
117 logger.info(Colo: %s, colo)
118 except Exception as exp:
119 logger.debug(Invalid colo: %s, exp)
120 hosts.remove(host)
121 else:
122 host_colo = DBHandler.fetch_colo(colo)
123 if host_colo is not None:
124 host.emitter_host_colo = host_colo
125 logger.info(Host Colo: %s, host_colo)
126 else:
127 hosts.remove(host)
128 logger.debug(Removing Host error processing colo
: %s, host)
129 DBHandler._save(hosts)
Questions 2 : is this a good model considering there will be 1000s of hosts
being updated in a bulk update and fact being session is static variable in
DBHandler ?
Colo is another table and hence mapper object. If the colo is not present for
any of the host it will be fetched from static method fetch_colo of DB
handler based on name. Now there is one to many mapping from colo to host,
aka one colo can represent many hosts. In such case, would the call to DB be
avoided if the colo is already fetched previously and only relationship with
new host will be taken care of?
Pardon my ignorance for I am newbie to sqlalchemy n python in general
I'm not very familiar with concurrent.futures but I will note that the Session
isn't thread safe, as it refers to a single DBAPI Connection/transaction in
progress as well as lots of internal state, and all the objects associated with
a given Session are in fact proxies to the state of that Session and
transaction so they aren't either. If you are running concurrent, multiple
threads, you typically have a Session per thread, and every object that is
handled within that thread should originate within that thread. To pass the
state of objects between threads, you should use the Session.merge() method.
I talk in depth about why the Session works this way in my talk The SQLAlchemy
Session in Depth
http://www.sqlalchemy.org/library.html#thesqlalchemysessionindepth . There is
also discussion of this in