Interesting error, that could be a bug and perhaps we should ensure upgrade is more thread-safe (with a lock on upgrade); can u open a bug @ bugs.launchpad.net/taskflow for that and we can try to add said lock (that should hopefully resolve what u are seeing, although if it doesn't then the bug lies somewhere else).

Thanks much!

-Josh

On 03/19/2016 08:45 AM, pnkk wrote:
Hi Joshua,

Thanks for all your inputs.
We are using this feature successfully. But I rarely see an issue
related to concurrency.

To give you a brief, we use eventlets and every job runs in a separate
eventlet thread.

In the job execution part, we use taskflow functionality and persist all
the details of the job.

There we try to get the backend as below. We do upgrade everytime when a
job is executed.

         backend_uri = cfg.CONF.db.sqlalchemy_url
         conf = {
             'connection': backend_uri,
         }
         self.backend = backends.fetch(conf)
         with contextlib.closing(self.backend.get_connection()) as conn:
             conn.upgrade()

Now when two jobs start executing at the same time, I see below error.
Failed upgrading database version
   DBAPIError: (exceptions.RuntimeError) reentrant call inside
<_io.BufferedReader name=14>

We have monkey patched eventlet, is it not supposed to take care of
these concurrency issues?

Below are the versions for related modules in use:
eventlet==0.17.4
taskflow==1.26.0
SQLAlchemy==1.0.9

Thanks,
Kanthi

On Fri, Feb 12, 2016 at 1:29 PM, Joshua Harlow <harlo...@fastmail.com
<mailto:harlo...@fastmail.com>> wrote:

    pn kk wrote:

        Hi Joshua,

        Yes, sure will do that once I get some window out of my work.

        One last query(hopefully :) ) , can the factory method be an
        instance
        method of a class?


    Instance methods are particularly hard to use (since they require an
    instance of an object to be useful); so I think the check u have hit
    is making sure that if the flow-factory is called to recreate the
    flow that it can do so without having import issues. Currently I
    believe it doesn't handle instance methods (due to the complexity of
    needing an instance attached to that method).

    Perhaps what u want is to provide a function that can be found like:

    def make_flow_factory():
        return FlowFactory().flow_factory

    Or make flow_factory a class or static method, which should have the
    same/similar effect.

    Hope that makes sense (more queries are fine and welcome!)


        I tried giving it as "FlowFactory().flow_factory", where
        FlowFactory is
        my class name. It failed with below error:
        ValueError: Flow factory <bound method FlowFactory.flow_factory of
        <__main__.FlowFactory instance at 0x2b43b48>> is not reimportable by
        name __builtin__.instance.flow_factory

        Thanks again
        -Kanthi


        On Thu, Jan 28, 2016 at 12:29 AM, Joshua Harlow
        <harlo...@fastmail.com <mailto:harlo...@fastmail.com>
        <mailto:harlo...@fastmail.com <mailto:harlo...@fastmail.com>>>
        wrote:

             pn kk wrote:

                 Hi,

                 Thanks for the responses. Putting it in a small example

                 def flow_factory(tmp):
                       return lf.Flow('resume from backend example').add(
                           TestTask(name='first', test=tmp),
                           InterruptTask(name='boom'),
                           TestTask(name='second', test="second task"))


                 class TestTask(task.Task):
                       def __init__(self, name, provides=None, test,
        **kwargs):
                           self.test=test
                           super(TestTask, self).__init__(name,
        provides, **kwargs)
                       def execute(self, *args, **kwargs):
                           print('executing %s' % self)
                           return 'ok'

                 class InterruptTask(task.Task):
                       def execute(self, *args, **kwargs):
                           # DO NOT TRY THIS AT HOME
                           engine.suspend()

                 I was searching for a way in which I can reload the
        flow after crash
                 without passing the parameter "tmp" shown above
                 Looks like "load_from_factory" gives this provision.


             Thanks for the example, ya, this is one such way to do this
        that u
             have found, there are a few other ways, but that is the
        main one
             that people should be using.



                 engine =

        taskflow.engines.load_from_factory(flow_factory=flow_factory,
                 factory_kwargs={"tmp":"test_data"}, book=book,
        backend=backend)
                 engine.run()

                 Now it suspends after running interrupt task, I can now
        reload
                 the flow
                 from the saved factory method without passing parameter
        again.
                 for flow_detail_2 in book:
                       engine2 =
        taskflow.engines.load_from_detail(flow_detail_2,
                 backend=backend)

                 engine2.run()

                 Let me know if this is ok or is there a better approach to
                 achieve this?


             There are a few other ways, but this one should be the
        currently
             recommended one.

             An idea, do u want to maybe update (submit a review to
        update) the
             docs, if u didn't find this very easy to figure out so that
        others
             can more easily figure it out. I'm sure that would be
        appreciated by
             all.


                 -Thanks


                 On Wed, Jan 27, 2016 at 12:03 AM, Joshua Harlow
                 <harlo...@fastmail.com <mailto:harlo...@fastmail.com>
        <mailto:harlo...@fastmail.com <mailto:harlo...@fastmail.com>>
                 <mailto:harlo...@fastmail.com
        <mailto:harlo...@fastmail.com> <mailto:harlo...@fastmail.com
        <mailto:harlo...@fastmail.com>>>>

                 wrote:

                      Hi there,

                      Michał is correct, it should be saved.

                      Do u have a small example of what u are trying to do
                 because that
                      will help determine if what u are doing will be
        saved or
                 whether it
                      will not be.

                      Or even possibly explaining what is being done
        would be fine to
                      (more data/info for me to reason about what should be
                 stored in your
                      case).

                      Thanks,

                      Josh


                      Michał Dulko wrote:

                          On 01/26/2016 10:23 AM, pn kk wrote:

                              Hi,

                              I use taskflow for job management and now
        trying to
                 persist
                              the state
                              of flows/tasks in mysql to recover incase of
                 process crashes.

                              I could see the state and the task results
        stored
                 in the
                              database.

                              Now I am looking for some way to store the
        input
                 parameters
                              of the tasks.

                              Please share your inputs to achieve this.

                              -Thanks

                          I've played with that some time ago and if I
        recall
                 correctly input
                          parameters should be available in the flow's
        storage, which
                          means these
                          are also saved to the DB. Take a look on
                 resume_workflows method
                          on my
                          old PoC [1] (hopefully TaskFlow haven't
        changed much
                 since then).

                          [1]
        https://review.openstack.org/#/c/152200/4/cinder/scheduler/manager.py



        
