Hi all,
I'd just like to let everyone know a new feature in taskflow (that I think will
be benefical to various projects (reducing the duplication of similar code in
various projects that accomplish the same feature set). The new feature is an
ability to run tasks in remote-workers (the task transitions and state
persistence is still done in an 'orchestrating' engine). This means that the
engine no longer has to run tasks locally or in threads (or greenthreads) but
can run tasks on remote machines (anything that can be connected to a MQ via
kombu; TBD when this becomes oslo.messaging).
A simple example that might show how this works better for folks that have some
time to try it out.
---------
Pre-setup: git clone the taskflow repo and install it (in a venv or elsewhere),
install a mq server (rabbitmq for example).
---------
Lets now create two basic tasks (one that says hello and one that says goodbye).
class HelloWorldTask(task.Task):
default_provides = "hi_happened"
def execute(self):
LOG.info('hello world')
return time.time()
class GoodbyeWorldTask(task.Task):
default_provides = "goodbye_happened"
def execute(self, hi_happened):
LOG.info('goodbye world (hi said on %s)', hi_happened)
return time.time()
* Notice how the GoodbyeWorldTask requires an input 'hi_happened' (which is
produced by the HelloWorldTask).
Now lets create a workflow that combines these two together.
f = linear_flow.Flow("hi-bye")
f.add(HelloWorldTask())
f.add(GoodbyeWorldTask())
Notice here that we have specified a linear runtime order (that is hello will
be said before goodbye, this is also inherent in the dependency ordering since
the goodbye task requires 'hi_happened' to run, and the only way to satisfy
that dependency is to run the helloworld task before the goodbye task).
* If you are wondering what the heck this is (or why it is useful to have
these little task and flow classes) check out
https://wiki.openstack.org/wiki/TaskFlow#Structure
Now the fun begins!
We need a worker to accept requests to run tasks on so lets create a small
function that just does that.
def run_worker():
worker_conf = dict(MQ_CONF)
worker_conf.update({
# These are the available tasks that this worker has access to execute.
'tasks': [
HelloWorldTask,
GoodbyeWorldTask,
],
})
# Start this up and stop it on ctrl-c
worker = remote_worker.Worker(**worker_conf)
runner = threading.Thread(target=worker.run)
runner.start()
worker.wait()
while True:
try:
time.sleep(1)
except KeyboardInterrupt:
LOG.info("Dying...")
worker.stop()
runner.join()
break
And of course we need a function that will perform the orchestration of the
remote (or local tasks), this function starts the whole execution flow by
taking the workflow defined above and combining that workflow with an engine
that will run the individual tasks (and transfer data between those tasks as
needed).
* For those still wondering what an engine is (or what it offers) check out
https://wiki.openstack.org/wiki/TaskFlow#Engines and
https://wiki.openstack.org/wiki/TaskFlow/Patterns_and_Engines/Persistence#Big_Picture
(which hopefully will make it easier to understand why the concept exists in
the first place).
def run_engine():
# Make some remote tasks happen
f = lf.Flow("test")
f.add(HelloWorldTask())
f.add(GoodbyeWorldTask())
# Create a in-memory storage area where intermediate results will be
# saved (you can change this to a persistent backend if desired).
backend = impl_memory.MemoryBackend({})
_logbook, flowdetail = pu.temporary_flow_detail(backend)
engine_conf = dict(MQ_CONF)
engine_conf.update({
# This identifies what workers are accessible via what queues, this
# will be made better soon with reviews
https://review.openstack.org/#/c/75094/
# or similar.
'workers_info': {
'work': [
HelloWorldTask().name,
GoodbyeWorldTask().name,
],
}
})
LOG.info("Running workflow %s", f)
# Now run the engine.
e = engine.WorkerBasedActionEngine(f, flowdetail, backend, engine_conf)
with logging_listener.LoggingListener(e, level=logging.INFO):
e.run()
# See the results recieved.
print("Final results: %s" % (e.storage.fetch_all()))
Now once we have this two methods created we can actually start the worker and
the engine and watch the action happen. To do this without having to apply a
little more boilerplate (imports and such) the code above can be found at
http://paste.openstack.org/show/73071/.
To start a worker just do the following. Download the above paste to a file
named 'test.py' (and then modify the MQ_SERVER global to point to your MQ host)
and then run the following.
$ python test.py worker # starts a worker
$ python test.py # starts the main engine
You should start to see state transitions happening and the final engine result
being produced by the engine coordinating calls with remote-workers.
* For the engine the output should be similar to
http://paste.openstack.org/show/73072/, for the worker it should be similar to
http://paste.openstack.org/show/73073/
Hopefully this new model can be useful in the future to heat, glance, ... and
any others that would like to take advantage of said functionality (taskflow is
a library on pypi that anyone can and is encouraged to use).
Feel free to ask questions, concerns, or any other comments welcome (here or in
#openstack-state-management).
Thanks!
-Josh
_______________________________________________
OpenStack-dev mailing list
[email protected]
http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev