Wow, I didn't know that... is it a bug? But, the RETURNING clause will work though :)
stmt = P.__table__.insert(returning=[P.id], values=[{"val": 1}, {"val": 2}]) with engine.connect() as conn: result = conn.execute(stmt) # do something with result hmm, maybe perfomance suffers from huge-size-SQL...... well, which I didn't test Cosmia On Monday, March 24, 2014 9:39:51 PM UTC+8, Michael Bayer wrote: > > RETURNING doesn’t work with DBAPI’s “executemany” style of execution, > however, which is what conn.execute(stmt, [list of parameter sets]) calls. > > > > On Mar 24, 2014, at 5:33 AM, Cosmia Luna <cos...@gmail.com <javascript:>> > wrote: > > INSERT statement of postgresql supports RETURNING, read this > http://docs.sqlalchemy.org/en/rel_0_8/core/dml.html#sqlalchemy.sql.expression.Insert.returning > > On Monday, March 24, 2014 2:43:46 PM UTC+8, James Meneghello wrote: >> >> Oops, I should add - the reason I can't use an itertools counter to >> pre-assign IDs is because the table is potentially being dumped to by >> multiple scripts, which is why I have to commit the parts prior to the >> segments (since engine.execute can't return multiple insert_ids). >> >> On Monday, 24 March 2014 14:40:52 UTC+8, James Meneghello wrote: >>> >>> Thanks for the quick reply! >>> >>> This seems to work pretty well. I took out the batching (as it's already >>> batched at a higher level) and modified it to suit the insertion of >>> children as well (and reducded the unique to a single field) , and it >>> appears to work. >>> >>> with db_session() as db: >>> existing_entries = dict( >>> ((entry.subject, entry) for entry in >>> >>> db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all() >>> ) >>> ) >>> >>> segment_inserts = [] >>> for subject, entry in entries.items(): >>> existing_entry = existing_entries.get(subject, None) >>> if existing_entry: >>> segments = dict(((s.segment, s) for s in >>> existing_entry.segments)) >>> for segment_number, segment in entry['segments'].items(): >>> if int(segment_number) not in segments: >>> segment['entry_id'] = existing_entry.id >>> segment_inserts.append(segment) >>> else: >>> entry_id = engine.execute(Entry.__table__.insert(), >>> entry).inserted_primary_key >>> for segment in entry['segments'].values(): >>> segment['entry_id'] = entry_id[0] >>> segment_inserts.append(segment) >>> >>> engine.execute(Segment.__table__.insert(), segment_inserts) >>> >>> >>> For 20,000 segments, this ends up being about 45 seconds and 1650 >>> queries - 2 to select all the entries and segments, 1 to insert the >>> segments and the rest to insert parts. From here, however, I rewrote it a >>> bit: >>> >>> with db_session() as db: >>> existing_entries = dict( >>> ((entry.subject, entry) for entry in >>> >>> db.query(Entry).filter(Entry.subject.in_(entries.keys())).all() >>> ) >>> ) >>> >>> entry_inserts = [] >>> for subject, entry in entries.items(): >>> existing_entry = existing_entries.get(subject, None) >>> if not existing_entry: >>> entry_inserts.append(entry) >>> >>> engine.execute(Entry.__table__.insert(), entry_inserts) >>> >>> existing_entries = dict( >>> ((entry.subject, entry) for entry in >>> >>> db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all() >>> ) >>> ) >>> >>> segment_inserts = [] >>> for subject, entry in entries.items(): >>> existing_entry = existing_entries.get(subject, None) >>> if existing_entry: >>> segments = dict(((s.segment, s) for s in >>> existing_entry.segments)) >>> for segment_number, segment in entry['segments'].items(): >>> if int(segment_number) not in segments: >>> segment['entry_id'] = existing_entry.id >>> segment_inserts.append(segment) >>> else: >>> log.error('i\'ve made a huge mistake') >>> >>> engine.execute(Segment.__table__.insert(), segment_inserts) >>> >>> This ends up being about 19 seconds, 6 queries for a clean dump, and a >>> bit less if the table is already populated. Removing the unique indexes on >>> both the entries and segments tables and replacing them with standard >>> indexes saves about a second in a full dump, and about 6 seconds for an >>> update. I'm pretty happy with where it is now, and I suspect most of the >>> time (aside from the two insert calls) is being spent in Python. That said, >>> if you have any tips for improvements I'd be all ears. >>> >>> Thanks for the help! >>> >>> On Monday, 24 March 2014 09:19:25 UTC+8, Michael Bayer wrote: >>>> >>>> >>>> On Mar 23, 2014, at 11:33 AM, James Meneghello <muro...@gmail.com> >>>> wrote: >>>> >>>> I'm having a few issues with unique constraints and bulk inserts. The >>>> software I'm writing takes data from an external source (a lot of it, >>>> anywhere from 1,000 rows per minute to 100-200k+), crunches it down into >>>> its hierarchy and saves it to the DB, to be aggregated in the background. >>>> The function handling the initial DB save is designed to work with about >>>> 20-50k rows at a time - very little modification takes place, it's pretty >>>> much just grabbed and thrown into the table. Obviously the amount of data >>>> being saved somewhat excludes the use of the ORM in this particular table, >>>> but there are a number of other tables that benefit from using the ORM. >>>> Hence, the small stuff uses the ORM and the big stuff uses the Core. >>>> >>>> The main problem I'm having is with the initial save. The data comes in >>>> unordered and sometimes contains duplicates, so there's a UniqueConstraint >>>> on Entry on sub, division, created. Unfortunately, this hampers the bulk >>>> insert - if there's a duplicate, it rolls back the entire insert and hence >>>> the entries aren't available to be referenced by the segments later. >>>> Obviously, capturing it in a try/catch would skip the whole block as well. >>>> Both Entry and Segment have the same problem - there are often duplicate >>>> segments. Since there's a large amount of data being pushed through it, I >>>> assume it's impractical to insert the elements individually - while >>>> there's >>>> only 100-200 entries per block, there's usually 20-50k segments. >>>> >>>> Is there any way of forcing the engine to skip over duplicates and not >>>> rollback the transaction on exception? Code's below. Using Postgres, with >>>> psycopg2 as the driver. >>>> >>>> >>>> engine.execute(Entry.__table__.insert(), entries) >>>> >>>> segment_list = [] >>>> for sub, entry in entry.items(): >>>> segments = entry.pop('segments') >>>> >>>> e = db.query(Entry)\ >>>> .filter(Entry.sub==entry['sub'])\ >>>> .filter(Entry.division==entry['division'])\ >>>> .filter(Entry.created==entry['created']).first() >>>> >>>> for segment in segments: >>>> segment['entry_id'] = e.id >>>> segment_list.append(segment) >>>> >>>> engine.execute(Segment.__table__.insert(), segment_list) >>>> >>>> >>>> >>>> >>>> In addition, is there some way to pre-fetch data? Rather than query for >>>> each Entry, it'd be nice to pre-load all entries and save a couple hundred >>>> queries. >>>> >>>> >>>> you have all the entries in segment_list. Seems like you’d just want >>>> to dedupe entries as they enter that list, that’s a pretty simple thing to >>>> do with a dictionary. but additionally you’re also emitting a SELECT for >>>> every item individually so whatever time you’re saving on that INSERT is >>>> just being expended with that huge number of SELECT statements anyway. >>>> >>>> so yes you certainly want to pre-load everything, since the criteria >>>> here is three different things you can use a tuple-based IN clause. >>>> >>>> below is an example that uses this and also batches so that it won’t >>>> run out of memory under any circumstances, it starts with 50K rows in the >>>> database and then adds another 50K with 20K overlapping. the whole thing >>>> runs in about 28 seconds on my mac. >>>> >>>> from sqlalchemy import * >>>> from sqlalchemy.orm import * >>>> from sqlalchemy.ext.declarative import declarative_base >>>> import random >>>> import itertools >>>> >>>> Base = declarative_base() >>>> >>>> class Entry(Base): >>>> __tablename__ = 'a' >>>> >>>> id = Column(Integer, primary_key=True) >>>> sub = Column(Integer) >>>> division = Column(Integer) >>>> created = Column(Integer) >>>> __table_args__ = (UniqueConstraint('sub', 'division', 'created'), ) >>>> >>>> e = create_engine("postgresql://scott:tiger@localhost/test", echo=True) >>>> Base.metadata.drop_all(e) >>>> Base.metadata.create_all(e) >>>> >>>> >>>> a_bunch_of_fake_unique_entries = list(set( >>>> (random.randint(1, 100000), random.randint(1, 100000), >>>> random.randint(1, 100000)) >>>> for i in range(100000) >>>> )) >>>> >>>> entries_we_will_start_with = a_bunch_of_fake_unique_entries[0:50000] >>>> entries_we_will_merge = a_bunch_of_fake_unique_entries[30000:100000] >>>> >>>> sess = Session(e) >>>> >>>> counter = itertools.count(1) >>>> sess.add_all([Entry(id=next(counter), sub=sub, division=division, >>>> created=created) >>>> for sub, division, created in >>>> entries_we_will_start_with]) >>>> sess.commit() >>>> >>>> # here's where your example begins... This will also batch it >>>> # to ensure it can scale arbitrarily >>>> >>>> while entries_we_will_merge: >>>> batch = entries_we_will_merge[0:1000] >>>> entries_we_will_merge = entries_we_will_merge[1000:] >>>> >>>> existing_entries = dict( >>>> ((entry.sub, entry.division, entry.created), >>>> entry) >>>> for entry in sess.query(Entry).filter( >>>> tuple_(Entry.sub, Entry.division, >>>> Entry.created).in_([ >>>> tuple_(sub, division, created) >>>> for sub, division, created in >>>> batch >>>> ]) >>>> ) >>>> ) >>>> >>>> inserts = [] >>>> for entry_to_merge in batch: >>>> existing_entry = existing_entries.get(entry_to_merge, None) >>>> if existing_entry: >>>> # do whatever to update existing >>>> pass >>>> else: >>>> inserts.append( >>>> dict( >>>> id=next(counter), >>>> sub=entry_to_merge[0], >>>> division=entry_to_merge[1], >>>> create_engine=entry_to_merge[2] >>>> ) >>>> ) >>>> if inserts: >>>> sess.execute(Entry.__table__.insert(), params=inserts) >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> Thanks! >>>> >>>> -- >>>> You received this message because you are subscribed to the Google >>>> Groups "sqlalchemy" group. >>>> To unsubscribe from this group and stop receiving emails from it, send >>>> an email to sqlalchemy+...@googlegroups.com. >>>> To post to this group, send email to sqlal...@googlegroups.com. >>>> Visit this group at http://groups.google.com/group/sqlalchemy. >>>> For more options, visit https://groups.google.com/d/optout. >>>> >>>> >>>> > -- > You received this message because you are subscribed to the Google Groups > "sqlalchemy" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to sqlalchemy+...@googlegroups.com <javascript:>. > To post to this group, send email to sqlal...@googlegroups.com<javascript:> > . > Visit this group at http://groups.google.com/group/sqlalchemy. > For more options, visit https://groups.google.com/d/optout. > > > -- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com. To post to this group, send email to sqlalchemy@googlegroups.com. Visit this group at http://groups.google.com/group/sqlalchemy. For more options, visit https://groups.google.com/d/optout.