I'm just starting with POE, what I'm attempting to build is a virtual computer for a large optimization problem.
The basic concept is a main perl script starts up on a master computer, which is going to compute a very large optimization, and each step of the optimization running ~200 or so jobs. The only thing that will vary from one step of the optimization to the next is a set of coefficients. The data for each job is somewhat large (1 to 3 megabytes), so I don't want to retransmit The data to the network peer everytime, which means I need to have a little negotiation between the peer and the master computer to see if the only thing needed to retransmit is the set of coefficients. I started with a simple POE script for the master server, and just plain vanilla perl for the peers that will perform the compute tasks. Plain vanilla perl on the peer side *greatly* simplifies how this will eventually be used. I began using this child_start: =================================================== sub child_start { my ($heap, $socket, $addr) = @_[HEAP, ARG0, ARG1]; $heap->{readwrite} = new POE::Wheel::ReadWrite ( Handle => $socket, Driver => new POE::Driver::SysRW (), Filter => new POE::Filter::Line (), InputEvent => 'child_input', ErrorEvent => 'child_error', ); $heap->{peername} = join ':', @_[ARG1, ARG2]; $child_state{$heap->{peername}} = CLIENT_INITIALIZE ; $child_heap{$heap->{peername}} = $heap ; $heap->{readwrite}->put( "Hello, client!, your IP addr is $addr" ); print "CHILD: Connected to $heap->{peername}.\n"; } ==================================================== This is all well and good, works as expected in the "child_input" function. Next, I tested an alarm paradigm to let a master process run the optimization: ========================================================= sub master_process { my $n = (keys %child_state) ; foreach my $c (keys %child_state) { $child_heap{$c}->{readwrite}->put( "You need to get a job" ) if $child_state{$c} == CLIENT_IDLE ; } print STDERR "There appear to be $n children connected and ready\n" ; print "tick at ", time(), "\n" ; $_[HEAP]->{next_alarm_time} = int(time())+2 ; $_[KERNEL]->alarm(tick => $_[HEAP]->{next_alarm_time}) ; } ========================================================= This worked as expected too. The message "You need to get a job" is showing up at each of the peers. Now I wanted to see if I could start a negotiation with the peers, you'll see several lines commented out as I was playing with different approaches. ========================================================= sub master_process { my $n = (keys %child_state) ; foreach my $c (keys %child_state) { if($child_state{$c} == CLIENT_IDLE) { my $resp ; #$child_heap{$c}->{readwrite}->event(InputEvent => undef) ; #$child_heap{$c}->{readwrite}->pause_input() ; print STDERR "Sending JOB request to child $c\n" ; $child_heap{$c}->{readwrite}->put("JOB : are you willing and ready?"); my $bytes_in_buf = $child_heap{$c}->{readwrite}->get_driver_out_octets() ; while($bytes_in_buf > 0) { print STDERR "$bytes_in_buf bytes ready to flush to client\n" ; $child_heap{$c}->{readwrite}->flush(); $bytes_in_buf = $child_heap{$c}->{readwrite}->get_driver_out_octets() ; } my $fh = $child_heap{$c}->{readwrite}->get_input_handle() ; $resp = <$fh>; print "child response to JOB is $resp\n" ; $child_heap{$c}->{readwrite}->put("RUN date"); $child_state{$c} = CLIENT_RUNNING_JOB ; #$child_heap{$c}->{readwrite}->event(InputEvent => \&child_input) ; $child_heap{$c}->{readwrite}->resume_input() ; } } #print STDERR "There appear to be $n children connected and ready\n" ; #print STDERR "tick at ", time(), "\n" ; $_[HEAP]->{next_alarm_time} = int(time())+2 ; $_[KERNEL]->alarm(tick => $_[HEAP]->{next_alarm_time}) ; } ================================================ First time I tried this, I did not use the call to flush(), and nothing was showing up at the peers, so I did try using the flush(), but it doesn't flush the output bytes. It appears that for flushing to work, the POE kernel needs to have control. That Would defeat my goal of making the job negotation an atomic operation. I could get around this by saving state information for the job negotiation, but I would prefer not to open that can of worms if I don't have to. The documentation says flush is experimental, so consider this feedback if you see nothing else wrong in my testing thus far. Any and all comments or suggestions are welcome. Cheers, Jeff Welty Scientist Weyerhaeuser Company