Re: [openstack-dev] [TaskFlow] TaskFlow persistence: Job failure retry

2016-06-05 Thread Joshua Harlow
Cool, we'll feel free to find the taskflow (and others) either in 
#openstack-oslo or #openstack-state-management if you have any questions.


-Josh

pnkk wrote:

I am working on NFV orchestrator based on MANO

Regards,
Kanthi

On Thu, Jun 2, 2016 at 3:00 AM, Joshua Harlow mailto:harlo...@fastmail.com>> wrote:

Interesting way to combine taskflow + celery.

I didn't expect it to be used like this, but the more power to you!

Taskflow itself has some similar capabilities via
http://docs.openstack.org/developer/taskflow/workers.html#design but
anyway what u've done is pretty neat as well.

I am assuming this isn't an openstack project (due to usage of
celery), any details on what's being worked on (am curious here)?

pnkk wrote:

Thanks for the nice documentation.

To my knowledge celery is widely used for distributed task
processing.
This fits our requirement perfectly where we want to return
immediate
response to the user from our API server and run long running
task in
background. Celery also gives flexibility with the worker
types(process(can overcome GIL problems too)/evetlet...) and it also
provides nice message brokers(rabbitmq,redis...)

We used both celery and taskflow for our core processing to
leverage the
benefits of both. Taskflow provides nice primitives like(execute,
revert, pre,post stuf) which takes off the load from the
application.

As far as the actual issue is concerned, I found one way to
solve it by
using celery "retry" option. This along with late_acks makes the
application highly fault tolerant.

http://docs.celeryproject.org/en/latest/faq.html#faq-acks-late-vs-retry

Regards,
Kanthi


On Sat, May 28, 2016 at 1:51 AM, Joshua Harlow
mailto:harlo...@fastmail.com>
>>
wrote:

 Seems like u could just use
http://docs.openstack.org/developer/taskflow/jobs.html (it appears
 that you may not be?); the job itself would when failed be then
 worked on by a different job consumer.

 Have u looked at those? It almost appears that u are using
celery as
 a job distribution system (similar to the jobs.html link
mentioned
 above)? Is that somewhat correct (I haven't seen anyone try
this,
 wondering how u are using it and the choices that directed u to
 that, aka, am curious)?

 -Josh

 pnkk wrote:

 To be specific, we hit this issue when the node running our
 service is
 rebooted.
 Our solution is designed in a way that each and every
job is a
 celery
 task and inside celery task, we create taskflow flow.

 We enabled late_acks in celery(uses rabbitmq as message
broker),
 so if
 our service/node goes down, other healthy service can
pick the
 job and
 completes it.
 This works fine, but we just hit this rare case where
the node was
 rebooted just when taskflow is updating something to
the database.

 In this case, it raises an exception and the job is marked
 failed. Since
 it is complete(with failure), message is removed from the
 rabbitmq and
 other worker would not be able to process it.
 Can taskflow handle such I/O errors gracefully or should
 application try
 to catch this exception? If application has to handle
it what would
 happen to that particular database transaction which
failed just
 when
 the node is rebooted? Who will retry this transaction?

 Thanks,
 Kanthi

 On Fri, May 27, 2016 at 5:39 PM, pnkk
mailto:pnkk2...@gmail.com>
>



Re: [openstack-dev] [TaskFlow] TaskFlow persistence: Job failure retry

2016-06-05 Thread pnkk
I am working on NFV orchestrator based on MANO

Regards,
Kanthi

On Thu, Jun 2, 2016 at 3:00 AM, Joshua Harlow  wrote:

> Interesting way to combine taskflow + celery.
>
> I didn't expect it to be used like this, but the more power to you!
>
> Taskflow itself has some similar capabilities via
> http://docs.openstack.org/developer/taskflow/workers.html#design but
> anyway what u've done is pretty neat as well.
>
> I am assuming this isn't an openstack project (due to usage of celery),
> any details on what's being worked on (am curious here)?
>
> pnkk wrote:
>
>> Thanks for the nice documentation.
>>
>> To my knowledge celery is widely used for distributed task processing.
>> This fits our requirement perfectly where we want to return immediate
>> response to the user from our API server and run long running task in
>> background. Celery also gives flexibility with the worker
>> types(process(can overcome GIL problems too)/evetlet...) and it also
>> provides nice message brokers(rabbitmq,redis...)
>>
>> We used both celery and taskflow for our core processing to leverage the
>> benefits of both. Taskflow provides nice primitives like(execute,
>> revert, pre,post stuf) which takes off the load from the application.
>>
>> As far as the actual issue is concerned, I found one way to solve it by
>> using celery "retry" option. This along with late_acks makes the
>> application highly fault tolerant.
>>
>> http://docs.celeryproject.org/en/latest/faq.html#faq-acks-late-vs-retry
>>
>> Regards,
>> Kanthi
>>
>>
>> On Sat, May 28, 2016 at 1:51 AM, Joshua Harlow > > wrote:
>>
>> Seems like u could just use
>> http://docs.openstack.org/developer/taskflow/jobs.html (it appears
>> that you may not be?); the job itself would when failed be then
>> worked on by a different job consumer.
>>
>> Have u looked at those? It almost appears that u are using celery as
>> a job distribution system (similar to the jobs.html link mentioned
>> above)? Is that somewhat correct (I haven't seen anyone try this,
>> wondering how u are using it and the choices that directed u to
>> that, aka, am curious)?
>>
>> -Josh
>>
>> pnkk wrote:
>>
>> To be specific, we hit this issue when the node running our
>> service is
>> rebooted.
>> Our solution is designed in a way that each and every job is a
>> celery
>> task and inside celery task, we create taskflow flow.
>>
>> We enabled late_acks in celery(uses rabbitmq as message broker),
>> so if
>> our service/node goes down, other healthy service can pick the
>> job and
>> completes it.
>> This works fine, but we just hit this rare case where the node was
>> rebooted just when taskflow is updating something to the database.
>>
>> In this case, it raises an exception and the job is marked
>> failed. Since
>> it is complete(with failure), message is removed from the
>> rabbitmq and
>> other worker would not be able to process it.
>> Can taskflow handle such I/O errors gracefully or should
>> application try
>> to catch this exception? If application has to handle it what
>> would
>> happen to that particular database transaction which failed just
>> when
>> the node is rebooted? Who will retry this transaction?
>>
>> Thanks,
>> Kanthi
>>
>> On Fri, May 27, 2016 at 5:39 PM, pnkk > 
>> >> wrote:
>>
>>  Hi,
>>
>>  When taskflow engine is executing a job, the execution
>> failed due to
>>  IO error(traceback pasted below).
>>
>>  2016-05-25 19:45:21.717 7119 ERROR
>>  taskflow.engines.action_engine.engine 127.0.1.1 [-]  Engine
>>  execution has failed, something bad must of happened (last 10
>>  machine transitions were [('SCHEDULING', 'WAITING'),
>> ('WAITING',
>> 'ANALYZING'), ('ANALYZING', 'SCHEDULING'), ('SCHEDULING',
>> 'WAITING'), ('WAITING', 'ANALYZING'), ('ANALYZING', 'SCHEDULING'),
>>  ('SCHEDULING', 'WAITING'), ('WAITING', 'ANALYZING'),
>> ('ANALYZING',
>> 'GAME_OVER'), ('GAME_OVER', 'FAILURE')])
>>  2016-05-25 19:45:21.717 7119 TRACE
>>  taskflow.engines.action_engine.engine Traceback (most
>> recent call last):
>>  2016-05-25 19:45:21.717 7119 TRACE
>>  taskflow.engines.action_engine.engine   File
>>
>> "/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/engine.py",
>>  line 269, in run_iter
>>  2016-05-25 19:45:21.717 7119 TRACE
>>  taskflow.engines.action_engine.engine
>>  failure.Failure.reraise_if_any(

Re: [openstack-dev] [TaskFlow] TaskFlow persistence: Job failure retry

2016-06-01 Thread Joshua Harlow

Interesting way to combine taskflow + celery.

I didn't expect it to be used like this, but the more power to you!

Taskflow itself has some similar capabilities via 
http://docs.openstack.org/developer/taskflow/workers.html#design but 
anyway what u've done is pretty neat as well.


I am assuming this isn't an openstack project (due to usage of celery), 
any details on what's being worked on (am curious here)?


pnkk wrote:

Thanks for the nice documentation.

To my knowledge celery is widely used for distributed task processing.
This fits our requirement perfectly where we want to return immediate
response to the user from our API server and run long running task in
background. Celery also gives flexibility with the worker
types(process(can overcome GIL problems too)/evetlet...) and it also
provides nice message brokers(rabbitmq,redis...)

We used both celery and taskflow for our core processing to leverage the
benefits of both. Taskflow provides nice primitives like(execute,
revert, pre,post stuf) which takes off the load from the application.

As far as the actual issue is concerned, I found one way to solve it by
using celery "retry" option. This along with late_acks makes the
application highly fault tolerant.

http://docs.celeryproject.org/en/latest/faq.html#faq-acks-late-vs-retry

Regards,
Kanthi


On Sat, May 28, 2016 at 1:51 AM, Joshua Harlow mailto:harlo...@fastmail.com>> wrote:

Seems like u could just use
http://docs.openstack.org/developer/taskflow/jobs.html (it appears
that you may not be?); the job itself would when failed be then
worked on by a different job consumer.

