Re: [sqlalchemy] Bulk Inserts and Unique Constraints

2014-03-27 Thread Cosmia Luna
Wow, I didn't know that... is it a bug?

But, the RETURNING clause will work though :)

stmt = P.__table__.insert(returning=[P.id], values=[{val: 1}, {val: 2}])
with engine.connect() as conn:
result = conn.execute(stmt)
# do something with result

hmm, maybe perfomance suffers from huge-size-SQL.. well, which I didn't 
test

Cosmia

On Monday, March 24, 2014 9:39:51 PM UTC+8, Michael Bayer wrote:

 RETURNING doesn’t work with DBAPI’s “executemany” style of execution, 
 however, which is what conn.execute(stmt, [list of parameter sets]) calls.



 On Mar 24, 2014, at 5:33 AM, Cosmia Luna cos...@gmail.com javascript: 
 wrote:

 INSERT statement of postgresql supports RETURNING, read this 
 http://docs.sqlalchemy.org/en/rel_0_8/core/dml.html#sqlalchemy.sql.expression.Insert.returning

 On Monday, March 24, 2014 2:43:46 PM UTC+8, James Meneghello wrote:

 Oops, I should add - the reason I can't use an itertools counter to 
 pre-assign IDs is because the table is potentially being dumped to by 
 multiple scripts, which is why I have to commit the parts prior to the 
 segments (since engine.execute can't return multiple insert_ids).

 On Monday, 24 March 2014 14:40:52 UTC+8, James Meneghello wrote:

 Thanks for the quick reply!

 This seems to work pretty well. I took out the batching (as it's already 
 batched at a higher level) and modified it to suit the insertion of 
 children as well (and reducded the unique to a single field) , and it 
 appears to work.

 with db_session() as db:
 existing_entries = dict(
 ((entry.subject, entry) for entry in
 
 db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
 )
 )

 segment_inserts = []
 for subject, entry in entries.items():
 existing_entry = existing_entries.get(subject, None)
 if existing_entry:
 segments = dict(((s.segment, s) for s in 
 existing_entry.segments))
 for segment_number, segment in entry['segments'].items():
 if int(segment_number) not in segments:
 segment['entry_id'] = existing_entry.id
 segment_inserts.append(segment)
 else:
 entry_id = engine.execute(Entry.__table__.insert(), 
 entry).inserted_primary_key
 for segment in entry['segments'].values():
 segment['entry_id'] = entry_id[0]
 segment_inserts.append(segment)

 engine.execute(Segment.__table__.insert(), segment_inserts)


 For 20,000 segments, this ends up being about 45 seconds and 1650 
 queries - 2 to select all the entries and segments, 1 to insert the 
 segments and the rest to insert parts. From here, however, I rewrote it a 
 bit:

 with db_session() as db:
 existing_entries = dict(
 ((entry.subject, entry) for entry in
 
 db.query(Entry).filter(Entry.subject.in_(entries.keys())).all()
 )
 )

 entry_inserts = []
 for subject, entry in entries.items():
 existing_entry = existing_entries.get(subject, None)
 if not existing_entry:
 entry_inserts.append(entry)

 engine.execute(Entry.__table__.insert(), entry_inserts)

 existing_entries = dict(
 ((entry.subject, entry) for entry in
 
 db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
 )
 )

 segment_inserts = []
 for subject, entry in entries.items():
 existing_entry = existing_entries.get(subject, None)
 if existing_entry:
 segments = dict(((s.segment, s) for s in 
 existing_entry.segments))
 for segment_number, segment in entry['segments'].items():
 if int(segment_number) not in segments:
 segment['entry_id'] = existing_entry.id
 segment_inserts.append(segment)
 else:
 log.error('i\'ve made a huge mistake')

 engine.execute(Segment.__table__.insert(), segment_inserts)

 This ends up being about 19 seconds, 6 queries for a clean dump, and a 
 bit less if the table is already populated. Removing the unique indexes on 
 both the entries and segments tables and replacing them with standard 
 indexes saves about a second in a full dump, and about 6 seconds for an 
 update. I'm pretty happy with where it is now, and I suspect most of the 
 time (aside from the two insert calls) is being spent in Python. That said, 
 if you have any tips for improvements I'd be all ears.

 Thanks for the help!

 On Monday, 24 March 2014 09:19:25 UTC+8, Michael Bayer wrote:


 On Mar 23, 2014, at 11:33 AM, James Meneghello muro...@gmail.com 
 wrote:

 I'm having a few issues with unique constraints and bulk inserts. The 
 software I'm writing takes data from an external source (a lot of it, 
 anywhere 

Re: [sqlalchemy] Bulk Inserts and Unique Constraints

2014-03-27 Thread Michael Bayer
no DBAPI I've tested does it, at most it would only be possible for PG, SQL 
Server dialects.

On Mar 27, 2014, at 2:30 AM, Cosmia Luna cosm...@gmail.com wrote:

 Wow, I didn't know that... is it a bug?
 
 But, the RETURNING clause will work though :)
 
 stmt = P.__table__.insert(returning=[P.id], values=[{val: 1}, {val: 2}])
 with engine.connect() as conn:
 result = conn.execute(stmt)
 # do something with result
 
 hmm, maybe perfomance suffers from huge-size-SQL.. well, which I didn't 
 test
 
 Cosmia
 
 On Monday, March 24, 2014 9:39:51 PM UTC+8, Michael Bayer wrote:
 RETURNING doesn't work with DBAPI's executemany style of execution, 
 however, which is what conn.execute(stmt, [list of parameter sets]) calls.
 
 
 
 On Mar 24, 2014, at 5:33 AM, Cosmia Luna cos...@gmail.com wrote:
 
 INSERT statement of postgresql supports RETURNING, read this 
 http://docs.sqlalchemy.org/en/rel_0_8/core/dml.html#sqlalchemy.sql.expression.Insert.returning
 
 On Monday, March 24, 2014 2:43:46 PM UTC+8, James Meneghello wrote:
 Oops, I should add - the reason I can't use an itertools counter to 
 pre-assign IDs is because the table is potentially being dumped to by 
 multiple scripts, which is why I have to commit the parts prior to the 
 segments (since engine.execute can't return multiple insert_ids).
 
 On Monday, 24 March 2014 14:40:52 UTC+8, James Meneghello wrote:
 Thanks for the quick reply!
 
 This seems to work pretty well. I took out the batching (as it's already 
 batched at a higher level) and modified it to suit the insertion of children 
 as well (and reducded the unique to a single field) , and it appears to work.
 
 with db_session() as db:
 existing_entries = dict(
 ((entry.subject, entry) for entry in
 
 db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
 )
 )
 
 segment_inserts = []
 for subject, entry in entries.items():
 existing_entry = existing_entries.get(subject, None)
 if existing_entry:
 segments = dict(((s.segment, s) for s in 
 existing_entry.segments))
 for segment_number, segment in entry['segments'].items():
 if int(segment_number) not in segments:
 segment['entry_id'] = existing_entry.id
 segment_inserts.append(segment)
 else:
 entry_id = engine.execute(Entry.__table__.insert(), 
 entry).inserted_primary_key
 for segment in entry['segments'].values():
 segment['entry_id'] = entry_id[0]
 segment_inserts.append(segment)
 
 engine.execute(Segment.__table__.insert(), segment_inserts)
 
 For 20,000 segments, this ends up being about 45 seconds and 1650 queries - 
 2 to select all the entries and segments, 1 to insert the segments and the 
 rest to insert parts. From here, however, I rewrote it a bit:
 
 with db_session() as db:
 existing_entries = dict(
 ((entry.subject, entry) for entry in
 
 db.query(Entry).filter(Entry.subject.in_(entries.keys())).all()
 )
 )
 
 entry_inserts = []
 for subject, entry in entries.items():
 existing_entry = existing_entries.get(subject, None)
 if not existing_entry:
 entry_inserts.append(entry)
 
 engine.execute(Entry.__table__.insert(), entry_inserts)
 
 existing_entries = dict(
 ((entry.subject, entry) for entry in
 
 db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
 )
 )
 
 segment_inserts = []
 for subject, entry in entries.items():
 existing_entry = existing_entries.get(subject, None)
 if existing_entry:
 segments = dict(((s.segment, s) for s in 
 existing_entry.segments))
 for segment_number, segment in entry['segments'].items():
 if int(segment_number) not in segments:
 segment['entry_id'] = existing_entry.id
 segment_inserts.append(segment)
 else:
 log.error('i\'ve made a huge mistake')
 
 engine.execute(Segment.__table__.insert(), segment_inserts)
 
 This ends up being about 19 seconds, 6 queries for a clean dump, and a bit 
 less if the table is already populated. Removing the unique indexes on both 
 the entries and segments tables and replacing them with standard indexes 
 saves about a second in a full dump, and about 6 seconds for an update. I'm 
 pretty happy with where it is now, and I suspect most of the time (aside 
 from the two insert calls) is being spent in Python. That said, if you have 
 any tips for improvements I'd be all ears.
 
 Thanks for the help!
 
 On Monday, 24 March 2014 09:19:25 UTC+8, Michael Bayer wrote:
 
 On Mar 23, 2014, at 11:33 AM, James Meneghello 

Re: [sqlalchemy] Bulk Inserts and Unique Constraints

2014-03-24 Thread James Meneghello
Thanks for the quick reply!

This seems to work pretty well. I took out the batching (as it's already 
batched at a higher level) and modified it to suit the insertion of 
children as well (and reducded the unique to a single field) , and it 
appears to work.

with db_session() as db:
existing_entries = dict(
((entry.subject, entry) for entry in

db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
)
)

segment_inserts = []
for subject, entry in entries.items():
existing_entry = existing_entries.get(subject, None)
if existing_entry:
segments = dict(((s.segment, s) for s in 
existing_entry.segments))
for segment_number, segment in entry['segments'].items():
if int(segment_number) not in segments:
segment['entry_id'] = existing_entry.id
segment_inserts.append(segment)
else:
entry_id = engine.execute(Entry.__table__.insert(), 
entry).inserted_primary_key
for segment in entry['segments'].values():
segment['entry_id'] = entry_id[0]
segment_inserts.append(segment)

engine.execute(Segment.__table__.insert(), segment_inserts)


For 20,000 segments, this ends up being about 45 seconds and 1650 queries - 
2 to select all the entries and segments, 1 to insert the segments and the 
rest to insert parts. From here, however, I rewrote it a bit:

with db_session() as db:
existing_entries = dict(
((entry.subject, entry) for entry in

db.query(Entry).filter(Entry.subject.in_(entries.keys())).all()
)
)

entry_inserts = []
for subject, entry in entries.items():
existing_entry = existing_entries.get(subject, None)
if not existing_entry:
entry_inserts.append(entry)

engine.execute(Entry.__table__.insert(), entry_inserts)

existing_entries = dict(
((entry.subject, entry) for entry in

db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
)
)

segment_inserts = []
for subject, entry in entries.items():
existing_entry = existing_entries.get(subject, None)
if existing_entry:
segments = dict(((s.segment, s) for s in 
existing_entry.segments))
for segment_number, segment in entry['segments'].items():
if int(segment_number) not in segments:
segment['entry_id'] = existing_entry.id
segment_inserts.append(segment)
else:
log.error('i\'ve made a huge mistake')

engine.execute(Segment.__table__.insert(), segment_inserts)

This ends up being about 19 seconds, 6 queries for a clean dump, and a bit 
less if the table is already populated. Removing the unique indexes on both 
the entries and segments tables and replacing them with standard indexes 
saves about a second in a full dump, and about 6 seconds for an update. I'm 
pretty happy with where it is now, and I suspect most of the time (aside 
from the two insert calls) is being spent in Python. That said, if you have 
any tips for improvements I'd be all ears.

Thanks for the help!

On Monday, 24 March 2014 09:19:25 UTC+8, Michael Bayer wrote:


 On Mar 23, 2014, at 11:33 AM, James Meneghello 
 muro...@gmail.comjavascript: 
 wrote:

 I'm having a few issues with unique constraints and bulk inserts. The 
 software I'm writing takes data from an external source (a lot of it, 
 anywhere from 1,000 rows per minute to 100-200k+), crunches it down into 
 its hierarchy and saves it to the DB, to be aggregated in the background. 
 The function handling the initial DB save is designed to work with about 
 20-50k rows at a time - very little modification takes place, it's pretty 
 much just grabbed and thrown into the table. Obviously the amount of data 
 being saved somewhat excludes the use of the ORM in this particular table, 
 but there are a number of other tables that benefit from using the ORM. 
 Hence, the small stuff uses the ORM and the big stuff uses the Core.

 The main problem I'm having is with the initial save. The data comes in 
 unordered and sometimes contains duplicates, so there's a UniqueConstraint 
 on Entry on sub, division, created. Unfortunately, this hampers the bulk 
 insert - if there's a duplicate, it rolls back the entire insert and hence 
 the entries aren't available to be referenced by the segments later. 
 Obviously, capturing it in a try/catch would skip the whole block as well. 
 Both Entry and Segment have the same problem - there are often duplicate 
 segments. Since there's a large amount of data being pushed through it, I 
 assume it's impractical to insert the elements individually - while there's 
 only 100-200 

