cvsuser     05/03/14 09:35:20

  Modified:    App-Context/lib/App/Context Cluster.pm
  Added:       App-Context/lib/App/Context Server.pm
  Log:
  evolving
  
  Revision  Changes    Path
  1.2       +36 -179   p5ee/App-Context/lib/App/Context/Cluster.pm
  
  Index: Cluster.pm
  ===================================================================
  RCS file: /cvs/public/p5ee/App-Context/lib/App/Context/Cluster.pm,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- Cluster.pm        28 Sep 2004 15:20:40 -0000      1.1
  +++ Cluster.pm        14 Mar 2005 17:35:20 -0000      1.2
  @@ -1,6 +1,6 @@
   
   #############################################################################
  -## $Id: Cluster.pm,v 1.1 2004/09/28 15:20:40 spadkins Exp $
  +## $Id: Cluster.pm,v 1.2 2005/03/14 17:35:20 spadkins Exp $
   #############################################################################
   
   package App::Context::Cluster;
  @@ -8,7 +8,7 @@
   use App;
   use App::Context;
   
  [EMAIL PROTECTED] = ( "App::Context" );
  [EMAIL PROTECTED] = ( "App::Context::Server" );
   
   use Sys::Hostname;
   use Socket;
  @@ -36,36 +36,24 @@
   
   =cut
   
  -sub _init {
  +sub _init2a {
       &App::sub_entry if ($App::trace);
       my ($self, $options) = @_;
  -    $options = {} if (!defined $options);
  -
  -    my $host = hostname;
  -    $self->{hostname} = $host;
  -    $host =~ s/\..*//;   # get rid of fully qualified domain name
  -    $self->{host} = $host;
  -
       $self->{controller_host} = $options->{controller_host};
       $self->{controller_port} = $options->{controller_port};
       $self->{is_controller}   = $options->{controller};
  -    $self->{verbose}         = $options->{verbose};
  -
       if ($self->{is_controller}) {
           $self->{port} = $options->{controller_port};
       }
       else {
           $self->{port} = $options->{node_port};
       }
  +    &App::sub_exit() if ($App::trace);
  +}
   
  -    my $listen_socket = IO::Socket::INET->new(
  -        Proto     => "tcp",
  -        LocalPort => $self->{port},
  -        Listen    => SOMAXCONN,
  -    ) || die "Unable to listen on $host:$self->{port} - $!";
  -
  -    $self->{listen_socket} = $listen_socket;
  -
  +sub _init2b {
  +    &App::sub_entry if ($App::trace);
  +    my ($self, $options) = @_;
       if ($self->{is_controller}) {
           $self->ctrl_init();
       }
  @@ -75,187 +63,56 @@
       &App::sub_exit() if ($App::trace);
   }
   
  -sub close_listen_socket {
  -    &App::sub_entry if ($App::trace);
  +sub dispatch_events_begin {
       my ($self) = @_;
  -    if ($self->{listen_socket}) {
  -        $self->{listen_socket}->close();
  -        delete $self->{listen_socket};
  -    }
  -    &App::sub_exit() if ($App::trace);
  -}
   
  -sub DESTROY {
  -    &App::sub_entry if ($App::trace);
  -    my ($self) = @_;
  -    $self->close_listen_socket();
  -    &App::sub_exit() if ($App::trace);
  -}
  +    my $verbose = $self->{verbose};
  +    my $role = $self->{is_controller} ? "Controller" : "Node";
   
  -sub dispatch_events {
  -    &App::sub_entry if ($App::trace);
  -    my ($self) = @_;
  +    print "[$$] Starting Cluster $role on $self->{host}:$self->{port}\n" if 
($verbose);
   
  -    my ($role, $port, $startup, $shutdown, $process_msg);
       if ($self->{is_controller}) {
  -        $role        = "Controller";
  -        $startup     = "ctrl_startup";
  -        $shutdown    = "ctrl_shutdown";
  -        $process_msg = "ctrl_process_msg";
  +        $self->ctrl_startup();
       }
       else {
  -        $role        = "Node";
  -        $startup     = "node_startup";
  -        $shutdown    = "node_shutdown";
  -        $process_msg = "node_process_msg";
  +        $self->node_startup();
       }
  -    my $verbose = $self->{verbose};
  +}
   
  -    print "[$$] Starting Cluster $role on $self->{host}:$self->{port}\n" if 
($verbose);
  +sub dispatch_events_end {
  +    my ($self) = @_;
   
  -    $self->$startup();
  +    my $verbose = $self->{verbose};
  +    my $role = $self->{is_controller} ? "Controller" : "Node";
   
  -    my $quit = 0;
  -    $SIG{USR1} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); };
  -    $SIG{USR2} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); };
  -    $SIG{HUP}  = sub { print "[$$] Caught Signal: @_\n" if ($verbose); };
  -    $SIG{INT}  = sub { print "[$$] Caught Signal: @_\n" if ($verbose); $quit 
= 1; };
  -    $SIG{QUIT} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); $quit 