Have u looked at those? It almost appears that u are using celery as
a job distribution system (similar to the jobs.html link mentioned
above)? Is that somewhat correct (I haven't seen anyone try this,
wondering how u are using it and the choices that directed u to
that, aka, am curious)?

-Josh

pnkk wrote:

To be specific, we hit this issue when the node running our
service is
rebooted.
Our solution is designed in a way that each and every job is a
celery
task and inside celery task, we create taskflow flow.

We enabled late_acks in celery(uses rabbitmq as message broker),
so if
our service/node goes down, other healthy service can pick the
job and
completes it.
This works fine, but we just hit this rare case where the node was
rebooted just when taskflow is updating something to the database.

In this case, it raises an exception and the job is marked
failed. Since
it is complete(with failure), message is removed from the
rabbitmq and
other worker would not be able to process it.
Can taskflow handle such I/O errors gracefully or should
application try
to catch this exception? If application has to handle it what would
happen to that particular database transaction which failed just
when
the node is rebooted? Who will retry this transaction?

Thanks,
Kanthi

On Fri, May 27, 2016 at 5:39 PM, pnkk mailto:pnkk2...@gmail.com>
>> wrote:

 Hi,

 When taskflow engine is executing a job, the execution
failed due to
 IO error(traceback pasted below).

 2016-05-25 19:45:21.717 7119 ERROR
 taskflow.engines.action_engine.engine 127.0.1.1 [-]  Engine
 execution has failed, something bad must of happened (last 10
 machine transitions were [('SCHEDULING', 'WAITING'),
('WAITING',
'ANALYZING'), ('ANALYZING', 'SCHEDULING'), ('SCHEDULING',
'WAITING'), ('WAITING', 'ANALYZING'), ('ANALYZING', 'SCHEDULING'),
 ('SCHEDULING', 'WAITING'), ('WAITING', 'ANALYZING'),
('ANALYZING',
'GAME_OVER'), ('GAME_OVER', 'FAILURE')])
 2016-05-25 19:45:21.717 7119 TRACE
 taskflow.engines.action_engine.engine Traceback (most
recent call last):
 2016-05-25 19:45:21.717 7119 TRACE
 taskflow.engines.action_engine.engine   File

"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/engine.py",
 line 269, in run_iter
 2016-05-25 19:45:21.717 7119 TRACE
 taskflow.engines.action_engine.engine
 failure.Failure.reraise_if_any(memory.failures)
 2016-05-25 19:45:21.717 7119 TRACE
 taskflow.engines.action_engine.engine   File

"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/types/failure.py",
 line 336, in reraise_if_any
 2016-05-25 19:45:21.717 7119 TRACE
 taskflow.engines.action_engine.engine failures[0].reraise()
  

Re: [openstack-dev] [TaskFlow] TaskFlow persistence: Job failure retry

2016-06-01 Thread pnkk
Thanks for the nice documentation.

To my knowledge celery is widely used for distributed task processing. This
fits our requirement perfectly where we want to return immediate response
to the user from our API server and run long running task in background.
Celery also gives flexibility with the worker types(process(can overcome
GIL problems too)/evetlet...) and it also provides nice message
brokers(rabbitmq,redis...)

We used both celery and taskflow for our core processing to leverage the
benefits of both. Taskflow provides nice primitives like(execute, revert,
pre,post stuf) which takes off the load from the application.

As far as the actual issue is concerned, I found one way to solve it by
using celery "retry" option. This along with late_acks makes the
application highly fault tolerant.

http://docs.celeryproject.org/en/latest/faq.html#faq-acks-late-vs-retry

Regards,
Kanthi


On Sat, May 28, 2016 at 1:51 AM, Joshua Harlow 
wrote:

> Seems like u could just use
> http://docs.openstack.org/developer/taskflow/jobs.html (it appears that
> you may not be?); the job itself would when failed be then worked on by a
> different job consumer.
>
> Have u looked at those? It almost appears that u are using celery as a job
> distribution system (similar to the jobs.html link mentioned above)? Is
> that somewhat correct (I haven't seen anyone try this, wondering how u are
> using it and the choices that directed u to that, aka, am curious)?
>
> -Josh
>
> pnkk wrote:
>
>> To be specific, we hit this issue when the node running our service is
>> rebooted.
>> Our solution is designed in a way that each and every job is a celery
>> task and inside celery task, we create taskflow flow.
>>
>> We enabled late_acks in celery(uses rabbitmq as message broker), so if
>> our service/node goes down, other healthy service can pick the job and
>> completes it.
>> This works fine, but we just hit this rare case where the node was
>> rebooted just when taskflow is updating something to the database.
>>
>> In this case, it raises an exception and the job is marked failed. Since
>> it is complete(with failure), message is removed from the rabbitmq and
>> other worker would not be able to process it.
>> Can taskflow handle such I/O errors gracefully or should application try
>> to catch this exception? If application has to handle it what would
>> happen to that particular database transaction which failed just when
>> the node is rebooted? Who will retry this transaction?
>>
>> Thanks,
>> Kanthi
>>
>> On Fri, May 27, 2016 at 5:39 PM, pnkk > > wrote:
>>
>> Hi,
>>
>> When taskflow engine is executing a job, the execution failed due to
>> IO error(traceback pasted below).
>>
>> 2016-05-25 19:45:21.717 7119 ERROR
>> taskflow.engines.action_engine.engine 127.0.1.1 [-]  Engine
>> execution has failed, something bad must of happened (last 10
>> machine transitions were [('SCHEDULING', 'WAITING'), ('WAITING',
>> 'ANALYZING'), ('ANALYZING', 'SCHEDULING'), ('SCHEDULING',
>> 'WAITING'), ('WAITING', 'ANALYZING'), ('ANALYZING', 'SCHEDULING'),
>> ('SCHEDULING', 'WAITING'), ('WAITING', 'ANALYZING'), ('ANALYZING',
>> 'GAME_OVER'), ('GAME_OVER', 'FAILURE')])
>> 2016-05-25 19:45:21.717 7119 TRACE
>> taskflow.engines.action_engine.engine Traceback (most recent call
>> last):
>> 2016-05-25 19:45:21.717 7119 TRACE
>> taskflow.engines.action_engine.engine   File
>>
>> "/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/engine.py",
>> line 269, in run_iter
>> 2016-05-25 19:45:21.717 7119 TRACE
>> taskflow.engines.action_engine.engine
>> failure.Failure.reraise_if_any(memory.failures)
>> 2016-05-25 19:45:21.717 7119 TRACE
>> taskflow.engines.action_engine.engine   File
>>
>> "/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/types/failure.py",
>> line 336, in reraise_if_any
>> 2016-05-25 19:45:21.717 7119 TRACE
>> taskflow.engines.action_engine.engine failures[0].reraise()
>> 2016-05-25 19:45:21.717 7119 TRACE
>> taskflow.engines.action_engine.engine   File
>>
>> "/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/types/failure.py",
>> line 343, in reraise
>> 2016-05-25 19:45:21.717 7119 TRACE
>> taskflow.engines.action_engine.engine six.reraise(*self._exc_info)
>> 2016-05-25 19:45:21.717 7119 TRACE
>> taskflow.engines.action_engine.engine   File
>>
>> "/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/scheduler.py",
>> line 94, in schedule
>> 2016-05-25 19:45:21.717 7119 TRACE
>> taskflow.engines.action_engine.engine
>> futures.add(scheduler.schedule(atom))
>> 2016-05-25 19:45:21.717 7119 TRACE
>> taskflow.engines.action_engine.engine   File
>>
>> "/o

Re: [openstack-dev] [TaskFlow] TaskFlow persistence: Job failure retry

2016-05-27 Thread Joshua Harlow
Seems like u could just use 
http://docs.openstack.org/developer/taskflow/jobs.html (it appears that 
you may not be?); the job itself would when failed be then worked on by 
a different job consumer.


Have u looked at those? It almost appears that u are using celery as a 
job distribution system (similar to the jobs.html link mentioned above)? 
Is that somewhat correct (I haven't seen anyone try this, wondering how 
u are using it and the choices that directed u to that, aka, am curious)?


-Josh

pnkk wrote:

To be specific, we hit this issue when the node running our service is
rebooted.
Our solution is designed in a way that each and every job is a celery
task and inside celery task, we create taskflow flow.

We enabled late_acks in celery(uses rabbitmq as message broker), so if
our service/node goes down, other healthy service can pick the job and
completes it.
This works fine, but we just hit this rare case where the node was
rebooted just when taskflow is updating something to the database.