Re: [sqlalchemy] Bulk Inserts and Unique Constraints

2014-03-24 Thread James Meneghello
Oops, I should add - the reason I can't use an itertools counter to 
pre-assign IDs is because the table is potentially being dumped to by 
multiple scripts, which is why I have to commit the parts prior to the 
segments (since engine.execute can't return multiple insert_ids).

On Monday, 24 March 2014 14:40:52 UTC+8, James Meneghello wrote:

 Thanks for the quick reply!

 This seems to work pretty well. I took out the batching (as it's already 
 batched at a higher level) and modified it to suit the insertion of 
 children as well (and reducded the unique to a single field) , and it 
 appears to work.

 with db_session() as db:
 existing_entries = dict(
 ((entry.subject, entry) for entry in
 
 db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
 )
 )

 segment_inserts = []
 for subject, entry in entries.items():
 existing_entry = existing_entries.get(subject, None)
 if existing_entry:
 segments = dict(((s.segment, s) for s in 
 existing_entry.segments))
 for segment_number, segment in entry['segments'].items():
 if int(segment_number) not in segments:
 segment['entry_id'] = existing_entry.id
 segment_inserts.append(segment)
 else:
 entry_id = engine.execute(Entry.__table__.insert(), 
 entry).inserted_primary_key
 for segment in entry['segments'].values():
 segment['entry_id'] = entry_id[0]
 segment_inserts.append(segment)

 engine.execute(Segment.__table__.insert(), segment_inserts)


 For 20,000 segments, this ends up being about 45 seconds and 1650 queries 
 - 2 to select all the entries and segments, 1 to insert the segments and 
 the rest to insert parts. From here, however, I rewrote it a bit:

 with db_session() as db:
 existing_entries = dict(
 ((entry.subject, entry) for entry in
 
 db.query(Entry).filter(Entry.subject.in_(entries.keys())).all()
 )
 )

 entry_inserts = []
 for subject, entry in entries.items():
 existing_entry = existing_entries.get(subject, None)
 if not existing_entry:
 entry_inserts.append(entry)

 engine.execute(Entry.__table__.insert(), entry_inserts)

 existing_entries = dict(
 ((entry.subject, entry) for entry in
 
 db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
 )
 )

 segment_inserts = []
 for subject, entry in entries.items():
 existing_entry = existing_entries.get(subject, None)
 if existing_entry:
 segments = dict(((s.segment, s) for s in 
 existing_entry.segments))
 for segment_number, segment in entry['segments'].items():
 if int(segment_number) not in segments:
 segment['entry_id'] = existing_entry.id
 segment_inserts.append(segment)
 else:
 log.error('i\'ve made a huge mistake')

 engine.execute(Segment.__table__.insert(), segment_inserts)

 This ends up being about 19 seconds, 6 queries for a clean dump, and a bit 
 less if the table is already populated. Removing the unique indexes on both 
 the entries and segments tables and replacing them with standard indexes 
 saves about a second in a full dump, and about 6 seconds for an update. I'm 
 pretty happy with where it is now, and I suspect most of the time (aside 
 from the two insert calls) is being spent in Python. That said, if you have 
 any tips for improvements I'd be all ears.

 Thanks for the help!

 On Monday, 24 March 2014 09:19:25 UTC+8, Michael Bayer wrote:


 On Mar 23, 2014, at 11:33 AM, James Meneghello muro...@gmail.com wrote:

 I'm having a few issues with unique constraints and bulk inserts. The 
 software I'm writing takes data from an external source (a lot of it, 
 anywhere from 1,000 rows per minute to 100-200k+), crunches it down into 
 its hierarchy and saves it to the DB, to be aggregated in the background. 
 The function handling the initial DB save is designed to work with about 
 20-50k rows at a time - very little modification takes place, it's pretty 
 much just grabbed and thrown into the table. Obviously the amount of data 
 being saved somewhat excludes the use of the ORM in this particular table, 
 but there are a number of other tables that benefit from using the ORM. 
 Hence, the small stuff uses the ORM and the big stuff uses the Core.

 The main problem I'm having is with the initial save. The data comes in 
 unordered and sometimes contains duplicates, so there's a UniqueConstraint 
 on Entry on sub, division, created. Unfortunately, this hampers the bulk 
 insert - if there's a duplicate, it rolls back the entire insert and 