= 1; };
  -    $SIG{TERM} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); $quit 
= 1; };
  -
  -    my $listen_socket = $self->{listen_socket};
  -    my ($connection_socket, $msg);
  -    my ($event, @events);
  -    my ($time, $time_of_next_event, $sleep_interval);
  -    my ($pid, $exitval, $sig);
  -    while (!$quit) {
  -        eval {
  -            print "[$$] Checking for scheduled events.\n" if ($verbose >= 8);
  -            $time = time();
  -            $time_of_next_event = $self->get_current_events([EMAIL 
PROTECTED], $time);
  -            if ($#events > -1) {
  -                foreach $event (@events) {
  -                    $self->send_event($event);
  -                }
  -                $time = time();
  -            }
  -            while (($pid = waitpid(-1,WNOHANG)) > 0) {
  -                $exitval = $? >> 8;
  -                $sig     = $? & 255;
  -                print "[$$] Child $pid finished 
[exitval=$exitval,sig=$sig]\n";
  -                $self->cleanup_pid($pid, $exitval, $sig);
  -            }
  -            $time = time();
  -            if ($time_of_next_event > 0) {
  -                $sleep_interval = $time_of_next_event - $time;
  -                $sleep_interval = 0 if ($sleep_interval < 0);
  -            }
  -            else {
  -                $sleep_interval = 15*60;
  -            }
  -            # TODO: if (sleep_interval == 0), use select() to see if anyone 
is waiting, else ...
  -            print "[$$] Sleeping on socket: timeout($sleep_interval)\n" if 
($verbose >= 3);
  -            $listen_socket->timeout($sleep_interval);
  -            $connection_socket = $listen_socket->accept();
  -            if ($connection_socket) {
  -                $connection_socket->autoflush(1);
  -                $msg = <$connection_socket>;
  -                $msg =~ s/[\015\012]+$//;
  -                if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
  -                    $quit = 1;
  -                }
  -                else {
  -                    $self->$process_msg($connection_socket, $msg);
  -                }
  -                $connection_socket->close();
  -            }
  -        };
  -        if ($@) {
  -            print STDERR "[$$] $@" if ($verbose);
  -        }
  +    if ($self->{is_controller}) {
  +        $self->ctrl_shutdown();
       }
  -
  -    $self->close_listen_socket();
  -    $self->$shutdown();
  -    print "[$$] Stopping Cluster $role.\n" if ($verbose);
  -    $self->shutdown();
  -    &App::sub_exit() if ($App::trace);
  -}
  -
  -# TODO: Implement this as a fork() or a context-level message to a node to 