In this case, it raises an exception and the job is marked failed. Since
it is complete(with failure), message is removed from the rabbitmq and
other worker would not be able to process it.
Can taskflow handle such I/O errors gracefully or should application try
to catch this exception? If application has to handle it what would
happen to that particular database transaction which failed just when
the node is rebooted? Who will retry this transaction?

Thanks,
Kanthi

On Fri, May 27, 2016 at 5:39 PM, pnkk mailto:pnkk2...@gmail.com>> wrote:

Hi,

When taskflow engine is executing a job, the execution failed due to
IO error(traceback pasted below).

2016-05-25 19:45:21.717 7119 ERROR
taskflow.engines.action_engine.engine 127.0.1.1 [-]  Engine
execution has failed, something bad must of happened (last 10
machine transitions were [('SCHEDULING', 'WAITING'), ('WAITING',
'ANALYZING'), ('ANALYZING', 'SCHEDULING'), ('SCHEDULING',
'WAITING'), ('WAITING', 'ANALYZING'), ('ANALYZING', 'SCHEDULING'),
('SCHEDULING', 'WAITING'), ('WAITING', 'ANALYZING'), ('ANALYZING',
'GAME_OVER'), ('GAME_OVER', 'FAILURE')])
2016-05-25 19:45:21.717 7119 TRACE
taskflow.engines.action_engine.engine Traceback (most recent call last):
2016-05-25 19:45:21.717 7119 TRACE
taskflow.engines.action_engine.engine   File

"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/engine.py",
line 269, in run_iter
2016-05-25 19:45:21.717 7119 TRACE
taskflow.engines.action_engine.engine
failure.Failure.reraise_if_any(memory.failures)
2016-05-25 19:45:21.717 7119 TRACE
taskflow.engines.action_engine.engine   File

"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/types/failure.py",
line 336, in reraise_if_any
2016-05-25 19:45:21.717 7119 TRACE
taskflow.engines.action_engine.engine failures[0].reraise()
2016-05-25 19:45:21.717 7119 TRACE
taskflow.engines.action_engine.engine   File

"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/types/failure.py",
line 343, in reraise
2016-05-25 19:45:21.717 7119 TRACE
taskflow.engines.action_engine.engine six.reraise(*self._exc_info)
2016-05-25 19:45:21.717 7119 TRACE
taskflow.engines.action_engine.engine   File

"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/scheduler.py",
line 94, in schedule
2016-05-25 19:45:21.717 7119 TRACE
taskflow.engines.action_engine.engine
futures.add(scheduler.schedule(atom))
2016-05-25 19:45:21.717 7119 TRACE
taskflow.engines.action_engine.engine   File

"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/scheduler.py",
line 67, in schedule
2016-05-25 19:45:21.717 7119 TRACE
taskflow.engines.action_engine.engine return
self._task_action.schedule_execution(task)
2016-05-25 19:45:21.717 7119 TRACE
taskflow.engines.action_engine.engine   File

"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/actions/task.py",
line 99, in schedule_execution
2016-05-25 19:45:21.717 7119 TRACE
taskflow.engines.action_engine.engine self.change_state(task,
states.RUNNING, progress=0.0)
2016-05-25 19:45:21.717 7119 TRACE
taskflow.engines.action_engine.engine   File

"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/actions/task.py",
line 67, in change_state
2016-05-25 19:45:21.717 7119 TRACE
taskflow.engines.action_engine.engine
self._storage.set_atom_state(task.name , state)
2016-05-25 19:45:21.717 7119 TRACE
taskflo

Re: [openstack-dev] [TaskFlow] TaskFlow persistence: Job failure retry

2016-05-27 Thread pnkk
To be specific, we hit this issue when the node running our service is
rebooted.
Our solution is designed in a way that each and every job is a celery task
and inside celery task, we create taskflow flow.

We enabled late_acks in celery(uses rabbitmq as message broker), so if our
service/node goes down, other healthy service can pick the job and
completes it.
This works fine, but we just hit this rare case where the node was rebooted
just when taskflow is updating something to the database.

In this case, it raises an exception and the job is marked failed. Since it
is complete(with failure), message is removed from the rabbitmq and other
worker would not be able to process it.
Can taskflow handle such I/O errors gracefully or should application try to
catch this exception? If application has to handle it what would happen to
that particular database transaction which failed just when the node is
rebooted? Who will retry this transaction?

Thanks,
Kanthi

On Fri, May 27, 2016 at 5:39 PM, pnkk  wrote:

> Hi,
>
> When taskflow engine is executing a job, the execution failed due to IO
> error(traceback pasted below).
>
> 2016-05-25 19:45:21.717 7119 ERROR taskflow.engines.action_engine.engine
> 127.0.1.1 [-]  Engine execution has failed, something bad must of happened
> (last 10 machine transitions were [('SCHEDULING', 'WAITING'), ('WAITING',
> 'ANALYZING'), ('ANALYZING', 'SCHEDULING'), ('SCHEDULING', 'WAITING'),
> ('WAITING', 'ANALYZING'), ('ANALYZING', 'SCHEDULING'), ('SCHEDULING',
> 'WAITING'), ('WAITING', 'ANALYZING'), ('ANALYZING', 'GAME_OVER'),
> ('GAME_OVER', 'FAILURE')])
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
> Traceback (most recent call last):
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
> File
> "/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/engine.py",
> line 269, in run_iter
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
>   failure.Failure.reraise_if_any(memory.failures)
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
> File
> "/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/types/failure.py",
> line 336, in reraise_if_any
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
>   failures[0].reraise()
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
> File
> "/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/types/failure.py",
> line 343, in reraise
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
>   six.reraise(*self._exc_info)
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
> File
> "/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/scheduler.py",
> line 94, in schedule
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
>   futures.add(scheduler.schedule(atom))
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
> File
> "/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/scheduler.py",
> line 67, in schedule
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
>   return self._task_action.schedule_execution(task)
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
> File
> "/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/actions/task.py",
> line 99, in schedule_execution
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
>   self.change_state(task, states.RUNNING, progress=0.0)
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
> File
> "/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/actions/task.py",
> line 67, in change_state
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
>   self._storage.set_atom_state(task.name, state)
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
> File
> "/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/fasteners/lock.py",
> line 85, in wrapper
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
>   return f(self, *args, **kwargs)
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
> File
> "/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/storage.py",
> line 486, in set_atom_state
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
>   self._with_connection(self._save_atom_detail, source, clone)
> 2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
> F

[openstack-dev] [TaskFlow] TaskFlow persistence: Job failure retry

2016-05-27 Thread pnkk
Hi,

When taskflow engine is executing a job, the execution failed due to IO
error(traceback pasted below).

2016-05-25 19:45:21.717 7119 ERROR taskflow.engines.action_engine.engine
127.0.1.1 [-]  Engine execution has failed, something bad must of happened
(last 10 machine transitions were [('SCHEDULING', 'WAITING'), ('WAITING',
'ANALYZING'), ('ANALYZING', 'SCHEDULING'), ('SCHEDULING', 'WAITING'),
('WAITING', 'ANALYZING'), ('ANALYZING', 'SCHEDULING'), ('SCHEDULING',
'WAITING'), ('WAITING', 'ANALYZING'), ('ANALYZING', 'GAME_OVER'),
('GAME_OVER', 'FAILURE')])
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
Traceback (most recent call last):
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
File
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/engine.py",
line 269, in run_iter
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
  failure.Failure.reraise_if_any(memory.failures)
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
File
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/types/failure.py",
line 336, in reraise_if_any
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
  failures[0].reraise()
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
File
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/types/failure.py",
line 343, in reraise
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
  six.reraise(*self._exc_info)
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
File
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/scheduler.py",
line 94, in schedule
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
  futures.add(scheduler.schedule(atom))
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
File
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/scheduler.py",
line 67, in schedule
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
  return self._task_action.schedule_execution(task)
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
File
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/actions/task.py",
line 99, in schedule_execution
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
  self.change_state(task, states.RUNNING, progress=0.0)
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
File
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/actions/task.py",
line 67, in change_state
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
  self._storage.set_atom_state(task.name, state)
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
File
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/fasteners/lock.py",
line 85, in wrapper
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
  return f(self, *args, **kwargs)
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
File
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/storage.py",
line 486, in set_atom_state
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
  self._with_connection(self._save_atom_detail, source, clone)
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
File
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/storage.py",
line 341, in _with_connection
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
  return functor(conn, *args, **kwargs)
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
File
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/storage.py",
line 471, in _save_atom_detail
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
  original_atom_detail.update(conn.update_atom_details(atom_detail))
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
File
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/persistence/backends/impl_sqlalchemy.py",
line 427, in update_atom_details
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
  row = conn.execute(q).first()
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine
File
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/engin

Re: [openstack-dev] [TaskFlow] TaskFlow persistence