Re: [sqlalchemy] Bulk Inserts and Unique Constraints

2014-03-24 Thread Cosmia Luna
INSERT statement of postgresql supports RETURNING, read this 
http://docs.sqlalchemy.org/en/rel_0_8/core/dml.html#sqlalchemy.sql.expression.Insert.returning

On Monday, March 24, 2014 2:43:46 PM UTC+8, James Meneghello wrote:

 Oops, I should add - the reason I can't use an itertools counter to 
 pre-assign IDs is because the table is potentially being dumped to by 
 multiple scripts, which is why I have to commit the parts prior to the 
 segments (since engine.execute can't return multiple insert_ids).

 On Monday, 24 March 2014 14:40:52 UTC+8, James Meneghello wrote:

 Thanks for the quick reply!

 This seems to work pretty well. I took out the batching (as it's already 
 batched at a higher level) and modified it to suit the insertion of 
 children as well (and reducded the unique to a single field) , and it 
 appears to work.

 with db_session() as db:
 existing_entries = dict(
 ((entry.subject, entry) for entry in
 
 db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
 )
 )

 segment_inserts = []
 for subject, entry in entries.items():
 existing_entry = existing_entries.get(subject, None)
 if existing_entry:
 segments = dict(((s.segment, s) for s in 
 existing_entry.segments))
 for segment_number, segment in entry['segments'].items():
 if int(segment_number) not in segments:
 segment['entry_id'] = existing_entry.id
 segment_inserts.append(segment)
 else:
 entry_id = engine.execute(Entry.__table__.insert(), 
 entry).inserted_primary_key
 for segment in entry['segments'].values():
 segment['entry_id'] = entry_id[0]
 segment_inserts.append(segment)

 engine.execute(Segment.__table__.insert(), segment_inserts)


 For 20,000 segments, this ends up being about 45 seconds and 1650 queries 
 - 2 to select all the entries and segments, 1 to insert the segments and 
 the rest to insert parts. From here, however, I rewrote it a bit:

 with db_session() as db:
 existing_entries = dict(
 ((entry.subject, entry) for entry in
 
 db.query(Entry).filter(Entry.subject.in_(entries.keys())).all()
 )
 )

 entry_inserts = []
 for subject, entry in entries.items():
 existing_entry = existing_entries.get(subject, None)
 if not existing_entry:
 entry_inserts.append(entry)

 engine.execute(Entry.__table__.insert(), entry_inserts)

 existing_entries = dict(
 ((entry.subject, entry) for entry in
 
 db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
 )
 )

 segment_inserts = []
 for subject, entry in entries.items():
 existing_entry = existing_entries.get(subject, None)
 if existing_entry:
 segments = dict(((s.segment, s) for s in 
 existing_entry.segments))
 for segment_number, segment in entry['segments'].items():
 if int(segment_number) not in segments:
 segment['entry_id'] = existing_entry.id
 segment_inserts.append(segment)
 else:
 log.error('i\'ve made a huge mistake')

 engine.execute(Segment.__table__.insert(), segment_inserts)

 This ends up being about 19 seconds, 6 queries for a clean dump, and a 
 bit less if the table is already populated. Removing the unique indexes on 
 both the entries and segments tables and replacing them with standard 
 indexes saves about a second in a full dump, and about 6 seconds for an 
 update. I'm pretty happy with where it is now, and I suspect most of the 
 time (aside from the two insert calls) is being spent in Python. That said, 
 if you have any tips for improvements I'd be all ears.

 Thanks for the help!

 On Monday, 24 March 2014 09:19:25 UTC+8, Michael Bayer wrote:


 On Mar 23, 2014, at 11:33 AM, James Meneghello muro...@gmail.com 
 wrote:

 I'm having a few issues with unique constraints and bulk inserts. The 
 software I'm writing takes data from an external source (a lot of it, 
 anywhere from 1,000 rows per minute to 100-200k+), crunches it down into 
 its hierarchy and saves it to the DB, to be aggregated in the background. 
 The function handling the initial DB save is designed to work with about 
 20-50k rows at a time - very little modification takes place, it's pretty 
 much just grabbed and thrown into the table. Obviously the amount of data 
 being saved somewhat excludes the use of the ORM in this particular table, 
 but there are a number of other tables that benefit from using the ORM. 
 Hence, the small stuff uses the ORM and the big stuff uses the Core.

 The main problem I'm having is with the initial save. The 

