Wow, I didn't know that... is it a bug?

But, the RETURNING clause will work though :)

stmt = P.__table__.insert(returning=[P.id], values=[{"val": 1}, {"val": 2}])
with engine.connect() as conn:
    result = conn.execute(stmt)
    # do something with result

hmm, maybe perfomance suffers from huge-size-SQL...... well, which I didn't 
test

Cosmia

On Monday, March 24, 2014 9:39:51 PM UTC+8, Michael Bayer wrote:
>
> RETURNING doesn’t work with DBAPI’s “executemany” style of execution, 
> however, which is what conn.execute(stmt, [list of parameter sets]) calls.
>
>
>
> On Mar 24, 2014, at 5:33 AM, Cosmia Luna <cos...@gmail.com <javascript:>> 
> wrote:
>
> INSERT statement of postgresql supports RETURNING, read this 
> http://docs.sqlalchemy.org/en/rel_0_8/core/dml.html#sqlalchemy.sql.expression.Insert.returning
>
> On Monday, March 24, 2014 2:43:46 PM UTC+8, James Meneghello wrote:
>>
>> Oops, I should add - the reason I can't use an itertools counter to 
>> pre-assign IDs is because the table is potentially being dumped to by 
>> multiple scripts, which is why I have to commit the parts prior to the 
>> segments (since engine.execute can't return multiple insert_ids).
>>
>> On Monday, 24 March 2014 14:40:52 UTC+8, James Meneghello wrote:
>>>
>>> Thanks for the quick reply!
>>>
>>> This seems to work pretty well. I took out the batching (as it's already 
>>> batched at a higher level) and modified it to suit the insertion of 
>>> children as well (and reducded the unique to a single field) , and it 
>>> appears to work.
>>>
>>> with db_session() as db:
>>>     existing_entries = dict(
>>>         ((entry.subject, entry) for entry in
>>>             
>>> db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
>>>         )
>>>     )
>>>
>>>     segment_inserts = []
>>>     for subject, entry in entries.items():
>>>         existing_entry = existing_entries.get(subject, None)
>>>         if existing_entry:
>>>             segments = dict(((s.segment, s) for s in 
>>> existing_entry.segments))
>>>             for segment_number, segment in entry['segments'].items():
>>>                 if int(segment_number) not in segments:
>>>                     segment['entry_id'] = existing_entry.id
>>>                     segment_inserts.append(segment)
>>>         else:
>>>             entry_id = engine.execute(Entry.__table__.insert(), 
>>> entry).inserted_primary_key
>>>             for segment in entry['segments'].values():
>>>                 segment['entry_id'] = entry_id[0]
>>>                 segment_inserts.append(segment)
>>>
>>>     engine.execute(Segment.__table__.insert(), segment_inserts)
>>>
>>>
>>> For 20,000 segments, this ends up being about 45 seconds and 1650 
>>> queries - 2 to select all the entries and segments, 1 to insert the 
>>> segments and the rest to insert parts. From here, however, I rewrote it a 
>>> bit:
>>>
>>>     with db_session() as db:
>>>         existing_entries = dict(
>>>             ((entry.subject, entry) for entry in
>>>                 
>>> db.query(Entry).filter(Entry.subject.in_(entries.keys())).all()
>>>             )
>>>         )
>>>
>>>         entry_inserts = []
>>>         for subject, entry in entries.items():
>>>             existing_entry = existing_entries.get(subject, None)
>>>             if not existing_entry:
>>>                 entry_inserts.append(entry)
>>>
>>>         engine.execute(Entry.__table__.insert(), entry_inserts)
>>>
>>>         existing_entries = dict(
>>>             ((entry.subject, entry) for entry in
>>>                 
>>> db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
>>>             )
>>>         )
>>>
>>>         segment_inserts = []
>>>         for subject, entry in entries.items():
>>>             existing_entry = existing_entries.get(subject, None)
>>>             if existing_entry:
>>>                 segments = dict(((s.segment, s) for s in 
>>> existing_entry.segments))
>>>                 for segment_number, segment in entry['segments'].items():
>>>                     if int(segment_number) not in segments:
>>>                         segment['entry_id'] = existing_entry.id
>>>                         segment_inserts.append(segment)
>>>             else:
>>>                 log.error('i\'ve made a huge mistake')
>>>
>>>         engine.execute(Segment.__table__.insert(), segment_inserts)
>>>
>>> This ends up being about 19 seconds, 6 queries for a clean dump, and a 
>>> bit less if the table is already populated. Removing the unique indexes on 
>>> both the entries and segments tables and replacing them with standard 
>>> indexes saves about a second in a full dump, and about 6 seconds for an 
>>> update. I'm pretty happy with where it is now, and I suspect most of the 
>>> time (aside from the two insert calls) is being spent in Python. That said, 
>>> if you have any tips for improvements I'd be all ears.
>>>
>>> Thanks for the help!
>>>
>>> On Monday, 24 March 2014 09:19:25 UTC+8, Michael Bayer wrote:
>>>>
>>>>
>>>> On Mar 23, 2014, at 11:33 AM, James Meneghello <muro...@gmail.com> 
>>>> wrote:
>>>>
>>>> I'm having a few issues with unique constraints and bulk inserts. The 
>>>> software I'm writing takes data from an external source (a lot of it, 
>>>> anywhere from 1,000 rows per minute to 100-200k+), crunches it down into 
>>>> its hierarchy and saves it to the DB, to be aggregated in the background. 
>>>> The function handling the initial DB save is designed to work with about 
>>>> 20-50k rows at a time - very little modification takes place, it's pretty 
>>>> much just grabbed and thrown into the table. Obviously the amount of data 
>>>> being saved somewhat excludes the use of the ORM in this particular table, 
>>>> but there are a number of other tables that benefit from using the ORM. 
>>>> Hence, the small stuff uses the ORM and the big stuff uses the Core.
>>>>
>>>> The main problem I'm having is with the initial save. The data comes in 
>>>> unordered and sometimes contains duplicates, so there's a UniqueConstraint 
>>>> on Entry on sub, division, created. Unfortunately, this hampers the bulk 
>>>> insert - if there's a duplicate, it rolls back the entire insert and hence 
>>>> the entries aren't available to be referenced by the segments later. 
>>>> Obviously, capturing it in a try/catch would skip the whole block as well. 
>>>> Both Entry and Segment have the same problem - there are often duplicate 
>>>> segments. Since there's a large amount of data being pushed through it, I 
>>>> assume it's impractical to insert the elements individually - while 
>>>> there's 
>>>> only 100-200 entries per block, there's usually 20-50k segments.
>>>>
>>>> Is there any way of forcing the engine to skip over duplicates and not 
>>>> rollback the transaction on exception? Code's below. Using Postgres, with 
>>>> psycopg2 as the driver.
>>>>
>>>>
>>>>         engine.execute(Entry.__table__.insert(), entries)
>>>>
>>>>         segment_list = []
>>>>         for sub, entry in entry.items():
>>>>             segments = entry.pop('segments')
>>>>
>>>>             e = db.query(Entry)\
>>>>                 .filter(Entry.sub==entry['sub'])\
>>>>                 .filter(Entry.division==entry['division'])\
>>>>                 .filter(Entry.created==entry['created']).first()
>>>>
>>>>             for segment in segments:
>>>>                 segment['entry_id'] = e.id
>>>>                 segment_list.append(segment)
>>>>
>>>>         engine.execute(Segment.__table__.insert(), segment_list)
>>>>
>>>>
>>>>
>>>>
>>>> In addition, is there some way to pre-fetch data? Rather than query for 
>>>> each Entry, it'd be nice to pre-load all entries and save a couple hundred 
>>>> queries.
>>>>
>>>>
>>>> you have all the entries in segment_list.  Seems like you’d just want 
>>>> to dedupe entries as they enter that list, that’s a pretty simple thing to 
>>>> do with a dictionary.   but additionally you’re also emitting a SELECT for 
>>>> every item individually so whatever time you’re saving on that INSERT is 
>>>> just being expended with that huge number of SELECT statements anyway.
>>>>
>>>> so yes you certainly want to pre-load everything, since the criteria 
>>>> here is three different things you can use a tuple-based IN clause.
>>>>
>>>> below is an example that uses this and also batches so that it won’t 
>>>> run out of memory under any circumstances, it starts with 50K rows in the 
>>>> database and then adds another 50K with 20K overlapping.  the whole thing 
>>>> runs in about 28 seconds on my mac.
>>>>
>>>> from sqlalchemy import *
>>>> from sqlalchemy.orm import *
>>>> from sqlalchemy.ext.declarative import declarative_base
>>>> import random
>>>> import itertools
>>>>
>>>> Base = declarative_base()
>>>>
>>>> class Entry(Base):
>>>>     __tablename__ = 'a'
>>>>
>>>>     id = Column(Integer, primary_key=True)
>>>>     sub = Column(Integer)
>>>>     division = Column(Integer)
>>>>     created = Column(Integer)
>>>>     __table_args__ = (UniqueConstraint('sub', 'division', 'created'), )
>>>>
>>>> e = create_engine("postgresql://scott:tiger@localhost/test", echo=True)
>>>> Base.metadata.drop_all(e)
>>>> Base.metadata.create_all(e)
>>>>
>>>>
>>>> a_bunch_of_fake_unique_entries = list(set(
>>>>     (random.randint(1, 100000), random.randint(1, 100000), 
>>>> random.randint(1, 100000))
>>>>     for i in range(100000)
>>>> ))
>>>>
>>>> entries_we_will_start_with = a_bunch_of_fake_unique_entries[0:50000]
>>>> entries_we_will_merge = a_bunch_of_fake_unique_entries[30000:100000]
>>>>
>>>> sess = Session(e)
>>>>
>>>> counter = itertools.count(1)
>>>> sess.add_all([Entry(id=next(counter), sub=sub, division=division, 
>>>> created=created)
>>>>                     for sub, division, created in 
>>>> entries_we_will_start_with])
>>>> sess.commit()
>>>>
>>>> # here's where your example begins... This will also batch it
>>>> # to ensure it can scale arbitrarily
>>>>
>>>> while entries_we_will_merge:
>>>>     batch = entries_we_will_merge[0:1000]
>>>>     entries_we_will_merge = entries_we_will_merge[1000:]
>>>>
>>>>     existing_entries = dict(
>>>>                         ((entry.sub, entry.division, entry.created), 
>>>> entry)
>>>>                         for entry in sess.query(Entry).filter(
>>>>                                 tuple_(Entry.sub, Entry.division, 
>>>> Entry.created).in_([
>>>>                                         tuple_(sub, division, created)
>>>>                                         for sub, division, created in
>>>>                                         batch
>>>>                                 ])
>>>>                         )
>>>>                         )
>>>>
>>>>     inserts = []
>>>>     for entry_to_merge in batch:
>>>>         existing_entry = existing_entries.get(entry_to_merge, None)
>>>>         if existing_entry:
>>>>             # do whatever to update existing
>>>>             pass
>>>>         else:
>>>>             inserts.append(
>>>>                 dict(
>>>>                     id=next(counter),
>>>>                     sub=entry_to_merge[0],
>>>>                     division=entry_to_merge[1],
>>>>                     create_engine=entry_to_merge[2]
>>>>                 )
>>>>             )
>>>>     if inserts:
>>>>         sess.execute(Entry.__table__.insert(), params=inserts)
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Thanks!
>>>>
>>>> -- 
>>>> You received this message because you are subscribed to the Google 
>>>> Groups "sqlalchemy" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>> an email to sqlalchemy+...@googlegroups.com.
>>>> To post to this group, send email to sqlal...@googlegroups.com.
>>>> Visit this group at http://groups.google.com/group/sqlalchemy.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>>
>>>>
> -- 
> You received this message because you are subscribed to the Google Groups 
> "sqlalchemy" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to sqlalchemy+...@googlegroups.com <javascript:>.
> To post to this group, send email to sqlal...@googlegroups.com<javascript:>
> .
> Visit this group at http://groups.google.com/group/sqlalchemy.
> For more options, visit https://groups.google.com/d/optout.
>
>
>

-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

Reply via email to