2016-03-23 Thread Joshua Harlow

On 03/23/2016 12:49 PM, pnkk wrote:

Joshua,

We are performing few scaling tests for our solution and see that there
are errors as below:

Failed saving logbook 'cc6f5cbd-c2f7-4432-9ca6-fff185cf853b'\n  InternalError: 
(pymysql.err.InternalError) (1205, u'Lock wait timeout exceeded; try restarting 
transaction') [SQL: u'UPDATE logbooks SET created_at=%s, updated_at=%s, meta=%s, 
name=%s, uuid=%s WHERE logbooks.uuid = %s'] [parameters: (datetime.datetime(2016, 3, 
18, 18, 16, 40), datetime.datetime(2016, 3, 23, 3, 3, 44, 95395), u'{}', u'test', 
u'cc6f5cbd-c2f7-4432-9ca6-fff185cf853b', 
u'cc6f5cbd-c2f7-4432-9ca6-fff185cf853b')]"


We have about 800 flows as of now and each flow is updated in the same logbook 
in a separate eventlet thread.


Every thread calls save_logbook() on the same logbook record. I think this 
function is trying to update logbook record even though my usecase needs only 
flow details to be inserted and it doesn't update any information related to 
logbook.



Right its trying to update the 'updated_at' field afaik,



Probably one of the threads was holding the lock while updating, and others 
tried for lock and failed after the default interval has elapsed.


I can think of few alternatives at the moment:


1. Increase the number of logbooks

2. Increase theinnodb_lock_wait_timeout

3. There are some suggestions to make the innodb transaction isolation level to "READ 
COMMITTED" instead of "REPEATABLE READ", but I am not very familiar of the side 
effects they can cause


4. Add some basic retries?

5. The following review should also help (and save less) @ 
https://review.openstack.org/#/c/241441/


Afaik we are also using READ COMMITTED already ;)

https://github.com/openstack/taskflow/blob/master/taskflow/persistence/backends/impl_sqlalchemy.py#L105




Appreciate your thoughts on given alternatives or probably even better 
alternative


Do u want to try using https://pypi.python.org/pypi/retrying in a few 
strategic places so that if the above occurs, that it retries?





Thanks,

Kanthi






On Sun, Mar 20, 2016 at 10:00 PM, Joshua Harlow mailto:harlo...@fastmail.com>> wrote:

Lingxian Kong wrote:

Kanthi, sorry for chiming in, I suggest you may have a chance to
take
a look at Mistral[1], which is the workflow as a service in
OpenStack(or without OpenStack).


Out of curiosity, why? Seems the ML post was about 'TaskFlow
persistence' not mistral, just saying (unsure how it is relevant to
mention mistral in this)...

Back to getting more coffee...

-Josh



__
OpenStack Development Mailing List (not for usage questions)
Unsubscribe:
openstack-dev-requ...@lists.openstack.org?subject:unsubscribe

http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev




__
OpenStack Development Mailing List (not for usage questions)
Unsubscribe: openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev



__
OpenStack Development Mailing List (not for usage questions)
Unsubscribe: openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev


Re: [openstack-dev] [TaskFlow] TaskFlow persistence

2016-03-23 Thread Nikhil Komawar
Just throwing this out there:

May be the sessions are open o_O? If you're using sqlalchemy to talk to
the DB then may be open and close the sessions per transaction than keep
them open for all threads?

On 3/23/16 3:49 PM, pnkk wrote:
> Joshua,
>
> We are performing few scaling tests for our solution and see that
> there are errors as below:
>
> Failed saving logbook 'cc6f5cbd-c2f7-4432-9ca6-fff185cf853b'\n  
> InternalError: (pymysql.err.InternalError) (1205, u'Lock wait timeout 
> exceeded; try restarting transaction') [SQL: u'UPDATE logbooks SET 
> created_at=%s, updated_at=%s, meta=%s, name=%s, uuid=%s WHERE logbooks.uuid = 
> %s'] [parameters: (datetime.datetime(2016, 3, 18, 18, 16, 40), 
> datetime.datetime(2016, 3, 23, 3, 3, 44, 95395), u'{}', u'test', 
> u'cc6f5cbd-c2f7-4432-9ca6-fff185cf853b', 
> u'cc6f5cbd-c2f7-4432-9ca6-fff185cf853b')]"
> We have about 800 flows as of now and each flow is updated in the same 
> logbook in a separate eventlet thread.
> Every thread calls save_logbook() on the same logbook record. I think this 
> function is trying to update logbook record even though my usecase needs only 
> flow details to be inserted and it doesn't update any information related to 
> logbook.
> Probably one of the threads was holding the lock while updating, and others 
> tried for lock and failed after the default interval has elapsed.
> I can think of few alternatives at the moment:
> 1. Increase the number of logbooks
> 2. Increase the innodb_lock_wait_timeout
> 3. There are some suggestions to make the innodb transaction isolation level 
> to "READ COMMITTED" instead of "REPEATABLE READ", but I am not very familiar 
> of the side effects they can cause
> Appreciate your thoughts on given alternatives or probably even better 
> alternative
> Thanks,
> Kanthi
>
> On Sun, Mar 20, 2016 at 10:00 PM, Joshua Harlow  > wrote:
>
> Lingxian Kong wrote:
>
> Kanthi, sorry for chiming in, I suggest you may have a chance
> to take
> a look at Mistral[1], which is the workflow as a service in
> OpenStack(or without OpenStack).
>
>
> Out of curiosity, why? Seems the ML post was about 'TaskFlow
> persistence' not mistral, just saying (unsure how it is relevant
> to mention mistral in this)...
>
> Back to getting more coffee...
>
> -Josh
>
>
>
> __
> OpenStack Development Mailing List (not for usage questions)
> Unsubscribe:
> openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
> 
> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>
>
>
>
> __
> OpenStack Development Mailing List (not for usage questions)
> Unsubscribe: openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev

-- 

Thanks,
Nikhil


__
OpenStack Development Mailing List (not for usage questions)
Unsubscribe: openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev


Re: [openstack-dev] [TaskFlow] TaskFlow persistence

2016-03-23 Thread pnkk
Joshua,

We are performing few scaling tests for our solution and see that there are
errors as below:

Failed saving logbook 'cc6f5cbd-c2f7-4432-9ca6-fff185cf853b'\n
InternalError: (pymysql.err.InternalError) (1205, u'Lock wait timeout
exceeded; try restarting transaction') [SQL: u'UPDATE logbooks SET
created_at=%s, updated_at=%s, meta=%s, name=%s, uuid=%s WHERE
logbooks.uuid = %s'] [parameters: (datetime.datetime(2016, 3, 18, 18,
16, 40), datetime.datetime(2016, 3, 23, 3, 3, 44, 95395), u'{}',
u'test', u'cc6f5cbd-c2f7-4432-9ca6-fff185cf853b',
u'cc6f5cbd-c2f7-4432-9ca6-fff185cf853b')]"


We have about 800 flows as of now and each flow is updated in the same
logbook in a separate eventlet thread.


Every thread calls save_logbook() on the same logbook record. I think
this function is trying to update logbook record even though my
usecase needs only flow details to be inserted and it doesn't update
any information related to logbook.


Probably one of the threads was holding the lock while updating, and
others tried for lock and failed after the default interval has
elapsed.


I can think of few alternatives at the moment:


1. Increase the number of logbooks

2. Increase the innodb_lock_wait_timeout

3. There are some suggestions to make the innodb transaction isolation
level to "READ COMMITTED" instead of "REPEATABLE READ", but I am not
very familiar of the side effects they can cause


Appreciate your thoughts on given alternatives or probably even better
alternative


Thanks,

Kanthi






On Sun, Mar 20, 2016 at 10:00 PM, Joshua Harlow 
wrote:

> Lingxian Kong wrote:
>
>> Kanthi, sorry for chiming in, I suggest you may have a chance to take
>> a look at Mistral[1], which is the workflow as a service in
>> OpenStack(or without OpenStack).
>>
>
> Out of curiosity, why? Seems the ML post was about 'TaskFlow persistence'
> not mistral, just saying (unsure how it is relevant to mention mistral in
> this)...
>
> Back to getting more coffee...
>
> -Josh
>
>
>
> __
> OpenStack Development Mailing List (not for usage questions)
> Unsubscribe: openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>
__
OpenStack Development Mailing List (not for usage questions)
Unsubscribe: openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev


Re: [openstack-dev] [TaskFlow] TaskFlow persistence

2016-03-20 Thread Joshua Harlow

Lingxian Kong wrote:

Kanthi, sorry for chiming in, I suggest you may have a chance to take
a look at Mistral[1], which is the workflow as a service in
OpenStack(or without OpenStack).


Out of curiosity, why? Seems the ML post was about 'TaskFlow 
persistence' not mistral, just saying (unsure how it is relevant to 
mention mistral in this)...


Back to getting more coffee...