Re: [sqlalchemy] Bulk Inserts and Unique Constraints

2014-03-24 Thread Michael Bayer
RETURNING doesn't work with DBAPI's executemany style of execution, however, 
which is what conn.execute(stmt, [list of parameter sets]) calls.



On Mar 24, 2014, at 5:33 AM, Cosmia Luna cosm...@gmail.com wrote:

 INSERT statement of postgresql supports RETURNING, read this 
 http://docs.sqlalchemy.org/en/rel_0_8/core/dml.html#sqlalchemy.sql.expression.Insert.returning
 
 On Monday, March 24, 2014 2:43:46 PM UTC+8, James Meneghello wrote:
 Oops, I should add - the reason I can't use an itertools counter to 
 pre-assign IDs is because the table is potentially being dumped to by 
 multiple scripts, which is why I have to commit the parts prior to the 
 segments (since engine.execute can't return multiple insert_ids).
 
 On Monday, 24 March 2014 14:40:52 UTC+8, James Meneghello wrote:
 Thanks for the quick reply!
 
 This seems to work pretty well. I took out the batching (as it's already 
 batched at a higher level) and modified it to suit the insertion of children 
 as well (and reducded the unique to a single field) , and it appears to work.
 
 with db_session() as db:
 existing_entries = dict(
 ((entry.subject, entry) for entry in
 
 db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
 )
 )
 
 segment_inserts = []
 for subject, entry in entries.items():
 existing_entry = existing_entries.get(subject, None)
 if existing_entry:
 segments = dict(((s.segment, s) for s in existing_entry.segments))
 for segment_number, segment in entry['segments'].items():
 if int(segment_number) not in segments:
 segment['entry_id'] = existing_entry.id
 segment_inserts.append(segment)
 else:
 entry_id = engine.execute(Entry.__table__.insert(), 
 entry).inserted_primary_key
 for segment in entry['segments'].values():
 segment['entry_id'] = entry_id[0]
 segment_inserts.append(segment)
 
 engine.execute(Segment.__table__.insert(), segment_inserts)
 
 For 20,000 segments, this ends up being about 45 seconds and 1650 queries - 2 
 to select all the entries and segments, 1 to insert the segments and the rest 
 to insert parts. From here, however, I rewrote it a bit:
 
 with db_session() as db:
 existing_entries = dict(
 ((entry.subject, entry) for entry in
 
 db.query(Entry).filter(Entry.subject.in_(entries.keys())).all()
 )
 )
 
 entry_inserts = []
 for subject, entry in entries.items():
 existing_entry = existing_entries.get(subject, None)
 if not existing_entry:
 entry_inserts.append(entry)
 
 engine.execute(Entry.__table__.insert(), entry_inserts)
 
 existing_entries = dict(
 ((entry.subject, entry) for entry in
 
 db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
 )
 )
 
 segment_inserts = []
 for subject, entry in entries.items():
 existing_entry = existing_entries.get(subject, None)
 if existing_entry:
 segments = dict(((s.segment, s) for s in 
 existing_entry.segments))
 for segment_number, segment in entry['segments'].items():
 if int(segment_number) not in segments:
 segment['entry_id'] = existing_entry.id
 segment_inserts.append(segment)
 else:
 log.error('i\'ve made a huge mistake')
 
 engine.execute(Segment.__table__.insert(), segment_inserts)
 
 This ends up being about 19 seconds, 6 queries for a clean dump, and a bit 
 less if the table is already populated. Removing the unique indexes on both 
 the entries and segments tables and replacing them with standard indexes 
 saves about a second in a full dump, and about 6 seconds for an update. I'm 
 pretty happy with where it is now, and I suspect most of the time (aside from 
 the two insert calls) is being spent in Python. That said, if you have any 
 tips for improvements I'd be all ears.
 
 Thanks for the help!
 
 On Monday, 24 March 2014 09:19:25 UTC+8, Michael Bayer wrote:
 
 On Mar 23, 2014, at 11:33 AM, James Meneghello muro...@gmail.com wrote:
 
 I'm having a few issues with unique constraints and bulk inserts. The 
 software I'm writing takes data from an external source (a lot of it, 
 anywhere from 1,000 rows per minute to 100-200k+), crunches it down into its 
 hierarchy and saves it to the DB, to be aggregated in the background. The 
 function handling the initial DB save is designed to work with about 20-50k 
 rows at a time - very little modification takes place, it's pretty much just 
 grabbed and thrown into the table. Obviously the amount of data being saved 
 somewhat excludes the use of the ORM in this 

