[sqlalchemy] Bulk Inserts and Unique Constraints

2014-03-23 Thread James Meneghello
I'm having a few issues with unique constraints and bulk inserts. The 
software I'm writing takes data from an external source (a lot of it, 
anywhere from 1,000 rows per minute to 100-200k+), crunches it down into 
its hierarchy and saves it to the DB, to be aggregated in the background. 
The function handling the initial DB save is designed to work with about 
20-50k rows at a time - very little modification takes place, it's pretty 
much just grabbed and thrown into the table. Obviously the amount of data 
being saved somewhat excludes the use of the ORM in this particular table, 
but there are a number of other tables that benefit from using the ORM. 
Hence, the small stuff uses the ORM and the big stuff uses the Core.

The main problem I'm having is with the initial save. The data comes in 
unordered and sometimes contains duplicates, so there's a UniqueConstraint 
on Entry on sub, division, created. Unfortunately, this hampers the bulk 
insert - if there's a duplicate, it rolls back the entire insert and hence 
the entries aren't available to be referenced by the segments later. 
Obviously, capturing it in a try/catch would skip the whole block as well. 
Both Entry and Segment have the same problem - there are often duplicate 
segments. Since there's a large amount of data being pushed through it, I 
assume it's impractical to insert the elements individually - while there's 
only 100-200 entries per block, there's usually 20-50k segments.

Is there any way of forcing the engine to skip over duplicates and not 
rollback the transaction on exception? Code's below. Using Postgres, with 
psycopg2 as the driver.


engine.execute(Entry.__table__.insert(), entries)

segment_list = []
for sub, entry in entry.items():
segments = entry.pop('segments')

e = db.query(Entry)\
.filter(Entry.sub==entry['sub'])\
.filter(Entry.division==entry['division'])\
.filter(Entry.created==entry['created']).first()

for segment in segments:
segment['entry_id'] = e.id
segment_list.append(segment)

engine.execute(Segment.__table__.insert(), segment_list)

In addition, is there some way to pre-fetch data? Rather than query for 
each Entry, it'd be nice to pre-load all entries and save a couple hundred 
queries.

Thanks!

-- 
You received this message because you are subscribed to the Google Groups 
sqlalchemy group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.


Re: [sqlalchemy] Bulk Inserts and Unique Constraints

2014-03-23 Thread Michael Bayer

On Mar 23, 2014, at 11:33 AM, James Meneghello murod...@gmail.com wrote:

 I'm having a few issues with unique constraints and bulk inserts. The 
 software I'm writing takes data from an external source (a lot of it, 
 anywhere from 1,000 rows per minute to 100-200k+), crunches it down into its 
 hierarchy and saves it to the DB, to be aggregated in the background. The 
 function handling the initial DB save is designed to work with about 20-50k 
 rows at a time - very little modification takes place, it's pretty much just 
 grabbed and thrown into the table. Obviously the amount of data being saved 
 somewhat excludes the use of the ORM in this particular table, but there are 
 a number of other tables that benefit from using the ORM. Hence, the small 
 stuff uses the ORM and the big stuff uses the Core.
 
 The main problem I'm having is with the initial save. The data comes in 
 unordered and sometimes contains duplicates, so there's a UniqueConstraint on 
 Entry on sub, division, created. Unfortunately, this hampers the bulk insert 
 - if there's a duplicate, it rolls back the entire insert and hence the 
 entries aren't available to be referenced by the segments later. Obviously, 
 capturing it in a try/catch would skip the whole block as well. Both Entry 
 and Segment have the same problem - there are often duplicate segments. Since 
 there's a large amount of data being pushed through it, I assume it's 
 impractical to insert the elements individually - while there's only 100-200 
 entries per block, there's usually 20-50k segments.
 
 Is there any way of forcing the engine to skip over duplicates and not 
 rollback the transaction on exception? Code's below. Using Postgres, with 
 psycopg2 as the driver.
 
 
 engine.execute(Entry.__table__.insert(), entries)
 
 segment_list = []
 for sub, entry in entry.items():
 segments = entry.pop('segments')
 
 e = db.query(Entry)\
 .filter(Entry.sub==entry['sub'])\
 .filter(Entry.division==entry['division'])\
 .filter(Entry.created==entry['created']).first()
 
 for segment in segments:
 segment['entry_id'] = e.id
 segment_list.append(segment)
 
 engine.execute(Segment.__table__.insert(), segment_list)


 
 In addition, is there some way to pre-fetch data? Rather than query for each 
 Entry, it'd be nice to pre-load all entries and save a couple hundred queries.

you have all the entries in segment_list.  Seems like you'd just want to dedupe 
entries as they enter that list, that's a pretty simple thing to do with a 
dictionary.   but additionally you're also emitting a SELECT for every item 
individually so whatever time you're saving on that INSERT is just being 
expended with that huge number of SELECT statements anyway.

