Re: [sqlalchemy] Windowed Queries breaking after a commit and emitting many, many Selects.

2014-10-27 Thread James Meneghello
Using a scoped session with a session generator and I didn't want 
expire_on_commit to be False for everything, so setting it using the 
Session constructor wouldn't work properly. If a session was created prior 
to the one that needed that flag, it'd give me a ProtocolError since it 
couldn't change the session after it'd already been created. Manually 
setting the expire_on_commit attribute in the session and setting it back 
after it was done worked fine, though, and didn't mess with the scoped 
session pool:

with db_session() as db:
db.expire_on_commit = False
# do stuff
db.expire_on_commit = True

-- 
You received this message because you are subscribed to the Google Groups 
sqlalchemy group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.


[sqlalchemy] Windowed Queries breaking after a commit and emitting many, many Selects.

2014-10-26 Thread James Meneghello
The application I'm working on operates over extremely large datasets, so 
I'm using the query windowing from here 
(https://bitbucket.org/zzzeek/sqlalchemy/wiki/UsageRecipes/WindowedRangeQuery) 
to break it into manageable chunks. The query window is usually around 10k 
rows, after which it updates/deletes some rows and continues on. Simple 
breakdown is like this:

query = session.query(Item).filter(...several filters)
total_items = query.count() # used for logging
for row in windowed_query(query, Item.id, 1):
count += 1

# process, determine whether to keep (and update) or delete (put in a 
list for batch-deletion)
# one such example is:
if row.group_name != regex.group_name:
continue

if count = 1:
save(items) # items to be kept, issues updates
deleted = 
db.query(Item).filter(Item.id.in_(dead_items)).delete(synchronize_session='fetch')
session.commit()
count = 0

This works fine until it's gone through a save/delete cycle. Once it's 
saved, it goes back to access the windowed query again and pull the next 
10k rows. This works until the following line:

if row.group_name != regex.group_name:

At which point sqla will emit a SELECT for the item of that specific ID, 
presumably because the group_name wasn't available and it had to fetch it. 
This only occurs after the commit - so I assume that committing the session 
is breaking the query. Hence, for the next 10k rows, it emits 10k queries 
(one per row).

Because the script is potentially processing so many rows, I don't want to 
let the dead_items list grow to be massive, so the deletes need to occur 
fairly regularly throughout the process.

Any idea what's causing this / how to fix it? Thanks!

-- 
You received this message because you are subscribed to the Google Groups 
sqlalchemy group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.


[sqlalchemy] Threading Queries

2014-05-08 Thread James Meneghello
A couple of questions:

I'm writing an application using concurrent.futures (by process). The 
processes themselves are fairly involved - not simple functions. I'm using 
scoped_sessions and a context manager like so:

# db.py

engine = create_engine(sqlalchemy_url)
Session = scoped_session(sessionmaker(bind=engine))

@contextmanager
def db_session():
session = Session()
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.remove()

Using this context manager and something like the below code:

def process():
with db_session() as db:
# the function is obviously more involved than this
u = User(name='bob')
db.add(u)

return u

def main():
with db_session() as db:
g = Group(name='peeps')
user = process()
user.group = g

# this line breaks
db.add(g)

I'm guessing this is because the call to db_session() is nested inside 
another, meaning that the thread-local session is being closed inside 
process(), and so when it gets passed back to main() the session object is 
gone. Is there a recommended way to handle this?

Along similar lines, the application (using the session/engine creation as 
above) also has to use raw_connection() at a few points to access the 
copy_expert() cursor function from psycopg2. I'm getting very strange 
errors coming out of the copy functions - I suspect due to multiple copies 
occurring at once (there's ~4 processes running at once, but rarely copying 
at the same time). The copy code looks like this:


from db import engine

conn = engine.raw_connection()
cur = conn.cursor()
cur.copy_expert(COPY parts ({}) FROM STDIN WITH CSV ESCAPE 
E''.format(', '.join(ordering)), s)
conn.commit()

Does raw_connection() still pull from a connection pool, or could two calls 
to it at once potentially destroy things?

Some of the errors are below (the data going in is clean, I've manually 
checked it).

Thanks!


---

Traceback (most recent call last):
  File /usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/base.py, 
line 940, in _execute_context
context)
  File 
/usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/default.py, line 
435, in do_execute
cursor.execute(statement, parameters)
psycopg2.DatabaseError: insufficient data in D message
lost synchronization with server: got message type 5, length 808464640

...

sqlalchemy.exc.DatabaseError: (DatabaseError) insufficient data in D 
message
lost synchronization with server: got message type 5, length 808464640

...

psycopg2.InterfaceError: connection already closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File /usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/base.py, 
line 508, in _rollback_impl
self._handle_dbapi_exception(e, None, None, None, None)
  File /usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/base.py, 
line 1108, in _handle_dbapi_exception
exc_info
  File /usr/local/lib/python3.3/dist-packages/sqlalchemy/util/compat.py, 
line 174, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=exc_value)
  File /usr/local/lib/python3.3/dist-packages/sqlalchemy/util/compat.py, 
line 167, in reraise
raise value.with_traceback(tb)
  File /usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/base.py, 
line 506, in _rollback_impl
self.engine.dialect.do_rollback(self.connection)
  File 
/usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/default.py, line 
405, in do_rollback
dbapi_connection.rollback()
sqlalchemy.exc.InterfaceError: (InterfaceError) connection already closed 
None None

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File /usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/base.py, 
line 233, in connection
return self.__connection
AttributeError: 'Connection' object has no attribute 
'_Connection__connection'

...

Traceback (most recent call last):
  File /usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/base.py, 
line 940, in _execute_context
context)
  File 
/usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/default.py, line 
435, in do_execute
cursor.execute(statement, parameters)
psycopg2.DatabaseError: lost synchronization with server: got message type 

...

Traceback (most recent call last):
  File /usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/base.py, 
line 506, in _rollback_impl
self.engine.dialect.do_rollback(self.connection)
  File 
/usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/default.py, line 
405, in do_rollback
dbapi_connection.rollback()
psycopg2.InterfaceError: connection already closed

...
psycopg2.DatabaseError: error with no message from the libpq

-- 
You received this message because you are subscribed to the Google Groups 
sqlalchemy group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to 

[sqlalchemy] Re: Bulk Inserts and Unique Constraints

2014-03-25 Thread James Meneghello
I wasn't going to bother, but I had a look at doing this just out of 
curiosity, and these were the results:

executemany():

Inserting 424 entries: 0.3362s
Inserting 20,000 segments: 14.01s

COPY:

Inserting 425 entries: 0.04s
Inserting 20,000 segments: 0.3s

So a pretty massive boost. Thanks :)


On Monday, 24 March 2014 23:30:32 UTC+8, Jonathan Vanasco wrote:

 Since you're using Postgres... have you considered using python to 
 generate a COPY file ?

 Sqlalchemy doesn't seem to support it natively... maybe via 'text', but 
 your underlying psycopg2 driver does.

 it's way way way faster.  i've found it significantly faster than dropping 
 fkeys and using prepared statements.


-- 
You received this message because you are subscribed to the Google Groups 
sqlalchemy group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.


Re: [sqlalchemy] Bulk Inserts and Unique Constraints

2014-03-24 Thread James Meneghello
Thanks for the quick reply!

This seems to work pretty well. I took out the batching (as it's already 
batched at a higher level) and modified it to suit the insertion of 
children as well (and reducded the unique to a single field) , and it 
appears to work.

with db_session() as db:
existing_entries = dict(
((entry.subject, entry) for entry in

db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
)
)

segment_inserts = []
for subject, entry in entries.items():
existing_entry = existing_entries.get(subject, None)
if existing_entry:
segments = dict(((s.segment, s) for s in 
existing_entry.segments))
for segment_number, segment in entry['segments'].items():
if int(segment_number) not in segments:
segment['entry_id'] = existing_entry.id
segment_inserts.append(segment)
else:
entry_id = engine.execute(Entry.__table__.insert(), 
entry).inserted_primary_key
for segment in entry['segments'].values():
segment['entry_id'] = entry_id[0]
segment_inserts.append(segment)

engine.execute(Segment.__table__.insert(), segment_inserts)


For 20,000 segments, this ends up being about 45 seconds and 1650 queries - 
2 to select all the entries and segments, 1 to insert the segments and the 
rest to insert parts. From here, however, I rewrote it a bit:

with db_session() as db:
existing_entries = dict(
((entry.subject, entry) for entry in

db.query(Entry).filter(Entry.subject.in_(entries.keys())).all()
)
)

entry_inserts = []
for subject, entry in entries.items():
existing_entry = existing_entries.get(subject, None)
if not existing_entry:
entry_inserts.append(entry)

engine.execute(Entry.__table__.insert(), entry_inserts)

existing_entries = dict(
((entry.subject, entry) for entry in

db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
)
)

segment_inserts = []
for subject, entry in entries.items():
existing_entry = existing_entries.get(subject, None)
if existing_entry:
segments = dict(((s.segment, s) for s in 
existing_entry.segments))
for segment_number, segment in entry['segments'].items():
if int(segment_number) not in segments:
segment['entry_id'] = existing_entry.id
segment_inserts.append(segment)
else:
log.error('i\'ve made a huge mistake')

engine.execute(Segment.__table__.insert(), segment_inserts)