-Josh


__
OpenStack Development Mailing List (not for usage questions)
Unsubscribe: openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev


Re: [openstack-dev] [TaskFlow] TaskFlow persistence

2016-03-20 Thread Lingxian Kong
Kanthi, sorry for chiming in, I suggest you may have a chance to take
a look at Mistral[1], which is the workflow as a service in
OpenStack(or without OpenStack).

[1]: http://docs.openstack.org/developer/mistral/

On Sun, Mar 20, 2016 at 5:35 AM, pnkk  wrote:
> Filed it at https://bugs.launchpad.net/taskflow/+bug/1559496
>
> Thanks,
> Kanthi
>
> On Sat, Mar 19, 2016 at 9:30 PM, Joshua Harlow 
> wrote:
>>
>> Interesting error, that could be a bug and perhaps we should ensure
>> upgrade is more thread-safe (with a lock on upgrade); can u open a bug @
>> bugs.launchpad.net/taskflow for that and we can try to add said lock (that
>> should hopefully resolve what u are seeing, although if it doesn't then the
>> bug lies somewhere else).
>>
>> Thanks much!
>>
>> -Josh
>>
>>
>> On 03/19/2016 08:45 AM, pnkk wrote:
>>>
>>> Hi Joshua,
>>>
>>> Thanks for all your inputs.
>>> We are using this feature successfully. But I rarely see an issue
>>> related to concurrency.
>>>
>>> To give you a brief, we use eventlets and every job runs in a separate
>>> eventlet thread.
>>>
>>> In the job execution part, we use taskflow functionality and persist all
>>> the details of the job.
>>>
>>> There we try to get the backend as below. We do upgrade everytime when a
>>> job is executed.
>>>
>>>  backend_uri = cfg.CONF.db.sqlalchemy_url
>>>  conf = {
>>>  'connection': backend_uri,
>>>  }
>>>  self.backend = backends.fetch(conf)
>>>  with contextlib.closing(self.backend.get_connection()) as conn:
>>>  conn.upgrade()
>>>
>>> Now when two jobs start executing at the same time, I see below error.
>>> Failed upgrading database version
>>>DBAPIError: (exceptions.RuntimeError) reentrant call inside
>>> <_io.BufferedReader name=14>
>>>
>>> We have monkey patched eventlet, is it not supposed to take care of
>>> these concurrency issues?
>>>
>>> Below are the versions for related modules in use:
>>> eventlet==0.17.4
>>> taskflow==1.26.0
>>> SQLAlchemy==1.0.9
>>>
>>> Thanks,
>>> Kanthi
>>>
>>> On Fri, Feb 12, 2016 at 1:29 PM, Joshua Harlow >> > wrote:
>>>
>>> pn kk wrote:
>>>
>>> Hi Joshua,
>>>
>>> Yes, sure will do that once I get some window out of my work.
>>>
>>> One last query(hopefully :) ) , can the factory method be an
>>> instance
>>> method of a class?
>>>
>>>
>>> Instance methods are particularly hard to use (since they require an
>>> instance of an object to be useful); so I think the check u have hit
>>> is making sure that if the flow-factory is called to recreate the
>>> flow that it can do so without having import issues. Currently I
>>> believe it doesn't handle instance methods (due to the complexity of
>>> needing an instance attached to that method).
>>>
>>> Perhaps what u want is to provide a function that can be found like:
>>>
>>> def make_flow_factory():
>>> return FlowFactory().flow_factory
>>>
>>> Or make flow_factory a class or static method, which should have the
>>> same/similar effect.
>>>
>>> Hope that makes sense (more queries are fine and welcome!)
>>>
>>>
>>> I tried giving it as "FlowFactory().flow_factory", where
>>> FlowFactory is
>>> my class name. It failed with below error:
>>> ValueError: Flow factory >> of
>>> <__main__.FlowFactory instance at 0x2b43b48>> is not reimportable
>>> by
>>> name __builtin__.instance.flow_factory
>>>
>>> Thanks again
>>> -Kanthi
>>>
>>>
>>> On Thu, Jan 28, 2016 at 12:29 AM, Joshua Harlow
>>> mailto:harlo...@fastmail.com>
>>> >>
>>> wrote:
>>>
>>>  pn kk wrote:
>>>
>>>  Hi,
>>>
>>>  Thanks for the responses. Putting it in a small example
>>>
>>>  def flow_factory(tmp):
>>>return lf.Flow('resume from backend example').add(
>>>TestTask(name='first', test=tmp),
>>>InterruptTask(name='boom'),
>>>TestTask(name='second', test="second task"))
>>>
>>>
>>>  class TestTask(task.Task):
>>>def __init__(self, name, provides=None, test,
>>> **kwargs):
>>>self.test=test
>>>super(TestTask, self).__init__(name,
>>> provides, **kwargs)
>>>def execute(self, *args, **kwargs):
>>>print('executing %s' % self)
>>>return 'ok'
>>>
>>>  class InterruptTask(task.Task):
>>>def execute(self, *args, **kwargs):
>>># DO NOT TRY THIS AT HOME
>>>engine.suspend()
>>>
>>>  I was searching for a way in wh

Re: [openstack-dev] [TaskFlow] TaskFlow persistence

2016-03-19 Thread pnkk
Filed it at https://bugs.launchpad.net/taskflow/+bug/1559496

Thanks,
Kanthi

On Sat, Mar 19, 2016 at 9:30 PM, Joshua Harlow 
wrote:

> Interesting error, that could be a bug and perhaps we should ensure
> upgrade is more thread-safe (with a lock on upgrade); can u open a bug @
> bugs.launchpad.net/taskflow for that and we can try to add said lock
> (that should hopefully resolve what u are seeing, although if it doesn't
> then the bug lies somewhere else).
>
> Thanks much!
>
> -Josh
>
>
> On 03/19/2016 08:45 AM, pnkk wrote:
>
>> Hi Joshua,
>>
>> Thanks for all your inputs.
>> We are using this feature successfully. But I rarely see an issue
>> related to concurrency.
>>
>> To give you a brief, we use eventlets and every job runs in a separate
>> eventlet thread.
>>
>> In the job execution part, we use taskflow functionality and persist all
>> the details of the job.
>>
>> There we try to get the backend as below. We do upgrade everytime when a
>> job is executed.
>>
>>  backend_uri = cfg.CONF.db.sqlalchemy_url
>>  conf = {
>>  'connection': backend_uri,
>>  }
>>  self.backend = backends.fetch(conf)
>>  with contextlib.closing(self.backend.get_connection()) as conn:
>>  conn.upgrade()
>>
>> Now when two jobs start executing at the same time, I see below error.
>> Failed upgrading database version
>>DBAPIError: (exceptions.RuntimeError) reentrant call inside
>> <_io.BufferedReader name=14>
>>
>> We have monkey patched eventlet, is it not supposed to take care of
>> these concurrency issues?
>>
>> Below are the versions for related modules in use:
>> eventlet==0.17.4
>> taskflow==1.26.0
>> SQLAlchemy==1.0.9
>>
>> Thanks,
>> Kanthi
>>
>> On Fri, Feb 12, 2016 at 1:29 PM, Joshua Harlow > > wrote:
>>
>> pn kk wrote:
>>
>> Hi Joshua,
>>
>> Yes, sure will do that once I get some window out of my work.
>>
>> One last query(hopefully :) ) , can the factory method be an
>> instance
>> method of a class?
>>
>>
>> Instance methods are particularly hard to use (since they require an
>> instance of an object to be useful); so I think the check u have hit
>> is making sure that if the flow-factory is called to recreate the
>> flow that it can do so without having import issues. Currently I
>> believe it doesn't handle instance methods (due to the complexity of
>> needing an instance attached to that method).
>>
>> Perhaps what u want is to provide a function that can be found like:
>>
>> def make_flow_factory():
>> return FlowFactory().flow_factory
>>
>> Or make flow_factory a class or static method, which should have the
>> same/similar effect.
>>
>> Hope that makes sense (more queries are fine and welcome!)
>>
>>
>> I tried giving it as "FlowFactory().flow_factory", where
>> FlowFactory is
>> my class name. It failed with below error:
>> ValueError: Flow factory > <__main__.FlowFactory instance at 0x2b43b48>> is not reimportable
>> by
>> name __builtin__.instance.flow_factory
>>
>> Thanks again
>> -Kanthi
>>
>>
>> On Thu, Jan 28, 2016 at 12:29 AM, Joshua Harlow
>> mailto:harlo...@fastmail.com>
>> >>
>> wrote:
>>
>>  pn kk wrote:
>>
>>  Hi,
>>
>>  Thanks for the responses. Putting it in a small example
>>
>>  def flow_factory(tmp):
>>return lf.Flow('resume from backend example').add(
>>TestTask(name='first', test=tmp),
>>InterruptTask(name='boom'),
>>TestTask(name='second', test="second task"))
>>
>>
>>  class TestTask(task.Task):
>>def __init__(self, name, provides=None, test,
>> **kwargs):
>>self.test=test
>>super(TestTask, self).__init__(name,
>> provides, **kwargs)
>>def execute(self, *args, **kwargs):
>>print('executing %s' % self)
>>return 'ok'
>>
>>  class InterruptTask(task.Task):
>>def execute(self, *args, **kwargs):
>># DO NOT TRY THIS AT HOME
>>engine.suspend()
>>
>>  I was searching for a way in which I can reload the
>> flow after crash
>>  without passing the parameter "tmp" shown above
>>  Looks like "load_from_factory" gives this provision.
>>
>>
>>  Thanks for the example, ya, this is one such way to do this
>> that u
>>  have found, there are a few other ways, but that is the
>> main one
>>  that people sho

