I'm just starting with POE, what I'm attempting to build is a virtual computer
for a large optimization problem.

The basic concept is a main perl script starts up on a master computer, which
is going to compute a very large optimization, and each step of the optimization
running ~200 or so jobs.  The only thing that will vary from one step of the 
optimization to the next is a set of coefficients.

The data for each job is somewhat large (1 to 3 megabytes), so I don't want to 
retransmit
The data to the network peer everytime, which means I need to have a little 
negotiation
between the peer and the master computer to see if the only thing needed to 
retransmit
is the set of coefficients.

I started with a simple POE script for the master server, and just plain 
vanilla perl
for the peers that will perform the compute tasks.  Plain vanilla perl on the 
peer side
*greatly* simplifies how this will eventually be used.  

I began using this child_start:

===================================================
sub child_start {
  my ($heap, $socket, $addr) = @_[HEAP, ARG0, ARG1];

  $heap->{readwrite} = new POE::Wheel::ReadWrite
    ( Handle => $socket,
      Driver => new POE::Driver::SysRW (),
      Filter => new POE::Filter::Line (),
      InputEvent   => 'child_input',
      ErrorEvent   => 'child_error',
    );
  $heap->{peername} = join ':', @_[ARG1, ARG2];
  $child_state{$heap->{peername}} = CLIENT_INITIALIZE ;
  $child_heap{$heap->{peername}} = $heap ;
  $heap->{readwrite}->put( "Hello, client!, your IP addr is $addr" );
  print "CHILD: Connected to $heap->{peername}.\n";
}
====================================================

This is all well and good, works as expected in the "child_input" function.

Next, I tested an alarm paradigm to let a master process run the optimization:

=========================================================
sub master_process {
  my $n = (keys %child_state) ;
  foreach my $c (keys %child_state) {
      $child_heap{$c}->{readwrite}->put( "You need to get a job" ) if 
$child_state{$c} == CLIENT_IDLE ;
  }
  print STDERR "There appear to be $n children connected and ready\n" ;
  print "tick at ", time(), "\n" ;
  $_[HEAP]->{next_alarm_time} = int(time())+2 ;
  $_[KERNEL]->alarm(tick => $_[HEAP]->{next_alarm_time}) ;
}
=========================================================

This worked as expected too.  The message "You need to get a job" is showing 
up at each of the peers.

Now I wanted to see if I could start a negotiation with the peers, you'll see 
several
lines commented out as I was playing with different approaches.

=========================================================

sub master_process {
  my $n = (keys %child_state) ;

  foreach my $c (keys %child_state) {
      if($child_state{$c} == CLIENT_IDLE) {
          my $resp ;
          #$child_heap{$c}->{readwrite}->event(InputEvent => undef) ;
          #$child_heap{$c}->{readwrite}->pause_input() ;

          print STDERR "Sending JOB request to child $c\n" ;
          $child_heap{$c}->{readwrite}->put("JOB : are you willing and ready?");

          my $bytes_in_buf = 
$child_heap{$c}->{readwrite}->get_driver_out_octets() ;

          while($bytes_in_buf > 0) {
              print STDERR "$bytes_in_buf bytes ready to flush to client\n" ;
              $child_heap{$c}->{readwrite}->flush();
              $bytes_in_buf = 
$child_heap{$c}->{readwrite}->get_driver_out_octets() ;
          }

          my $fh = $child_heap{$c}->{readwrite}->get_input_handle() ;

          $resp = <$fh>;
          print "child response to JOB is $resp\n" ;
          $child_heap{$c}->{readwrite}->put("RUN date");
          $child_state{$c} = CLIENT_RUNNING_JOB ;
          #$child_heap{$c}->{readwrite}->event(InputEvent => \&child_input) ;
          $child_heap{$c}->{readwrite}->resume_input() ;
      }
  }

  #print STDERR "There appear to be $n children connected and ready\n" ;
  #print STDERR "tick at ", time(), "\n" ;

  $_[HEAP]->{next_alarm_time} = int(time())+2 ;
  $_[KERNEL]->alarm(tick => $_[HEAP]->{next_alarm_time}) ;
}

================================================
First time I tried this, I did not use the call to flush(), and nothing was 
showing
up at the peers, so I did try using the flush(), but it doesn't flush the 
output bytes.

It appears that for flushing to work, the POE kernel needs to have control.  
That
Would defeat my goal of making the job negotation an atomic operation.  I could
get around this by saving state information for the job negotiation, but I
would prefer not to open that can of worms if I don't have to.

The documentation says flush is experimental, so consider this feedback if you
see nothing else wrong in my testing thus far.

Any and all comments or suggestions are welcome.

Cheers,

Jeff Welty
Scientist
Weyerhaeuser Company


Reply via email to