John,

I could come up with first cut implementation of MapReduce using
Rufus.
It will be great if you can suggest any improvements for it.

Thanks.

With regards,
Harshal
---8<---
require 'rubygems'
require 'openwfe/def'
require 'openwfe/workitem'
require 'openwfe/engine/engine'

require 'openwfe/expressions/raw_prog'
require 'openwfe/participants/soapparticipants'
require 'pp'

module SchedulingPolicy

  def create_execution_plan(tasks, workers)
  end

  def create_next_execution_job(tasks, worker)
  end

end

module FlatSchedulingPolicy

  extend SchedulingPolicy

  # use as standalone function
  def chunk_array(array, pieces=2)
    len = array.length;
    mid = (len/pieces)
    chunks = []
    start = 0
    1.upto(pieces) do |i|
      last = start+mid
      last = last-1 unless len%pieces >= i
      chunks << array[start..last] || []
      start = last+1
    end
    chunks
  end

  def create_execution_plan(tasks, workers)
    tasks = self.chunk_array(tasks, workers.size)
    tasks
  end

  def create_next_execution_job(tasks, worker)
    #raise
UnsupportedOperationException.new("#{}:create_next_execution_job")
  end

end


class ExecutionJob
  attr_accessor :worker
  attr_accessor :tasks
end

class ExecutionPlan
  attr_accessor :executions
end

class Schedule

  attr_accessor :workers
  attr_accessor :tasks

  def initialize(workers, tasks)
    self.workers = workers
    self.tasks = tasks
  end

  def add_task(task)
    self.tasks << task
    self
  end

  def add_worker(worker)
    self.workers << worker
    self
  end

end


module Mechanism

end

module SchedulingMechanism
  extend Mechanism
end

class MapReduceSchedulingMechanism < OpenWFE::ProcessDefinition

  include SchedulingMechanism

  param :field => 'execution_plan'

  sequence do
    concurrent_iterator :on_value => "$
{f:executions_plan}", :to_variable => 'execution' do
        participant :alice, :task => "${exection.tasks}", :target => "$
{execution.worker}"
    end
    participant :summarize
  end

  def initialize
    super
  end

end

#
# launching the process


class Scheduler

  attr_accessor :schedule
  attr_accessor :policy
  attr_accessor :mechanism
  attr_accessor :engine

  class Policy
    include FlatSchedulingPolicy

    def create_execution_plan(tasks, workers)
      tasks = self.chunk_array(tasks, workers.size)
      tasks
    end

    def create_next_execution_job(tasks, worker)
      #raise
UnsupportedOperationException.new("#{}:create_next_execution_job")
    end

    def apply(schedule)
      self.create_execution_plan(schedule.tasks, schedule.workers)
    end

  end

  def initialize(mechanism = nil, policy = nil)
    self.policy = policy
    self.mechanism = mechanism
    self.engine = OpenWFE::Engine.new
  end

  def precondition()

    return false if self.schedule.nil?
    return false if self.policy.nil?
    return false if self.mechanism.nil?
    return false if self.engine.nil?


    true
  end

  def postcondition()

  end

  def apply(schedule, mechanism = nil, policy = nil, &block)

    self.schedule = schedule
    self.mechanism = mechanism unless mechanism.nil?
    self.policy = policy unless policy.nil?

    self.policy = self.class::Policy.new if self.policy.nil?

    return unless self.precondition

    execution_plan = nil
    if block_given?
      execution_plan  = yield self.schedule
    else
      execution_plan = self.policy.apply(self.schedule)
    end
    li = OpenWFE::LaunchItem.new(self.mechanism)

    li.set_field('plan', execution_plan)

    li.initial_comment = 'please give your impressions about
http://ruby-lang.org'
    fei = self.engine.launch(li)

    puts 'To wait...'

    self.engine.wait_for fei

    puts 'Done!'

    return self.postcondition

  end

end

class MapReduceScheduler < Scheduler

  attr_reader   :results

  class Mechanism < MapReduceSchedulingMechanism

    def initialize
      super
    end

  end

  def initialize(policy = nil)

    super(MapReduceSchedulingMechanism, policy)

    self.engine.register_participant :summarize do |workitem|
      self.reduce(self.results)
      puts
      puts "summary of process #{workitem.fei.workflow_instance_id}"
      workitem.attributes.each do |k, v|
          next unless k.match ".*_comment$"
          puts " - #{k} : '#{v}'"
      end
    end

    self.results = []
    self.engine.register_participant :alice do |workitem|
      self.results << self.map(workitem)
    end
  end

  def map(workitem)
      puts 'alice got a workitem...'

      begin
        workitem.alice_comment = 'this thing looks interesting'
        sleep((rand*100)%3)
        puts " #{workitem.params['worker']}:
#{workitem.params['tasks']}: Sending task\n"
        sleep((rand*100)%3)
        puts " #{workitem.params['worker']}:
#{workitem.params['tasks']}: Done!\n"
      rescue Error => error
        puts "Error occured"
      end
  end

  def reduce(results)
    results
  end

end

--->8---

On Mar 7, 2:05 pm, "John Mettraux" <[EMAIL PROTECTED]> wrote:
> On Fri, Mar 7, 2008 at 6:03 PM, harshal <[EMAIL PROTECTED]> wrote:
>
> >  Can we have code as followed?
>
> >  ---8<---
> >  class Base < OpenWFE::ProcessDefinition
> >  end
>
> >  class Derived < Base
> >     sequence do
> >        ...
> >     end
> >  end
> >  --->8---
>
> Hello,
>
> I haven't tried, but I guess that yes.
>
> Cheers,
>
> --
> John Mettraux   -///-  http://jmettraux.openwfe.org
--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"OpenWFEru dev" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to [EMAIL PROTECTED]
For more options, visit this group at 
http://groups.google.com/group/openwferu-dev?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to