Re: [openstack-dev] [TaskFlow] TaskFlow persistence

2016-03-19 Thread Joshua Harlow
Interesting error, that could be a bug and perhaps we should ensure 
upgrade is more thread-safe (with a lock on upgrade); can u open a bug @ 
bugs.launchpad.net/taskflow for that and we can try to add said lock 
(that should hopefully resolve what u are seeing, although if it doesn't 
then the bug lies somewhere else).


Thanks much!

-Josh

On 03/19/2016 08:45 AM, pnkk wrote:

Hi Joshua,

Thanks for all your inputs.
We are using this feature successfully. But I rarely see an issue
related to concurrency.

To give you a brief, we use eventlets and every job runs in a separate
eventlet thread.

In the job execution part, we use taskflow functionality and persist all
the details of the job.

There we try to get the backend as below. We do upgrade everytime when a
job is executed.

 backend_uri = cfg.CONF.db.sqlalchemy_url
 conf = {
 'connection': backend_uri,
 }
 self.backend = backends.fetch(conf)
 with contextlib.closing(self.backend.get_connection()) as conn:
 conn.upgrade()

Now when two jobs start executing at the same time, I see below error.
Failed upgrading database version
   DBAPIError: (exceptions.RuntimeError) reentrant call inside
<_io.BufferedReader name=14>

We have monkey patched eventlet, is it not supposed to take care of
these concurrency issues?

Below are the versions for related modules in use:
eventlet==0.17.4
taskflow==1.26.0
SQLAlchemy==1.0.9

Thanks,
Kanthi

On Fri, Feb 12, 2016 at 1:29 PM, Joshua Harlow mailto:harlo...@fastmail.com>> wrote:

pn kk wrote:

Hi Joshua,

Yes, sure will do that once I get some window out of my work.

One last query(hopefully :) ) , can the factory method be an
instance
method of a class?


Instance methods are particularly hard to use (since they require an
instance of an object to be useful); so I think the check u have hit
is making sure that if the flow-factory is called to recreate the
flow that it can do so without having import issues. Currently I
believe it doesn't handle instance methods (due to the complexity of
needing an instance attached to that method).

Perhaps what u want is to provide a function that can be found like:

def make_flow_factory():
return FlowFactory().flow_factory

Or make flow_factory a class or static method, which should have the
same/similar effect.

Hope that makes sense (more queries are fine and welcome!)


I tried giving it as "FlowFactory().flow_factory", where
FlowFactory is
my class name. It failed with below error:
ValueError: Flow factory > is not reimportable by
name __builtin__.instance.flow_factory

Thanks again
-Kanthi


On Thu, Jan 28, 2016 at 12:29 AM, Joshua Harlow
mailto:harlo...@fastmail.com>
>>
wrote:

 pn kk wrote:

 Hi,

 Thanks for the responses. Putting it in a small example

 def flow_factory(tmp):
   return lf.Flow('resume from backend example').add(
   TestTask(name='first', test=tmp),
   InterruptTask(name='boom'),
   TestTask(name='second', test="second task"))


 class TestTask(task.Task):
   def __init__(self, name, provides=None, test,
**kwargs):
   self.test=test
   super(TestTask, self).__init__(name,
provides, **kwargs)
   def execute(self, *args, **kwargs):
   print('executing %s' % self)
   return 'ok'

 class InterruptTask(task.Task):
   def execute(self, *args, **kwargs):
   # DO NOT TRY THIS AT HOME
   engine.suspend()

 I was searching for a way in which I can reload the
flow after crash
 without passing the parameter "tmp" shown above
 Looks like "load_from_factory" gives this provision.


 Thanks for the example, ya, this is one such way to do this
that u
 have found, there are a few other ways, but that is the
main one
 that people should be using.



 engine =

taskflow.engines.load_from_factory(flow_factory=flow_factory,
 factory_kwargs={"tmp":"test_data"}, book=book,
backend=backend)
 engine.run()

 Now it suspends after running interrupt task, I can now
reload
 the flow
 from the saved factory method without passing parameter
again.
 for flow_detail_2 in book:
   engine2 =
taskflow.engines.load_from_

Re: [openstack-dev] [TaskFlow] TaskFlow persistence

2016-03-19 Thread pnkk
Hi Joshua,

Thanks for all your inputs.
We are using this feature successfully. But I rarely see an issue related
to concurrency.

To give you a brief, we use eventlets and every job runs in a separate
eventlet thread.

In the job execution part, we use taskflow functionality and persist all
the details of the job.

There we try to get the backend as below. We do upgrade everytime when a
job is executed.

backend_uri = cfg.CONF.db.sqlalchemy_url
conf = {
'connection': backend_uri,
}
self.backend = backends.fetch(conf)
with contextlib.closing(self.backend.get_connection()) as conn:
conn.upgrade()

Now when two jobs start executing at the same time, I see below error.
Failed upgrading database version
  DBAPIError: (exceptions.RuntimeError) reentrant call inside
<_io.BufferedReader name=14>

We have monkey patched eventlet, is it not supposed to take care of these
concurrency issues?

Below are the versions for related modules in use:
eventlet==0.17.4
taskflow==1.26.0
SQLAlchemy==1.0.9

Thanks,
Kanthi

On Fri, Feb 12, 2016 at 1:29 PM, Joshua Harlow 
wrote:

> pn kk wrote:
>
>> Hi Joshua,
>>
>> Yes, sure will do that once I get some window out of my work.
>>
>> One last query(hopefully :) ) , can the factory method be an instance
>> method of a class?
>>
>
> Instance methods are particularly hard to use (since they require an
> instance of an object to be useful); so I think the check u have hit is
> making sure that if the flow-factory is called to recreate the flow that it
> can do so without having import issues. Currently I believe it doesn't
> handle instance methods (due to the complexity of needing an instance
> attached to that method).
>
> Perhaps what u want is to provide a function that can be found like:
>
> def make_flow_factory():
>return FlowFactory().flow_factory
>
> Or make flow_factory a class or static method, which should have the
> same/similar effect.
>
> Hope that makes sense (more queries are fine and welcome!)
>
>
>> I tried giving it as "FlowFactory().flow_factory", where FlowFactory is
>> my class name. It failed with below error:
>> ValueError: Flow factory > <__main__.FlowFactory instance at 0x2b43b48>> is not reimportable by
>> name __builtin__.instance.flow_factory
>>
>> Thanks again
>> -Kanthi
>>
>>
>> On Thu, Jan 28, 2016 at 12:29 AM, Joshua Harlow > > wrote:
>>
>> pn kk wrote:
>>
>> Hi,
>>
>> Thanks for the responses. Putting it in a small example
>>
>> def flow_factory(tmp):
>>   return lf.Flow('resume from backend example').add(
>>   TestTask(name='first', test=tmp),
>>   InterruptTask(name='boom'),
>>   TestTask(name='second', test="second task"))
>>
>>
>> class TestTask(task.Task):
>>   def __init__(self, name, provides=None, test, **kwargs):
>>   self.test=test
>>   super(TestTask, self).__init__(name, provides, **kwargs)
>>   def execute(self, *args, **kwargs):
>>   print('executing %s' % self)
>>   return 'ok'
>>
>> class InterruptTask(task.Task):
>>   def execute(self, *args, **kwargs):
>>   # DO NOT TRY THIS AT HOME
>>   engine.suspend()
>>
>> I was searching for a way in which I can reload the flow after
>> crash
>> without passing the parameter "tmp" shown above
>> Looks like "load_from_factory" gives this provision.
>>
>>
>> Thanks for the example, ya, this is one such way to do this that u
>> have found, there are a few other ways, but that is the main one
>> that people should be using.
>>
>>
>>
>> engine =
>> taskflow.engines.load_from_factory(flow_factory=flow_factory,
>> factory_kwargs={"tmp":"test_data"}, book=book, backend=backend)
>> engine.run()
>>
>> Now it suspends after running interrupt task, I can now reload
>> the flow
>> from the saved factory method without passing parameter again.
>> for flow_detail_2 in book:
>>   engine2 = taskflow.engines.load_from_detail(flow_detail_2,
>> backend=backend)
>>
>> engine2.run()
>>
>> Let me know if this is ok or is there a better approach to
>> achieve this?
>>
>>
>> There are a few other ways, but this one should be the currently
>> recommended one.
>>
>> An idea, do u want to maybe update (submit a review to update) the
>> docs, if u didn't find this very easy to figure out so that others
>> can more easily figure it out. I'm sure that would be appreciated by
>> all.
>>
>>
>> -Thanks
>>
>>
>> On Wed, Jan 27, 2016 at 12:03 AM, Joshua Harlow
>> mailto:harlo...@fastmail.com>
>> >>
>>
>> wrote:
>>
>>  Hi there,

