Thank you Mike!!!

Sorry about that - my example had errors :(

In reality, I am calling `celery_state_session.flush()` and 
`celery_state_session.commit()` right after I update any of the `cs.xxxx` 
values.

@app.task(
    base=BaseTask,
    bind=True
)
def my_task(self, ...):

    # Persist checkpoint 'started'
    cs = celery_state_session.query(CeleryTask).get(self.request.id)
    cs.started = datetime.now()
    celery_state_session.flush()
    celery_state_session.commit()

    try: 
 
        with transaction.manager:
            mymodel = domain_model_session.query(MyModel).one()
            mymodel.value = "..."

            *# Persist checkpoint 'value updated' <<< This checkpoint is 
never persisted !!!!*

            cs = celery_state_session.query(CeleryTask).get(self.request.id)
            cs.model_value_updated = datetime.now()
            celery_state_session.flush()
            celery_state_session.commit() 
    
    except Exception as e:
 
            *# Persist checkpoint 'transaction failed'  <<< This checkpoint 
is never persisted !!!!*

            cs = celery_state_session.query(CeleryTask).get(self.request.id)
            cs.failed_with_exception = datetime.now()
            celery_state_session.flush()
            celery_state_session.commit()

    finally:
 
           # Persist checkpoint 'celery task completed anyhow'  

           cs = celery_state_session.query(CeleryTask).get(self.request.id)
           cs.failed_with_exception = datetime.now()
           celery_state_session.flush()
           celery_state_session.commit()


Couple of clarifications that might be worth mentioning:

The `*# persist checkpoint*` is in reality a function kinda like this:

def checkpoint(self, name):
    eng = engine_from_config(self.app.conf['PYRAMID_REGISTRY'].settings)
    eng.update_execution_options(autocommit=True, autoflush=False)
    factory = sessionmaker()
    factory.configure(bind=eng)
    celery_state_session = factory()
    celery_state = celery_state_session.query(..).get(..)
    celery_state.value = ...
    celery_state_session.flush()
    celery_state_session.commit()
    celery_state_session.close()

Notice I create a brand new 'eng` and a brand new `celery_state_session` 
each time I call my checkpoint function. I am not using `scoped_session` 
either. Perhaps that's all incorrect?

I have many celery tasks running at the same time changing the same 
`mymodel` row so concurrent updates are typical.

When all is good the SQL log shows a BEGIN (implicit) followed by UPDATE 
and finally a COMMIT for mymodel and my celery_state. 

But whenever `psycopg2.extensions.TransactionRollbackError` happens, I see 
a COMMIT but no UPDATE for `celery_state` preceding it.
Did the `celery_state_session` got bound to the `transaction.manager` 
automatically? because threadlocals? 

What's the correct way to create a session that is not automatically bound 
to any transaction or that can be explicitly bound to an isolated 
transaction that can be committed whenever?



On Thursday, June 21, 2018 at 7:37:52 PM UTC-5, Mike Bayer wrote:
>
> From looking at your code example, I don't see you calling 
> celery_session.flush() or celery_session.commit(), so nothing is going to 
> persist.  Just setting an attribute value does not send any SQL.
>
> On Thu, Jun 21, 2018, 7:22 PM HP3 <henddher...@gmail.com <javascript:>> 
> wrote:
>
>> Hello all:
>>
>> Within a celery task, I need to have 2 unrelated sessions that can 
>> commit/rollback independently of each other:
>> One session (`domain_model_session`) performs vanilla domain model 
>> operations and the other (`celery_state_session`) is to persist the state 
>> of the celery task itself **isolated from the domain model one**. 
>>
>> *Imagine "checkpoints"*
>>
>> Something along these lines
>>
>> @task
>> def my task(...):
>>
>>            *# Persist checkpoint 'started'*
>>
>> cs = celery_state_session.query(CeleryTask).get(self.request.id)
>> cs.started = datetime.now()
>> celery_state_session.commit()
>>
>> try: 
>>
>>  
>>
>>     with transaction.manager:
>>         mymodel = domain_model_session.query(MyModel).one()
>>         mymodel.value = "..."
>>
>>                    *# Persist checkpoint 'value updated' *
>>
>>         cs = celery_state_session.query(CeleryTask).get(self.request.id)
>>         cs.model_value_updated = datetime.now()
>>     
>> except Exception as e:
>>
>>  
>>              * # Persist checkpoint 'transaction failed'  <<< This 
>> checkpoint is never persisted !!!!*
>>
>>    cs = celery_state_session.query(CeleryTask).get(self.request.id)
>>    cs.failed_with_exception = datetime.now()   
>>
>> finally:
>>
>>  
>>               *# Persist checkpoint 'celery task completed anyhow'  *
>>
>>    cs = celery_state_session.query(CeleryTask).get(self.request.id)
>>    cs.failed_with_exception = datetime.now()
>>
>>    
>>
>> I've been trying multiple things but I can't get the behavior I need.
>>
>> No matter what, whenever the transaction associated to 
>> `domain_model_session` fails it "rolls back" all operations performed by 
>> `celery_state_session` as if `celery_state_session` was automatically bound 
>> to the same transaction.manager.
>>
>> How am I supposed to create a session that works isolated?
>>
>>
>>
>> -- 
>> SQLAlchemy - 
>> The Python SQL Toolkit and Object Relational Mapper
>>  
>> http://www.sqlalchemy.org/
>>  
>> To post example code, please provide an MCVE: Minimal, Complete, and 
>> Verifiable Example. See http://stackoverflow.com/help/mcve for a full 
>> description.
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "sqlalchemy" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to sqlalchemy+...@googlegroups.com <javascript:>.
>> To post to this group, send email to sqlal...@googlegroups.com 
>> <javascript:>.
>> Visit this group at https://groups.google.com/group/sqlalchemy.
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
SQLAlchemy - 
The Python SQL Toolkit and Object Relational Mapper

http://www.sqlalchemy.org/

To post example code, please provide an MCVE: Minimal, Complete, and Verifiable 
Example.  See  http://stackoverflow.com/help/mcve for a full description.
--- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

Reply via email to