cvsuser     04/09/28 08:20:40

  Added:       App-Context/lib/App/Context Cluster.pm
  Log:
  new
  
  Revision  Changes    Path
  1.1                  p5ee/App-Context/lib/App/Context/Cluster.pm
  
  Index: Cluster.pm
  ===================================================================
  
  #############################################################################
  ## $Id: Cluster.pm,v 1.1 2004/09/28 15:20:40 spadkins Exp $
  #############################################################################
  
  package App::Context::Cluster;
  
  use App;
  use App::Context;
  
  @ISA = ( "App::Context" );
  
  use Sys::Hostname;
  use Socket;
  use IO::Socket;
  use IO::Socket::INET;
  use POSIX ":sys_wait_h";
  
  use strict;
  
  =head1 NAME
  
  App::Context::Cluster - a runtime environment with a Cluster Controller and many 
Cluster Nodes
  
  =head1 SYNOPSIS
  
     # ... official way to get a Context object ...
     use App;
     $context = App->context();
     $config = $context->config();   # get the configuration
     $config->dispatch_events();     # dispatch events
  
     # ... alternative way (used internally) ...
     use App::Context::Cluster;
     $context = App::Context::Cluster->new();
  
  =cut
  
  sub _init {
      &App::sub_entry if ($App::trace);
      my ($self, $options) = @_;
      $options = {} if (!defined $options);
  
      my $host = hostname;
      $self->{hostname} = $host;
      $host =~ s/\..*//;   # get rid of fully qualified domain name
      $self->{host} = $host;
  
      $self->{controller_host} = $options->{controller_host};
      $self->{controller_port} = $options->{controller_port};
      $self->{is_controller}   = $options->{controller};
      $self->{verbose}         = $options->{verbose};
  
      if ($self->{is_controller}) {
          $self->{port} = $options->{controller_port};
      }
      else {
          $self->{port} = $options->{node_port};
      }
  
      my $listen_socket = IO::Socket::INET->new(
          Proto     => "tcp",
          LocalPort => $self->{port},
          Listen    => SOMAXCONN,
      ) || die "Unable to listen on $host:$self->{port} - $!";
  
      $self->{listen_socket} = $listen_socket;
  
      if ($self->{is_controller}) {
          $self->ctrl_init();
      }
      else {
          $self->node_init();
      }
      &App::sub_exit() if ($App::trace);
  }
  
  sub close_listen_socket {
      &App::sub_entry if ($App::trace);
      my ($self) = @_;
      if ($self->{listen_socket}) {
          $self->{listen_socket}->close();
          delete $self->{listen_socket};
      }
      &App::sub_exit() if ($App::trace);
  }
  
  sub DESTROY {
      &App::sub_entry if ($App::trace);
      my ($self) = @_;
      $self->close_listen_socket();
      &App::sub_exit() if ($App::trace);
  }
  
  sub dispatch_events {
      &App::sub_entry if ($App::trace);
      my ($self) = @_;
  
      my ($role, $port, $startup, $shutdown, $process_msg);
      if ($self->{is_controller}) {
          $role        = "Controller";
          $startup     = "ctrl_startup";
          $shutdown    = "ctrl_shutdown";
          $process_msg = "ctrl_process_msg";
      }
      else {
          $role        = "Node";
          $startup     = "node_startup";
          $shutdown    = "node_shutdown";
          $process_msg = "node_process_msg";
      }
      my $verbose = $self->{verbose};
  
      print "[$$] Starting Cluster $role on $self->{host}:$self->{port}\n" if 
($verbose);
  
      $self->$startup();
  
      my $quit = 0;
      $SIG{USR1} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); };
      $SIG{USR2} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); };
      $SIG{HUP}  = sub { print "[$$] Caught Signal: @_\n" if ($verbose); };
      $SIG{INT}  = sub { print "[$$] Caught Signal: @_\n" if ($verbose); $quit = 1; };
      $SIG{QUIT} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); $quit = 1; };
      $SIG{TERM} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); $quit = 1; };
  
      my $listen_socket = $self->{listen_socket};
      my ($connection_socket, $msg);
      my ($event, @events);
      my ($time, $time_of_next_event, $sleep_interval);
      my ($pid, $exitval, $sig);
      while (!$quit) {
          eval {
              print "[$$] Checking for scheduled events.\n" if ($verbose >= 8);
              $time = time();
              $time_of_next_event = $self->get_current_events([EMAIL PROTECTED], 
$time);
              if ($#events > -1) {
                  foreach $event (@events) {
                      $self->send_event($event);
                  }
                  $time = time();
              }
              while (($pid = waitpid(-1,WNOHANG)) > 0) {
                  $exitval = $? >> 8;
                  $sig     = $? & 255;
                  print "[$$] Child $pid finished [exitval=$exitval,sig=$sig]\n";
                  $self->cleanup_pid($pid, $exitval, $sig);
              }
              $time = time();
              if ($time_of_next_event > 0) {
                  $sleep_interval = $time_of_next_event - $time;
                  $sleep_interval = 0 if ($sleep_interval < 0);
              }
              else {
                  $sleep_interval = 15*60;
              }
              # TODO: if (sleep_interval == 0), use select() to see if anyone is 
waiting, else ...
              print "[$$] Sleeping on socket: timeout($sleep_interval)\n" if ($verbose 
>= 3);
              $listen_socket->timeout($sleep_interval);
              $connection_socket = $listen_socket->accept();
              if ($connection_socket) {
                  $connection_socket->autoflush(1);
                  $msg = <$connection_socket>;
                  $msg =~ s/[\015\012]+$//;
                  if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
                      $quit = 1;
                  }
                  else {
                      $self->$process_msg($connection_socket, $msg);
                  }
                  $connection_socket->close();
              }
          };
          if ($@) {
              print STDERR "[$$] $@" if ($verbose);
          }
      }
  
      $self->close_listen_socket();
      $self->$shutdown();
      print "[$$] Stopping Cluster $role.\n" if ($verbose);
      $self->shutdown();
      &App::sub_exit() if ($App::trace);
  }
  
  # TODO: Implement this as a fork() or a context-level message to a node to fork().
  #       i.e. messages such as "EVENT:" and "EVENT-OK:"
  #       Save the callback_event according to an event_tag.
  #       Then implement cleanup_pid to send the callback_event.
  sub send_async_event {
      &App::sub_entry if ($App::trace);
      my ($self, $event, $callback_event) = @_;
      $self->send_event($event);
      if ($callback_event) {
          my $event_tag = "local-$$";
          if (! $callback_event->{args}) {
              $callback_event->{args} = [ $event_tag ];
          }
          $self->send_event($callback_event);
      }
      &App::sub_exit() if ($App::trace);
  }
  
  sub cleanup_pid {
      &App::sub_entry if ($App::trace);
      my ($self, $pid, $exitval, $sig) = @_;
      # nothing to do
      &App::sub_exit() if ($App::trace);
  }
  
  sub send_async_message {
      &App::sub_entry if ($App::trace);
      my ($self, $host, $port, $message) = @_;
      my $pid = fork();
      if (!$pid) {   # running in child
          $self->send_message($host, $port, $message);
          exit(0);
      }
      &App::sub_exit() if ($App::trace);
  }
  
  sub send_message {
      &App::sub_entry if ($App::trace);
      my ($self, $host, $port, $message) = @_;
      my $verbose = $self->{verbose};
      print STDERR "[$$] send_message($host, $port, $message)\n" if ($verbose >= 2);
  
      if (!$port && $host =~ /^([^:]+):([0-9]+)$/) {
          $host = $1;
          $port = $2;
      }
  
      my $sock = IO::Socket::INET->new(
          PeerAddr => $host,
          PeerPort => $port,
          Proto    => "tcp",
          Type     => SOCK_STREAM,
      );
  
      my ($response);
      if ($sock) {
          eval {
              $sock->autoflush(1);
              print $sock $message, "\n";
              $response = <$sock>;
              $response =~ s/[\r\n]+$//;
              close($sock);
          };
          if ($@) {
              $response = "SEND ERROR: $@";
          }
      }
      else {
          $response = "CONNECT ERROR: $!";
      }
  
      print "[$$] send_message($host, $port, ...) => [$response]\n" if ($verbose >= 2);
      &App::sub_exit($response) if ($App::trace);
      return($response);
  }
  
  ######################################################################
  # Controller
  ######################################################################
  
  sub ctrl_init {
      &App::sub_entry if ($App::trace);
      my ($self) = @_;
      &App::sub_exit() if ($App::trace);
  }
  
  sub ctrl_startup {
      &App::sub_entry if ($App::trace);
      my ($self) = @_;
      &App::sub_exit() if ($App::trace);
  }
  
  sub ctrl_shutdown {
      &App::sub_entry if ($App::trace);
      my ($self) = @_;
      &App::sub_exit() if ($App::trace);
  }
  
  sub ctrl_process_msg {
      &App::sub_entry if ($App::trace);
      my ($self, $connection_socket, $msg) = @_;
      my $verbose = $self->{verbose};
      print STDERR "[$$] process_msg($msg)\n" if ($verbose >= 2);
      my $processing_complete = $self->ctrl_process_app_msg($connection_socket, $msg);
      if (!$processing_complete) {
          if ($msg =~ /^NODE-UP:(.*)/) {
              my $resp = $self->ctrl_set_node_up($1) || "ok";
              $connection_socket->print("$resp\n");
          }
          elsif ($msg =~ /^NODE-DOWN:(.*)/) {
              $self->ctrl_set_node_down($1);
              $connection_socket->print("ok\n");
          }
          elsif ($msg =~ s/^GET//) {
              $connection_socket->print("HTTP/1.0 200 OK\n");
              $connection_socket->print("Content-type: text/plain\n");
              $connection_socket->print("\n");
              $connection_socket->print("$self->{host}:$self->{port} acting as cluster 
controller\n");
              $connection_socket->print($self->ctrl_state());
          }
          else {
              $connection_socket->print("unknown [$msg]\n");
          }
      }
      &App::sub_exit() if ($App::trace);
  }
  
  sub ctrl_process_app_msg {
      &App::sub_entry if ($App::trace);
      my ($self, $connection_socket, $msg) = @_;
      my $processing_complete = 0;
      &App::sub_exit($processing_complete) if ($App::trace);
      return($processing_complete);
  }
  
  sub ctrl_state {
      &App::sub_entry if ($App::trace);
      my ($self) = @_;
      my %state = (
          "node" => $self->{node},
          "nodes" => $self->{nodes},
          "scheduled_events" => $self->{scheduled_events},
      );
      my $d = Data::Dumper->new([ \%state ], [ "state" ]);
      $d->Indent(1);
      my $state = $d->Dump();
      &App::sub_exit($state) if ($App::trace);
      return($state);
  }
  
  sub ctrl_set_node_down {
      &App::sub_entry if ($App::trace);
      my ($self, $node) = @_;
      $self->{node}{$node}{up} = 0;
      $self->ctrl_set_nodes();
      &App::sub_exit() if ($App::trace);
  }
  
  sub ctrl_set_node_up {
      &App::sub_entry if ($App::trace);
      my ($self, $node) = @_;
      my ($retval);
      if ($self->{node}{$node}{up}) {
          $retval = "ok";
      }
      else {
          $self->{node}{$node}{up} = 1;
          $self->ctrl_set_nodes();
          $retval = "new";
      }
      &App::sub_exit($retval) if ($App::trace);
      return($retval);
  }
  
  sub ctrl_set_nodes {
      &App::sub_entry if ($App::trace);
      my ($self) = @_;
      my (@nodes);
      foreach my $node (sort keys %{$self->{node}}) {
          if ($self->{node}{$node}{up}) {
              push(@nodes, $node);
          }
      }
      $self->{nodes} = [EMAIL PROTECTED];
      &App::sub_exit() if ($App::trace);
  }
  
  ######################################################################
  # Node
  ######################################################################
  
  sub node_init {
      &App::sub_entry if ($App::trace);
      my ($self) = @_;
      &App::sub_exit() if ($App::trace);
  }
  
  sub node_startup {
      &App::sub_entry if ($App::trace);
      my ($self) = @_;
      my $controller_host = $self->{controller_host};
      my $controller_port = $self->{controller_port};
      my $node_host       = $self->{host};
      my $node_port       = $self->{port};
      my $node_heartbeat  = $self->{context}{options}{node_heartbeat} || 60;
      $self->schedule_event(
          method => "send_async_message",
          args => [ $controller_host, $controller_port, 
"NODE-UP:$node_host:$node_port" ],
          time => time(),  # immediately ...
          interval => $node_heartbeat,  # and every X seconds hereafter
      );
      &App::sub_exit() if ($App::trace);
  }
  
  sub node_shutdown {
      &App::sub_entry if ($App::trace);
      my ($self) = @_;
      my $controller_host = $self->{controller_host};
      my $controller_port = $self->{controller_port};
      my $node_host       = $self->{host};
      my $node_port       = $self->{port};
      $self->send_async_message($controller_host, $controller_port, 
"NODE-DOWN:$node_host:$node_port");
      &App::sub_exit() if ($App::trace);
  }
  
  sub node_process_msg {
      &App::sub_entry if ($App::trace);
      my ($self, $connection_socket, $msg) = @_;
      my $verbose = $self->{verbose};
      print STDERR "[$$] process_msg($msg)\n" if ($verbose >= 2);
      my $processing_complete = $self->node_process_app_msg($connection_socket, $msg);
      if (!$processing_complete) {
          if ($msg =~ s/^GET//) {
              $connection_socket->print("HTTP/1.0 200 OK\n");
              $connection_socket->print("Content-type: text/plain\n");
              $connection_socket->print("\n");
              $connection_socket->print("$self->{host}:$self->{port} acting as cluster 
node\n");
              #$connection_socket->print($self->ctrl_state());
          }
          else {
              $connection_socket->print("unknown [$msg]\n");
          }
      }
      &App::sub_exit() if ($App::trace);
  }
  
  sub node_process_app_msg {
      &App::sub_entry if ($App::trace);
      my ($self, $connection_socket, $msg) = @_;
      my $processing_complete = 0;
      &App::sub_exit($processing_complete) if ($App::trace);
      return($processing_complete);
  }
  
  1;
  
  
  
  

Reply via email to