RETURNING doesn't work with DBAPI's "executemany" style of execution, however, which is what conn.execute(stmt, [list of parameter sets]) calls.
On Mar 24, 2014, at 5:33 AM, Cosmia Luna <cosm...@gmail.com> wrote: > INSERT statement of postgresql supports RETURNING, read this > http://docs.sqlalchemy.org/en/rel_0_8/core/dml.html#sqlalchemy.sql.expression.Insert.returning > > On Monday, March 24, 2014 2:43:46 PM UTC+8, James Meneghello wrote: > Oops, I should add - the reason I can't use an itertools counter to > pre-assign IDs is because the table is potentially being dumped to by > multiple scripts, which is why I have to commit the parts prior to the > segments (since engine.execute can't return multiple insert_ids). > > On Monday, 24 March 2014 14:40:52 UTC+8, James Meneghello wrote: > Thanks for the quick reply! > > This seems to work pretty well. I took out the batching (as it's already > batched at a higher level) and modified it to suit the insertion of children > as well (and reducded the unique to a single field) , and it appears to work. > > with db_session() as db: > existing_entries = dict( > ((entry.subject, entry) for entry in > > db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all() > ) > ) > > segment_inserts = [] > for subject, entry in entries.items(): > existing_entry = existing_entries.get(subject, None) > if existing_entry: > segments = dict(((s.segment, s) for s in existing_entry.segments)) > for segment_number, segment in entry['segments'].items(): > if int(segment_number) not in segments: > segment['entry_id'] = existing_entry.id > segment_inserts.append(segment) > else: > entry_id = engine.execute(Entry.__table__.insert(), > entry).inserted_primary_key > for segment in entry['segments'].values(): > segment['entry_id'] = entry_id[0] > segment_inserts.append(segment) > > engine.execute(Segment.__table__.insert(), segment_inserts) > > For 20,000 segments, this ends up being about 45 seconds and 1650 queries - 2 > to select all the entries and segments, 1 to insert the segments and the rest > to insert parts. From here, however, I rewrote it a bit: > > with db_session() as db: > existing_entries = dict( > ((entry.subject, entry) for entry in > > db.query(Entry).filter(Entry.subject.in_(entries.keys())).all() > ) > ) > > entry_inserts = [] > for subject, entry in entries.items(): > existing_entry = existing_entries.get(subject, None) > if not existing_entry: > entry_inserts.append(entry) > > engine.execute(Entry.__table__.insert(), entry_inserts) > > existing_entries = dict( > ((entry.subject, entry) for entry in > > db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all() > ) > ) > > segment_inserts = [] > for subject, entry in entries.items(): > existing_entry = existing_entries.get(subject, None) > if existing_entry: > segments = dict(((s.segment, s) for s in > existing_entry.segments)) > for segment_number, segment in entry['segments'].items(): > if int(segment_number) not in segments: > segment['entry_id'] = existing_entry.id > segment_inserts.append(segment) > else: > log.error('i\'ve made a huge mistake') > > engine.execute(Segment.__table__.insert(), segment_inserts) > > This ends up being about 19 seconds, 6 queries for a clean dump, and a bit > less if the table is already populated. Removing the unique indexes on both > the entries and segments tables and replacing them with standard indexes > saves about a second in a full dump, and about 6 seconds for an update. I'm > pretty happy with where it is now, and I suspect most of the time (aside from > the two insert calls) is being spent in Python. That said, if you have any > tips for improvements I'd be all ears. > > Thanks for the help! > > On Monday, 24 March 2014 09:19:25 UTC+8, Michael Bayer wrote: > > On Mar 23, 2014, at 11:33 AM, James Meneghello <muro...@gmail.com> wrote: > >> I'm having a few issues with unique constraints and bulk inserts. The >> software I'm writing takes data from an external source (a lot of it, >> anywhere from 1,000 rows per minute to 100-200k+), crunches it down into its >> hierarchy and saves it to the DB, to be aggregated in the background. The >> function handling the initial DB save is designed to work with about 20-50k >> rows at a time - very little modification takes place, it's pretty much just >> grabbed and thrown into the table. Obviously the amount of data being saved >> somewhat excludes the use of the ORM in this particular table, but there are >> a number of other tables that benefit from using the ORM. Hence, the small >> stuff uses the ORM and the big stuff uses the Core. >> >> The main problem I'm having is with the initial save. The data comes in >> unordered and sometimes contains duplicates, so there's a UniqueConstraint >> on Entry on sub, division, created. Unfortunately, this hampers the bulk >> insert - if there's a duplicate, it rolls back the entire insert and hence >> the entries aren't available to be referenced by the segments later. >> Obviously, capturing it in a try/catch would skip the whole block as well. >> Both Entry and Segment have the same problem - there are often duplicate >> segments. Since there's a large amount of data being pushed through it, I >> assume it's impractical to insert the elements individually - while there's >> only 100-200 entries per block, there's usually 20-50k segments. >> >> Is there any way of forcing the engine to skip over duplicates and not >> rollback the transaction on exception? Code's below. Using Postgres, with >> psycopg2 as the driver. >> >> >> engine.execute(Entry.__table__.insert(), entries) >> >> segment_list = [] >> for sub, entry in entry.items(): >> segments = entry.pop('segments') >> >> e = db.query(Entry)\ >> .filter(Entry.sub==entry['sub'])\ >> .filter(Entry.division==entry['division'])\ >> .filter(Entry.created==entry['created']).first() >> >> for segment in segments: >> segment['entry_id'] = e.id >> segment_list.append(segment) >> >> engine.execute(Segment.__table__.insert(), segment_list) > > >> >> In addition, is there some way to pre-fetch data? Rather than query for each >> Entry, it'd be nice to pre-load all entries and save a couple hundred >> queries. > > you have all the entries in segment_list. Seems like you'd just want to > dedupe entries as they enter that list, that's a pretty simple thing to do > with a dictionary. but additionally you're also emitting a SELECT for every > item individually so whatever time you're saving on that INSERT is just being > expended with that huge number of SELECT statements anyway. > > so yes you certainly want to pre-load everything, since the criteria here is > three different things you can use a tuple-based IN clause. > > below is an example that uses this and also batches so that it won't run out > of memory under any circumstances, it starts with 50K rows in the database > and then adds another 50K with 20K overlapping. the whole thing runs in > about 28 seconds on my mac. > > from sqlalchemy import * > from sqlalchemy.orm import * > from sqlalchemy.ext.declarative import declarative_base > import random > import itertools > > Base = declarative_base() > > class Entry(Base): > __tablename__ = 'a' > > id = Column(Integer, primary_key=True) > sub = Column(Integer) > division = Column(Integer) > created = Column(Integer) > __table_args__ = (UniqueConstraint('sub', 'division', 'created'), ) > > e = create_engine("postgresql://scott:tiger@localhost/test", echo=True) > Base.metadata.drop_all(e) > Base.metadata.create_all(e) > > > a_bunch_of_fake_unique_entries = list(set( > (random.randint(1, 100000), random.randint(1, 100000), random.randint(1, > 100000)) > for i in range(100000) > )) > > entries_we_will_start_with = a_bunch_of_fake_unique_entries[0:50000] > entries_we_will_merge = a_bunch_of_fake_unique_entries[30000:100000] > > sess = Session(e) > > counter = itertools.count(1) > sess.add_all([Entry(id=next(counter), sub=sub, division=division, > created=created) > for sub, division, created in entries_we_will_start_with]) > sess.commit() > > # here's where your example begins... This will also batch it > # to ensure it can scale arbitrarily > > while entries_we_will_merge: > batch = entries_we_will_merge[0:1000] > entries_we_will_merge = entries_we_will_merge[1000:] > > existing_entries = dict( > ((entry.sub, entry.division, entry.created), entry) > for entry in sess.query(Entry).filter( > tuple_(Entry.sub, Entry.division, > Entry.created).in_([ > tuple_(sub, division, created) > for sub, division, created in > batch > ]) > ) > ) > > inserts = [] > for entry_to_merge in batch: > existing_entry = existing_entries.get(entry_to_merge, None) > if existing_entry: > # do whatever to update existing > pass > else: > inserts.append( > dict( > id=next(counter), > sub=entry_to_merge[0], > division=entry_to_merge[1], > create_engine=entry_to_merge[2] > ) > ) > if inserts: > sess.execute(Entry.__table__.insert(), params=inserts) > > > > > > > > > > >> >> Thanks! >> >> -- >> You received this message because you are subscribed to the Google Groups >> "sqlalchemy" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to sqlalchemy+...@googlegroups.com. >> To post to this group, send email to sqlal...@googlegroups.com. >> Visit this group at http://groups.google.com/group/sqlalchemy. >> For more options, visit https://groups.google.com/d/optout. > > > -- > You received this message because you are subscribed to the Google Groups > "sqlalchemy" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to sqlalchemy+unsubscr...@googlegroups.com. > To post to this group, send email to sqlalchemy@googlegroups.com. > Visit this group at http://groups.google.com/group/sqlalchemy. > For more options, visit https://groups.google.com/d/optout. -- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com. To post to this group, send email to sqlalchemy@googlegroups.com. Visit this group at http://groups.google.com/group/sqlalchemy. For more options, visit https://groups.google.com/d/optout.