John,
I could come up with first cut implementation of MapReduce using
Rufus.
It will be great if you can suggest any improvements for it.
Thanks.
With regards,
Harshal
---8<---
require 'rubygems'
require 'openwfe/def'
require 'openwfe/workitem'
require 'openwfe/engine/engine'
require 'openwfe/expressions/raw_prog'
require 'openwfe/participants/soapparticipants'
require 'pp'
module SchedulingPolicy
def create_execution_plan(tasks, workers)
end
def create_next_execution_job(tasks, worker)
end
end
module FlatSchedulingPolicy
extend SchedulingPolicy
# use as standalone function
def chunk_array(array, pieces=2)
len = array.length;
mid = (len/pieces)
chunks = []
start = 0
1.upto(pieces) do |i|
last = start+mid
last = last-1 unless len%pieces >= i
chunks << array[start..last] || []
start = last+1
end
chunks
end
def create_execution_plan(tasks, workers)
tasks = self.chunk_array(tasks, workers.size)
tasks
end
def create_next_execution_job(tasks, worker)
#raise
UnsupportedOperationException.new("#{}:create_next_execution_job")
end
end
class ExecutionJob
attr_accessor :worker
attr_accessor :tasks
end
class ExecutionPlan
attr_accessor :executions
end
class Schedule
attr_accessor :workers
attr_accessor :tasks
def initialize(workers, tasks)
self.workers = workers
self.tasks = tasks
end
def add_task(task)
self.tasks << task
self
end
def add_worker(worker)
self.workers << worker
self
end
end
module Mechanism
end
module SchedulingMechanism
extend Mechanism
end
class MapReduceSchedulingMechanism < OpenWFE::ProcessDefinition
include SchedulingMechanism
param :field => 'execution_plan'
sequence do
concurrent_iterator :on_value => "$
{f:executions_plan}", :to_variable => 'execution' do
participant :alice, :task => "${exection.tasks}", :target => "$
{execution.worker}"
end
participant :summarize
end
def initialize
super
end
end
#
# launching the process
class Scheduler
attr_accessor :schedule
attr_accessor :policy
attr_accessor :mechanism
attr_accessor :engine
class Policy
include FlatSchedulingPolicy
def create_execution_plan(tasks, workers)
tasks = self.chunk_array(tasks, workers.size)
tasks
end
def create_next_execution_job(tasks, worker)
#raise
UnsupportedOperationException.new("#{}:create_next_execution_job")
end
def apply(schedule)
self.create_execution_plan(schedule.tasks, schedule.workers)
end
end
def initialize(mechanism = nil, policy = nil)
self.policy = policy
self.mechanism = mechanism
self.engine = OpenWFE::Engine.new
end
def precondition()
return false if self.schedule.nil?
return false if self.policy.nil?
return false if self.mechanism.nil?
return false if self.engine.nil?
true
end
def postcondition()
end
def apply(schedule, mechanism = nil, policy = nil, &block)
self.schedule = schedule
self.mechanism = mechanism unless mechanism.nil?
self.policy = policy unless policy.nil?
self.policy = self.class::Policy.new if self.policy.nil?
return unless self.precondition
execution_plan = nil
if block_given?
execution_plan = yield self.schedule
else
execution_plan = self.policy.apply(self.schedule)
end
li = OpenWFE::LaunchItem.new(self.mechanism)
li.set_field('plan', execution_plan)
li.initial_comment = 'please give your impressions about
http://ruby-lang.org'
fei = self.engine.launch(li)
puts 'To wait...'
self.engine.wait_for fei
puts 'Done!'
return self.postcondition
end
end
class MapReduceScheduler < Scheduler
attr_reader :results
class Mechanism < MapReduceSchedulingMechanism
def initialize
super
end
end
def initialize(policy = nil)
super(MapReduceSchedulingMechanism, policy)
self.engine.register_participant :summarize do |workitem|
self.reduce(self.results)
puts
puts "summary of process #{workitem.fei.workflow_instance_id}"
workitem.attributes.each do |k, v|
next unless k.match ".*_comment$"
puts " - #{k} : '#{v}'"
end
end
self.results = []
self.engine.register_participant :alice do |workitem|
self.results << self.map(workitem)
end
end
def map(workitem)
puts 'alice got a workitem...'
begin
workitem.alice_comment = 'this thing looks interesting'
sleep((rand*100)%3)
puts " #{workitem.params['worker']}:
#{workitem.params['tasks']}: Sending task\n"
sleep((rand*100)%3)
puts " #{workitem.params['worker']}:
#{workitem.params['tasks']}: Done!\n"
rescue Error => error
puts "Error occured"
end
end
def reduce(results)
results
end
end
--->8---
On Mar 7, 2:05 pm, "John Mettraux" <[EMAIL PROTECTED]> wrote:
> On Fri, Mar 7, 2008 at 6:03 PM, harshal <[EMAIL PROTECTED]> wrote:
>
> > Can we have code as followed?
>
> > ---8<---
> > class Base < OpenWFE::ProcessDefinition
> > end
>
> > class Derived < Base
> > sequence do
> > ...
> > end
> > end
> > --->8---
>
> Hello,
>
> I haven't tried, but I guess that yes.
>
> Cheers,
>
> --
> John Mettraux -///- http://jmettraux.openwfe.org
--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups
"OpenWFEru dev" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to [EMAIL PROTECTED]
For more options, visit this group at
http://groups.google.com/group/openwferu-dev?hl=en
-~----------~----~----~----~------~----~------~--~---