cvsuser     06/02/28 19:32:49

  Modified:    App-Context/lib/App/Context Cluster.pm Server.pm
  Log:
  send_async_event()
  
  Revision  Changes    Path
  1.3       +2 -12     p5ee/App-Context/lib/App/Context/Cluster.pm
  
  Index: Cluster.pm
  ===================================================================
  RCS file: /cvs/public/p5ee/App-Context/lib/App/Context/Cluster.pm,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- Cluster.pm        14 Mar 2005 17:35:20 -0000      1.2
  +++ Cluster.pm        1 Mar 2006 03:32:49 -0000       1.3
  @@ -6,7 +6,7 @@
   package App::Context::Cluster;
   
   use App;
  -use App::Context;
  +use App::Context::Server;
   
   @ISA = ( "App::Context::Server" );
   
  @@ -105,16 +105,6 @@
       }
   }
   
  -sub process_msg {
  -    my ($self, $connection_socket, $msg) = @_;
  -    if ($self->{is_controller}) {
  -        $self->ctrl_process_msg($connection_socket, $msg);
  -    }
  -    else {
  -        $self->node_process_msg($connection_socket, $msg);
  -    }
  -}
  -
   ######################################################################
   # Controller
   ######################################################################
  
  
  
  1.2       +104 -30   p5ee/App-Context/lib/App/Context/Server.pm
  
  Index: Server.pm
  ===================================================================
  RCS file: /cvs/public/p5ee/App-Context/lib/App/Context/Server.pm,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- Server.pm 14 Mar 2005 17:35:20 -0000      1.1
  +++ Server.pm 1 Mar 2006 03:32:49 -0000       1.2
  @@ -49,6 +49,9 @@
       $host =~ s/\..*//;   # get rid of fully qualified domain name
       $self->{host} = $host;
   
  +    $self->{pending_async_events} = [];
  +    $self->{pending_callback_event} = {};
  +
       $self->{verbose} = $options->{verbose};
   
       $self->_init2a($options);
  @@ -136,25 +139,32 @@
       my $listen_socket = $self->{listen_socket};
       my ($connection_socket, $msg);
       my ($event, @events);
  -    my ($time, $time_of_next_event, $sleep_interval);
  +    my ($time, $time_of_next_event, $sleep_interval, $event_occurred);
       my ($pid, $exitval, $sig);
       while (!$quit) {
           eval {
  +            $event_occurred = 0;
  +            if ($#{$self->{pending_async_events}} > -1) {
  +                $self->dispatch_pending_async_events();
  +                $event_occurred = 1;
  +            }
  +            while (($pid = waitpid(-1,WNOHANG)) > 0) {
  +                $event_occurred = 1;
  +                $exitval = $? >> 8;
  +                $sig     = $? & 255;
  +                $self->log("Child $pid finished 
[exitval=$exitval,sig=$sig]\n");
  +                $self->finish_pid($pid, $exitval, $sig);
  +            }
               $self->log("Checking for scheduled events.\n") if ($verbose >= 
8);
               $time = time();
               $time_of_next_event = $self->get_current_events([EMAIL 
PROTECTED], $time);
               if ($#events > -1) {
  +                $event_occurred = 1;
                   foreach $event (@events) {
                       $self->send_event($event);
                   }
                   $time = time();
               }
  -            while (($pid = waitpid(-1,WNOHANG)) > 0) {
  -                $exitval = $? >> 8;
  -                $sig     = $? & 255;
  -                $self->log("Child $pid finished 
[exitval=$exitval,sig=$sig]\n");
  -                $self->cleanup_pid($pid, $exitval, $sig);
  -            }
               $time = time();
               if ($time_of_next_event > 0) {
                   $sleep_interval = $time_of_next_event - $time;
  @@ -163,21 +173,23 @@
               else {
                   $sleep_interval = $default_sleep_interval;
               }
  -            # TODO: if (sleep_interval == 0), use select() to see if anyone 
is waiting, else ...
  -            $self->log("Sleeping on socket: timeout($sleep_interval)\n") if 
($verbose >= 3);
  -            $listen_socket->timeout($sleep_interval);
  -            $connection_socket = $listen_socket->accept();
  -            if ($connection_socket) {
  -                $connection_socket->autoflush(1);
  -                $msg = <$connection_socket>;
  -                $msg =~ s/[\015\012]+$//;
  -                if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
  -                    $quit = 1;
  -                }
  -                else {
  -                    $self->process_msg($connection_socket, $msg);
  +            if (!$event_occurred) {
  +                # TODO: if (sleep_interval == 0), use select() to see if 
anyone is waiting, else ...
  +                $self->log("Sleeping on socket: timeout($sleep_interval)\n") 
if ($verbose >= 3);
  +                $listen_socket->timeout($sleep_interval);
  +                $connection_socket = $listen_socket->accept();
  +                if ($connection_socket) {
  +                    $connection_socket->autoflush(1);
  +                    $msg = <$connection_socket>;
  +                    $msg =~ s/[\015\012]+$//;
  +                    if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
  +                        $quit = 1;
  +                    }
  +                    else {
  +                        $self->process_msg($connection_socket, $msg);
  +                    }
  +                    $connection_socket->close();
                   }
  -                $connection_socket->close();
               }
           };
           if ($@) {
  @@ -206,30 +218,92 @@
   sub process_msg {
       my ($self, $connection_socket, $msg) = @_;
       $self->log("process_msg: [$msg]\n");
  +    my $verbose = $self->{verbose};
  +    my $processing_complete = $self->process_app_msg($connection_socket, 
$msg);
  +    if (!$processing_complete) {
  +        if ($msg =~ s/^GET//) {
  +            $connection_socket->print("HTTP/1.0 200 OK\n");
  +            $connection_socket->print("Content-type: text/plain\n");
  +            $connection_socket->print("\n");
  +            $connection_socket->print("$self->{host}:$self->{port} as a 
server\n");
  +            $connection_socket->print($self->state());
  +        }
  +        else {
  +            $connection_socket->print("unknown [$msg]\n");
  +        }
  +    }
  +    &App::sub_exit() if ($App::trace);
  +}
  +
  +sub process_app_msg {
  +    &App::sub_entry if ($App::trace);
  +    my ($self, $connection_socket, $msg) = @_;
  +    my $processing_complete = 0;
  +    &App::sub_exit($processing_complete) if ($App::trace);
  +    return($processing_complete);
  +}
  +
  +sub state {
  +    &App::sub_entry if ($App::trace);
  +    my ($self) = @_;
  +    my $state = "";
  +    &App::sub_exit($state) if ($App::trace);
  +    return($state);
   }
   
   # TODO: Implement this as a fork() or a context-level message to a node to 
fork().
   #       i.e. messages such as "EVENT:" and "EVENT-OK:"
   #       Save the callback_event according to an event_tag.
   #       Then implement cleanup_pid to send the callback_event.
  +
   sub send_async_event {
       &App::sub_entry if ($App::trace);
       my ($self, $event, $callback_event) = @_;
  -    $self->send_event($event);
  +    push(@{$self->{pending_async_events}}, [ $event, $callback_event ]);
  +    &App::sub_exit() if ($App::trace);
  +}
  +
  +sub dispatch_pending_async_events {
  +    &App::sub_entry if ($App::trace);
  +    my ($self) = @_;
  +    my $pending_async_events = $self->{pending_async_events};
  +    my ($async_event);
  +    while ($#$pending_async_events > -1) {
  +        $async_event = shift(@$pending_async_events);
  +        $self->send_async_event_now(@$async_event);
  +    }
  +    &App::sub_exit() if ($App::trace);
  +}
  +
  +sub send_async_event_now {
  +    &App::sub_entry if ($App::trace);
  +    my ($self, $event, $callback_event) = @_;
  +    my $pid = fork();
  +    if (!$pid) {   # running in child
  +        $self->send_event($event);
  +        exit(0);
  +    }
       if ($callback_event) {
  -        my $event_tag = "local-$$";
  -        if (! $callback_event->{args}) {
  -            $callback_event->{args} = [ $event_tag ];
  -        }
  -        $self->send_event($callback_event);
  +        my $event_tag = "local-$pid";
  +        $self->{pending_callback_event}{$event_tag} = $callback_event;
       }
       &App::sub_exit() if ($App::trace);
   }
   
  -sub cleanup_pid {
  +sub finish_pid {
       &App::sub_entry if ($App::trace);
       my ($self, $pid, $exitval, $sig) = @_;
  -    # nothing to do
  +
  +    my $event_tag = "local-$pid";
  +    my $callback_event = $self->{pending_callback_event}{$event_tag};
  +    if ($callback_event) {
  +        delete $self->{pending_callback_event}{$event_tag};
  +        $callback_event->{args} = [] if (! $callback_event->{args});
  +        my $message = (!$exitval || !$sig) ? "Error $exitval on $pid 
[sig=$sig]" : "";
  +        push(@{$callback_event->{args}}, {location => $event_tag, returnval 
=> $exitval, message => "Sig $sig"});
  +        $self->send_event($callback_event);
  +    }
  +
       &App::sub_exit() if ($App::trace);
   }
   
  
  
  

Reply via email to