Re: [openstack-dev] [TaskFlow] TaskFlow persistence

2016-02-12 Thread Joshua Harlow

pn kk wrote:

Hi Joshua,

Yes, sure will do that once I get some window out of my work.

One last query(hopefully :) ) , can the factory method be an instance
method of a class?


Instance methods are particularly hard to use (since they require an 
instance of an object to be useful); so I think the check u have hit is 
making sure that if the flow-factory is called to recreate the flow that 
it can do so without having import issues. Currently I believe it 
doesn't handle instance methods (due to the complexity of needing an 
instance attached to that method).


Perhaps what u want is to provide a function that can be found like:

def make_flow_factory():
   return FlowFactory().flow_factory

Or make flow_factory a class or static method, which should have the 
same/similar effect.


Hope that makes sense (more queries are fine and welcome!)



I tried giving it as "FlowFactory().flow_factory", where FlowFactory is
my class name. It failed with below error:
ValueError: Flow factory > is not reimportable by
name __builtin__.instance.flow_factory

Thanks again
-Kanthi


On Thu, Jan 28, 2016 at 12:29 AM, Joshua Harlow mailto:harlo...@fastmail.com>> wrote:

pn kk wrote:

Hi,

Thanks for the responses. Putting it in a small example

def flow_factory(tmp):
  return lf.Flow('resume from backend example').add(
  TestTask(name='first', test=tmp),
  InterruptTask(name='boom'),
  TestTask(name='second', test="second task"))


class TestTask(task.Task):
  def __init__(self, name, provides=None, test, **kwargs):
  self.test=test
  super(TestTask, self).__init__(name, provides, **kwargs)
  def execute(self, *args, **kwargs):
  print('executing %s' % self)
  return 'ok'

class InterruptTask(task.Task):
  def execute(self, *args, **kwargs):
  # DO NOT TRY THIS AT HOME
  engine.suspend()

I was searching for a way in which I can reload the flow after crash
without passing the parameter "tmp" shown above
Looks like "load_from_factory" gives this provision.


Thanks for the example, ya, this is one such way to do this that u
have found, there are a few other ways, but that is the main one
that people should be using.



engine =
taskflow.engines.load_from_factory(flow_factory=flow_factory,
factory_kwargs={"tmp":"test_data"}, book=book, backend=backend)
engine.run()

Now it suspends after running interrupt task, I can now reload
the flow
from the saved factory method without passing parameter again.
for flow_detail_2 in book:
  engine2 = taskflow.engines.load_from_detail(flow_detail_2,
backend=backend)

engine2.run()

Let me know if this is ok or is there a better approach to
achieve this?


There are a few other ways, but this one should be the currently
recommended one.

An idea, do u want to maybe update (submit a review to update) the
docs, if u didn't find this very easy to figure out so that others
can more easily figure it out. I'm sure that would be appreciated by
all.


-Thanks


On Wed, Jan 27, 2016 at 12:03 AM, Joshua Harlow
mailto:harlo...@fastmail.com>
>>
wrote:

 Hi there,

 Michał is correct, it should be saved.

 Do u have a small example of what u are trying to do
because that
 will help determine if what u are doing will be saved or
whether it
 will not be.

 Or even possibly explaining what is being done would be fine to
 (more data/info for me to reason about what should be
stored in your
 case).

 Thanks,

 Josh


 Michał Dulko wrote:

 On 01/26/2016 10:23 AM, pn kk wrote:

 Hi,

 I use taskflow for job management and now trying to
persist
 the state
 of flows/tasks in mysql to recover incase of
process crashes.

 I could see the state and the task results stored
in the
 database.

 Now I am looking for some way to store the input
parameters
 of the tasks.

 Please share your inputs to achieve this.

 -Thanks

 I've played with that some time ago and if I recall
correctly input
 parameters should be available in the flow's storage, which
 means these
 are also saved to the DB. Take a look on
resume_workflows method
 o

Re: [openstack-dev] [TaskFlow] TaskFlow persistence

2016-02-11 Thread pn kk
Hi Joshua,

Yes, sure will do that once I get some window out of my work.

One last query(hopefully :) ) , can the factory method be an instance
method of a class?

I tried giving it as "FlowFactory().flow_factory", where FlowFactory is my
class name. It failed with below error:
ValueError: Flow factory > is not reimportable by name
__builtin__.instance.flow_factory

Thanks again
-Kanthi


On Thu, Jan 28, 2016 at 12:29 AM, Joshua Harlow 
wrote:

