RETURNING doesn't work with DBAPI's "executemany" style of execution, however, 
which is what conn.execute(stmt, [list of parameter sets]) calls.



On Mar 24, 2014, at 5:33 AM, Cosmia Luna <cosm...@gmail.com> wrote:

> INSERT statement of postgresql supports RETURNING, read this 
> http://docs.sqlalchemy.org/en/rel_0_8/core/dml.html#sqlalchemy.sql.expression.Insert.returning
> 
> On Monday, March 24, 2014 2:43:46 PM UTC+8, James Meneghello wrote:
> Oops, I should add - the reason I can't use an itertools counter to 
> pre-assign IDs is because the table is potentially being dumped to by 
> multiple scripts, which is why I have to commit the parts prior to the 
> segments (since engine.execute can't return multiple insert_ids).
> 
> On Monday, 24 March 2014 14:40:52 UTC+8, James Meneghello wrote:
> Thanks for the quick reply!
> 
> This seems to work pretty well. I took out the batching (as it's already 
> batched at a higher level) and modified it to suit the insertion of children 
> as well (and reducded the unique to a single field) , and it appears to work.
> 
> with db_session() as db:
>     existing_entries = dict(
>         ((entry.subject, entry) for entry in
>             
> db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
>         )
>     )
> 
>     segment_inserts = []
>     for subject, entry in entries.items():
>         existing_entry = existing_entries.get(subject, None)
>         if existing_entry:
>             segments = dict(((s.segment, s) for s in existing_entry.segments))
>             for segment_number, segment in entry['segments'].items():
>                 if int(segment_number) not in segments:
>                     segment['entry_id'] = existing_entry.id
>                     segment_inserts.append(segment)
>         else:
>             entry_id = engine.execute(Entry.__table__.insert(), 
> entry).inserted_primary_key
>             for segment in entry['segments'].values():
>                 segment['entry_id'] = entry_id[0]
>                 segment_inserts.append(segment)
> 
>     engine.execute(Segment.__table__.insert(), segment_inserts)
> 
> For 20,000 segments, this ends up being about 45 seconds and 1650 queries - 2 
> to select all the entries and segments, 1 to insert the segments and the rest 
> to insert parts. From here, however, I rewrote it a bit:
> 
>     with db_session() as db:
>         existing_entries = dict(
>             ((entry.subject, entry) for entry in
>                 
> db.query(Entry).filter(Entry.subject.in_(entries.keys())).all()
>             )
>         )
> 
>         entry_inserts = []
>         for subject, entry in entries.items():
>             existing_entry = existing_entries.get(subject, None)
>             if not existing_entry:
>                 entry_inserts.append(entry)
> 
>         engine.execute(Entry.__table__.insert(), entry_inserts)
> 
>         existing_entries = dict(
>             ((entry.subject, entry) for entry in
>                 
> db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
>             )
>         )
> 
>         segment_inserts = []
>         for subject, entry in entries.items():
>             existing_entry = existing_entries.get(subject, None)
>             if existing_entry:
>                 segments = dict(((s.segment, s) for s in 
> existing_entry.segments))
>                 for segment_number, segment in entry['segments'].items():
>                     if int(segment_number) not in segments:
>                         segment['entry_id'] = existing_entry.id
>                         segment_inserts.append(segment)
>             else:
>                 log.error('i\'ve made a huge mistake')
> 
>         engine.execute(Segment.__table__.insert(), segment_inserts)
> 
> This ends up being about 19 seconds, 6 queries for a clean dump, and a bit 
> less if the table is already populated. Removing the unique indexes on both 
> the entries and segments tables and replacing them with standard indexes 
> saves about a second in a full dump, and about 6 seconds for an update. I'm 
> pretty happy with where it is now, and I suspect most of the time (aside from 
> the two insert calls) is being spent in Python. That said, if you have any 
> tips for improvements I'd be all ears.
> 
> Thanks for the help!
> 
> On Monday, 24 March 2014 09:19:25 UTC+8, Michael Bayer wrote:
> 
> On Mar 23, 2014, at 11:33 AM, James Meneghello <muro...@gmail.com> wrote:
> 
>> I'm having a few issues with unique constraints and bulk inserts. The 
>> software I'm writing takes data from an external source (a lot of it, 
>> anywhere from 1,000 rows per minute to 100-200k+), crunches it down into its 
>> hierarchy and saves it to the DB, to be aggregated in the background. The 
>> function handling the initial DB save is designed to work with about 20-50k 
>> rows at a time - very little modification takes place, it's pretty much just 
>> grabbed and thrown into the table. Obviously the amount of data being saved 
>> somewhat excludes the use of the ORM in this particular table, but there are 
>> a number of other tables that benefit from using the ORM. Hence, the small 
>> stuff uses the ORM and the big stuff uses the Core.
>> 
>> The main problem I'm having is with the initial save. The data comes in 
>> unordered and sometimes contains duplicates, so there's a UniqueConstraint 
>> on Entry on sub, division, created. Unfortunately, this hampers the bulk 
>> insert - if there's a duplicate, it rolls back the entire insert and hence 
>> the entries aren't available to be referenced by the segments later. 
>> Obviously, capturing it in a try/catch would skip the whole block as well. 
>> Both Entry and Segment have the same problem - there are often duplicate 
>> segments. Since there's a large amount of data being pushed through it, I 
>> assume it's impractical to insert the elements individually - while there's 
>> only 100-200 entries per block, there's usually 20-50k segments.
>> 
>> Is there any way of forcing the engine to skip over duplicates and not 
>> rollback the transaction on exception? Code's below. Using Postgres, with 
>> psycopg2 as the driver.
>> 
>> 
>>         engine.execute(Entry.__table__.insert(), entries)
>> 
>>         segment_list = []
>>         for sub, entry in entry.items():
>>             segments = entry.pop('segments')
>> 
>>             e = db.query(Entry)\
>>                 .filter(Entry.sub==entry['sub'])\
>>                 .filter(Entry.division==entry['division'])\
>>                 .filter(Entry.created==entry['created']).first()
>> 
>>             for segment in segments:
>>                 segment['entry_id'] = e.id
>>                 segment_list.append(segment)
>> 
>>         engine.execute(Segment.__table__.insert(), segment_list)
> 
> 
>> 
>> In addition, is there some way to pre-fetch data? Rather than query for each 
>> Entry, it'd be nice to pre-load all entries and save a couple hundred 
>> queries.
> 
> you have all the entries in segment_list.  Seems like you'd just want to 
> dedupe entries as they enter that list, that's a pretty simple thing to do 
> with a dictionary.   but additionally you're also emitting a SELECT for every 
> item individually so whatever time you're saving on that INSERT is just being 
> expended with that huge number of SELECT statements anyway.
> 
> so yes you certainly want to pre-load everything, since the criteria here is 
> three different things you can use a tuple-based IN clause.
> 
> below is an example that uses this and also batches so that it won't run out 
> of memory under any circumstances, it starts with 50K rows in the database 
> and then adds another 50K with 20K overlapping.  the whole thing runs in 
> about 28 seconds on my mac.
> 
> from sqlalchemy import *
> from sqlalchemy.orm import *
> from sqlalchemy.ext.declarative import declarative_base
> import random
> import itertools
> 
> Base = declarative_base()
> 
> class Entry(Base):
>     __tablename__ = 'a'
> 
>     id = Column(Integer, primary_key=True)
>     sub = Column(Integer)
>     division = Column(Integer)
>     created = Column(Integer)
>     __table_args__ = (UniqueConstraint('sub', 'division', 'created'), )
> 
> e = create_engine("postgresql://scott:tiger@localhost/test", echo=True)
> Base.metadata.drop_all(e)
> Base.metadata.create_all(e)
> 
> 
> a_bunch_of_fake_unique_entries = list(set(
>     (random.randint(1, 100000), random.randint(1, 100000), random.randint(1, 
> 100000))
>     for i in range(100000)
> ))
> 
> entries_we_will_start_with = a_bunch_of_fake_unique_entries[0:50000]
> entries_we_will_merge = a_bunch_of_fake_unique_entries[30000:100000]
> 
> sess = Session(e)
> 
> counter = itertools.count(1)
> sess.add_all([Entry(id=next(counter), sub=sub, division=division, 
> created=created)
>                     for sub, division, created in entries_we_will_start_with])
> sess.commit()
> 
> # here's where your example begins... This will also batch it
> # to ensure it can scale arbitrarily
> 
> while entries_we_will_merge:
>     batch = entries_we_will_merge[0:1000]
>     entries_we_will_merge = entries_we_will_merge[1000:]
> 
>     existing_entries = dict(
>                         ((entry.sub, entry.division, entry.created), entry)
>                         for entry in sess.query(Entry).filter(
>                                 tuple_(Entry.sub, Entry.division, 
> Entry.created).in_([
>                                         tuple_(sub, division, created)
>                                         for sub, division, created in
>                                         batch
>                                 ])
>                         )
>                         )
> 
>     inserts = []
>     for entry_to_merge in batch:
>         existing_entry = existing_entries.get(entry_to_merge, None)
>         if existing_entry:
>             # do whatever to update existing
>             pass
>         else:
>             inserts.append(
>                 dict(
>                     id=next(counter),
>                     sub=entry_to_merge[0],
>                     division=entry_to_merge[1],
>                     create_engine=entry_to_merge[2]
>                 )
>             )
>     if inserts:
>         sess.execute(Entry.__table__.insert(), params=inserts)
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>> 
>> Thanks!
>> 
>> -- 
>> You received this message because you are subscribed to the Google Groups 
>> "sqlalchemy" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to sqlalchemy+...@googlegroups.com.
>> To post to this group, send email to sqlal...@googlegroups.com.
>> Visit this group at http://groups.google.com/group/sqlalchemy.
>> For more options, visit https://groups.google.com/d/optout.
> 
> 
> -- 
> You received this message because you are subscribed to the Google Groups 
> "sqlalchemy" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to sqlalchemy+unsubscr...@googlegroups.com.
> To post to this group, send email to sqlalchemy@googlegroups.com.
> Visit this group at http://groups.google.com/group/sqlalchemy.
> For more options, visit https://groups.google.com/d/optout.

-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

Reply via email to