cvsuser 05/03/14 09:35:20
Modified: App-Context/lib/App/Context Cluster.pm
Added: App-Context/lib/App/Context Server.pm
Log:
evolving
Revision Changes Path
1.2 +36 -179 p5ee/App-Context/lib/App/Context/Cluster.pm
Index: Cluster.pm
===================================================================
RCS file: /cvs/public/p5ee/App-Context/lib/App/Context/Cluster.pm,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- Cluster.pm 28 Sep 2004 15:20:40 -0000 1.1
+++ Cluster.pm 14 Mar 2005 17:35:20 -0000 1.2
@@ -1,6 +1,6 @@
#############################################################################
-## $Id: Cluster.pm,v 1.1 2004/09/28 15:20:40 spadkins Exp $
+## $Id: Cluster.pm,v 1.2 2005/03/14 17:35:20 spadkins Exp $
#############################################################################
package App::Context::Cluster;
@@ -8,7 +8,7 @@
use App;
use App::Context;
[EMAIL PROTECTED] = ( "App::Context" );
[EMAIL PROTECTED] = ( "App::Context::Server" );
use Sys::Hostname;
use Socket;
@@ -36,36 +36,24 @@
=cut
-sub _init {
+sub _init2a {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
- $options = {} if (!defined $options);
-
- my $host = hostname;
- $self->{hostname} = $host;
- $host =~ s/\..*//; # get rid of fully qualified domain name
- $self->{host} = $host;
-
$self->{controller_host} = $options->{controller_host};
$self->{controller_port} = $options->{controller_port};
$self->{is_controller} = $options->{controller};
- $self->{verbose} = $options->{verbose};
-
if ($self->{is_controller}) {
$self->{port} = $options->{controller_port};
}
else {
$self->{port} = $options->{node_port};
}
+ &App::sub_exit() if ($App::trace);
+}
- my $listen_socket = IO::Socket::INET->new(
- Proto => "tcp",
- LocalPort => $self->{port},
- Listen => SOMAXCONN,
- ) || die "Unable to listen on $host:$self->{port} - $!";
-
- $self->{listen_socket} = $listen_socket;
-
+sub _init2b {
+ &App::sub_entry if ($App::trace);
+ my ($self, $options) = @_;
if ($self->{is_controller}) {
$self->ctrl_init();
}
@@ -75,187 +63,56 @@
&App::sub_exit() if ($App::trace);
}
-sub close_listen_socket {
- &App::sub_entry if ($App::trace);
+sub dispatch_events_begin {
my ($self) = @_;
- if ($self->{listen_socket}) {
- $self->{listen_socket}->close();
- delete $self->{listen_socket};
- }
- &App::sub_exit() if ($App::trace);
-}
-sub DESTROY {
- &App::sub_entry if ($App::trace);
- my ($self) = @_;
- $self->close_listen_socket();
- &App::sub_exit() if ($App::trace);
-}
+ my $verbose = $self->{verbose};
+ my $role = $self->{is_controller} ? "Controller" : "Node";
-sub dispatch_events {
- &App::sub_entry if ($App::trace);
- my ($self) = @_;
+ print "[$$] Starting Cluster $role on $self->{host}:$self->{port}\n" if
($verbose);
- my ($role, $port, $startup, $shutdown, $process_msg);
if ($self->{is_controller}) {
- $role = "Controller";
- $startup = "ctrl_startup";
- $shutdown = "ctrl_shutdown";
- $process_msg = "ctrl_process_msg";
+ $self->ctrl_startup();
}
else {
- $role = "Node";
- $startup = "node_startup";
- $shutdown = "node_shutdown";
- $process_msg = "node_process_msg";
+ $self->node_startup();
}
- my $verbose = $self->{verbose};
+}
- print "[$$] Starting Cluster $role on $self->{host}:$self->{port}\n" if
($verbose);
+sub dispatch_events_end {
+ my ($self) = @_;
- $self->$startup();
+ my $verbose = $self->{verbose};
+ my $role = $self->{is_controller} ? "Controller" : "Node";
- my $quit = 0;
- $SIG{USR1} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); };
- $SIG{USR2} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); };
- $SIG{HUP} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); };
- $SIG{INT} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); $quit
= 1; };
- $SIG{QUIT} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); $quit
= 1; };
- $SIG{TERM} = sub { print "[$$] Caught Signal: @_\n" if ($verbose); $quit
= 1; };
-
- my $listen_socket = $self->{listen_socket};
- my ($connection_socket, $msg);
- my ($event, @events);
- my ($time, $time_of_next_event, $sleep_interval);
- my ($pid, $exitval, $sig);
- while (!$quit) {
- eval {
- print "[$$] Checking for scheduled events.\n" if ($verbose >= 8);
- $time = time();
- $time_of_next_event = $self->get_current_events([EMAIL
PROTECTED], $time);
- if ($#events > -1) {
- foreach $event (@events) {
- $self->send_event($event);
- }
- $time = time();
- }
- while (($pid = waitpid(-1,WNOHANG)) > 0) {
- $exitval = $? >> 8;
- $sig = $? & 255;
- print "[$$] Child $pid finished
[exitval=$exitval,sig=$sig]\n";
- $self->cleanup_pid($pid, $exitval, $sig);
- }
- $time = time();
- if ($time_of_next_event > 0) {
- $sleep_interval = $time_of_next_event - $time;
- $sleep_interval = 0 if ($sleep_interval < 0);
- }
- else {
- $sleep_interval = 15*60;
- }
- # TODO: if (sleep_interval == 0), use select() to see if anyone
is waiting, else ...
- print "[$$] Sleeping on socket: timeout($sleep_interval)\n" if
($verbose >= 3);
- $listen_socket->timeout($sleep_interval);
- $connection_socket = $listen_socket->accept();
- if ($connection_socket) {
- $connection_socket->autoflush(1);
- $msg = <$connection_socket>;
- $msg =~ s/[\015\012]+$//;
- if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
- $quit = 1;
- }
- else {
- $self->$process_msg($connection_socket, $msg);
- }
- $connection_socket->close();
- }
- };
- if ($@) {
- print STDERR "[$$] $@" if ($verbose);
- }
+ if ($self->{is_controller}) {
+ $self->ctrl_shutdown();
}
-
- $self->close_listen_socket();
- $self->$shutdown();
- print "[$$] Stopping Cluster $role.\n" if ($verbose);
- $self->shutdown();
- &App::sub_exit() if ($App::trace);
-}
-
-# TODO: Implement this as a fork() or a context-level message to a node to
fork().
-# i.e. messages such as "EVENT:" and "EVENT-OK:"
-# Save the callback_event according to an event_tag.
-# Then implement cleanup_pid to send the callback_event.
-sub send_async_event {
- &App::sub_entry if ($App::trace);
- my ($self, $event, $callback_event) = @_;
- $self->send_event($event);
- if ($callback_event) {
- my $event_tag = "local-$$";
- if (! $callback_event->{args}) {
- $callback_event->{args} = [ $event_tag ];
- }
- $self->send_event($callback_event);
+ else {
+ $self->node_shutdown();
}
- &App::sub_exit() if ($App::trace);
-}
-sub cleanup_pid {
- &App::sub_entry if ($App::trace);
- my ($self, $pid, $exitval, $sig) = @_;
- # nothing to do
- &App::sub_exit() if ($App::trace);
+ print "[$$] Stopping Cluster $role\n" if ($verbose);
}
-sub send_async_message {
- &App::sub_entry if ($App::trace);
- my ($self, $host, $port, $message) = @_;
- my $pid = fork();
- if (!$pid) { # running in child
- $self->send_message($host, $port, $message);
- exit(0);
+sub process_msg {
+ my ($self, $connection_socket, $msg) = @_;
+ if ($self->{is_controller}) {
+ $self->ctrl_process_msg($connection_socket, $msg);
}
- &App::sub_exit() if ($App::trace);
-}
-
-sub send_message {
- &App::sub_entry if ($App::trace);
- my ($self, $host, $port, $message) = @_;
- my $verbose = $self->{verbose};
- print STDERR "[$$] send_message($host, $port, $message)\n" if ($verbose
>= 2);
-
- if (!$port && $host =~ /^([^:]+):([0-9]+)$/) {
- $host = $1;
- $port = $2;
+ else {
+ $self->node_process_msg($connection_socket, $msg);
}
+}
- my $sock = IO::Socket::INET->new(
- PeerAddr => $host,
- PeerPort => $port,
- Proto => "tcp",
- Type => SOCK_STREAM,
- );
-
- my ($response);
- if ($sock) {
- eval {
- $sock->autoflush(1);
- print $sock $message, "\n";
- $response = <$sock>;
- $response =~ s/[\r\n]+$//;
- close($sock);
- };
- if ($@) {
- $response = "SEND ERROR: $@";
- }
+sub process_msg {
+ my ($self, $connection_socket, $msg) = @_;
+ if ($self->{is_controller}) {
+ $self->ctrl_process_msg($connection_socket, $msg);
}
else {
- $response = "CONNECT ERROR: $!";
+ $self->node_process_msg($connection_socket, $msg);
}
-
- print "[$$] send_message($host, $port, ...) => [$response]\n" if
($verbose >= 2);
- &App::sub_exit($response) if ($App::trace);
- return($response);
}
######################################################################
1.1 p5ee/App-Context/lib/App/Context/Server.pm
Index: Server.pm
===================================================================
#############################################################################
## $Id: Server.pm,v 1.1 2005/03/14 17:35:20 spadkins Exp $
#############################################################################
package App::Context::Server;
use App;
use App::Context;
@ISA = ( "App::Context" );
use Sys::Hostname;
use Socket;
use IO::Socket;
use IO::Socket::INET;
use POSIX ":sys_wait_h";
use strict;
=head1 NAME
App::Context::Server - a runtime environment with a single master server and
its subprocesses
=head1 SYNOPSIS
# ... official way to get a Context object ...
use App;
$context = App->context();
$config = $context->config(); # get the configuration
$config->dispatch_events(); # dispatch events
# ... alternative way (used internally) ...
use App::Context::Server;
$context = App::Context::Server->new();
=cut
sub _init {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
$options = {} if (!defined $options);
$| = 1; # autoflush STDOUT (not sure this is required)
open(STDERR, ">&STDOUT") || die "Unable to redirect STDERR to STDOUT";
my $host = hostname;
$self->{hostname} = $host;
$host =~ s/\..*//; # get rid of fully qualified domain name
$self->{host} = $host;
$self->{verbose} = $options->{verbose};
$self->_init2a($options);
my $listen_socket = IO::Socket::INET->new(
Proto => "tcp",
LocalPort => $self->{port},
Listen => SOMAXCONN,
) || die "Unable to listen on $host:$self->{port} - $!";
$self->{listen_socket} = $listen_socket;
$self->_init2b($options);
&App::sub_exit() if ($App::trace);
}
sub _init2a {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
$self->{port} = $options->{port} || $options->{controller_port} || 8080;
&App::sub_exit() if ($App::trace);
}
sub _init2b {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
&App::sub_exit() if ($App::trace);
}
sub close_listen_socket {
&App::sub_entry if ($App::trace);
my ($self) = @_;
if ($self->{listen_socket}) {
$self->{listen_socket}->close();
delete $self->{listen_socket};
}
&App::sub_exit() if ($App::trace);
}
sub DESTROY {
&App::sub_entry if ($App::trace);
my ($self) = @_;
$self->close_listen_socket();
&App::sub_exit() if ($App::trace);
}
sub dispatch_events {
&App::sub_entry if ($App::trace);
my ($self) = @_;
my ($role, $port, $startup, $shutdown, $process_msg);
$self->dispatch_events_begin();
my $verbose = $self->{verbose};
my $options = $self->{options};
my $objects = $options->{init_objects};
my ($service_type, $name, $service);
foreach my $object (split(/ *[;,]+ */, $objects)) {
if ($object) {
if ($object =~ /^([A-Z][A-Za-z0-9]+)\.([A-Za-z0-9_-]+)$/) {
$service_type = $1;
$name = $2;
}
else {
$service_type = "SessionObject";
$name = $object;
}
$service = $self->service($service_type, $name); # instantiate
it. that's all.
$self->log("$service_type $name instantiated [$service]\n");
}
}
my $quit = 0;
$SIG{USR1} = sub { $self->log("Caught Signal: @_\n") if ($verbose); };
$SIG{USR2} = sub { $self->log("Caught Signal: @_\n") if ($verbose); };
$SIG{HUP} = sub { $self->log("Caught Signal: @_\n") if ($verbose); };
$SIG{INT} = sub { $self->log("Caught Signal: @_ (quitting)\n") if
($verbose); $quit = 1; };
$SIG{QUIT} = sub { $self->log("Caught Signal: @_ (quitting)\n") if
($verbose); $quit = 1; };
$SIG{TERM} = sub { $self->log("Caught Signal: @_ (quitting)\n") if
($verbose); $quit = 1; };
my $default_sleep_interval = 15*60;
my $listen_socket = $self->{listen_socket};
my ($connection_socket, $msg);
my ($event, @events);
my ($time, $time_of_next_event, $sleep_interval);
my ($pid, $exitval, $sig);
while (!$quit) {
eval {
$self->log("Checking for scheduled events.\n") if ($verbose >= 8);
$time = time();
$time_of_next_event = $self->get_current_events([EMAIL
PROTECTED], $time);
if ($#events > -1) {
foreach $event (@events) {
$self->send_event($event);
}
$time = time();
}
while (($pid = waitpid(-1,WNOHANG)) > 0) {
$exitval = $? >> 8;
$sig = $? & 255;
$self->log("Child $pid finished
[exitval=$exitval,sig=$sig]\n");
$self->cleanup_pid($pid, $exitval, $sig);
}
$time = time();
if ($time_of_next_event > 0) {
$sleep_interval = $time_of_next_event - $time;
$sleep_interval = 0 if ($sleep_interval < 0);
}
else {
$sleep_interval = $default_sleep_interval;
}
# TODO: if (sleep_interval == 0), use select() to see if anyone
is waiting, else ...
$self->log("Sleeping on socket: timeout($sleep_interval)\n") if
($verbose >= 3);
$listen_socket->timeout($sleep_interval);
$connection_socket = $listen_socket->accept();
if ($connection_socket) {
$connection_socket->autoflush(1);
$msg = <$connection_socket>;
$msg =~ s/[\015\012]+$//;
if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
$quit = 1;
}
else {
$self->process_msg($connection_socket, $msg);
}
$connection_socket->close();
}
};
if ($@) {
$self->log($@) if ($verbose);
}
}
$self->close_listen_socket();
$self->dispatch_events_end();
$self->shutdown();
&App::sub_exit() if ($App::trace);
}
sub dispatch_events_begin {
my ($self) = @_;
my $verbose = $self->{verbose};
$self->log("Starting Server on $self->{host}:$self->{port}\n") if
($verbose);
}
sub dispatch_events_end {
my ($self) = @_;
my $verbose = $self->{verbose};
$self->log("Stopping Server.\n") if ($verbose);
}
sub process_msg {
my ($self, $connection_socket, $msg) = @_;
$self->log("process_msg: [$msg]\n");
}
# TODO: Implement this as a fork() or a context-level message to a node to
fork().
# i.e. messages such as "EVENT:" and "EVENT-OK:"
# Save the callback_event according to an event_tag.
# Then implement cleanup_pid to send the callback_event.
sub send_async_event {
&App::sub_entry if ($App::trace);
my ($self, $event, $callback_event) = @_;
$self->send_event($event);
if ($callback_event) {
my $event_tag = "local-$$";
if (! $callback_event->{args}) {
$callback_event->{args} = [ $event_tag ];
}
$self->send_event($callback_event);
}
&App::sub_exit() if ($App::trace);
}
sub cleanup_pid {
&App::sub_entry if ($App::trace);
my ($self, $pid, $exitval, $sig) = @_;
# nothing to do
&App::sub_exit() if ($App::trace);
}
sub send_async_message {
&App::sub_entry if ($App::trace);
my ($self, $host, $port, $message) = @_;
my $pid = fork();
if (!$pid) { # running in child
$self->send_message($host, $port, $message);
exit(0);
}
&App::sub_exit() if ($App::trace);
}
sub send_message {
&App::sub_entry if ($App::trace);
my ($self, $host, $port, $message) = @_;
my $verbose = $self->{verbose};
$self->log("send_message($host, $port, $message)\n") if ($verbose >= 2);
if (!$port && $host =~ /^([^:]+):([0-9]+)$/) {
$host = $1;
$port = $2;
}
my $sock = IO::Socket::INET->new(
PeerAddr => $host,
PeerPort => $port,
Proto => "tcp",
Type => SOCK_STREAM,
);
my ($response);
if ($sock) {
eval {
$sock->autoflush(1);
print $sock $message, "\n";
$response = <$sock>;
$response =~ s/[\r\n]+$//;
close($sock);
};
if ($@) {
$response = "SEND ERROR: $@";
}
}
else {
$response = "CONNECT ERROR: $!";
}
$self->log("send_message($host, $port, ...) => [$response]\n") if
($verbose >= 2);
&App::sub_exit($response) if ($App::trace);
return($response);
}
1;