__________________________________________________________________________
                          OpenStack Development Mailing List (not for usage
                 questions)
                          Unsubscribe:
        openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
        <http://openstack-dev-requ...@lists.openstack.org?subject:unsubscribe>

        <http://openstack-dev-requ...@lists.openstack.org?subject:unsubscribe>

        <http://openstack-dev-requ...@lists.openstack.org?subject:unsubscribe>
        http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev




        
__________________________________________________________________________
                      OpenStack Development Mailing List (not for usage
        questions)
                      Unsubscribe:
        openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
        <http://openstack-dev-requ...@lists.openstack.org?subject:unsubscribe>

        <http://openstack-dev-requ...@lists.openstack.org?subject:unsubscribe>

        <http://openstack-dev-requ...@lists.openstack.org?subject:unsubscribe>
        http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev



        
__________________________________________________________________________
                 OpenStack Development Mailing List (not for usage
        questions)
                 Unsubscribe:
        openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
        <http://openstack-dev-requ...@lists.openstack.org?subject:unsubscribe>

        <http://openstack-dev-requ...@lists.openstack.org?subject:unsubscribe>
        http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev



        
__________________________________________________________________________
             OpenStack Development Mailing List (not for usage questions)
             Unsubscribe:
        openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
        <http://openstack-dev-requ...@lists.openstack.org?subject:unsubscribe>

        <http://openstack-dev-requ...@lists.openstack.org?subject:unsubscribe>
        http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev


        
__________________________________________________________________________
        OpenStack Development Mailing List (not for usage questions)
        Unsubscribe:
        openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
        <http://openstack-dev-requ...@lists.openstack.org?subject:unsubscribe>
        http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev


    __________________________________________________________________________
    OpenStack Development Mailing List (not for usage questions)
    Unsubscribe:
    openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
    <http://openstack-dev-requ...@lists.openstack.org?subject:unsubscribe>
    http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev




__________________________________________________________________________
OpenStack Development Mailing List (not for usage questions)
Unsubscribe: openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev


__________________________________________________________________________
OpenStack Development Mailing List (not for usage questions)
Unsubscribe: openstack-dev-requ...@lists.openstack.org?subject:unsubscribe
http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev

Reply via email to