This ends up being about 19 seconds, 6 queries for a clean dump, and a bit 
less if the table is already populated. Removing the unique indexes on both 
the entries and segments tables and replacing them with standard indexes 
saves about a second in a full dump, and about 6 seconds for an update. I'm 
pretty happy with where it is now, and I suspect most of the time (aside 
from the two insert calls) is being spent in Python. That said, if you have 
any tips for improvements I'd be all ears.

Thanks for the help!

On Monday, 24 March 2014 09:19:25 UTC+8, Michael Bayer wrote:


 On Mar 23, 2014, at 11:33 AM, James Meneghello 
 muro...@gmail.comjavascript: 
 wrote:

 I'm having a few issues with unique constraints and bulk inserts. The 
 software I'm writing takes data from an external source (a lot of it, 
 anywhere from 1,000 rows per minute to 100-200k+), crunches it down into 
 its hierarchy and saves it to the DB, to be aggregated in the background. 
 The function handling the initial DB save is designed to work with about 
 20-50k rows at a time - very little modification takes place, it's pretty 
 much just grabbed and thrown into the table. Obviously the amount of data 
 being saved somewhat excludes the use of the ORM in this particular table, 
 but there are a number of other tables that benefit from using the ORM. 
 Hence, the small stuff uses the ORM and the big stuff uses the Core.

 The main problem I'm having is with the initial save. The data comes in 
 unordered and sometimes contains duplicates, so there's a UniqueConstraint 
 on Entry on sub, division, created. Unfortunately, this hampers the bulk 
 insert - if there's a duplicate, it rolls back the entire insert and hence 
 the entries aren't available to be referenced by the segments later. 
 Obviously, capturing it in a try/catch would skip the whole block as well. 
 Both Entry and Segment have the same problem - there are often duplicate 
 segments. Since there's a large amount of data being pushed through it, I 
 assume it's impractical to insert the elements individually - while there's 
 only 100-200

Re: [sqlalchemy] Bulk Inserts and Unique Constraints

2014-03-24 Thread James Meneghello
Oops, I should add - the reason I can't use an itertools counter to 
pre-assign IDs is because the table is potentially being dumped to by 
multiple scripts, which is why I have to commit the parts prior to the 
segments (since engine.execute can't return multiple insert_ids).

On Monday, 24 March 2014 14:40:52 UTC+8, James Meneghello wrote:

 Thanks for the quick reply!

 This seems to work pretty well. I took out the batching (as it's already 
 batched at a higher level) and modified it to suit the insertion of 
 children as well (and reducded the unique to a single field) , and it 
 appears to work.

 with db_session() as db:
 existing_entries = dict(
 ((entry.subject, entry) for entry in
 
 db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
 )
 )

 segment_inserts = []
 for subject, entry in entries.items():
 existing_entry = existing_entries.get(subject, None)
 if existing_entry:
 segments = dict(((s.segment, s) for s in 
 existing_entry.segments))
 for segment_number, segment in entry['segments'].items():
 if int(segment_number) not in segments:
 segment['entry_id'] = existing_entry.id
 segment_inserts.append(segment)
 else:
 entry_id = engine.execute(Entry.__table__.insert(), 
 entry).inserted_primary_key
 for segment in entry['segments'].values():
 segment['entry_id'] = entry_id[0]
 segment_inserts.append(segment)

 engine.execute(Segment.__table__.insert(), segment_inserts)


 For 20,000 segments, this ends up being about 45 seconds and 1650 queries 
 - 2 to select all the entries and segments, 1 to insert the segments and 
 the rest to insert parts. From here, however, I rewrote it a bit:

 with db_session() as db:
 existing_entries = dict(
 ((entry.subject, entry) for entry in
 
 db.query(Entry).filter(Entry.subject.in_(entries.keys())).all()
 )
 )

 entry_inserts = []
 for subject, entry in entries.items():
 existing_entry = existing_entries.get(subject, None)
 if not existing_entry:
 entry_inserts.append(entry)

 engine.execute(Entry.__table__.insert(), entry_inserts)

 existing_entries = dict(
 ((entry.subject, entry) for entry in
 
 db.query(Entry).filter(Entry.subject.in_(entries.keys())).options(subqueryload('segments')).all()
 )
 )

 segment_inserts = []
 for subject, entry in entries.items():
 existing_entry = existing_entries.get(subject, None)
 if existing_entry:
 segments = dict(((s.segment, s) for s in 
 existing_entry.segments))
 for segment_number, segment in entry['segments'].items():
 if int(segment_number) not in segments:
 segment['entry_id'] = existing_entry.id
 segment_inserts.append(segment)
 else:
 log.error('i\'ve made a huge mistake')

 engine.execute(Segment.__table__.insert(), segment_inserts)

 This ends up being about 19 seconds, 6 queries for a clean dump, and a bit 
 less if the table is already populated. Removing the unique indexes on both 
 the entries and segments tables and replacing them with standard indexes 
 saves about a second in a full dump, and about 6 seconds for an update. I'm 
 pretty happy with where it is now, and I suspect most of the time (aside 
 from the two insert calls) is being spent in Python. That said, if you have 
 any tips for improvements I'd be all ears.

 Thanks for the help!

 On Monday, 24 March 2014 09:19:25 UTC+8, Michael Bayer wrote:


 On Mar 23, 2014, at 11:33 AM, James Meneghello muro...@gmail.com wrote:

 I'm having a few issues with unique constraints and bulk inserts. The 
 software I'm writing takes data from an external source (a lot of it, 
 anywhere from 1,000 rows per minute to 100-200k+), crunches it down into 
 its hierarchy and saves it to the DB, to be aggregated in the background. 
 The function handling the initial DB save is designed to work with about 
 20-50k rows at a time - very little modification takes place, it's pretty 
 much just grabbed and thrown into the table. Obviously the amount of data 
 being saved somewhat excludes the use of the ORM in this particular table, 
 but there are a number of other tables that benefit from using the ORM. 
 Hence, the small stuff uses the ORM and the big stuff uses the Core.

 The main problem I'm having is with the initial save. The data comes in 
 unordered and sometimes contains duplicates, so there's a UniqueConstraint 
 on Entry on sub, division, created. Unfortunately, this hampers the bulk 
 insert - if there's a duplicate, it rolls back the entire insert