[sqlalchemy] Bulk Inserts and Unique Constraints

2014-03-23 Thread James Meneghello
I'm having a few issues with unique constraints and bulk inserts. The 
software I'm writing takes data from an external source (a lot of it, 
anywhere from 1,000 rows per minute to 100-200k+), crunches it down into 
its hierarchy and saves it to the DB, to be aggregated in the background. 
The function handling the initial DB save is designed to work with about 
20-50k rows at a time - very little modification takes place, it's pretty 
much just grabbed and thrown into the table. Obviously the amount of data 
being saved somewhat excludes the use of the ORM in this particular table, 
but there are a number of other tables that benefit from using the ORM. 
Hence, the small stuff uses the ORM and the big stuff uses the Core.

The main problem I'm having is with the initial save. The data comes in 
unordered and sometimes contains duplicates, so there's a UniqueConstraint 
on Entry on sub, division, created. Unfortunately, this hampers the bulk 
insert - if there's a duplicate, it rolls back the entire insert and hence 
the entries aren't available to be referenced by the segments later. 
Obviously, capturing it in a try/catch would skip the whole block as well. 
Both Entry and Segment have the same problem - there are often duplicate 
segments. Since there's a large amount of data being pushed through it, I 
assume it's impractical to insert the elements individually - while there's 
only 100-200 entries per block, there's usually 20-50k segments.

