Hi,
I have had an issue with bulk operations arising from the following use 
case:
I was looping an iterator of mappings, which are either already in the DB 
(inserts) or not (updates). As I did not want to loop over it twice and 
wanted to do only on transaction, I used greenlets do split the iterator 
and concurrently run `bulk_insert_mappings` and `bulk_update_mappings` on 
the same session object. That looks like that:

"""
Insert some greenlet magic into generators
"""
from greenlet import greenlet


def greenletify_gen(mapping):
    """ This generator will be passed to bulk operations
        Greenlets allow us to get out of the bulk methods
        And thus run them concurrently """
    if mapping is None:
        raise StopIteration()
    yield mapping
    for mapping in iter(greenlet.getcurrent().parent.switch, None):
        yield mapping


"""
Concurrently run bulk operations
"""
from test_model import TestModel
from session_ctx_mgr import session_ctx_mgr
from sqlalchemy.exc import ResourceClosedError


with session_ctx_mgr() as session:
    insert_greenlet = greenlet(lambda mapping: 
session.bulk_insert_mappings(TestModel, greenletify_gen(mapping)))
    update_greenlet = greenlet(lambda mapping: 
session.bulk_update_mappings(TestModel, greenletify_gen(mapping)))
    insert_greenlet.switch({'id': 2, 'value': 2})
    update_greenlet.switch({'id': 1, 'value': 2})
    insert_greenlet.switch(None)
    update_greenlet.switch(None)

However the aforementioned example raises this error:

Traceback (most recent call last):
  File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/session.py", 
line 2332, in _bulk_save_mappings
    isstates, update_changed_only)
  File 
"/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/persistence.py", 
line 100, in _bulk_update
    if session_transaction.session.connection_callable:
AttributeError: 'NoneType' object has no attribute 'connection_callable'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "drunken-octo-dubstep.py", line 43, in <module>
    update_greenlet.switch(None)
  File "drunken-octo-dubstep.py", line 37, in <lambda>
    update_greenlet = greenlet(lambda mapping: 
session.bulk_update_mappings(TestModel, greenletify_gen(mapping)))
  File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/scoping.py", 
line 150, in do
    return getattr(self.registry(), name)(*args, **kwargs)
  File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/session.py", 
line 2318, in bulk_update_mappings
    self._bulk_save_mappings(mapper, mappings, True, False, False, False)
  File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/session.py", 
line 2340, in _bulk_save_mappings
    transaction.rollback(_capture_exception=True)
  File 
"/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/util/langhelpers.py", 
line 63, in __exit__
    compat.reraise(type_, value, traceback)
  File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/util/compat.py", 
line 182, in reraise
    raise value
  File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/session.py", 
line 2340, in _bulk_save_mappings
    transaction.rollback(_capture_exception=True)
  File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/session.py", 
line 408, in rollback
    self._assert_active(prepared_ok=True, rollback_ok=True)
  File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/session.py", 
line 223, in _assert_active
    raise sa_exc.ResourceClosedError(closed_msg)
sqlalchemy.exc.ResourceClosedError: This transaction is closed

Now, what's funny is that when you invert the 2 last lines, the exception 
disappears (ie, `bulk_update_mappings` ends before `bulk_insert_mappings`).
I managed to generalize the behavior, and it seems it all depends on the 
1st mapping of the loop. If it was an insert, `bulk_update_mappings` must 
end first. If it was an update, `bulk_insert_mappings` must end first.

The whole source code for the example is available 
here: https://github.com/Loamhoof/drunken-octo-dubstep, in the file 
`drunken-octo-dubsstep.py`.

Now, I posted here and not as an issue because:
1) I'm not sure there isn't an other, more legit way to do what I want
2) Such a way to use the API should be supported

So, questions related:
1) Is there indeed a better way to run both bulk operations while looping 
over an iterator only once?
2) Should this be considered a bug?

Versions used:
SQLAlchemy==1.0.5
greenlet==0.4.7
psycopg2==2.6.1
and Postgres 9.4.1, Python 3.4.3

Thanks for your time!

Regards,
Soeren

-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

Reply via email to