On Jul 22, 2014, at 7:47 AM, Milind Vaidya <kava...@gmail.com> wrote:
> I am developing a back-end scripts to populate a bunch of tables. > > 1. dbinterface.py : contains all mapper classes and DBHandler class which has > required static methods for queries. > e.g. > > class Destination(Base): > __table__ = Base.metadata.tables['destination'] > > > @staticmethod > def _save(rows): > try: > DBHandler.session.add_all(rows) > DBHandler.session.commit() > except Exception as exp: > logger.debug("Error saving data: %s", str(exp)) > > Question 1: Is this a good idea to handle exceptions like this ? The save > method accepts list of objects to be saved. If there is problem with one > object I don't want to prevent others from getting updated. Other alternative > will be handling exception in calling code, roll back and retry saving the > list. the use case of "if there is a problem with one object i don't want others having an issue" can't be handled generically, for the simple reason that if object X is dependent on object Y, a failure to persist X means Y can't be persisted either. It sounds like what you're really dealing with is inserting unrelated rows. The mechanics of transactions at both the SQLAlchemy level and in many cases at the DB level prevents the transaction from proceeding from a failed INSERT, *unless* you use a savepoint. So the pattern, when you expect *individual* rows to fail, is: for row in rows: try: with session.begin_nested(): session.add(row) except orm_exc.IntegrityError: logger.error("Error", exc_info=True) session.commit() the above will not be as performant as a mass insert of all the rows as once as the Session needs to flush for each one individually. > > 2. basetables.py : This will populate base tables in the DB. The data will be > fetched from a web service using async calls. > > def populate(hosts): > 100 '''Fetch property value for each host''' > 101 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: > 102 future_to_host = {executor.submit(call__ws, host.host_name): host > 103 for host in hosts} > 104 for future in concurrent.futures.as_completed(future_to_host): > 105 host = future_to_host[future] > 106 try: > 107 property = future.result() > 108 except Exception as exp: > 109 logger.debug("Error fetching hosts from webervice: \ > 110 %s", exp) > 111 else: > 112 host.property = property > 113 logger.info("host: %s, property: %s", host, property) > 114 finally: > 115 try: > 116 colo = find_colo(host.host_name) > 117 logger.info("Colo: %s", colo) > 118 except Exception as exp: > 119 logger.debug("Invalid colo: %s", exp) > 120 hosts.remove(host) > 121 else: > 122 host_colo = DBHandler.fetch_colo(colo) > 123 if host_colo is not None: > 124 host.emitter_host_colo = host_colo > 125 logger.info("Host Colo: %s", host_colo) > 126 else: > 127 hosts.remove(host) > 128 logger.debug("Removing Host error processing colo > : %s", host) > 129 DBHandler._save(hosts) > > Questions 2 : is this a good model considering there will be 1000s of hosts > being updated in a bulk update and fact being session is static variable in > DBHandler ? > Colo is another table and hence mapper object. If the colo is not present for > any of the host it will be fetched from static method fetch_colo of DB > handler based on name. Now there is one to many mapping from colo to host, > aka one colo can represent many hosts. In such case, would the call to DB be > avoided if the colo is already fetched previously and only relationship with > new host will be taken care of? > > Pardon my ignorance for I am newbie to sqlalchemy n python in general I'm not very familiar with concurrent.futures but I will note that the Session isn't thread safe, as it refers to a single DBAPI Connection/transaction in progress as well as lots of internal state, and all the objects associated with a given Session are in fact proxies to the state of that Session and transaction so they aren't either. If you are running concurrent, multiple threads, you typically have a Session per thread, and every object that is handled within that thread should originate within that thread. To pass the state of objects between threads, you should use the Session.merge() method. I talk in depth about why the Session works this way in my talk "The SQLAlchemy Session in Depth" http://www.sqlalchemy.org/library.html#thesqlalchemysessionindepth . There is also discussion of this in http://docs.sqlalchemy.org/en/rel_0_9/orm/session.html#is-the-session-thread-safe. Alternatively, if you really want to have all of the concurrent work proceeding on a single transaction, you can share a Session and its objects between threads if you mutex all database-specific operations. My approach towards this is to create a coarse-grained facade around both your Session and all the objects within, exposing only non-database-linked objects on the outside to your concurrent.futures workers. These coarse-grained methods would provide a mutex such that the state of the Session isn't impacted by more than one thread at the same time. -- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com. To post to this group, send email to sqlalchemy@googlegroups.com. Visit this group at http://groups.google.com/group/sqlalchemy. For more options, visit https://groups.google.com/d/optout.