Re: [sqlalchemy] Bulk Inserts and Unique Constraints
Wow, I didn't know that... is it a bug? But, the RETURNING clause will work though :) stmt = P.__table__.insert(returning=[P.id], values=[{val: 1}, {val: 2}]) with engine.connect() as conn: result = conn.execute(stmt) # do something with result hmm, maybe perfomance suffers from huge-size-SQL.. well, which I didn't test Cosmia On Monday, March 24, 2014 9:39:51 PM UTC+8, Michael Bayer wrote: RETURNING doesn’t work with DBAPI’s “executemany” style of execution, however, which is what conn.execute(stmt, [list of parameter sets]) calls. On Mar 24, 2014, at 5:33 AM, Cosmia Luna cos...@gmail.com javascript: wrote: INSERT statement of postgresql supports RETURNING, read this http://docs.sqlalchemy.org/en/rel_0_8/core/dml.html#sqlalchemy.sql.expression.Insert.returning On Monday, March 24, 2014 2:43:46 PM UTC+8, James Meneghello wrote: Oops, I should add - the reason I can't use an itertools counter to pre-assign IDs is because the table is potentially being dumped to by multiple scripts, which is why I have to commit the parts prior to the segments (since engine.execute can't return multiple insert_ids). On Monday, 24 March 2014 14:40:52 UTC+8, James Meneghello wrote: Thanks for the quick reply! This seems to work pretty well. I took out the batching (as it's already batched at a higher level) and modified it to suit the insertion of children as well (and reducded the unique to a single field) , and it appears to work. with db_session() as db: existing_entries = dict( ((entry.subject, entry) for entry in db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all() ) ) segment_inserts = [] for subject, entry in entries.items(): existing_entry = existing_entries.get(subject, None) if existing_entry: segments = dict(((s.segment, s) for s in existing_entry.segments)) for segment_number, segment in entry['segments'].items(): if int(segment_number) not in segments: segment['entry_id'] = existing_entry.id segment_inserts.append(segment) else: entry_id = engine.execute(Entry.__table__.insert(), entry).inserted_primary_key for segment in entry['segments'].values(): segment['entry_id'] = entry_id[0] segment_inserts.append(segment) engine.execute(Segment.__table__.insert(), segment_inserts) For 20,000 segments, this ends up being about 45 seconds and 1650 queries - 2 to select all the entries and segments, 1 to insert the segments and the rest to insert parts. From here, however, I rewrote it a bit: with db_session() as db: existing_entries = dict( ((entry.subject, entry) for entry in db.query(Entry).filter(Entry.subject.in_(entries.keys())).all() ) ) entry_inserts = [] for subject, entry in entries.items(): existing_entry = existing_entries.get(subject, None) if not existing_entry: entry_inserts.append(entry) engine.execute(Entry.__table__.insert(), entry_inserts) existing_entries = dict( ((entry.subject, entry) for entry in db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all() ) ) segment_inserts = [] for subject, entry in entries.items(): existing_entry = existing_entries.get(subject, None) if existing_entry: segments = dict(((s.segment, s) for s in existing_entry.segments)) for segment_number, segment in entry['segments'].items(): if int(segment_number) not in segments: segment['entry_id'] = existing_entry.id segment_inserts.append(segment) else: log.error('i\'ve made a huge mistake') engine.execute(Segment.__table__.insert(), segment_inserts) This ends up being about 19 seconds, 6 queries for a clean dump, and a bit less if the table is already populated. Removing the unique indexes on both the entries and segments tables and replacing them with standard indexes saves about a second in a full dump, and about 6 seconds for an update. I'm pretty happy with where it is now, and I suspect most of the time (aside from the two insert calls) is being spent in Python. That said, if you have any tips for improvements I'd be all ears. Thanks for the help! On Monday, 24 March 2014 09:19:25 UTC+8, Michael Bayer wrote: On Mar 23, 2014, at 11:33 AM, James Meneghello muro...@gmail.com wrote: I'm having a few issues with unique constraints and bulk inserts. The software I'm writing takes data from an external source (a lot of it, anywhere
Re: [sqlalchemy] Bulk Inserts and Unique Constraints
no DBAPI I've tested does it, at most it would only be possible for PG, SQL Server dialects. On Mar 27, 2014, at 2:30 AM, Cosmia Luna cosm...@gmail.com wrote: Wow, I didn't know that... is it a bug? But, the RETURNING clause will work though :) stmt = P.__table__.insert(returning=[P.id], values=[{val: 1}, {val: 2}]) with engine.connect() as conn: result = conn.execute(stmt) # do something with result hmm, maybe perfomance suffers from huge-size-SQL.. well, which I didn't test Cosmia On Monday, March 24, 2014 9:39:51 PM UTC+8, Michael Bayer wrote: RETURNING doesn't work with DBAPI's executemany style of execution, however, which is what conn.execute(stmt, [list of parameter sets]) calls. On Mar 24, 2014, at 5:33 AM, Cosmia Luna cos...@gmail.com wrote: INSERT statement of postgresql supports RETURNING, read this http://docs.sqlalchemy.org/en/rel_0_8/core/dml.html#sqlalchemy.sql.expression.Insert.returning On Monday, March 24, 2014 2:43:46 PM UTC+8, James Meneghello wrote: Oops, I should add - the reason I can't use an itertools counter to pre-assign IDs is because the table is potentially being dumped to by multiple scripts, which is why I have to commit the parts prior to the segments (since engine.execute can't return multiple insert_ids). On Monday, 24 March 2014 14:40:52 UTC+8, James Meneghello wrote: Thanks for the quick reply! This seems to work pretty well. I took out the batching (as it's already batched at a higher level) and modified it to suit the insertion of children as well (and reducded the unique to a single field) , and it appears to work. with db_session() as db: existing_entries = dict( ((entry.subject, entry) for entry in db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all() ) ) segment_inserts = [] for subject, entry in entries.items(): existing_entry = existing_entries.get(subject, None) if existing_entry: segments = dict(((s.segment, s) for s in existing_entry.segments)) for segment_number, segment in entry['segments'].items(): if int(segment_number) not in segments: segment['entry_id'] = existing_entry.id segment_inserts.append(segment) else: entry_id = engine.execute(Entry.__table__.insert(), entry).inserted_primary_key for segment in entry['segments'].values(): segment['entry_id'] = entry_id[0] segment_inserts.append(segment) engine.execute(Segment.__table__.insert(), segment_inserts) For 20,000 segments, this ends up being about 45 seconds and 1650 queries - 2 to select all the entries and segments, 1 to insert the segments and the rest to insert parts. From here, however, I rewrote it a bit: with db_session() as db: existing_entries = dict( ((entry.subject, entry) for entry in db.query(Entry).filter(Entry.subject.in_(entries.keys())).all() ) ) entry_inserts = [] for subject, entry in entries.items(): existing_entry = existing_entries.get(subject, None) if not existing_entry: entry_inserts.append(entry) engine.execute(Entry.__table__.insert(), entry_inserts) existing_entries = dict( ((entry.subject, entry) for entry in db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all() ) ) segment_inserts = [] for subject, entry in entries.items(): existing_entry = existing_entries.get(subject, None) if existing_entry: segments = dict(((s.segment, s) for s in existing_entry.segments)) for segment_number, segment in entry['segments'].items(): if int(segment_number) not in segments: segment['entry_id'] = existing_entry.id segment_inserts.append(segment) else: log.error('i\'ve made a huge mistake') engine.execute(Segment.__table__.insert(), segment_inserts) This ends up being about 19 seconds, 6 queries for a clean dump, and a bit less if the table is already populated. Removing the unique indexes on both the entries and segments tables and replacing them with standard indexes saves about a second in a full dump, and about 6 seconds for an update. I'm pretty happy with where it is now, and I suspect most of the time (aside from the two insert calls) is being spent in Python. That said, if you have any tips for improvements I'd be all ears. Thanks for the help! On Monday, 24 March 2014 09:19:25 UTC+8, Michael Bayer wrote: On Mar 23, 2014, at 11:33 AM, James Meneghello
Re: [sqlalchemy] Bulk Inserts and Unique Constraints
Thanks for the quick reply! This seems to work pretty well. I took out the batching (as it's already batched at a higher level) and modified it to suit the insertion of children as well (and reducded the unique to a single field) , and it appears to work. with db_session() as db: existing_entries = dict( ((entry.subject, entry) for entry in db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all() ) ) segment_inserts = [] for subject, entry in entries.items(): existing_entry = existing_entries.get(subject, None) if existing_entry: segments = dict(((s.segment, s) for s in existing_entry.segments)) for segment_number, segment in entry['segments'].items(): if int(segment_number) not in segments: segment['entry_id'] = existing_entry.id segment_inserts.append(segment) else: entry_id = engine.execute(Entry.__table__.insert(), entry).inserted_primary_key for segment in entry['segments'].values(): segment['entry_id'] = entry_id[0] segment_inserts.append(segment) engine.execute(Segment.__table__.insert(), segment_inserts) For 20,000 segments, this ends up being about 45 seconds and 1650 queries - 2 to select all the entries and segments, 1 to insert the segments and the rest to insert parts. From here, however, I rewrote it a bit: with db_session() as db: existing_entries = dict( ((entry.subject, entry) for entry in db.query(Entry).filter(Entry.subject.in_(entries.keys())).all() ) ) entry_inserts = [] for subject, entry in entries.items(): existing_entry = existing_entries.get(subject, None) if not existing_entry: entry_inserts.append(entry) engine.execute(Entry.__table__.insert(), entry_inserts) existing_entries = dict( ((entry.subject, entry) for entry in db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all() ) ) segment_inserts = [] for subject, entry in entries.items(): existing_entry = existing_entries.get(subject, None) if existing_entry: segments = dict(((s.segment, s) for s in existing_entry.segments)) for segment_number, segment in entry['segments'].items(): if int(segment_number) not in segments: segment['entry_id'] = existing_entry.id segment_inserts.append(segment) else: log.error('i\'ve made a huge mistake') engine.execute(Segment.__table__.insert(), segment_inserts) This ends up being about 19 seconds, 6 queries for a clean dump, and a bit less if the table is already populated. Removing the unique indexes on both the entries and segments tables and replacing them with standard indexes saves about a second in a full dump, and about 6 seconds for an update. I'm pretty happy with where it is now, and I suspect most of the time (aside from the two insert calls) is being spent in Python. That said, if you have any tips for improvements I'd be all ears. Thanks for the help! On Monday, 24 March 2014 09:19:25 UTC+8, Michael Bayer wrote: On Mar 23, 2014, at 11:33 AM, James Meneghello muro...@gmail.comjavascript: wrote: I'm having a few issues with unique constraints and bulk inserts. The software I'm writing takes data from an external source (a lot of it, anywhere from 1,000 rows per minute to 100-200k+), crunches it down into its hierarchy and saves it to the DB, to be aggregated in the background. The function handling the initial DB save is designed to work with about 20-50k rows at a time - very little modification takes place, it's pretty much just grabbed and thrown into the table. Obviously the amount of data being saved somewhat excludes the use of the ORM in this particular table, but there are a number of other tables that benefit from using the ORM. Hence, the small stuff uses the ORM and the big stuff uses the Core. The main problem I'm having is with the initial save. The data comes in unordered and sometimes contains duplicates, so there's a UniqueConstraint on Entry on sub, division, created. Unfortunately, this hampers the bulk insert - if there's a duplicate, it rolls back the entire insert and hence the entries aren't available to be referenced by the segments later. Obviously, capturing it in a try/catch would skip the whole block as well. Both Entry and Segment have the same problem - there are often duplicate segments. Since there's a large amount of data being pushed through it, I assume it's impractical to insert the elements individually - while there's only 100-200
Re: [sqlalchemy] Bulk Inserts and Unique Constraints
Oops, I should add - the reason I can't use an itertools counter to pre-assign IDs is because the table is potentially being dumped to by multiple scripts, which is why I have to commit the parts prior to the segments (since engine.execute can't return multiple insert_ids). On Monday, 24 March 2014 14:40:52 UTC+8, James Meneghello wrote: Thanks for the quick reply! This seems to work pretty well. I took out the batching (as it's already batched at a higher level) and modified it to suit the insertion of children as well (and reducded the unique to a single field) , and it appears to work. with db_session() as db: existing_entries = dict( ((entry.subject, entry) for entry in db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all() ) ) segment_inserts = [] for subject, entry in entries.items(): existing_entry = existing_entries.get(subject, None) if existing_entry: segments = dict(((s.segment, s) for s in existing_entry.segments)) for segment_number, segment in entry['segments'].items(): if int(segment_number) not in segments: segment['entry_id'] = existing_entry.id segment_inserts.append(segment) else: entry_id = engine.execute(Entry.__table__.insert(), entry).inserted_primary_key for segment in entry['segments'].values(): segment['entry_id'] = entry_id[0] segment_inserts.append(segment) engine.execute(Segment.__table__.insert(), segment_inserts) For 20,000 segments, this ends up being about 45 seconds and 1650 queries - 2 to select all the entries and segments, 1 to insert the segments and the rest to insert parts. From here, however, I rewrote it a bit: with db_session() as db: existing_entries = dict( ((entry.subject, entry) for entry in db.query(Entry).filter(Entry.subject.in_(entries.keys())).all() ) ) entry_inserts = [] for subject, entry in entries.items(): existing_entry = existing_entries.get(subject, None) if not existing_entry: entry_inserts.append(entry) engine.execute(Entry.__table__.insert(), entry_inserts) existing_entries = dict( ((entry.subject, entry) for entry in db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all() ) ) segment_inserts = [] for subject, entry in entries.items(): existing_entry = existing_entries.get(subject, None) if existing_entry: segments = dict(((s.segment, s) for s in existing_entry.segments)) for segment_number, segment in entry['segments'].items(): if int(segment_number) not in segments: segment['entry_id'] = existing_entry.id segment_inserts.append(segment) else: log.error('i\'ve made a huge mistake') engine.execute(Segment.__table__.insert(), segment_inserts) This ends up being about 19 seconds, 6 queries for a clean dump, and a bit less if the table is already populated. Removing the unique indexes on both the entries and segments tables and replacing them with standard indexes saves about a second in a full dump, and about 6 seconds for an update. I'm pretty happy with where it is now, and I suspect most of the time (aside from the two insert calls) is being spent in Python. That said, if you have any tips for improvements I'd be all ears. Thanks for the help! On Monday, 24 March 2014 09:19:25 UTC+8, Michael Bayer wrote: On Mar 23, 2014, at 11:33 AM, James Meneghello muro...@gmail.com wrote: I'm having a few issues with unique constraints and bulk inserts. The software I'm writing takes data from an external source (a lot of it, anywhere from 1,000 rows per minute to 100-200k+), crunches it down into its hierarchy and saves it to the DB, to be aggregated in the background. The function handling the initial DB save is designed to work with about 20-50k rows at a time - very little modification takes place, it's pretty much just grabbed and thrown into the table. Obviously the amount of data being saved somewhat excludes the use of the ORM in this particular table, but there are a number of other tables that benefit from using the ORM. Hence, the small stuff uses the ORM and the big stuff uses the Core. The main problem I'm having is with the initial save. The data comes in unordered and sometimes contains duplicates, so there's a UniqueConstraint on Entry on sub, division, created. Unfortunately, this hampers the bulk insert - if there's a duplicate, it rolls back the entire insert and
Re: [sqlalchemy] Bulk Inserts and Unique Constraints
INSERT statement of postgresql supports RETURNING, read this http://docs.sqlalchemy.org/en/rel_0_8/core/dml.html#sqlalchemy.sql.expression.Insert.returning On Monday, March 24, 2014 2:43:46 PM UTC+8, James Meneghello wrote: Oops, I should add - the reason I can't use an itertools counter to pre-assign IDs is because the table is potentially being dumped to by multiple scripts, which is why I have to commit the parts prior to the segments (since engine.execute can't return multiple insert_ids). On Monday, 24 March 2014 14:40:52 UTC+8, James Meneghello wrote: Thanks for the quick reply! This seems to work pretty well. I took out the batching (as it's already batched at a higher level) and modified it to suit the insertion of children as well (and reducded the unique to a single field) , and it appears to work. with db_session() as db: existing_entries = dict( ((entry.subject, entry) for entry in db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all() ) ) segment_inserts = [] for subject, entry in entries.items(): existing_entry = existing_entries.get(subject, None) if existing_entry: segments = dict(((s.segment, s) for s in existing_entry.segments)) for segment_number, segment in entry['segments'].items(): if int(segment_number) not in segments: segment['entry_id'] = existing_entry.id segment_inserts.append(segment) else: entry_id = engine.execute(Entry.__table__.insert(), entry).inserted_primary_key for segment in entry['segments'].values(): segment['entry_id'] = entry_id[0] segment_inserts.append(segment) engine.execute(Segment.__table__.insert(), segment_inserts) For 20,000 segments, this ends up being about 45 seconds and 1650 queries - 2 to select all the entries and segments, 1 to insert the segments and the rest to insert parts. From here, however, I rewrote it a bit: with db_session() as db: existing_entries = dict( ((entry.subject, entry) for entry in db.query(Entry).filter(Entry.subject.in_(entries.keys())).all() ) ) entry_inserts = [] for subject, entry in entries.items(): existing_entry = existing_entries.get(subject, None) if not existing_entry: entry_inserts.append(entry) engine.execute(Entry.__table__.insert(), entry_inserts) existing_entries = dict( ((entry.subject, entry) for entry in db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all() ) ) segment_inserts = [] for subject, entry in entries.items(): existing_entry = existing_entries.get(subject, None) if existing_entry: segments = dict(((s.segment, s) for s in existing_entry.segments)) for segment_number, segment in entry['segments'].items(): if int(segment_number) not in segments: segment['entry_id'] = existing_entry.id segment_inserts.append(segment) else: log.error('i\'ve made a huge mistake') engine.execute(Segment.__table__.insert(), segment_inserts) This ends up being about 19 seconds, 6 queries for a clean dump, and a bit less if the table is already populated. Removing the unique indexes on both the entries and segments tables and replacing them with standard indexes saves about a second in a full dump, and about 6 seconds for an update. I'm pretty happy with where it is now, and I suspect most of the time (aside from the two insert calls) is being spent in Python. That said, if you have any tips for improvements I'd be all ears. Thanks for the help! On Monday, 24 March 2014 09:19:25 UTC+8, Michael Bayer wrote: On Mar 23, 2014, at 11:33 AM, James Meneghello muro...@gmail.com wrote: I'm having a few issues with unique constraints and bulk inserts. The software I'm writing takes data from an external source (a lot of it, anywhere from 1,000 rows per minute to 100-200k+), crunches it down into its hierarchy and saves it to the DB, to be aggregated in the background. The function handling the initial DB save is designed to work with about 20-50k rows at a time - very little modification takes place, it's pretty much just grabbed and thrown into the table. Obviously the amount of data being saved somewhat excludes the use of the ORM in this particular table, but there are a number of other tables that benefit from using the ORM. Hence, the small stuff uses the ORM and the big stuff uses the Core. The main problem I'm having is with the initial save. The
Re: [sqlalchemy] Bulk Inserts and Unique Constraints
RETURNING doesn't work with DBAPI's executemany style of execution, however, which is what conn.execute(stmt, [list of parameter sets]) calls. On Mar 24, 2014, at 5:33 AM, Cosmia Luna cosm...@gmail.com wrote: INSERT statement of postgresql supports RETURNING, read this http://docs.sqlalchemy.org/en/rel_0_8/core/dml.html#sqlalchemy.sql.expression.Insert.returning On Monday, March 24, 2014 2:43:46 PM UTC+8, James Meneghello wrote: Oops, I should add - the reason I can't use an itertools counter to pre-assign IDs is because the table is potentially being dumped to by multiple scripts, which is why I have to commit the parts prior to the segments (since engine.execute can't return multiple insert_ids). On Monday, 24 March 2014 14:40:52 UTC+8, James Meneghello wrote: Thanks for the quick reply! This seems to work pretty well. I took out the batching (as it's already batched at a higher level) and modified it to suit the insertion of children as well (and reducded the unique to a single field) , and it appears to work. with db_session() as db: existing_entries = dict( ((entry.subject, entry) for entry in db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all() ) ) segment_inserts = [] for subject, entry in entries.items(): existing_entry = existing_entries.get(subject, None) if existing_entry: segments = dict(((s.segment, s) for s in existing_entry.segments)) for segment_number, segment in entry['segments'].items(): if int(segment_number) not in segments: segment['entry_id'] = existing_entry.id segment_inserts.append(segment) else: entry_id = engine.execute(Entry.__table__.insert(), entry).inserted_primary_key for segment in entry['segments'].values(): segment['entry_id'] = entry_id[0] segment_inserts.append(segment) engine.execute(Segment.__table__.insert(), segment_inserts) For 20,000 segments, this ends up being about 45 seconds and 1650 queries - 2 to select all the entries and segments, 1 to insert the segments and the rest to insert parts. From here, however, I rewrote it a bit: with db_session() as db: existing_entries = dict( ((entry.subject, entry) for entry in db.query(Entry).filter(Entry.subject.in_(entries.keys())).all() ) ) entry_inserts = [] for subject, entry in entries.items(): existing_entry = existing_entries.get(subject, None) if not existing_entry: entry_inserts.append(entry) engine.execute(Entry.__table__.insert(), entry_inserts) existing_entries = dict( ((entry.subject, entry) for entry in db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all() ) ) segment_inserts = [] for subject, entry in entries.items(): existing_entry = existing_entries.get(subject, None) if existing_entry: segments = dict(((s.segment, s) for s in existing_entry.segments)) for segment_number, segment in entry['segments'].items(): if int(segment_number) not in segments: segment['entry_id'] = existing_entry.id segment_inserts.append(segment) else: log.error('i\'ve made a huge mistake') engine.execute(Segment.__table__.insert(), segment_inserts) This ends up being about 19 seconds, 6 queries for a clean dump, and a bit less if the table is already populated. Removing the unique indexes on both the entries and segments tables and replacing them with standard indexes saves about a second in a full dump, and about 6 seconds for an update. I'm pretty happy with where it is now, and I suspect most of the time (aside from the two insert calls) is being spent in Python. That said, if you have any tips for improvements I'd be all ears. Thanks for the help! On Monday, 24 March 2014 09:19:25 UTC+8, Michael Bayer wrote: On Mar 23, 2014, at 11:33 AM, James Meneghello muro...@gmail.com wrote: I'm having a few issues with unique constraints and bulk inserts. The software I'm writing takes data from an external source (a lot of it, anywhere from 1,000 rows per minute to 100-200k+), crunches it down into its hierarchy and saves it to the DB, to be aggregated in the background. The function handling the initial DB save is designed to work with about 20-50k rows at a time - very little modification takes place, it's pretty much just grabbed and thrown into the table. Obviously the amount of data being saved somewhat excludes the use of the ORM in this
[sqlalchemy] Bulk Inserts and Unique Constraints
I'm having a few issues with unique constraints and bulk inserts. The software I'm writing takes data from an external source (a lot of it, anywhere from 1,000 rows per minute to 100-200k+), crunches it down into its hierarchy and saves it to the DB, to be aggregated in the background. The function handling the initial DB save is designed to work with about 20-50k rows at a time - very little modification takes place, it's pretty much just grabbed and thrown into the table. Obviously the amount of data being saved somewhat excludes the use of the ORM in this particular table, but there are a number of other tables that benefit from using the ORM. Hence, the small stuff uses the ORM and the big stuff uses the Core. The main problem I'm having is with the initial save. The data comes in unordered and sometimes contains duplicates, so there's a UniqueConstraint on Entry on sub, division, created. Unfortunately, this hampers the bulk insert - if there's a duplicate, it rolls back the entire insert and hence the entries aren't available to be referenced by the segments later. Obviously, capturing it in a try/catch would skip the whole block as well. Both Entry and Segment have the same problem - there are often duplicate segments. Since there's a large amount of data being pushed through it, I assume it's impractical to insert the elements individually - while there's only 100-200 entries per block, there's usually 20-50k segments. Is there any way of forcing the engine to skip over duplicates and not rollback the transaction on exception? Code's below. Using Postgres, with psycopg2 as the driver. engine.execute(Entry.__table__.insert(), entries) segment_list = [] for sub, entry in entry.items(): segments = entry.pop('segments') e = db.query(Entry)\ .filter(Entry.sub==entry['sub'])\ .filter(Entry.division==entry['division'])\ .filter(Entry.created==entry['created']).first() for segment in segments: segment['entry_id'] = e.id segment_list.append(segment) engine.execute(Segment.__table__.insert(), segment_list) In addition, is there some way to pre-fetch data? Rather than query for each Entry, it'd be nice to pre-load all entries and save a couple hundred queries. Thanks! -- You received this message because you are subscribed to the Google Groups sqlalchemy group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com. To post to this group, send email to sqlalchemy@googlegroups.com. Visit this group at http://groups.google.com/group/sqlalchemy. For more options, visit https://groups.google.com/d/optout.
Re: [sqlalchemy] Bulk Inserts and Unique Constraints
On Mar 23, 2014, at 11:33 AM, James Meneghello murod...@gmail.com wrote: I'm having a few issues with unique constraints and bulk inserts. The software I'm writing takes data from an external source (a lot of it, anywhere from 1,000 rows per minute to 100-200k+), crunches it down into its hierarchy and saves it to the DB, to be aggregated in the background. The function handling the initial DB save is designed to work with about 20-50k rows at a time - very little modification takes place, it's pretty much just grabbed and thrown into the table. Obviously the amount of data being saved somewhat excludes the use of the ORM in this particular table, but there are a number of other tables that benefit from using the ORM. Hence, the small stuff uses the ORM and the big stuff uses the Core. The main problem I'm having is with the initial save. The data comes in unordered and sometimes contains duplicates, so there's a UniqueConstraint on Entry on sub, division, created. Unfortunately, this hampers the bulk insert - if there's a duplicate, it rolls back the entire insert and hence the entries aren't available to be referenced by the segments later. Obviously, capturing it in a try/catch would skip the whole block as well. Both Entry and Segment have the same problem - there are often duplicate segments. Since there's a large amount of data being pushed through it, I assume it's impractical to insert the elements individually - while there's only 100-200 entries per block, there's usually 20-50k segments. Is there any way of forcing the engine to skip over duplicates and not rollback the transaction on exception? Code's below. Using Postgres, with psycopg2 as the driver. engine.execute(Entry.__table__.insert(), entries) segment_list = [] for sub, entry in entry.items(): segments = entry.pop('segments') e = db.query(Entry)\ .filter(Entry.sub==entry['sub'])\ .filter(Entry.division==entry['division'])\ .filter(Entry.created==entry['created']).first() for segment in segments: segment['entry_id'] = e.id segment_list.append(segment) engine.execute(Segment.__table__.insert(), segment_list) In addition, is there some way to pre-fetch data? Rather than query for each Entry, it'd be nice to pre-load all entries and save a couple hundred queries. you have all the entries in segment_list. Seems like you'd just want to dedupe entries as they enter that list, that's a pretty simple thing to do with a dictionary. but additionally you're also emitting a SELECT for every item individually so whatever time you're saving on that INSERT is just being expended with that huge number of SELECT statements anyway. so yes you certainly want to pre-load everything, since the criteria here is three different things you can use a tuple-based IN clause. below is an example that uses this and also batches so that it won't run out of memory under any circumstances, it starts with 50K rows in the database and then adds another 50K with 20K overlapping. the whole thing runs in about 28 seconds on my mac. from sqlalchemy import * from sqlalchemy.orm import * from sqlalchemy.ext.declarative import declarative_base import random import itertools Base = declarative_base() class Entry(Base): __tablename__ = 'a' id = Column(Integer, primary_key=True) sub = Column(Integer) division = Column(Integer) created = Column(Integer) __table_args__ = (UniqueConstraint('sub', 'division', 'created'), ) e = create_engine(postgresql://scott:tiger@localhost/test, echo=True) Base.metadata.drop_all(e) Base.metadata.create_all(e) a_bunch_of_fake_unique_entries = list(set( (random.randint(1, 10), random.randint(1, 10), random.randint(1, 10)) for i in range(10) )) entries_we_will_start_with = a_bunch_of_fake_unique_entries[0:5] entries_we_will_merge = a_bunch_of_fake_unique_entries[3:10] sess = Session(e) counter = itertools.count(1) sess.add_all([Entry(id=next(counter), sub=sub, division=division, created=created) for sub, division, created in entries_we_will_start_with]) sess.commit() # here's where your example begins... This will also batch it # to ensure it can scale arbitrarily while entries_we_will_merge: batch = entries_we_will_merge[0:1000] entries_we_will_merge = entries_we_will_merge[1000:] existing_entries = dict( ((entry.sub, entry.division, entry.created), entry) for entry in sess.query(Entry).filter( tuple_(Entry.sub, Entry.division, Entry.created).in_([ tuple_(sub, division, created) for sub, division, created in