[sqlalchemy] Re: Bulk Inserts and Unique Constraints

2014-03-24 Thread James Meneghello
That's effectively what I'm doing now. I'm not sure there's much I can 
speed up at this point - the SELECTs take about 0.05s, it's just the 
INSERTs taking a bulk of the time - 11-15s depending on the number of rows. 
That said, I'm still running on development and there'll be a significant 
boost once it's on proper hardware.

On Monday, 24 March 2014 22:44:09 UTC+8, Jonathan Vanasco wrote:

 The data comes in unordered and sometimes contains duplicates, so there's 
 a UniqueConstraint on Entry on sub, division, created.

 Have you tried pre-processing the list first ?

 I've had similar situations, when dealing with browser , user and app 
 analytics.  

 I normally do a first pass to restructure the raw log file and note any 
 'selects' i might need to associate the records to; then I lock tables, 
 precache the selects, and do all the inserts.  the speed pickups have been 
 great.


-- 
You received this message because you are subscribed to the Google Groups 
sqlalchemy group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.


[sqlalchemy] Bulk Inserts and Unique Constraints

2014-03-23 Thread James Meneghello
I'm having a few issues with unique constraints and bulk inserts. The 
software I'm writing takes data from an external source (a lot of it, 
anywhere from 1,000 rows per minute to 100-200k+), crunches it down into 
its hierarchy and saves it to the DB, to be aggregated in the background. 
The function handling the initial DB save is designed to work with about 
20-50k rows at a time - very little modification takes place, it's pretty 
much just grabbed and thrown into the table. Obviously the amount of data 
being saved somewhat excludes the use of the ORM in this particular table, 
but there are a number of other tables that benefit from using the ORM. 
Hence, the small stuff uses the ORM and the big stuff uses the Core.

The main problem I'm having is with the initial save. The data comes in 
unordered and sometimes contains duplicates, so there's a UniqueConstraint 
on Entry on sub, division, created. Unfortunately, this hampers the bulk 
insert - if there's a duplicate, it rolls back the entire insert and hence 
the entries aren't available to be referenced by the segments later. 
Obviously, capturing it in a try/catch would skip the whole block as well. 
Both Entry and Segment have the same problem - there are often duplicate 
segments. Since there's a large amount of data being pushed through it, I 
assume it's impractical to insert the elements individually - while there's 
only 100-200 entries per block, there's usually 20-50k segments.

Is there any way of forcing the engine to skip over duplicates and not 
rollback the transaction on exception? Code's below. Using Postgres, with 
psycopg2 as the driver.


engine.execute(Entry.__table__.insert(), entries)

segment_list = []
for sub, entry in entry.items():
segments = entry.pop('segments')

e = db.query(Entry)\
.filter(Entry.sub==entry['sub'])\
.filter(Entry.division==entry['division'])\
.filter(Entry.created==entry['created']).first()

for segment in segments:
segment['entry_id'] = e.id
segment_list.append(segment)

engine.execute(Segment.__table__.insert(), segment_list)

In addition, is there some way to pre-fetch data? Rather than query for 
each Entry, it'd be nice to pre-load all entries and save a couple hundred 
queries.

Thanks!

-- 
You received this message because you are subscribed to the Google Groups 
sqlalchemy group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.