cvsuser 04/09/28 08:20:40
Added: App-Context/lib/App/Context Cluster.pm
Log:
new
Revision Changes Path
1.1 p5ee/App-Context/lib/App/Context/Cluster.pm
Index: Cluster.pm
===================================================================
#############################################################################
## $Id: Cluster.pm,v 1.1 2004/09/28 15:20:40 spadkins Exp $
#############################################################################
package App::Context::Cluster;
use App;
use App::Context;
@ISA = ( "App::Context" );
use Sys::Hostname;
use Socket;
use IO::Socket;
use IO::Socket::INET;
use POSIX ":sys_wait_h";
use strict;
=head1 NAME
App::Context::Cluster - a runtime environment with a Cluster Controller and many
Cluster Nodes
=head1 SYNOPSIS
# ... official way to get a Context object ...
use App;
$context = App->context();
$config = $context->config(); # get the configuration
$config->dispatch_events(); # dispatch events
# ... alternative way (used internally) ...
use App::Context::Cluster;
$context = App::Context::Cluster->new();
=cut
sub _init {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
$options = {} if (!defined $options);
my $host = hostname;
$self->{hostname} = $host;
$host =~ s/\..*//; # get rid of fully qualified domain name
$self->{host} = $host;
$self->{controller_host} = $options->{controller_host};
$self->{controller_port} = $options->{controller_port};
$self->{is_controller} = $options->{controller};
$self->{verbose} = $options->{verbose};
if ($self->{is_controller}) {
$self->{port} = $options->{controller_port};
}
else {
$self->{port} = $options->{node_port};
}
my $listen_socket = IO::Socket::INET->new(
Proto => "tcp",
LocalPort => $self->{port},
Listen => SOMAXCONN,
) || die "Unable to listen on $host:$self->{port} - $!";
$self->{listen_socket} = $listen_socket;
if ($self->{is_controller}) {
$self->ctrl_init();
}
else {
$self->node_init();
}
&App::sub_exit() if ($App::trace);
}
sub close_listen_socket {
&App::sub_entry if ($App::trace);
my ($self) = @_;
if ($self->{listen_socket}) {
$self->{listen_socket}->close();
delete $self->{listen_socket};
}
&App::sub_exit() if ($App::trace);
}
sub DESTROY {
&App::sub_entry if ($App::trace);
my ($self) = @_;
$self->close_listen_socket();
&App::sub_exit() if ($App::trace);
}
sub dispatch_events {
&App::sub_entry if ($App::trace);
my ($self) = @_;
my ($role, $port, $startup, $shutdown, $process_msg);
if ($self->{is_controller}) {
$role = "Controller";
$startup = "ctrl_startup";
$shutdown = "ctrl_shutdown";
$process_msg = "ctrl_process_msg";
}
else {
$role = "Node";
$startup = "node_startup";
$shutdown = "node_shutdown";
$process_msg = "node_process_msg";
}
my $verbose = $self->{verbose};
print "[$$] Starting Cluster $role on $self->{host}:$self->{port}\n" if
($verbose);
$self->$startup();
my $quit = 0;
$SIG{USR1} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); };
$SIG{USR2} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); };
$SIG{HUP} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); };
$SIG{INT} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); $quit = 1; };
$SIG{QUIT} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); $quit = 1; };
$SIG{TERM} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); $quit = 1; };
my $listen_socket = $self->{listen_socket};
my ($connection_socket, $msg);
my ($event, @events);
my ($time, $time_of_next_event, $sleep_interval);
my ($pid, $exitval, $sig);
while (!$quit) {
eval {
print "[$$] Checking for scheduled events.\n" if ($verbose >= 8);
$time = time();
$time_of_next_event = $self->get_current_events([EMAIL PROTECTED],
$time);
if ($#events > -1) {
foreach $event (@events) {
$self->send_event($event);
}
$time = time();
}
while (($pid = waitpid(-1,WNOHANG)) > 0) {
$exitval = $? >> 8;
$sig = $? & 255;
print "[$$] Child $pid finished [exitval=$exitval,sig=$sig]\n";
$self->cleanup_pid($pid, $exitval, $sig);
}
$time = time();
if ($time_of_next_event > 0) {
$sleep_interval = $time_of_next_event - $time;
$sleep_interval = 0 if ($sleep_interval < 0);
}
else {
$sleep_interval = 15*60;
}
# TODO: if (sleep_interval == 0), use select() to see if anyone is
waiting, else ...
print "[$$] Sleeping on socket: timeout($sleep_interval)\n" if ($verbose
>= 3);
$listen_socket->timeout($sleep_interval);
$connection_socket = $listen_socket->accept();
if ($connection_socket) {
$connection_socket->autoflush(1);
$msg = <$connection_socket>;
$msg =~ s/[\015\012]+$//;
if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
$quit = 1;
}
else {
$self->$process_msg($connection_socket, $msg);
}
$connection_socket->close();
}
};
if ($@) {
print STDERR "[$$] $@" if ($verbose);
}
}
$self->close_listen_socket();
$self->$shutdown();
print "[$$] Stopping Cluster $role.\n" if ($verbose);
$self->shutdown();
&App::sub_exit() if ($App::trace);
}
# TODO: Implement this as a fork() or a context-level message to a node to fork().
# i.e. messages such as "EVENT:" and "EVENT-OK:"
# Save the callback_event according to an event_tag.
# Then implement cleanup_pid to send the callback_event.
sub send_async_event {
&App::sub_entry if ($App::trace);
my ($self, $event, $callback_event) = @_;
$self->send_event($event);
if ($callback_event) {
my $event_tag = "local-$$";
if (! $callback_event->{args}) {
$callback_event->{args} = [ $event_tag ];
}
$self->send_event($callback_event);
}
&App::sub_exit() if ($App::trace);
}
sub cleanup_pid {
&App::sub_entry if ($App::trace);
my ($self, $pid, $exitval, $sig) = @_;
# nothing to do
&App::sub_exit() if ($App::trace);
}
sub send_async_message {
&App::sub_entry if ($App::trace);
my ($self, $host, $port, $message) = @_;
my $pid = fork();
if (!$pid) { # running in child
$self->send_message($host, $port, $message);
exit(0);
}
&App::sub_exit() if ($App::trace);
}
sub send_message {
&App::sub_entry if ($App::trace);
my ($self, $host, $port, $message) = @_;
my $verbose = $self->{verbose};
print STDERR "[$$] send_message($host, $port, $message)\n" if ($verbose >= 2);
if (!$port && $host =~ /^([^:]+):([0-9]+)$/) {
$host = $1;
$port = $2;
}
my $sock = IO::Socket::INET->new(
PeerAddr => $host,
PeerPort => $port,
Proto => "tcp",
Type => SOCK_STREAM,
);
my ($response);
if ($sock) {
eval {
$sock->autoflush(1);
print $sock $message, "\n";
$response = <$sock>;
$response =~ s/[\r\n]+$//;
close($sock);
};
if ($@) {
$response = "SEND ERROR: $@";
}
}
else {
$response = "CONNECT ERROR: $!";
}
print "[$$] send_message($host, $port, ...) => [$response]\n" if ($verbose >= 2);
&App::sub_exit($response) if ($App::trace);
return($response);
}
######################################################################
# Controller
######################################################################
sub ctrl_init {
&App::sub_entry if ($App::trace);
my ($self) = @_;
&App::sub_exit() if ($App::trace);
}
sub ctrl_startup {
&App::sub_entry if ($App::trace);
my ($self) = @_;
&App::sub_exit() if ($App::trace);
}
sub ctrl_shutdown {
&App::sub_entry if ($App::trace);
my ($self) = @_;
&App::sub_exit() if ($App::trace);
}
sub ctrl_process_msg {
&App::sub_entry if ($App::trace);
my ($self, $connection_socket, $msg) = @_;
my $verbose = $self->{verbose};
print STDERR "[$$] process_msg($msg)\n" if ($verbose >= 2);
my $processing_complete = $self->ctrl_process_app_msg($connection_socket, $msg);
if (!$processing_complete) {
if ($msg =~ /^NODE-UP:(.*)/) {
my $resp = $self->ctrl_set_node_up($1) || "ok";
$connection_socket->print("$resp\n");
}
elsif ($msg =~ /^NODE-DOWN:(.*)/) {
$self->ctrl_set_node_down($1);
$connection_socket->print("ok\n");
}
elsif ($msg =~ s/^GET//) {
$connection_socket->print("HTTP/1.0 200 OK\n");
$connection_socket->print("Content-type: text/plain\n");
$connection_socket->print("\n");
$connection_socket->print("$self->{host}:$self->{port} acting as cluster
controller\n");
$connection_socket->print($self->ctrl_state());
}
else {
$connection_socket->print("unknown [$msg]\n");
}
}
&App::sub_exit() if ($App::trace);
}
sub ctrl_process_app_msg {
&App::sub_entry if ($App::trace);
my ($self, $connection_socket, $msg) = @_;
my $processing_complete = 0;
&App::sub_exit($processing_complete) if ($App::trace);
return($processing_complete);
}
sub ctrl_state {
&App::sub_entry if ($App::trace);
my ($self) = @_;
my %state = (
"node" => $self->{node},
"nodes" => $self->{nodes},
"scheduled_events" => $self->{scheduled_events},
);
my $d = Data::Dumper->new([ \%state ], [ "state" ]);
$d->Indent(1);
my $state = $d->Dump();
&App::sub_exit($state) if ($App::trace);
return($state);
}
sub ctrl_set_node_down {
&App::sub_entry if ($App::trace);
my ($self, $node) = @_;
$self->{node}{$node}{up} = 0;
$self->ctrl_set_nodes();
&App::sub_exit() if ($App::trace);
}
sub ctrl_set_node_up {
&App::sub_entry if ($App::trace);
my ($self, $node) = @_;
my ($retval);
if ($self->{node}{$node}{up}) {
$retval = "ok";
}
else {
$self->{node}{$node}{up} = 1;
$self->ctrl_set_nodes();
$retval = "new";
}
&App::sub_exit($retval) if ($App::trace);
return($retval);
}
sub ctrl_set_nodes {
&App::sub_entry if ($App::trace);
my ($self) = @_;
my (@nodes);
foreach my $node (sort keys %{$self->{node}}) {
if ($self->{node}{$node}{up}) {
push(@nodes, $node);
}
}
$self->{nodes} = [EMAIL PROTECTED];
&App::sub_exit() if ($App::trace);
}
######################################################################
# Node
######################################################################
sub node_init {
&App::sub_entry if ($App::trace);
my ($self) = @_;
&App::sub_exit() if ($App::trace);
}
sub node_startup {
&App::sub_entry if ($App::trace);
my ($self) = @_;
my $controller_host = $self->{controller_host};
my $controller_port = $self->{controller_port};
my $node_host = $self->{host};
my $node_port = $self->{port};
my $node_heartbeat = $self->{context}{options}{node_heartbeat} || 60;
$self->schedule_event(
method => "send_async_message",
args => [ $controller_host, $controller_port,
"NODE-UP:$node_host:$node_port" ],
time => time(), # immediately ...
interval => $node_heartbeat, # and every X seconds hereafter
);
&App::sub_exit() if ($App::trace);
}
sub node_shutdown {
&App::sub_entry if ($App::trace);
my ($self) = @_;
my $controller_host = $self->{controller_host};
my $controller_port = $self->{controller_port};
my $node_host = $self->{host};
my $node_port = $self->{port};
$self->send_async_message($controller_host, $controller_port,
"NODE-DOWN:$node_host:$node_port");
&App::sub_exit() if ($App::trace);
}
sub node_process_msg {
&App::sub_entry if ($App::trace);
my ($self, $connection_socket, $msg) = @_;
my $verbose = $self->{verbose};
print STDERR "[$$] process_msg($msg)\n" if ($verbose >= 2);
my $processing_complete = $self->node_process_app_msg($connection_socket, $msg);
if (!$processing_complete) {
if ($msg =~ s/^GET//) {
$connection_socket->print("HTTP/1.0 200 OK\n");
$connection_socket->print("Content-type: text/plain\n");
$connection_socket->print("\n");
$connection_socket->print("$self->{host}:$self->{port} acting as cluster
node\n");
#$connection_socket->print($self->ctrl_state());
}
else {
$connection_socket->print("unknown [$msg]\n");
}
}
&App::sub_exit() if ($App::trace);
}
sub node_process_app_msg {
&App::sub_entry if ($App::trace);
my ($self, $connection_socket, $msg) = @_;
my $processing_complete = 0;
&App::sub_exit($processing_complete) if ($App::trace);
return($processing_complete);
}
1;