Is there any way of forcing the engine to skip over duplicates and not 
rollback the transaction on exception? Code's below. Using Postgres, with 
psycopg2 as the driver.


engine.execute(Entry.__table__.insert(), entries)

segment_list = []
for sub, entry in entry.items():
segments = entry.pop('segments')

e = db.query(Entry)\
.filter(Entry.sub==entry['sub'])\
.filter(Entry.division==entry['division'])\
.filter(Entry.created==entry['created']).first()

for segment in segments:
segment['entry_id'] = e.id
segment_list.append(segment)

engine.execute(Segment.__table__.insert(), segment_list)

In addition, is there some way to pre-fetch data? Rather than query for 
each Entry, it'd be nice to pre-load all entries and save a couple hundred 
queries.

Thanks!

-- 
You received this message because you are subscribed to the Google Groups 
sqlalchemy group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.


Re: [sqlalchemy] Bulk Inserts and Unique Constraints

2014-03-23 Thread Michael Bayer

On Mar 23, 2014, at 11:33 AM, James Meneghello murod...@gmail.com wrote:

 I'm having a few issues with unique constraints and bulk inserts. The 
 software I'm writing takes data from an external source (a lot of it, 
 anywhere from 1,000 rows per minute to 100-200k+), crunches it down into its 
 hierarchy and saves it to the DB, to be aggregated in the background. The 
 function handling the initial DB save is designed to work with about 20-50k 
 rows at a time - very little modification takes place, it's pretty much just 
 grabbed and thrown into the table. Obviously the amount of data being saved 
 somewhat excludes the use of the ORM in this particular table, but there are 
 a number of other tables that benefit from using the ORM. Hence, the small 
 stuff uses the ORM and the big stuff uses the Core.
 
 The main problem I'm having is with the initial save. The data comes in 
 unordered and sometimes contains duplicates, so there's a UniqueConstraint on 
 Entry on sub, division, created. Unfortunately, this hampers the bulk insert 
 - if there's a duplicate, it rolls back the entire insert and hence the 
 entries aren't available to be referenced by the segments later. Obviously, 
 capturing it in a try/catch would skip the whole block as well. Both Entry 
 and Segment have the same problem - there are often duplicate segments. Since 
 there's a large amount of data being pushed through it, I assume it's 
 impractical to insert the elements individually - while there's only 100-200 
 entries per block, there's usually 20-50k segments.
 
 Is there any way of forcing the engine to skip over duplicates and not 
 rollback the transaction on exception? Code's below. Using Postgres, with 
 psycopg2 as the driver.
 
 
 engine.execute(Entry.__table__.insert(), entries)
 
 segment_list = []
 for sub, entry in entry.items():
 segments = entry.pop('segments')
 
 e = db.query(Entry)\
 .filter(Entry.sub==entry['sub'])\
 .filter(Entry.division==entry['division'])\
 .filter(Entry.created==entry['created']).first()
 
 for segment in segments:
 segment['entry_id'] = e.id
 segment_list.append(segment)
 
 engine.execute(Segment.__table__.insert(), segment_list)


 
 In addition, is there some way to pre-fetch data? Rather than query for each 
 Entry, it'd be nice to pre-load all entries and save a couple hundred queries.

