INSERT statement of postgresql supports RETURNING, read this http://docs.sqlalchemy.org/en/rel_0_8/core/dml.html#sqlalchemy.sql.expression.Insert.returning
On Monday, March 24, 2014 2:43:46 PM UTC+8, James Meneghello wrote: > > Oops, I should add - the reason I can't use an itertools counter to > pre-assign IDs is because the table is potentially being dumped to by > multiple scripts, which is why I have to commit the parts prior to the > segments (since engine.execute can't return multiple insert_ids). > > On Monday, 24 March 2014 14:40:52 UTC+8, James Meneghello wrote: >> >> Thanks for the quick reply! >> >> This seems to work pretty well. I took out the batching (as it's already >> batched at a higher level) and modified it to suit the insertion of >> children as well (and reducded the unique to a single field) , and it >> appears to work. >> >> with db_session() as db: >> existing_entries = dict( >> ((entry.subject, entry) for entry in >> >> db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all() >> ) >> ) >> >> segment_inserts = [] >> for subject, entry in entries.items(): >> existing_entry = existing_entries.get(subject, None) >> if existing_entry: >> segments = dict(((s.segment, s) for s in >> existing_entry.segments)) >> for segment_number, segment in entry['segments'].items(): >> if int(segment_number) not in segments: >> segment['entry_id'] = existing_entry.id >> segment_inserts.append(segment) >> else: >> entry_id = engine.execute(Entry.__table__.insert(), >> entry).inserted_primary_key >> for segment in entry['segments'].values(): >> segment['entry_id'] = entry_id[0] >> segment_inserts.append(segment) >> >> engine.execute(Segment.__table__.insert(), segment_inserts) >> >> >> For 20,000 segments, this ends up being about 45 seconds and 1650 queries >> - 2 to select all the entries and segments, 1 to insert the segments and >> the rest to insert parts. From here, however, I rewrote it a bit: >> >> with db_session() as db: >> existing_entries = dict( >> ((entry.subject, entry) for entry in >> >> db.query(Entry).filter(Entry.subject.in_(entries.keys())).all() >> ) >> ) >> >> entry_inserts = [] >> for subject, entry in entries.items(): >> existing_entry = existing_entries.get(subject, None) >> if not existing_entry: >> entry_inserts.append(entry) >> >> engine.execute(Entry.__table__.insert(), entry_inserts) >> >> existing_entries = dict( >> ((entry.subject, entry) for entry in >> >> db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all() >> ) >> ) >> >> segment_inserts = [] >> for subject, entry in entries.items(): >> existing_entry = existing_entries.get(subject, None) >> if existing_entry: >> segments = dict(((s.segment, s) for s in >> existing_entry.segments)) >> for segment_number, segment in entry['segments'].items(): >> if int(segment_number) not in segments: >> segment['entry_id'] = existing_entry.id >> segment_inserts.append(segment) >> else: >> log.error('i\'ve made a huge mistake') >> >> engine.execute(Segment.__table__.insert(), segment_inserts) >> >> This ends up being about 19 seconds, 6 queries for a clean dump, and a >> bit less if the table is already populated. Removing the unique indexes on >> both the entries and segments tables and replacing them with standard >> indexes saves about a second in a full dump, and about 6 seconds for an >> update. I'm pretty happy with where it is now, and I suspect most of the >> time (aside from the two insert calls) is being spent in Python. That said, >> if you have any tips for improvements I'd be all ears. >> >> Thanks for the help! >> >> On Monday, 24 March 2014 09:19:25 UTC+8, Michael Bayer wrote: >>> >>> >>> On Mar 23, 2014, at 11:33 AM, James Meneghello <muro...@gmail.com> >>> wrote: >>> >>> I'm having a few issues with unique constraints and bulk inserts. The >>> software I'm writing takes data from an external source (a lot of it, >>> anywhere from 1,000 rows per minute to 100-200k+), crunches it down into >>> its hierarchy and saves it to the DB, to be aggregated in the background. >>> The function handling the initial DB save is designed to work with about >>> 20-50k rows at a time - very little modification takes place, it's pretty >>> much just grabbed and thrown into the table. Obviously the amount of data >>> being saved somewhat excludes the use of the ORM in this particular table, >>> but there are a number of other tables that benefit from using the ORM. >>> Hence, the small stuff uses the ORM and the big stuff uses the Core. >>> >>> The main problem I'm having is with the initial save. The data comes in >>> unordered and sometimes contains duplicates, so there's a UniqueConstraint >>> on Entry on sub, division, created. Unfortunately, this hampers the bulk >>> insert - if there's a duplicate, it rolls back the entire insert and hence >>> the entries aren't available to be referenced by the segments later. >>> Obviously, capturing it in a try/catch would skip the whole block as well. >>> Both Entry and Segment have the same problem - there are often duplicate >>> segments. Since there's a large amount of data being pushed through it, I >>> assume it's impractical to insert the elements individually - while there's >>> only 100-200 entries per block, there's usually 20-50k segments. >>> >>> Is there any way of forcing the engine to skip over duplicates and not >>> rollback the transaction on exception? Code's below. Using Postgres, with >>> psycopg2 as the driver. >>> >>> >>> engine.execute(Entry.__table__.insert(), entries) >>> >>> segment_list = [] >>> for sub, entry in entry.items(): >>> segments = entry.pop('segments') >>> >>> e = db.query(Entry)\ >>> .filter(Entry.sub==entry['sub'])\ >>> .filter(Entry.division==entry['division'])\ >>> .filter(Entry.created==entry['created']).first() >>> >>> for segment in segments: >>> segment['entry_id'] = e.id >>> segment_list.append(segment) >>> >>> engine.execute(Segment.__table__.insert(), segment_list) >>> >>> >>> >>> >>> In addition, is there some way to pre-fetch data? Rather than query for >>> each Entry, it'd be nice to pre-load all entries and save a couple hundred >>> queries. >>> >>> >>> you have all the entries in segment_list. Seems like you’d just want to >>> dedupe entries as they enter that list, that’s a pretty simple thing to do >>> with a dictionary. but additionally you’re also emitting a SELECT for >>> every item individually so whatever time you’re saving on that INSERT is >>> just being expended with that huge number of SELECT statements anyway. >>> >>> so yes you certainly want to pre-load everything, since the criteria >>> here is three different things you can use a tuple-based IN clause. >>> >>> below is an example that uses this and also batches so that it won’t run >>> out of memory under any circumstances, it starts with 50K rows in the >>> database and then adds another 50K with 20K overlapping. the whole thing >>> runs in about 28 seconds on my mac. >>> >>> from sqlalchemy import * >>> from sqlalchemy.orm import * >>> from sqlalchemy.ext.declarative import declarative_base >>> import random >>> import itertools >>> >>> Base = declarative_base() >>> >>> class Entry(Base): >>> __tablename__ = 'a' >>> >>> id = Column(Integer, primary_key=True) >>> sub = Column(Integer) >>> division = Column(Integer) >>> created = Column(Integer) >>> __table_args__ = (UniqueConstraint('sub', 'division', 'created'), ) >>> >>> e = create_engine("postgresql://scott:tiger@localhost/test", echo=True) >>> Base.metadata.drop_all(e) >>> Base.metadata.create_all(e) >>> >>> >>> a_bunch_of_fake_unique_entries = list(set( >>> (random.randint(1, 100000), random.randint(1, 100000), >>> random.randint(1, 100000)) >>> for i in range(100000) >>> )) >>> >>> entries_we_will_start_with = a_bunch_of_fake_unique_entries[0:50000] >>> entries_we_will_merge = a_bunch_of_fake_unique_entries[30000:100000] >>> >>> sess = Session(e) >>> >>> counter = itertools.count(1) >>> sess.add_all([Entry(id=next(counter), sub=sub, division=division, >>> created=created) >>> for sub, division, created in >>> entries_we_will_start_with]) >>> sess.commit() >>> >>> # here's where your example begins... This will also batch it >>> # to ensure it can scale arbitrarily >>> >>> while entries_we_will_merge: >>> batch = entries_we_will_merge[0:1000] >>> entries_we_will_merge = entries_we_will_merge[1000:] >>> >>> existing_entries = dict( >>> ((entry.sub, entry.division, entry.created), >>> entry) >>> for entry in sess.query(Entry).filter( >>> tuple_(Entry.sub, Entry.division, >>> Entry.created).in_([ >>> tuple_(sub, division, created) >>> for sub, division, created in >>> batch >>> ]) >>> ) >>> ) >>> >>> inserts = [] >>> for entry_to_merge in batch: >>> existing_entry = existing_entries.get(entry_to_merge, None) >>> if existing_entry: >>> # do whatever to update existing >>> pass >>> else: >>> inserts.append( >>> dict( >>> id=next(counter), >>> sub=entry_to_merge[0], >>> division=entry_to_merge[1], >>> create_engine=entry_to_merge[2] >>> ) >>> ) >>> if inserts: >>> sess.execute(Entry.__table__.insert(), params=inserts) >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> Thanks! >>> >>> -- >>> You received this message because you are subscribed to the Google >>> Groups "sqlalchemy" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to sqlalchemy+...@googlegroups.com. >>> To post to this group, send email to sqlal...@googlegroups.com. >>> Visit this group at http://groups.google.com/group/sqlalchemy. >>> For more options, visit https://groups.google.com/d/optout. >>> >>> >>> -- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com. To post to this group, send email to sqlalchemy@googlegroups.com. Visit this group at http://groups.google.com/group/sqlalchemy. For more options, visit https://groups.google.com/d/optout.