cvsuser 06/02/28 19:32:49
Modified: App-Context/lib/App/Context Cluster.pm Server.pm
Log:
send_async_event()
Revision Changes Path
1.3 +2 -12 p5ee/App-Context/lib/App/Context/Cluster.pm
Index: Cluster.pm
===================================================================
RCS file: /cvs/public/p5ee/App-Context/lib/App/Context/Cluster.pm,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- Cluster.pm 14 Mar 2005 17:35:20 -0000 1.2
+++ Cluster.pm 1 Mar 2006 03:32:49 -0000 1.3
@@ -6,7 +6,7 @@
package App::Context::Cluster;
use App;
-use App::Context;
+use App::Context::Server;
@ISA = ( "App::Context::Server" );
@@ -105,16 +105,6 @@
}
}
-sub process_msg {
- my ($self, $connection_socket, $msg) = @_;
- if ($self->{is_controller}) {
- $self->ctrl_process_msg($connection_socket, $msg);
- }
- else {
- $self->node_process_msg($connection_socket, $msg);
- }
-}
-
######################################################################
# Controller
######################################################################
1.2 +104 -30 p5ee/App-Context/lib/App/Context/Server.pm
Index: Server.pm
===================================================================
RCS file: /cvs/public/p5ee/App-Context/lib/App/Context/Server.pm,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- Server.pm 14 Mar 2005 17:35:20 -0000 1.1
+++ Server.pm 1 Mar 2006 03:32:49 -0000 1.2
@@ -49,6 +49,9 @@
$host =~ s/\..*//; # get rid of fully qualified domain name
$self->{host} = $host;
+ $self->{pending_async_events} = [];
+ $self->{pending_callback_event} = {};
+
$self->{verbose} = $options->{verbose};
$self->_init2a($options);
@@ -136,25 +139,32 @@
my $listen_socket = $self->{listen_socket};
my ($connection_socket, $msg);
my ($event, @events);
- my ($time, $time_of_next_event, $sleep_interval);
+ my ($time, $time_of_next_event, $sleep_interval, $event_occurred);
my ($pid, $exitval, $sig);
while (!$quit) {
eval {
+ $event_occurred = 0;
+ if ($#{$self->{pending_async_events}} > -1) {
+ $self->dispatch_pending_async_events();
+ $event_occurred = 1;
+ }
+ while (($pid = waitpid(-1,WNOHANG)) > 0) {
+ $event_occurred = 1;
+ $exitval = $? >> 8;
+ $sig = $? & 255;
+ $self->log("Child $pid finished
[exitval=$exitval,sig=$sig]\n");
+ $self->finish_pid($pid, $exitval, $sig);
+ }
$self->log("Checking for scheduled events.\n") if ($verbose >=
8);
$time = time();
$time_of_next_event = $self->get_current_events([EMAIL
PROTECTED], $time);
if ($#events > -1) {
+ $event_occurred = 1;
foreach $event (@events) {
$self->send_event($event);
}
$time = time();
}
- while (($pid = waitpid(-1,WNOHANG)) > 0) {
- $exitval = $? >> 8;
- $sig = $? & 255;
- $self->log("Child $pid finished
[exitval=$exitval,sig=$sig]\n");
- $self->cleanup_pid($pid, $exitval, $sig);
- }
$time = time();
if ($time_of_next_event > 0) {
$sleep_interval = $time_of_next_event - $time;
@@ -163,21 +173,23 @@
else {
$sleep_interval = $default_sleep_interval;
}
- # TODO: if (sleep_interval == 0), use select() to see if anyone
is waiting, else ...
- $self->log("Sleeping on socket: timeout($sleep_interval)\n") if
($verbose >= 3);
- $listen_socket->timeout($sleep_interval);
- $connection_socket = $listen_socket->accept();
- if ($connection_socket) {
- $connection_socket->autoflush(1);
- $msg = <$connection_socket>;
- $msg =~ s/[\015\012]+$//;
- if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
- $quit = 1;
- }
- else {
- $self->process_msg($connection_socket, $msg);
+ if (!$event_occurred) {
+ # TODO: if (sleep_interval == 0), use select() to see if
anyone is waiting, else ...
+ $self->log("Sleeping on socket: timeout($sleep_interval)\n")
if ($verbose >= 3);
+ $listen_socket->timeout($sleep_interval);
+ $connection_socket = $listen_socket->accept();
+ if ($connection_socket) {
+ $connection_socket->autoflush(1);
+ $msg = <$connection_socket>;
+ $msg =~ s/[\015\012]+$//;
+ if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
+ $quit = 1;
+ }
+ else {
+ $self->process_msg($connection_socket, $msg);
+ }
+ $connection_socket->close();
}
- $connection_socket->close();
}
};
if ($@) {
@@ -206,30 +218,92 @@
sub process_msg {
my ($self, $connection_socket, $msg) = @_;
$self->log("process_msg: [$msg]\n");
+ my $verbose = $self->{verbose};
+ my $processing_complete = $self->process_app_msg($connection_socket,
$msg);
+ if (!$processing_complete) {
+ if ($msg =~ s/^GET//) {
+ $connection_socket->print("HTTP/1.0 200 OK\n");
+ $connection_socket->print("Content-type: text/plain\n");
+ $connection_socket->print("\n");
+ $connection_socket->print("$self->{host}:$self->{port} as a
server\n");
+ $connection_socket->print($self->state());
+ }
+ else {
+ $connection_socket->print("unknown [$msg]\n");
+ }
+ }
+ &App::sub_exit() if ($App::trace);
+}
+
+sub process_app_msg {
+ &App::sub_entry if ($App::trace);
+ my ($self, $connection_socket, $msg) = @_;
+ my $processing_complete = 0;
+ &App::sub_exit($processing_complete) if ($App::trace);
+ return($processing_complete);
+}
+
+sub state {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+ my $state = "";
+ &App::sub_exit($state) if ($App::trace);
+ return($state);
}
# TODO: Implement this as a fork() or a context-level message to a node to
fork().
# i.e. messages such as "EVENT:" and "EVENT-OK:"
# Save the callback_event according to an event_tag.
# Then implement cleanup_pid to send the callback_event.
+
sub send_async_event {
&App::sub_entry if ($App::trace);
my ($self, $event, $callback_event) = @_;
- $self->send_event($event);
+ push(@{$self->{pending_async_events}}, [ $event, $callback_event ]);
+ &App::sub_exit() if ($App::trace);
+}
+
+sub dispatch_pending_async_events {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+ my $pending_async_events = $self->{pending_async_events};
+ my ($async_event);
+ while ($#$pending_async_events > -1) {
+ $async_event = shift(@$pending_async_events);
+ $self->send_async_event_now(@$async_event);
+ }
+ &App::sub_exit() if ($App::trace);
+}
+
+sub send_async_event_now {
+ &App::sub_entry if ($App::trace);
+ my ($self, $event, $callback_event) = @_;
+ my $pid = fork();
+ if (!$pid) { # running in child
+ $self->send_event($event);
+ exit(0);
+ }
if ($callback_event) {
- my $event_tag = "local-$$";
- if (! $callback_event->{args}) {
- $callback_event->{args} = [ $event_tag ];
- }
- $self->send_event($callback_event);
+ my $event_tag = "local-$pid";
+ $self->{pending_callback_event}{$event_tag} = $callback_event;
}
&App::sub_exit() if ($App::trace);
}
-sub cleanup_pid {
+sub finish_pid {
&App::sub_entry if ($App::trace);
my ($self, $pid, $exitval, $sig) = @_;
- # nothing to do
+
+ my $event_tag = "local-$pid";
+ my $callback_event = $self->{pending_callback_event}{$event_tag};
+ if ($callback_event) {
+ delete $self->{pending_callback_event}{$event_tag};
+ $callback_event->{args} = [] if (! $callback_event->{args});
+ my $message = (!$exitval || !$sig) ? "Error $exitval on $pid
[sig=$sig]" : "";
+ push(@{$callback_event->{args}}, {location => $event_tag, returnval
=> $exitval, message => "Sig $sig"});
+ $self->send_event($callback_event);
+ }
+
&App::sub_exit() if ($App::trace);
}