you have all the entries in segment_list.  Seems like you'd just want to dedupe 
entries as they enter that list, that's a pretty simple thing to do with a 
dictionary.   but additionally you're also emitting a SELECT for every item 
individually so whatever time you're saving on that INSERT is just being 
expended with that huge number of SELECT statements anyway.

so yes you certainly want to pre-load everything, since the criteria here is 
three different things you can use a tuple-based IN clause.

below is an example that uses this and also batches so that it won't run out of 
memory under any circumstances, it starts with 50K rows in the database and 
then adds another 50K with 20K overlapping.  the whole thing runs in about 28 
seconds on my mac.

from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
import random
import itertools

Base = declarative_base()

class Entry(Base):
__tablename__ = 'a'

id = Column(Integer, primary_key=True)
sub = Column(Integer)
division = Column(Integer)
created = Column(Integer)
__table_args__ = (UniqueConstraint('sub', 'division', 'created'), )

e = create_engine(postgresql://scott:tiger@localhost/test, echo=True)
Base.metadata.drop_all(e)
Base.metadata.create_all(e)


a_bunch_of_fake_unique_entries = list(set(
(random.randint(1, 10), random.randint(1, 10), random.randint(1, 
10))
for i in range(10)
))

entries_we_will_start_with = a_bunch_of_fake_unique_entries[0:5]
entries_we_will_merge = a_bunch_of_fake_unique_entries[3:10]

sess = Session(e)

counter = itertools.count(1)
sess.add_all([Entry(id=next(counter), sub=sub, division=division, 
created=created)
for sub, division, created in entries_we_will_start_with])
sess.commit()

# here's where your example begins... This will also batch it
# to ensure it can scale arbitrarily

while entries_we_will_merge:
batch = entries_we_will_merge[0:1000]
entries_we_will_merge = entries_we_will_merge[1000:]

existing_entries = dict(
((entry.sub, entry.division, entry.created), entry)
for entry in sess.query(Entry).filter(
tuple_(Entry.sub, Entry.division, 
Entry.created).in_([
tuple_(sub, division, created)
for sub, division, created in