so yes you certainly want to pre-load everything, since the criteria here is 
three different things you can use a tuple-based IN clause.

below is an example that uses this and also batches so that it won't run out of 
memory under any circumstances, it starts with 50K rows in the database and 
then adds another 50K with 20K overlapping.  the whole thing runs in about 28 
seconds on my mac.

from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
import random
import itertools

Base = declarative_base()

class Entry(Base):
__tablename__ = 'a'

id = Column(Integer, primary_key=True)
sub = Column(Integer)
division = Column(Integer)
created = Column(Integer)
__table_args__ = (UniqueConstraint('sub', 'division', 'created'), )

e = create_engine(postgresql://scott:tiger@localhost/test, echo=True)
Base.metadata.drop_all(e)
Base.metadata.create_all(e)


a_bunch_of_fake_unique_entries = list(set(
(random.randint(1, 10), random.randint(1, 10), random.randint(1, 
10))
for i in range(10)
))

entries_we_will_start_with = a_bunch_of_fake_unique_entries[0:5]
entries_we_will_merge = a_bunch_of_fake_unique_entries[3:10]

sess = Session(e)

counter = itertools.count(1)
sess.add_all([Entry(id=next(counter), sub=sub, division=division, 
created=created)
for sub, division, created in entries_we_will_start_with])
sess.commit()

# here's where your example begins... This will also batch it
# to ensure it can scale arbitrarily

while entries_we_will_merge:
batch = entries_we_will_merge[0:1000]
entries_we_will_merge = entries_we_will_merge[1000:]

existing_entries = dict(
((entry.sub, entry.division, entry.created), entry)
for entry in sess.query(Entry).filter(
tuple_(Entry.sub, Entry.division, 
Entry.created).in_([
tuple_(sub, division, created)
for sub, division, created in

[sqlalchemy] Re: Bulk Inserts and Unique Constraints

2014-03-23 Thread Cosmia Luna
Anyway, I don't think you should issue so many SELECT's, performance 
suffers from it most, I think. INSERT statement will return the inserted 
id's for you, maybe you want to read this 
http://docs.sqlalchemy.org/en/latest/core/tutorial.html#executing

Skip duplicates... well if you turn off the constraint check and that's 
what you want, it works but... I think the best strategy for database is to 
ROLLBACK it. It's YOUR duty to filter the duplicates before send them to 
the database. set in python works well in this situation. You may want 
learn more about stored procedure if you want the database do the 
filtering.

And, if you really cares much about performance and you know the weakness 
of ORM, why do you use ORM?

Cosmia

On Sunday, March 23, 2014 11:33:32 PM UTC+8, James Meneghello wrote:

 I'm having a few issues with unique constraints and bulk inserts. The 
 software I'm writing takes data from an external source (a lot of it, 
 anywhere from 1,000 rows per minute to 100-200k+), crunches it down into 
 its hierarchy and saves it to the DB, to be aggregated in the background. 
 The function handling the initial DB save is designed to work with about 
 20-50k rows at a time - very little modification takes place, it's pretty 
 much just grabbed and thrown into the table. Obviously the amount of data 
 being saved somewhat excludes the use of the ORM in this particular table, 
 but there are a number of other tables that benefit from using the ORM. 
 Hence, the small stuff uses the ORM and the big stuff uses the Core.

 The main problem I'm having is with the initial save. The data comes in 
 unordered and sometimes contains duplicates, so there's a UniqueConstraint 
 on Entry on sub, division, created. Unfortunately, this hampers the bulk 
 insert - if there's a duplicate, it rolls back the entire insert and hence 
 the entries aren't available to be referenced by the segments later. 
 Obviously, capturing it in a try/catch would skip the whole block as well. 
 Both Entry and Segment have the same problem - there are often duplicate 
 segments. Since there's a large amount of data being pushed through it, I 
 assume it's impractical to insert the elements individually - while there's 
 only 100-200 entries per block, there's usually 20-50k segments.

 Is there any way of forcing the engine to skip over duplicates and not 
 rollback the transaction on exception? Code's below. Using Postgres, with 
 psycopg2 as the driver.


 engine.execute(Entry.__table__.insert(), entries)

 segment_list = []
 for sub, entry in entry.items():
 segments = entry.pop('segments')

 e = db.query(Entry)\
 .filter(Entry.sub==entry['sub'])\
 .filter(Entry.division==entry['division'])\
 .filter(Entry.created==entry['created']).first()

 for segment in segments:
 segment['entry_id'] = e.id
 segment_list.append(segment)

 engine.execute(Segment.__table__.insert(), segment_list)

 In addition, is there some way to pre-fetch data? Rather than query for 
 each Entry, it'd be nice to pre-load all entries and save a couple hundred 
 queries.

 Thanks!


-- 
You received this message because you are subscribed to the Google Groups 
sqlalchemy group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.