fork().
  -#       i.e. messages such as "EVENT:" and "EVENT-OK:"
  -#       Save the callback_event according to an event_tag.
  -#       Then implement cleanup_pid to send the callback_event.
  -sub send_async_event {
  -    &App::sub_entry if ($App::trace);
  -    my ($self, $event, $callback_event) = @_;
  -    $self->send_event($event);
  -    if ($callback_event) {
  -        my $event_tag = "local-$$";
  -        if (! $callback_event->{args}) {
  -            $callback_event->{args} = [ $event_tag ];
  -        }
  -        $self->send_event($callback_event);
  +    else {
  +        $self->node_shutdown();
       }
  -    &App::sub_exit() if ($App::trace);
  -}
   
  -sub cleanup_pid {
  -    &App::sub_entry if ($App::trace);
  -    my ($self, $pid, $exitval, $sig) = @_;
  -    # nothing to do
  -    &App::sub_exit() if ($App::trace);
  +    print "[$$] Stopping Cluster $role\n" if ($verbose);
   }
   
  -sub send_async_message {
  -    &App::sub_entry if ($App::trace);
  -    my ($self, $host, $port, $message) = @_;
  -    my $pid = fork();
  -    if (!$pid) {   # running in child
  -        $self->send_message($host, $port, $message);
  -        exit(0);
  +sub process_msg {
  +    my ($self, $connection_socket, $msg) = @_;
  +    if ($self->{is_controller}) {
  +        $self->ctrl_process_msg($connection_socket, $msg);
       }
  -    &App::sub_exit() if ($App::trace);
  -}
  -
  -sub send_message {
  -    &App::sub_entry if ($App::trace);
  -    my ($self, $host, $port, $message) = @_;
  -    my $verbose = $self->{verbose};
  -    print STDERR "[$$] send_message($host, $port, $message)\n" if ($verbose 
>= 2);
  -
  -    if (!$port && $host =~ /^([^:]+):([0-9]+)$/) {
  -        $host = $1;
  -        $port = $2;
  +    else {
  +        $self->node_process_msg($connection_socket, $msg);
       }
  +}
   
  -    my $sock = IO::Socket::INET->new(
  -        PeerAddr => $host,
  -        PeerPort => $port,
  -        Proto    => "tcp",
  -        Type     => SOCK_STREAM,
  -    );
  -
  -    my ($response);
  -    if ($sock) {
  -        eval {
  -            $sock->autoflush(1);
  -            print $sock $message, "\n";
  -            $response = <$sock>;
  -            $response =~ s/[\r\n]+$//;
  -            close($sock);
  -        };
  -        if ($@) {
  -            $response = "SEND ERROR: $@";
  -        }
  +sub process_msg {
  +    my ($self, $connection_socket, $msg) = @_;
  +    if ($self->{is_controller}) {
  +        $self->ctrl_process_msg($connection_socket, $msg);
       }
       else {
  -        $response = "CONNECT ERROR: $!";
  +        $self->node_process_msg($connection_socket, $msg);
       }
  -
  -    print "[$$] send_message($host, $port, ...) => [$response]\n" if 
($verbose >= 2);
  -    &App::sub_exit($response) if ($App::trace);
  -    return($response);
   }
   
   ######################################################################
  
  
  
  1.1                  p5ee/App-Context/lib/App/Context/Server.pm
  
  Index: Server.pm
  ===================================================================
  
  #############################################################################
  ## $Id: Server.pm,v 1.1 2005/03/14 17:35:20 spadkins Exp $
  #############################################################################
  
  package App::Context::Server;
  
  use App;
  use App::Context;
  
  @ISA = ( "App::Context" );
  
  use Sys::Hostname;
  use Socket;
  use IO::Socket;
  use IO::Socket::INET;
  use POSIX ":sys_wait_h";
  
  use strict;
  
  =head1 NAME
  
  App::Context::Server - a runtime environment with a single master server and 
its subprocesses
  
  =head1 SYNOPSIS
  
     # ... official way to get a Context object ...
     use App;
     $context = App->context();
     $config = $context->config();   # get the configuration
     $config->dispatch_events();     # dispatch events
  
     # ... alternative way (used internally) ...
     use App::Context::Server;
     $context = App::Context::Server->new();
  
  =cut
  
  sub _init {
      &App::sub_entry if ($App::trace);
      my ($self, $options) = @_;
      $options = {} if (!defined $options);
  
      $| = 1;  # autoflush STDOUT (not sure this is required)
      open(STDERR, ">&STDOUT") || die "Unable to redirect STDERR to STDOUT";
  
      my $host = hostname;
      $self->{hostname} = $host;
      $host =~ s/\..*//;   # get rid of fully qualified domain name
      $self->{host} = $host;
  
      $self->{verbose} = $options->{verbose};
  
      $self->_init2a($options);
  
      my $listen_socket = IO::Socket::INET->new(
          Proto     => "tcp",
          LocalPort => $self->{port},
          Listen    => SOMAXCONN,
      ) || die "Unable to listen on $host:$self->{port} - $!";
  
      $self->{listen_socket} = $listen_socket;
  
      $self->_init2b($options);
  
      &App::sub_exit() if ($App::trace);
  }
  
  sub _init2a {
      &App::sub_entry if ($App::trace);
      my ($self, $options) = @_;
      $self->{port} = $options->{port} || $options->{controller_port} || 8080;
      &App::sub_exit() if ($App::trace);
  }
  
  sub _init2b {
      &App::sub_entry if ($App::trace);
      my ($self, $options) = @_;
      &App::sub_exit() if ($App::trace);
  }
  
  sub close_listen_socket {
      &App::sub_entry if ($App::trace);
      my ($self) = @_;
      if ($self->{listen_socket}) {
          $self->{listen_socket}->close();
          delete $self->{listen_socket};
      }
      &App::sub_exit() if ($App::trace);
  }
  
  sub DESTROY {
      &App::sub_entry if ($App::trace);
      my ($self) = @_;
      $self->close_listen_socket();
      &App::sub_exit() if ($App::trace);
  }
  
  sub dispatch_events {
      &App::sub_entry if ($App::trace);
      my ($self) = @_;
  
      my ($role, $port, $startup, $shutdown, $process_msg);
      $self->dispatch_events_begin();
  
      my $verbose = $self->{verbose};
  
      my $options = $self->{options};
      my $objects = $options->{init_objects};
      my ($service_type, $name, $service);
      foreach my $object (split(/ *[;,]+ */, $objects)) {
          if ($object) {
              if ($object =~ /^([A-Z][A-Za-z0-9]+)\.([A-Za-z0-9_-]+)$/) {
                  $service_type = $1;
                  $name = $2;
              }
              else {
                  $service_type = "SessionObject";
                  $name = $object;
              }
              $service = $self->service($service_type, $name);  # instantiate 
it. that's all.
              $self->log("$service_type $name instantiated [$service]\n");
          }
      }
  
      my $quit = 0;
      $SIG{USR1} = sub { $self->log("Caught Signal: @_\n") if ($verbose); };
      $SIG{USR2} = sub { $self->log("Caught Signal: @_\n") if ($verbose); };
      $SIG{HUP}  = sub { $self->log("Caught Signal: @_\n") if ($verbose); };
      $SIG{INT}  = sub { $self->log("Caught Signal: @_ (quitting)\n") if 
($verbose); $quit = 1; };
      $SIG{QUIT} = sub { $self->log("Caught Signal: @_ (quitting)\n") if 
($verbose); $quit = 1; };
      $SIG{TERM} = sub { $self->log("Caught Signal: @_ (quitting)\n") if 
($verbose); $quit = 1; };
  
      my $default_sleep_interval = 15*60;
  
      my $listen_socket = $self->{listen_socket};
      my ($connection_socket, $msg);
      my ($event, @events);
      my ($time, $time_of_next_event, $sleep_interval);
      my ($pid, $exitval, $sig);
      while (!$quit) {
          eval {
              $self->log("Checking for scheduled events.\n") if ($verbose >= 8);
              $time = time();
              $time_of_next_event = $self->get_current_events([EMAIL 
PROTECTED], $time);
              if ($#events > -1) {
                  foreach $event (@events) {
                      $self->send_event($event);
                  }
                  $time = time();
              }
              while (($pid = waitpid(-1,WNOHANG)) > 0) {
                  $exitval = $? >> 8;
                  $sig     = $? & 255;
                  $self->log("Child $pid finished 
[exitval=$exitval,sig=$sig]\n");
                  $self->cleanup_pid($pid, $exitval, $sig);
              }
              $time = time();
              if ($time_of_next_event > 0) {
                  $sleep_interval = $time_of_next_event - $time;
                  $sleep_interval = 0 if ($sleep_interval < 0);
              }
              else {
                  $sleep_interval = $default_sleep_interval;
              }
              # TODO: if (sleep_interval == 0), use select() to see if anyone 
is waiting, else ...
              $self->log("Sleeping on socket: timeout($sleep_interval)\n") if 
($verbose >= 3);
              $listen_socket->timeout($sleep_interval);
              $connection_socket = $listen_socket->accept();
              if ($connection_socket) {
                  $connection_socket->autoflush(1);
                  $msg = <$connection_socket>;
                  $msg =~ s/[\015\012]+$//;
                  if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
                      $quit = 1;
                  }
                  else {
                      $self->process_msg($connection_socket, $msg);
                  }
                  $connection_socket->close();
              }
          };
          if ($@) {
              $self->log($@) if ($verbose);
          }
      }
  
      $self->close_listen_socket();
      $self->dispatch_events_end();
      $self->shutdown();
      &App::sub_exit() if ($App::trace);
  }
  
  sub dispatch_events_begin {
      my ($self) = @_;
      my $verbose = $self->{verbose};
      $self->log("Starting Server on $self->{host}:$self->{port}\n") if 
($verbose);
  }
  
  sub dispatch_events_end {
      my ($self) = @_;
      my $verbose = $self->{verbose};
      $self->log("Stopping Server.\n") if ($verbose);
  }
  
  sub process_msg {
      my ($self, $connection_socket, $msg) = @_;
      $self->log("process_msg: [$msg]\n");
  }
  
  # TODO: Implement this as a fork() or a context-level message to a node to 
fork().
  #       i.e. messages such as "EVENT:" and "EVENT-OK:"
  #       Save the callback_event according to an event_tag.
  #       Then implement cleanup_pid to send the callback_event.
  sub send_async_event {
      &App::sub_entry if ($App::trace);
      my ($self, $event, $callback_event) = @_;
      $self->send_event($event);
      if ($callback_event) {
          my $event_tag = "local-$$";
          if (! $callback_event->{args}) {
              $callback_event->{args} = [ $event_tag ];
          }
          $self->send_event($callback_event);
      }
      &App::sub_exit() if ($App::trace);
  }
  
  sub cleanup_pid {
      &App::sub_entry if ($App::trace);
      my ($self, $pid, $exitval, $sig) = @_;
      # nothing to do
      &App::sub_exit() if ($App::trace);
  }
  
  sub send_async_message {
      &App::sub_entry if ($App::trace);
      my ($self, $host, $port, $message) = @_;
      my $pid = fork();
      if (!$pid) {   # running in child
          $self->send_message($host, $port, $message);
          exit(0);
      }
      &App::sub_exit() if ($App::trace);
  }
  
  sub send_message {
      &App::sub_entry if ($App::trace);
      my ($self, $host, $port, $message) = @_;
      my $verbose = $self->{verbose};
      $self->log("send_message($host, $port, $message)\n") if ($verbose >= 2);
  
      if (!$port && $host =~ /^([^:]+):([0-9]+)$/) {
          $host = $1;
          $port = $2;
      }
  
      my $sock = IO::Socket::INET->new(
          PeerAddr => $host,
          PeerPort => $port,
          Proto    => "tcp",
          Type     => SOCK_STREAM,
      );
  
      my ($response);
      if ($sock) {
          eval {
              $sock->autoflush(1);
              print $sock $message, "\n";
              $response = <$sock>;
              $response =~ s/[\r\n]+$//;
              close($sock);
          };
          if ($@) {
              $response = "SEND ERROR: $@";
          }
      }
      else {
          $response = "CONNECT ERROR: $!";
      }
  
      $self->log("send_message($host, $port, ...) => [$response]\n") if 
($verbose >= 2);
      &App::sub_exit($response) if ($App::trace);
      return($response);
  }
  
  1;
  
  
  
  

Reply via email to