> pn kk wrote:
>
>> Hi,
>>
>> Thanks for the responses. Putting it in a small example
>>
>> def flow_factory(tmp):
>>  return lf.Flow('resume from backend example').add(
>>  TestTask(name='first', test=tmp),
>>  InterruptTask(name='boom'),
>>  TestTask(name='second', test="second task"))
>>
>>
>> class TestTask(task.Task):
>>  def __init__(self, name, provides=None, test, **kwargs):
>>  self.test=test
>>  super(TestTask, self).__init__(name, provides, **kwargs)
>>  def execute(self, *args, **kwargs):
>>  print('executing %s' % self)
>>  return 'ok'
>>
>> class InterruptTask(task.Task):
>>  def execute(self, *args, **kwargs):
>>  # DO NOT TRY THIS AT HOME
>>  engine.suspend()
>>
>> I was searching for a way in which I can reload the flow after crash
>> without passing the parameter "tmp" shown above
>> Looks like "load_from_factory" gives this provision.
>>
>
> Thanks for the example, ya, this is one such way to do this that u have
> found, there are a few other ways, but that is the main one that people
> should be using.
>
>
>>
>> engine = taskflow.engines.load_from_factory(flow_factory=flow_factory,
>> factory_kwargs={"tmp":"test_data"}, book=book, backend=backend)
>> engine.run()
>>
>> Now it suspends after running interrupt task, I can now reload the flow
>> from the saved factory method without passing parameter again.
>> for flow_detail_2 in book:
>>  engine2 = taskflow.engines.load_from_detail(flow_detail_2,
>> backend=backend)
>>
>> engine2.run()
>>
>> Let me know if this is ok or is there a better approach to achieve this?
>>
>
> There are a few other ways, but this one should be the currently
> recommended one.
>
> An idea, do u want to maybe update (submit a review to update) the docs,
> if u didn't find this very easy to figure out so that others can more
> easily figure it out. I'm sure that would be appreciated by all.
>
>
>> -Thanks
>>
>>
>> On Wed, Jan 27, 2016 at 12:03 AM, Joshua Harlow > > wrote:
>>
>> Hi there,
>>
>> Michał is correct, it should be saved.
>>
>> Do u have a small example of what u are trying to do because that
>> will help determine if what u are doing will be saved or whether it
>> will not be.
>>
>> Or even possibly explaining what is being done would be fine to
>> (more data/info for me to reason about what should be stored in your
>> case).
>>
>> Thanks,
>>
>> Josh
>>
>>
>> Michał Dulko wrote:
>>
>> On 01/26/2016 10:23 AM, pn kk wrote:
>>
>> Hi,
>>
>> I use taskflow for job management and now trying to persist
>> the state
>> of flows/tasks in mysql to recover incase of process crashes.
>>
>> I could see the state and the task results stored in the
>> database.
>>
>> Now I am looking for some way to store the input parameters
>> of the tasks.
>>
>> Please share your inputs to achieve this.
>>
>> -Thanks
>>
>> I've played with that some time ago and if I recall correctly
>> input
>> parameters should be available in the flow's storage, which
>> means these
>> are also saved to the DB. Take a look on resume_workflows method
>> on my
>> old PoC [1] (hopefully TaskFlow haven't changed much since then).
>>
>> [1]
>>
>> https://review.openstack.org/#/c/152200/4/cinder/scheduler/manager.py
>>
>>
>> __
>> OpenStack Development Mailing List (not for usage questions)
>> Unsubscribe:
>> openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
>> <
>> http://openstack-dev-requ...@lists.openstack.org?subject:unsubscribe>
>> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>>
>>
>>
>> __
>> OpenStack Development Mailing List (not for usage questions)
>> Unsubscribe:
>> openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
>> > >
>> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>>
>>
>> __
>> OpenStack Development Mailing List (not for usage questions)
>> Unsubscribe:
>> openstack-dev-requ...@lists.openstack.

Re: [openstack-dev] [TaskFlow] TaskFlow persistence

2016-01-27 Thread Joshua Harlow

pn kk wrote:

Hi,

Thanks for the responses. Putting it in a small example

def flow_factory(tmp):
 return lf.Flow('resume from backend example').add(
 TestTask(name='first', test=tmp),
 InterruptTask(name='boom'),
 TestTask(name='second', test="second task"))


class TestTask(task.Task):
 def __init__(self, name, provides=None, test, **kwargs):
 self.test=test
 super(TestTask, self).__init__(name, provides, **kwargs)
 def execute(self, *args, **kwargs):
 print('executing %s' % self)
 return 'ok'

class InterruptTask(task.Task):
 def execute(self, *args, **kwargs):
 # DO NOT TRY THIS AT HOME
 engine.suspend()

I was searching for a way in which I can reload the flow after crash
without passing the parameter "tmp" shown above
Looks like "load_from_factory" gives this provision.


Thanks for the example, ya, this is one such way to do this that u have 
found, there are a few other ways, but that is the main one that people 
should be using.





engine = taskflow.engines.load_from_factory(flow_factory=flow_factory,
factory_kwargs={"tmp":"test_data"}, book=book, backend=backend)
engine.run()

Now it suspends after running interrupt task, I can now reload the flow
from the saved factory method without passing parameter again.
for flow_detail_2 in book:
 engine2 = taskflow.engines.load_from_detail(flow_detail_2,
backend=backend)

engine2.run()

Let me know if this is ok or is there a better approach to achieve this?


There are a few other ways, but this one should be the currently 
recommended one.


An idea, do u want to maybe update (submit a review to update) the docs, 
if u didn't find this very easy to figure out so that others can more 
easily figure it out. I'm sure that would be appreciated by all.




-Thanks


On Wed, Jan 27, 2016 at 12:03 AM, Joshua Harlow mailto:harlo...@fastmail.com>> wrote:

Hi there,

Michał is correct, it should be saved.

Do u have a small example of what u are trying to do because that
will help determine if what u are doing will be saved or whether it
will not be.

Or even possibly explaining what is being done would be fine to
(more data/info for me to reason about what should be stored in your
case).

Thanks,

Josh


Michał Dulko wrote:

On 01/26/2016 10:23 AM, pn kk wrote:

Hi,

I use taskflow for job management and now trying to persist
the state
of flows/tasks in mysql to recover incase of process crashes.

I could see the state and the task results stored in the
database.

Now I am looking for some way to store the input parameters
of the tasks.

Please share your inputs to achieve this.

-Thanks

I've played with that some time ago and if I recall correctly input
parameters should be available in the flow's storage, which
means these
are also saved to the DB. Take a look on resume_workflows method
on my
old PoC [1] (hopefully TaskFlow haven't changed much since then).

[1]
https://review.openstack.org/#/c/152200/4/cinder/scheduler/manager.py


__
OpenStack Development Mailing List (not for usage questions)
Unsubscribe:
openstack-dev-requ...@lists.openstack.org?subject:unsubscribe

http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev


__
OpenStack Development Mailing List (not for usage questions)
Unsubscribe:
openstack-dev-requ...@lists.openstack.org?subject:unsubscribe

http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev


__
OpenStack Development Mailing List (not for usage questions)
Unsubscribe: openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev


__
OpenStack Development Mailing List (not for usage questions)
Unsubscribe: openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev


Re: [openstack-dev] [TaskFlow] TaskFlow persistence

2016-01-27 Thread pn kk
Hi,

Thanks for the responses. Putting it in a small example

def flow_factory(tmp):
return lf.Flow('resume from backend example').add(
TestTask(name='first', test=tmp),
InterruptTask(name='boom'),
TestTask(name='second', test="second task"))


class TestTask(task.Task):
def __init__(self, name, provides=None, test, **kwargs):
self.test=test
super(TestTask, self).__init__(name, provides, **kwargs)
def execute(self, *args, **kwargs):
print('executing %s' % self)
return 'ok'

class InterruptTask(task.Task):
def execute(self, *args, **kwargs):
# DO NOT TRY THIS AT HOME
engine.suspend()

I was searching for a way in which I can reload the flow after crash
without passing the parameter "tmp" shown above
Looks like "load_from_factory" gives this provision.


engine = taskflow.engines.load_from_factory(flow_factory=flow_factory,
factory_kwargs={"tmp":"test_data"}, book=book, backend=backend)
engine.run()

Now it suspends after running interrupt task, I can now reload the flow
from the saved factory method without passing parameter again.
for flow_detail_2 in book:
engine2 = taskflow.engines.load_from_detail(flow_detail_2,
backend=backend)

engine2.run()

Let me know if this is ok or is there a better approach to achieve this?

-Thanks


On Wed, Jan 27, 2016 at 12:03 AM, Joshua Harlow 
wrote:

> Hi there,
>
> Michał is correct, it should be saved.
>
> Do u have a small example of what u are trying to do because that will
> help determine if what u are doing will be saved or whether it will not be.
>
> Or even possibly explaining what is being done would be fine to (more
> data/info for me to reason about what should be stored in your case).
>
> Thanks,
>
> Josh
>
>
> Michał Dulko wrote:
>
>> On 01/26/2016 10:23 AM, pn kk wrote:
>>
>>> Hi,
>>>
>>> I use taskflow for job management and now trying to persist the state
>>> of flows/tasks in mysql to recover incase of process crashes.
>>>
>>> I could see the state and the task results stored in the database.
>>>
>>> Now I am looking for some way to store the input parameters of the tasks.
>>>
>>> Please share your inputs to achieve this.
>>>
>>> -Thanks
>>>
>>> I've played with that some time ago and if I recall correctly input
>> parameters should be available in the flow's storage, which means these
>> are also saved to the DB. Take a look on resume_workflows method on my
>> old PoC [1] (hopefully TaskFlow haven't changed much since then).
>>
>> [1] https://review.openstack.org/#/c/152200/4/cinder/scheduler/manager.py
>>
>> __
>> OpenStack Development Mailing List (not for usage questions)
>> Unsubscribe:
>> openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
>> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>>
>
> __
> OpenStack Development Mailing List (not for usage questions)
> Unsubscribe: openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>
__
OpenStack Development Mailing List (not for usage questions)
Unsubscribe: openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev


Re: [openstack-dev] [TaskFlow] TaskFlow persistence

2016-01-26 Thread Joshua Harlow

Hi there,

Michał is correct, it should be saved.

Do u have a small example of what u are trying to do because that will 
help determine if what u are doing will be saved or whether it will not be.


Or even possibly explaining what is being done would be fine to (more 
data/info for me to reason about what should be stored in your case).


Thanks,

Josh

Michał Dulko wrote:

On 01/26/2016 10:23 AM, pn kk wrote:

Hi,

I use taskflow for job management and now trying to persist the state
of flows/tasks in mysql to recover incase of process crashes.

I could see the state and the task results stored in the database.

Now I am looking for some way to store the input parameters of the tasks.

Please share your inputs to achieve this.

-Thanks


I've played with that some time ago and if I recall correctly input
parameters should be available in the flow's storage, which means these
are also saved to the DB. Take a look on resume_workflows method on my
old PoC [1] (hopefully TaskFlow haven't changed much since then).

[1] https://review.openstack.org/#/c/152200/4/cinder/scheduler/manager.py

__
OpenStack Development Mailing List (not for usage questions)
Unsubscribe: openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev


__
OpenStack Development Mailing List (not for usage questions)
Unsubscribe: openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev


Re: [openstack-dev] [TaskFlow] TaskFlow persistence

2016-01-26 Thread Michał Dulko
On 01/26/2016 10:23 AM, pn kk wrote:
> Hi,
>
> I use taskflow for job management and now trying to persist the state
> of flows/tasks in mysql to recover incase of process crashes.
>
> I could see the state and the task results stored in the database.
>
> Now I am looking for some way to store the input parameters of the tasks.
>
> Please share your inputs to achieve this.
>
> -Thanks
>
I've played with that some time ago and if I recall correctly input
parameters should be available in the flow's storage, which means these
are also saved to the DB. Take a look on resume_workflows method on my
old PoC [1] (hopefully TaskFlow haven't changed much since then).

[1] https://review.openstack.org/#/c/152200/4/cinder/scheduler/manager.py

__
OpenStack Development Mailing List (not for usage questions)
Unsubscribe: openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev


[openstack-dev] [TaskFlow] TaskFlow persistence

2016-01-26 Thread pn kk
Hi,

I use taskflow for job management and now trying to persist the state of
flows/tasks in mysql to recover incase of process crashes.

I could see the state and the task results stored in the database.

Now I am looking for some way to store the input parameters of the tasks.

Please share your inputs to achieve this.

-Thanks
__
OpenStack Development Mailing List (not for usage questions)
Unsubscribe: openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev