Author: spadkins
Date: Tue May 2 06:53:20 2006
New Revision: 6005
Modified:
p5ee/trunk/App-Context/lib/App/Context/ClusterController.pm
p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm
p5ee/trunk/App-Context/lib/App/Context/Server.pm
Log:
test whether these are ready for production
Modified: p5ee/trunk/App-Context/lib/App/Context/ClusterController.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/ClusterController.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/ClusterController.pm Tue May 2
06:53:20 2006
@@ -37,15 +37,16 @@
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
die "Controller must have a port defined (\$context->{options}{port})" if
(!$self->{port});
- $self->{num_node_events} = 0;
- $self->{max_node_events} =
$self->{options}{"app.context.cluster_controller.max_node_events"} || 10;
+ $self->{num_async_events} = 0;
+ $self->{max_async_events_per_node} =
$self->{options}{"app.context.max_async_events_per_node"} || 10;
+ $self->{max_async_events} = 0; # start with 0 because there are no nodes
up
&App::sub_exit() if ($App::trace);
}
sub _init2b {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
- # nothing yet
+ $self->startup_nodes($options) if ($options->{startup});
&App::sub_exit() if ($App::trace);
}
@@ -61,7 +62,6 @@
# nothing special yet
}
-# CONTROLLER ONLY
sub send_async_event_now {
&App::sub_entry if ($App::trace);
my ($self, $event, $callback_event) = @_;
@@ -77,13 +77,14 @@
if ($event->{args}) {
$args = $self->{rpc_serializer}->serialize($event->{args});
}
- my $response = $self->send_message($node_host, $node_port,
"ASYNC-EVENT:$event->{service_type}:$event->{name}:$event->{method}:$args");
- $self->{num_node_events}++;
- if ($callback_event) {
- if ($response =~ /^ASYNC-EVENT-TOKEN:(.+)/) {
- my $remote_event_token = $1;
- $self->{pending_callback_event}{$remote_event_token} =
$callback_event;
- }
+ my $response = $self->send_message($node_host, $node_port,
"ASYNC-EVENT:$event->{service_type}:$event->{name}:$event->{method}:$args", 1,
undef, 1);
+ if ($response =~ /^ASYNC-EVENT-TOKEN:(.+)/) {
+ my $runtime_event_token = $1;
+ $self->{num_async_events}++;
+ $self->{running_async_event}{$runtime_event_token} = [ $event,
$callback_event ];
+ }
+ elsif ($response =~ /ERROR/) {
+ $self->set_node_down("$node_host:$node_port");
}
}
else {
@@ -92,11 +93,31 @@
&App::sub_exit() if ($App::trace);
}
+# $runtime_event_tokens take the following forms:
+# $runtime_event_token = $pid; --
App::Context::Server::send_async_event_now() and ::finish_pid()
+# $runtime_event_token = "$host-$port-$serial"; -- i.e. a plain event token
on the node
+sub _abort_running_async_event {
+ &App::sub_entry if ($App::trace);
+ my ($self, $runtime_event_token, $event, $callback_event) = @_;
+ if ($runtime_event_token =~ /^[0-9]+$/) {
+ kill(9, $runtime_event_token);
+ }
+ elsif ($runtime_event_token =~ /^([^-]+)-([0-9]+)-/) {
+ my $node_host = $1;
+ my $node_port = $2;
+ $self->send_async_message($node_host, $node_port,
"ABORT-ASYNC-EVENT:$runtime_event_token");
+ }
+ else {
+ $self->log("Unable to abort running async event [$runtime_event_token]
(controller)\n");
+ }
+ &App::sub_exit() if ($App::trace);
+}
+
sub assign_event_destination {
&App::sub_entry if ($App::trace);
my ($self, $event) = @_;
my $assigned = 0;
- if ($self->{num_node_events} < $self->{max_node_events}) {
+ if ($self->{num_async_events} < $self->{max_async_events}) {
$event->{destination} = $self->{host};
$assigned = $self->assign_event_destination_by_round_robin($event);
}
@@ -125,27 +146,20 @@
sub process_msg {
&App::sub_entry if ($App::trace);
- my ($self, $connection_socket, $msg) = @_;
+ my ($self, $msg) = @_;
my $verbose = $self->{verbose};
$self->log({level=>3},"process_msg: [$msg]\n");
- my $processing_complete = $self->process_custom_msg($connection_socket,
$msg);
- if (!$processing_complete) {
+ my $return_value = $self->process_custom_msg($msg);
+ if (!$return_value) {
if ($msg =~ /^NODE-UP:(.*)/) {
- my $resp = $self->set_node_up($1) || "ok";
- $connection_socket->print("$resp\n");
+ $return_value = $self->set_node_up($1);
}
elsif ($msg =~ /^NODE-DOWN:(.*)/) {
$self->set_node_down($1);
- $connection_socket->print("ok\n");
- }
- elsif ($msg =~ s/^GET//) {
- $connection_socket->print("HTTP/1.0 200 OK\n");
- $connection_socket->print("Content-type: text/plain\n");
- $connection_socket->print("\n");
- $connection_socket->print($self->state());
+ $return_value = "OK";
}
elsif ($msg =~ /^ASYNC-EVENT-RESULTS:([^:]+):(.*)$/) {
- my $remote_event_token = $1;
+ my $runtime_event_token = $1;
my $results = $2;
if ($results ne "") {
$results = $self->{rpc_serializer}->deserialize($results);
@@ -153,31 +167,41 @@
$results = $results->[0];
}
}
- $connection_socket->print("OK\n");
- $self->{num_node_events}--;
- my $callback_event =
$self->{pending_callback_event}{$remote_event_token};
- if ($callback_event) {
- delete $self->{pending_callback_event}{$remote_event_token};
- $callback_event->{args} = [] if (! $callback_event->{args});
- push(@{$callback_event->{args}},
- {event_token => $callback_event->{event_token}, returnval
=> $results, errnum => 0, errmsg => ""});
- $self->send_event($callback_event);
+ my $async_event =
$self->{running_async_event}{$runtime_event_token};
+
+ if ($async_event) {
+ $self->{num_async_events}--;
+ delete $self->{running_async_event}{$runtime_event_token};
+
+ my ($event, $callback_event) = @$async_event;
+ if ($callback_event) {
+ $callback_event->{args} = [] if (!
$callback_event->{args});
+ push(@{$callback_event->{args}},
+ {event_token => $callback_event->{event_token},
returnval => $results, errnum => 0, errmsg => ""});
+ $self->send_event($callback_event);
+ }
}
+ else {
+ $self->log("WARNING: Unexpected Async Event Results:
[$msg]\n");
+ }
+ $return_value = "OK";
}
else {
- $connection_socket->print("unknown [$msg]\n");
+ $self->log("ERROR: unknown [$msg]\n");
+ $return_value = "unknown [$msg]";
}
}
- &App::sub_exit() if ($App::trace);
+ &App::sub_exit($return_value) if ($App::trace);
+ return($return_value);
}
# Can be overridden to provide customized processing.
sub process_custom_msg {
&App::sub_entry if ($App::trace);
- my ($self, $connection_socket, $msg) = @_;
- my $processing_complete = 0;
- &App::sub_exit($processing_complete) if ($App::trace);
- return($processing_complete);
+ my ($self, $msg) = @_;
+ my $return_value = "";
+ &App::sub_exit($return_value) if ($App::trace);
+ return($return_value);
}
sub state {
@@ -185,7 +209,7 @@
my ($self) = @_;
my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
- my $state = "Cluster Controller:
$self->{host}:$self->{port}\n[$datetime]\n";
+ my $state = "Cluster Controller: $self->{host}:$self->{port}
procs[$self->{num_procs}/$self->{max_procs}:max]
async_events[$self->{num_async_events}/$self->{max_async_events}:max/$self->{max_async_events_per_node}:per]\n[$datetime]\n";
$state .= "\n";
$state .= $self->_state();
@@ -199,6 +223,13 @@
my $state = "";
+ my (@nodes);
+ @nodes = @{$self->{nodes}} if ($self->{nodes});
+ $state .= "Nodes: up [EMAIL PROTECTED] last dispatched
[$self->{last_node_idx}]\n";
+ foreach my $node (sort keys %{$self->{node}}) {
+ $state .= sprintf(" %-16s %4s\n", $node, $self->{node}{$node}{up} ?
"UP" : "down");
+ }
+
$state .= $self->SUPER::_state();
&App::sub_exit($state) if ($App::trace);
@@ -208,6 +239,9 @@
sub set_node_down {
&App::sub_entry if ($App::trace);
my ($self, $node) = @_;
+ my $runtime_event_token_prefix = $node;
+ $runtime_event_token_prefix =~ s/:/-/;
+ $self->reset_running_async_events($runtime_event_token_prefix);
$self->{node}{$node}{up} = 0;
$self->set_nodes();
&App::sub_exit() if ($App::trace);
@@ -239,6 +273,102 @@
}
}
$self->{nodes} = [EMAIL PROTECTED];
+ $self->{max_async_events} = $self->{max_async_events_per_node} * ($#nodes
+ 1);
+ my $main_service = $self->{main_service};
+ if ($main_service && $main_service->can("capacity_change")) {
+ $main_service->capacity_change($self->{max_async_events}, [EMAIL
PROTECTED], $self->{node});
+ }
+ &App::sub_exit() if ($App::trace);
+}
+
+sub shutdown {
+ &App::sub_entry if ($App::trace);
+ my $self = shift;
+ $self->shutdown_nodes();
+ $self->write_node_file();
+ $self->SUPER::shutdown();
+ &App::sub_exit() if ($App::trace);
+}
+
+sub shutdown_nodes {
+ &App::sub_entry if ($App::trace);
+ my $self = shift;
+ foreach my $node (@{$self->{nodes}}) {
+ $self->send_message($node, undef, "QUIT", 0, undef, 1);
+ }
+ &App::sub_exit() if ($App::trace);
+}
+
+sub startup_nodes {
+ &App::sub_entry if ($App::trace);
+ my ($self, $options) = @_;
+
+ my $startup = $options->{startup};
+
+ if ($startup eq "1") {
+ $self->read_node_file();
+ }
+ else {
+ foreach my $node (split(/,/,$startup)) {
+ $self->{node}{$node} = {};
+ }
+ }
+
+ my ($msg, $host, $port, $cmd);
+ my $cmd_fmt = $self->{options}{"app.context.node_start_cmd"} || "ssh %s
'mvnode --port=%s > /dev/null 2>&1 &'";
+ foreach my $node (keys %{$self->{node}}) {
+ $msg = $self->send_message($node, undef, "CONTROLLER-UP:", 0, undef,
1);
+ if ($msg =~ /ERROR:/) {
+ if ($node =~ /^([^:]+):([0-9]+)$/) {
+ $host = $1;
+ $port = $2;
+ $cmd = sprintf($cmd_fmt, $host, $port);
+ $self->log("Starting Node [$node]: [$cmd]\n");
+ system("nohup $cmd < /dev/null > /dev/null 2>&1 &");
+ }
+ }
+ }
+ &App::sub_exit() if ($App::trace);
+}
+
+sub write_node_file {
+ &App::sub_entry if ($App::trace);
+ my $self = shift;
+ my $prefix = $self->{options}{prefix};
+ my $node_file =
"$prefix/log/$self->{options}{app}-$self->{host}:$self->{port}.nodes";
+ if (open(FILE, "> $node_file")) {
+ foreach my $node (@{$self->{nodes}}) {
+ print App::Context::ClusterController::FILE "$node\n";
+ }
+ close(App::Context::ClusterController::FILE);
+ }
+ else {
+ $self->log("WARNING: Can't write node file [$node_file]: $!\n");
+ }
+ &App::sub_exit() if ($App::trace);
+}
+
+sub read_node_file {
+ &App::sub_entry if ($App::trace);
+ my $self = shift;
+ my $prefix = $self->{options}{prefix};
+ my $node_file =
"$prefix/log/$self->{options}{app}-$self->{host}:$self->{port}.nodes";
+ my ($node);
+ if (open(FILE, "< $node_file")) {
+ while (<App::Context::ClusterController::FILE>) {
+ chomp;
+ if (/^[^:]+:[0-9]+$/) {
+ $node = $_;
+ # just take note of its existence. we don't know yet if it is
up.
+ $self->{node}{$node} = {} if (!defined $self->{node}{$node});
+ }
+ }
+ close(App::Context::ClusterController::FILE);
+ }
+ else {
+ # This is not really a problem.
+ # $self->log("WARNING: Can't read node file [$node_file]: $!\n");
+ }
&App::sub_exit() if ($App::trace);
}
Modified: p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm Tue May 2
06:53:20 2006
@@ -76,24 +76,22 @@
my $controller_port = $self->{controller_port};
my $node_host = $self->{host};
my $node_port = $self->{port};
- $self->send_async_message($controller_host, $controller_port,
"NODE-DOWN:$node_host:$node_port");
+ # We need to close the listen socket before we do a synchronous connection
to the controller
+ # in order to avoid deadlock.
+ $self->close_listen_socket();
+ # This message needs to be synchronous, otherwise the parent will kill the
subprocess during shutdown.
+ $self->send_message($controller_host, $controller_port,
"NODE-DOWN:$node_host:$node_port");
&App::sub_exit() if ($App::trace);
}
sub process_msg {
&App::sub_entry if ($App::trace);
- my ($self, $connection_socket, $msg) = @_;
+ my ($self, $msg) = @_;
my $verbose = $self->{verbose};
$self->log({level=>3},"process_msg: [$msg]\n");
- my $processing_complete = $self->process_custom_msg($connection_socket,
$msg);
- if (!$processing_complete) {
- if ($msg =~ s/^GET//) {
- $connection_socket->print("HTTP/1.0 200 OK\n");
- $connection_socket->print("Content-type: text/plain\n");
- $connection_socket->print("\n");
- $connection_socket->print($self->state());
- }
- elsif ($msg =~ /^ASYNC-EVENT:([^:]+):([^:]+):([^:]+):(.*)$/) {
+ my $return_value = $self->process_custom_msg($msg);
+ if (!$return_value) {
+ if ($msg =~ /^ASYNC-EVENT:([^:]+):([^:]+):([^:]+):(.*)$/) {
my %event = (
service_type => $1,
name => $2,
@@ -103,22 +101,39 @@
$event{args} = $self->{rpc_serializer}->deserialize($args) if
($args ne "");
my $event_token = $self->send_async_event({method =>
"process_async_event", args => [\%event],});
$event{event_token} = $event_token;
- $connection_socket->print("ASYNC-EVENT-TOKEN:$event_token\n");
+ $return_value = "ASYNC-EVENT-TOKEN:$event_token\n";
+ }
+ elsif ($msg =~ /^CONTROLLER-UP:/) {
+ my $controller_host = $self->{controller_host};
+ my $controller_port = $self->{controller_port};
+ my $node_host = $self->{host};
+ my $node_port = $self->{port};
+ $self->send_async_event({
+ method => "send_async_message",
+ args => [ $controller_host, $controller_port,
"NODE-UP:$node_host:$node_port" ],
+ });
+ $return_value = "OK";
+ }
+ elsif ($msg =~ /^ABORT-ASYNC-EVENT:(.*)/) {
+ my $event_token = $1;
+ $self->abort_async_event($event_token);
+ $return_value = "OK";
}
else {
- $connection_socket->print("unknown [$msg]\n");
+ $return_value = "ERROR: unknown [$msg]";
}
}
- &App::sub_exit() if ($App::trace);
+ &App::sub_exit($return_value) if ($App::trace);
+ return($return_value);
}
# Can be overridden to provide customized processing.
sub process_custom_msg {
&App::sub_entry if ($App::trace);
- my ($self, $connection_socket, $msg) = @_;
- my $processing_complete = 0;
- &App::sub_exit($processing_complete) if ($App::trace);
- return($processing_complete);
+ my ($self, $msg) = @_;
+ my $return_value = "";
+ &App::sub_exit($return_value) if ($App::trace);
+ return($return_value);
}
sub state {
@@ -126,7 +141,7 @@
my ($self) = @_;
my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
- my $state = "Cluster Node: $self->{host}:$self->{port}\n[$datetime]\n";
+ my $state = "Cluster Node: $self->{host}:$self->{port}
procs[$self->{num_procs}/$self->{max_procs}:max]
async_events[$self->{num_async_events}/$self->{max_async_events}:max]\n[$datetime]\n";
$state .= "\n";
$state .= $self->_state();
@@ -149,12 +164,26 @@
sub process_async_event {
&App::sub_entry if ($App::trace);
my ($self, $event) = @_;
- my $results = $self->send_event($event);
+ my ($results);
+ eval {
+ $results = $self->send_event($event);
+ };
+ if ($@) {
+ $results = $@;
+ }
my $results_txt = $self->{rpc_serializer}->serialize($results);
my $msg = "ASYNC-EVENT-RESULTS:$event->{event_token}:$results_txt";
$self->send_message($self->{controller_host}, $self->{controller_port},
$msg);
- &App::sub_exit(undef) if ($App::trace);
- return(undef);
+ &App::sub_exit() if ($App::trace);
+}
+
+sub finish_killed_async_event {
+ &App::sub_entry if ($App::trace);
+ my ($self, $event) = @_;
+ my $results_txt = $self->{rpc_serializer}->serialize("Subrequest Killed");
+ my $msg = "ASYNC-EVENT-RESULTS:$event->{event_token}:$results_txt";
+ $self->send_async_message($self->{controller_host},
$self->{controller_port}, $msg);
+ &App::sub_exit() if ($App::trace);
}
1;
Modified: p5ee/trunk/App-Context/lib/App/Context/Server.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/Server.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/Server.pm Tue May 2 06:53:20 2006
@@ -16,6 +16,8 @@
use IO::Socket;
use IO::Socket::INET;
use POSIX ":sys_wait_h";
+use Date::Format;
+use Date::Parse;
use strict;
@@ -42,6 +44,10 @@
my ($self, $options) = @_;
$options = {} if (!defined $options);
+ $self->SUPER::_init($options);
+
+ App->mkdir($options->{prefix}, "data", "app", "Context");
+
$| = 1; # autoflush STDOUT (not sure this is required)
open(STDERR, ">&STDOUT") || die "Unable to redirect STDERR to STDOUT";
@@ -49,9 +55,15 @@
$self->{hostname} = $host;
$host =~ s/\..*//; # get rid of fully qualified domain name
$self->{host} = $host;
+ $self->{port} = $options->{port} || 8080;
+ $self->{num_procs} = 0;
+ $self->{max_procs} = $self->{options}{"app.context.max_procs"} || 10;
+ $self->{max_async_events} =
$self->{options}{"app.context.max_async_events"}
+ if (defined $self->{options}{"app.context.max_async_events"});
+ $self->{async_event_count} = 0;
$self->{pending_async_events} = [];
- $self->{pending_callback_event} = {};
+ $self->{running_async_event} = {};
$self->{verbose} = $options->{verbose};
@@ -59,11 +71,34 @@
my $listen_socket = IO::Socket::INET->new(
Proto => "tcp",
+ # LocalAddr => $self->{host}, # allow both the "hostname" and
"localhost" to be used
LocalPort => $self->{port},
Listen => SOMAXCONN,
- ) || die "Unable to listen on $host:$self->{port} - $!";
+ ReuseAddr => 1,
+ ) || die "Unable to listen on $self->{host}:$self->{port} - $!";
$self->{listen_socket} = $listen_socket;
+ my $listen_fd = fileno($listen_socket);
+ my $listen_vec;
+ vec($listen_vec, $listen_fd, 1) = 1;
+ $self->{listen_vec} = $listen_vec;
+
+ $self->{rpc_serializer} = $self->serializer("server_rpc", class =>
"App::Serializer::Perl", indent => 0);
+
+ if ($self->{options}{log_rotate}) {
+ my $rotate_sec = $self->{options}{log_rotate};
+ $rotate_sec = $rotate_sec*(24*3600) if ($rotate_sec <= 31);
+ my $time = time();
+ my $base_time = str2time(time2str("%Y-%m-%d 00:00:00", $time)); # I
need a base which is midnight local time
+ my $next_rotate_time = ((int(($time -
$base_time)/$rotate_sec)+1)*$rotate_sec) + $base_time;
+ $self->schedule_event(
+ tag => "context-log-rotation",
+ method => "log_file_open",
+ args => [0], # don't overwrite
+ time => $next_rotate_time,
+ interval => $rotate_sec, # and every X seconds hereafter
+ );
+ }
$self->_init2b($options);
@@ -73,7 +108,6 @@
sub _init2a {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
- $self->{port} = $options->{port} || $options->{controller_port} || 8080;
&App::sub_exit() if ($App::trace);
}
@@ -87,8 +121,41 @@
&App::sub_entry if ($App::trace);
my ($self) = @_;
if ($self->{listen_socket}) {
- $self->{listen_socket}->close();
+ my $listen_socket = $self->{listen_socket};
+ my $listen_fd = fileno($listen_socket);
+ $self->log({level=>4},"Closed listen socket($listen_fd)\n");
+ $listen_socket->close();
+ $listen_socket = undef;
delete $self->{listen_socket};
+ delete $self->{listen_vec};
+ }
+ &App::sub_exit() if ($App::trace);
+}
+
+sub shutdown_unshareable_resources {
+ &App::sub_entry if ($App::trace);
+ my $self = shift;
+ $self->close_listen_socket();
+ $self->SUPER::shutdown_unshareable_resources();
+ &App::sub_exit() if ($App::trace);
+}
+
+sub shutdown {
+ &App::sub_entry if ($App::trace);
+ my $self = shift;
+ $self->close_listen_socket();
+ $self->shutdown_child_processes();
+ $self->SUPER::shutdown();
+ &App::sub_exit() if ($App::trace);
+}
+
+sub shutdown_child_processes {
+ &App::sub_entry if ($App::trace);
+ my $self = shift;
+ if ($self->{proc}) {
+ foreach my $pid (keys %{$self->{proc}}) {
+ kill(15, $pid);
+ }
}
&App::sub_exit() if ($App::trace);
}
@@ -102,9 +169,9 @@
sub dispatch_events {
&App::sub_entry if ($App::trace);
- my ($self) = @_;
+ my ($self, $max_events_occurred) = @_;
- my ($role, $port, $startup, $shutdown, $process_msg);
+ my ($role, $port, $startup, $shutdown);
$self->dispatch_events_begin();
my $verbose = $self->{verbose};
@@ -123,82 +190,165 @@
$name = $object;
}
$service = $self->service($service_type, $name); # instantiate
it. that's all.
- $self->log("$service_type $name instantiated [$service]\n");
+ $self->log({level=>3},"$service_type $name instantiated
[$service]\n");
+ $self->{main_service} = $service if (!$self->{main_service});
}
}
my $quit = 0;
- $SIG{USR1} = sub { $self->log("Caught Signal: @_\n") if ($verbose); };
- $SIG{USR2} = sub { $self->log("Caught Signal: @_\n") if ($verbose); };
- $SIG{HUP} = sub { $self->log("Caught Signal: @_\n") if ($verbose); };
- $SIG{INT} = sub { $self->log("Caught Signal: @_ (quitting)\n") if
($verbose); $quit = 1; };
- $SIG{QUIT} = sub { $self->log("Caught Signal: @_ (quitting)\n") if
($verbose); $quit = 1; };
- $SIG{TERM} = sub { $self->log("Caught Signal: @_ (quitting)\n") if
($verbose); $quit = 1; };
- $SIG{CHLD} = "DEFAULT";
+
+ $SIG{HUP} = sub { $self->log({level=>2},"Caught Signal: @_\n"); };
# SIG 1
+ $SIG{INT} = sub { $self->log({level=>2},"Caught Signal: @_
(quitting)\n"); $quit = 1; }; # SIG 2
+ $SIG{QUIT} = sub { $self->log({level=>2},"Caught Signal: @_
(quitting)\n"); $quit = 1; }; # SIG 3
+ $SIG{USR1} = sub { $self->log({level=>2},"Caught Signal: @_\n"); };
# SIG 10
+ $SIG{USR2} = sub { $self->log({level=>2},"Caught Signal: @_\n"); };
# SIG 12
+ $SIG{TERM} = sub { $self->log({level=>2},"Caught Signal: @_
(quitting)\n"); $quit = 1; }; # SIG 15
+ $SIG{CHLD} = "DEFAULT";
# SIG 17
my $default_sleep_interval = 15*60;
my $listen_socket = $self->{listen_socket};
- my ($connection_socket, $msg);
+ my $listen_vec = $self->{listen_vec};
+ my $listen_fd = fileno($listen_socket);
+ my ($connection_socket, $connection_fd, $msg, $accept_worthwhile);
my ($event, @events);
- my ($time, $time_of_next_event, $sleep_interval, $event_occurred);
+ my $event_loop_extensions = $self->{event_loop_extensions};
+ my ($extension, $obj, $method, $args, $extension_idx,
$extension_events_occurred);
+ my $last_extension_idx = -1;
+ my ($time, $time_of_next_event, $sleep_interval);
+ my $total_events_occurred = 0;
+ my ($events_occurred);
my ($pid, $exitval, $sig);
+ my ($await_return_value, $server_close, $return_value);
while (!$quit) {
eval {
- $event_occurred = 0;
+ $events_occurred = 0;
if ($#{$self->{pending_async_events}} > -1) {
- $self->dispatch_pending_async_events();
- $event_occurred = 1;
+ $events_occurred += $self->dispatch_pending_async_events();
}
- while (($pid = waitpid(-1,WNOHANG)) > 0) {
- $event_occurred = 1;
- $exitval = $? >> 8;
- $sig = $? & 255;
- $self->log("Child $pid finished
[exitval=$exitval,sig=$sig]\n");
- $self->finish_pid($pid, $exitval, $sig);
- }
- $self->log("Checking for scheduled events.\n") if ($verbose >= 8);
+ $events_occurred += $self->dispatch_finished_processes();
+
+ # Scheduled events: Every time through the loop, we check to see
+ # if it is time for a scheduled event to occur. If so, we send
+ # each of those events out.
+ $self->log({level=>4},"Checking for scheduled events.\n");
$time = time();
$time_of_next_event = $self->get_current_events([EMAIL PROTECTED],
$time);
if ($#events > -1) {
- $event_occurred = 1;
foreach $event (@events) {
$self->send_event($event);
+ $events_occurred++;
}
$time = time();
}
- $time = time();
- if ($time_of_next_event > 0) {
- $sleep_interval = $time_of_next_event - $time;
- $sleep_interval = 0 if ($sleep_interval < 0);
- }
- else {
- $sleep_interval = $default_sleep_interval;
+
+ # Registered Extensions to the Event Loop: These are lower
priority.
+ # We only allow the extensions to be run in any given iteration
through
+ # the event loop if we have no other core event that has occurred.
+ # Even then, we only allow one extension (that returns true) to run
+ # in each iteration, and we check them in round-robin fashion so
that
+ # one extension does not get more attention than the others.
+ if (!$events_occurred) {
+ $extension_idx = $last_extension_idx; # start with last
executed extension
+ for (my $i = 0; $i <= $#$event_loop_extensions; $i++) {
+ $extension_idx ++; # increment it in round-robin fashion
+ $extension_idx = 0 if ($extension_idx >
$#$event_loop_extensions);
+ $extension = $event_loop_extensions->[$extension_idx];
+ ($obj, $method, $args) = @$extension;
+ $extension_events_occurred = $obj->$method(@$args); #
execute extension and ...
+ if ($extension_events_occurred) { # check return value
for true
+ $last_extension_idx = $extension_idx;
+ $events_occurred += $extension_events_occurred;
+ last;
+ }
+ }
}
- if (!$event_occurred) {
+
+ if (!$events_occurred) {
+ # Sleep Interval: Based on when the next event is scheduled
and the current
+ # time, we determine the sleep interval.
+ $time = time();
+ if ($time_of_next_event > 0) {
+ $sleep_interval = $time_of_next_event - $time;
+ $sleep_interval = 0 if ($sleep_interval < 0);
+ }
+ else {
+ $sleep_interval = $default_sleep_interval;
+ }
+
# TODO: if (sleep_interval == 0), use select() to see if
anyone is waiting, else ...
- $self->log("Sleeping on socket: timeout($sleep_interval)\n")
if ($verbose >= 3);
- $listen_socket->timeout($sleep_interval);
- $SIG{CHLD} = sub { $self->log("Caught Signal: @_\n") if
($verbose); };
- $connection_socket = $listen_socket->accept();
- $SIG{CHLD} = "DEFAULT";
- if ($connection_socket) {
- $connection_socket->autoflush(1);
- $msg = <$connection_socket>;
- $msg =~ s/[\015\012]+$//;
- if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
- $quit = 1;
+ $self->log({level=>4},"Listening on socket($listen_fd):
timeout($sleep_interval)\n");
+ $accept_worthwhile = 1;
+ # NOTE: to understand why I do this section of code, read the
3rd paragraph under the
+ # accept() method of IO::Socket (i.e. "man IO::Socket") or
read it here.
+ # http://perldoc.perl.org/IO/Socket.html
+ if ($sleep_interval == 0) {
+ if (select($listen_vec, undef, $listen_vec, 0) == 0) { #
nothing happening on the socket
+ $accept_worthwhile = 0; # don't bother to call
accept() on it
}
- else {
- $self->process_msg($connection_socket, $msg);
+ }
+ # Here is the truth table for $await_return_value,
$server_close
+ # $await_return_value $server_close = client
+ server
+ # ------------------- -------------
---------------------- ---------------------
+ # 0 0 write/close
read/close
+ # 0 1 write/read/close
read/close
+ # 1 0
write/read/write/close read/write/read/close
+ # 1 1 write/read/close
read/write/close
+ # See: http://hea-www.harvard.edu/~fine/Tech/addrinuse.html
+ if ($accept_worthwhile) {
+ $listen_socket->timeout($sleep_interval);
+ #$SIG{CHLD} = sub { $self->log({level=>4},"Caught Signal:
@_\n"); };
+ $SIG{CHLD} = sub { }; # the point is to interrupt the
accept() system call, not to do anything.
+ $connection_socket = $listen_socket->accept();
+ $SIG{CHLD} = "DEFAULT";
+ if ($connection_socket) {
+ $connection_fd = fileno($connection_socket);
+ $msg = $connection_socket->getline();
+ $self->log({level=>4},"Message on
socket($connection_fd) [$msg]\n");
+ if ($msg) {
+ $await_return_value = ($msg =~ s/^RV-//);
+ $server_close = ($msg =~ s/^SC-//);
+ $msg =~ s/[\015\012]+$//;
+ if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
+ $quit = 1;
+ }
+ elsif ($msg =~ s/^GET//) {
+ $await_return_value = 1;
+ my $content = $self->state();
+ my $content_length = length($content);
+ $return_value = <<EOF;
+HTTP/1.1 200 OK
+Content-type: text/plain
+Content-length: $content_length
+Connection: close
+
+$content
+EOF
+ }
+ else {
+ $return_value = $self->process_msg($msg);
+ $return_value .= "\n" if ($return_value !~
/\n$/);
+ }
+ if ($await_return_value) {
+ $self->log({level=>4},"Returned on
socket($connection_fd) [$msg]\n") if ($msg !~ /^GET/);
+ $connection_socket->autoflush(1);
+ $connection_socket->print($return_value);
+ $connection_socket->getline() if
(!$server_close);
+ }
+ $connection_socket->close();
+ }
+ else {
+ $connection_socket->close();
+ }
}
- $connection_socket->close();
}
}
};
if ($@) {
- $self->log($@) if ($verbose);
+ $self->log($@);
}
+ $total_events_occurred += $events_occurred;
+ $quit = 1 if ($max_events_occurred && $total_events_occurred >=
$max_events_occurred);
}
$self->close_listen_socket();
@@ -207,159 +357,486 @@
&App::sub_exit() if ($App::trace);
}
+sub dispatch_network_events {
+ &App::sub_entry if ($App::trace);
+ my ($self, $sleep_interval) = @_;
+
+ $sleep_interval ||= 0;
+ my $verbose = $self->{verbose};
+ my $events_occurred = 0;
+
+ my ($connection_socket, $msg, $accept_worthwhile);
+ $self->log({level=>4},"Listening on socket: timeout($sleep_interval)\n");
+ my $listen_socket = $self->{listen_socket};
+ my $listen_vec = $self->{listen_vec};
+
+ $accept_worthwhile = 1;
+ if ($sleep_interval == 0) {
+ # NOTE: to understand why I do this section of code, read the 3rd
paragraph under the
+ # accept() method of IO::Socket (i.e. "man IO::Socket") or read it
here.
+ # http://perldoc.perl.org/IO/Socket.html
+ if (select($listen_vec, undef, $listen_vec, 0) == 0) { # nothing
happening on the socket
+ $accept_worthwhile = 0; # don't bother to call accept() on it
+ }
+ }
+
+ if ($accept_worthwhile) {
+ $listen_socket->timeout($sleep_interval);
+ $SIG{CHLD} = sub { };
+ $connection_socket = $listen_socket->accept();
+ $SIG{CHLD} = "DEFAULT";
+ if ($connection_socket) {
+ $connection_socket->autoflush(1);
+ $msg = <$connection_socket>;
+ $msg =~ s/[\015\012]+$//;
+ if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
+ # $quit = 1;
+ }
+ else {
+ $self->process_msg($msg);
+ }
+ $connection_socket->close();
+ $events_occurred ++;
+ }
+ }
+
+ &App::sub_exit($events_occurred) if ($App::trace);
+ return($events_occurred);
+}
+
+sub dispatch_finished_processes {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+ my ($pid, $exitval, $sig);
+ my $events_occurred = 0;
+ while (($pid = waitpid(-1,WNOHANG)) > 0) {
+ $events_occurred ++;
+ $exitval = $? >> 8;
+ $sig = $? & 255;
+ $self->log({level=>4},"Child $pid finished
[exitval=$exitval,sig=$sig]\n");
+ $self->finish_pid($pid, $exitval, $sig);
+ }
+ &App::sub_exit($events_occurred) if ($App::trace);
+ return($events_occurred);
+}
+
sub dispatch_events_begin {
+ &App::sub_entry if ($App::trace);
my ($self) = @_;
my $verbose = $self->{verbose};
- $self->log("Starting Server on $self->{host}:$self->{port}\n") if
($verbose);
+ $self->log({level=>2},"Starting Server on $self->{host}:$self->{port}\n");
+ &App::sub_exit() if ($App::trace);
}
sub dispatch_events_end {
my ($self) = @_;
my $verbose = $self->{verbose};
- $self->log("Stopping Server.\n") if ($verbose);
+ $self->log({level=>2},"Stopping Server.\n");
}
sub process_msg {
- my ($self, $connection_socket, $msg) = @_;
- $self->log("process_msg: [$msg]\n");
+ my ($self, $msg) = @_;
+ $self->log({level=>3},"process_msg: [$msg]\n");
my $verbose = $self->{verbose};
- my $processing_complete = $self->process_app_msg($connection_socket, $msg);
- if (!$processing_complete) {
- if ($msg =~ s/^GET//) {
- $connection_socket->print("HTTP/1.0 200 OK\n");
- $connection_socket->print("Content-type: text/plain\n");
- $connection_socket->print("\n");
- $connection_socket->print("$self->{host}:$self->{port} as a
server\n");
- $connection_socket->print($self->state());
- }
- else {
- $connection_socket->print("unknown [$msg]\n");
- }
+ my $return_value = $self->process_custom_msg($msg);
+ if (!$return_value) {
+ $return_value = "unknown [$msg]\n";
}
- &App::sub_exit() if ($App::trace);
+ &App::sub_exit($return_value) if ($App::trace);
+ return($return_value);
}
-sub process_app_msg {
+sub process_custom_msg {
&App::sub_entry if ($App::trace);
- my ($self, $connection_socket, $msg) = @_;
- my $processing_complete = 0;
- &App::sub_exit($processing_complete) if ($App::trace);
- return($processing_complete);
+ my ($self, $msg) = @_;
+ my $return_value = "";
+ &App::sub_exit($return_value) if ($App::trace);
+ return($return_value);
}
sub state {
&App::sub_entry if ($App::trace);
my ($self) = @_;
+
+ my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
+ my $state = "Server: $self->{host}:$self->{port}
procs[$self->{num_procs}/$self->{max_procs}:max]
async_events[$self->{num_async_events}/$self->{max_async_events}:max]\n[$datetime]\n";
+ $state .= "\n";
+ $state .= $self->_state();
+
+ &App::sub_exit($state) if ($App::trace);
+ return($state);
+}
+
+sub _state {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+
my $state = "";
+
+ my $options = $self->{options};
+ my $objects = $options->{init_objects};
+ my ($service_type, $name, $service);
+ foreach my $object (split(/ *[;,]+ */, $objects)) {
+ if ($object) {
+ if ($object =~ /^([A-Z][A-Za-z0-9]+)\.([A-Za-z0-9_-]+)$/) {
+ $service_type = $1;
+ $name = $2;
+ }
+ else {
+ $service_type = "SessionObject";
+ $name = $object;
+ }
+ $service = $self->service($service_type, $name); # instantiate
it. that's all.
+ if ($service->can("state")) {
+ $state .= "\n";
+ $state .= $service->state();
+ }
+ }
+ }
+
+ my $main_service = $self->{main_service};
+
+ $state .= "\n";
+ $state .= "Running Async Events:\n";
+ my ($async_event, $event, $callback_event, @args, $args_str, $event_token,
$runtime_event_token, $str);
+ foreach $runtime_event_token (sort keys %{$self->{running_async_event}}) {
+ $async_event = $self->{running_async_event}{$runtime_event_token};
+ ($event, $callback_event) = @$async_event;
+ $str = "";
+ if ($main_service && $main_service->can("format_async_event")) {
+ $str = $main_service->format_async_event($event, $callback_event,
$runtime_event_token);
+ }
+ if ($str) {
+ $state .= " ";
+ $state .= $main_service->format_async_event($event,
$callback_event, $runtime_event_token);
+ $state .= "\n";
+ }
+ else {
+ @args = ();
+ @args = @{$event->{args}} if ($event->{args});
+ $args_str = join(",",@args);
+ $state .= sprintf(" %-20s %-20s %-24s", $event->{event_token},
$runtime_event_token, "$event->{name}.$event->{method}($args_str)");
+ if ($callback_event) {
+ @args = ();
+ @args = @{$callback_event->{args}} if
($callback_event->{args});
+ $args_str = join(",",@args);
+ $state .=
"$callback_event->{name}.$callback_event->{method}($args_str)";
+ }
+ $state .= "\n";
+ }
+ }
+
+ $state .= "\n";
+ $state .= "Pending Async Events: count [$self->{async_event_count}]\n";
+ foreach $async_event (@{$self->{pending_async_events}}) {
+ ($event, $callback_event) = @$async_event;
+ $str = "";
+ if ($main_service && $main_service->can("format_async_event")) {
+ $str = $main_service->format_async_event($event, $callback_event);
+ }
+ if ($str) {
+ $state .= " ";
+ $state .= $main_service->format_async_event($event,
$callback_event);
+ $state .= "\n";
+ }
+ else {
+ @args = ();
+ @args = @{$event->{args}} if ($event->{args});
+ $args_str = join(",",@args);
+ $state .= sprintf(" %-20s %-40s", $event->{event_token},
"$event->{name}.$event->{method}($args_str)");
+ if ($callback_event) {
+ @args = ();
+ @args = @{$callback_event->{args}} if
($callback_event->{args});
+ $args_str = join(",",@args);
+ $state .= " =>
$callback_event->{name}.$callback_event->{method}($args_str)";
+ }
+ $state .= "\n";
+ }
+ }
+
+ $state .= "\n";
+
+ $state .= $self->SUPER::_state();
+
&App::sub_exit($state) if ($App::trace);
return($state);
}
# TODO: Implement this as a fork() or a context-level message to a node to
fork().
# i.e. messages such as "EVENT:" and "EVENT-OK:"
-# Save the callback_event according to an event_tag.
+# Save the callback_event according to an event_token.
# Then implement cleanup_pid to send the callback_event.
sub send_async_event {
&App::sub_entry if ($App::trace);
my ($self, $event, $callback_event) = @_;
+ my $event_token = $self->new_event_token();
+ $event->{event_token} = $event_token;
+ $callback_event->{event_token} = $event_token if ($callback_event);
push(@{$self->{pending_async_events}}, [ $event, $callback_event ]);
- &App::sub_exit() if ($App::trace);
+ &App::sub_exit($event_token) if ($App::trace);
+ return($event_token);
+}
+
+sub new_event_token {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+ $self->{async_event_count} ++;
+ my $event_token = "$self->{host}-$self->{port}-$self->{async_event_count}";
+ &App::sub_exit($event_token) if ($App::trace);
+ return($event_token);
}
sub dispatch_pending_async_events {
&App::sub_entry if ($App::trace);
my ($self) = @_;
my $pending_async_events = $self->{pending_async_events};
- my ($async_event);
+ my ($async_event, $assigned);
+ my $events_occurred = 0;
while ($#$pending_async_events > -1) {
- $async_event = shift(@$pending_async_events);
- $self->send_async_event_now(@$async_event);
+ $assigned =
$self->assign_event_destination($pending_async_events->[0][0]);
+ if ($assigned) {
+ $async_event = shift(@$pending_async_events);
+ $self->send_async_event_now(@$async_event);
+ $events_occurred ++;
+ }
+ else {
+ last;
+ }
}
- &App::sub_exit() if ($App::trace);
+ &App::sub_exit($events_occurred) if ($App::trace);
+ return($events_occurred);
+}
+
+sub assign_event_destination {
+ &App::sub_entry if ($App::trace);
+ my ($self, $event) = @_;
+ my $assigned = 0;
+ if ($self->{num_procs} < $self->{max_procs} &&
+ (!defined $self->{max_async_events} || $self->{num_async_events} <
$self->{max_async_events})) {
+ $event->{destination} = $self->{host};
+ $assigned = 1;
+ }
+ &App::sub_exit($assigned) if ($App::trace);
+ return($assigned);
}
sub send_async_event_now {
&App::sub_entry if ($App::trace);
my ($self, $event, $callback_event) = @_;
- my $pid = fork();
+ my $pid = $self->fork();
if (!$pid) { # running in child
- $self->send_event($event);
- exit(0);
- }
- if ($callback_event) {
- my $event_tag = "local-$pid";
- $self->{pending_callback_event}{$event_tag} = $callback_event;
+ my $exitval = 0;
+ my (@results);
+ eval {
+ @results = $self->send_event($event);
+ };
+ if ($@) {
+ @results = ($@);
+ }
+ if ($#results > -1 && defined $results[0] && $results[0] ne "") {
+ my $textfile = $self->{options}{prefix} . "/data/app/Context/$$";
+ if (open(FILE, "> $textfile")) {
+ print App::Context::Server::FILE @results;
+ close(App::Context::Server::FILE);
+ }
+ else {
+ $exitval = 1;
+ }
+ }
+ $self->shutdown();
+ $self->exit($exitval);
}
+ $self->{num_async_events}++;
+ my $runtime_event_token = $pid;
+ $self->{running_async_event}{$runtime_event_token} = [ $event,
$callback_event ];
+ &App::sub_exit() if ($App::trace);
+}
+
+=head2 wait_for_event()
+
+ * Signature: $self->wait_for_event($event_token)
+ * Param: $event_token string
+ * Return: void
+ * Throws: App::Exception
+ * Since: 0.01
+
+ Sample Usage:
+
+ $self->wait_for_event($event_token);
+
+The wait_for_event() method is called when an asynchronous event has been
+sent and no more processing can be completed before it is done.
+
+=cut
+
+sub wait_for_event {
+ &App::sub_entry if ($App::trace);
+ my ($self, $event_token) = @_;
&App::sub_exit() if ($App::trace);
}
+sub fork {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+ my $pid = $self->SUPER::fork();
+ if ($pid) { # the parent process has a new child process
+ $self->{num_procs}++;
+ $self->{proc}{$pid} = {};
+ }
+ else { # the new child process has no sub-processes
+ $self->{num_procs} = 0;
+ $self->{proc} = {};
+ $SIG{INT} = sub { $self->log({level=>2},"Caught Signal: @_
(quitting)\n"); $self->exit(102); }; # SIG 2
+ $SIG{QUIT} = sub { $self->log({level=>2},"Caught Signal: @_
(quitting)\n"); $self->exit(103); }; # SIG 3
+ $SIG{TERM} = sub { $self->log({level=>2},"Caught Signal: @_
(quitting)\n"); $self->exit(115); }; # SIG 15
+ }
+ &App::sub_exit($pid) if ($App::trace);
+ return($pid);
+}
+
sub finish_pid {
&App::sub_entry if ($App::trace);
my ($self, $pid, $exitval, $sig) = @_;
- my $event_tag = "local-$pid";
- my $callback_event = $self->{pending_callback_event}{$event_tag};
- if ($callback_event) {
- delete $self->{pending_callback_event}{$event_tag};
- $callback_event->{args} = [] if (! $callback_event->{args});
- my $message = (!$exitval || !$sig) ? "Error $exitval on $pid
[sig=$sig]" : "";
- push(@{$callback_event->{args}}, {location => $event_tag, returnval =>
$exitval, message => "Sig $sig"});
- $self->send_event($callback_event);
+ $self->{num_procs}--;
+ delete $self->{proc}{$pid};
+
+ my $runtime_event_token = $pid;
+ my $async_event = $self->{running_async_event}{$runtime_event_token};
+ if ($async_event) {
+ my ($event, $callback_event) = @$async_event;
+ my $returnval = "";
+ my $returnvalfile = $self->{options}{prefix} .
"/data/app/Context/$pid";
+ if (open(FILE, $returnvalfile)) {
+ if ($callback_event) {
+ $returnval = join("",<App::Context::Server::FILE>);
+ }
+ close(App::Context::Server::FILE);
+ unlink($returnvalfile);
+ }
+
+ $self->{num_async_events}--;
+ delete $self->{running_async_event}{$runtime_event_token};
+
+ if ($callback_event) {
+ $callback_event->{args} = [] if (! $callback_event->{args});
+ my $errmsg = ($exitval || $sig) ? "Exit $exitval on $pid
[sig=$sig]" : "";
+ push(@{$callback_event->{args}},
+ {event_token => $callback_event->{event_token}, returnval =>
$returnval, errnum => $exitval, errmsg => $errmsg});
+ $self->send_event($callback_event);
+ }
+ elsif ($sig == 9) { # killed without a chance to finish its work
+ $self->finish_killed_async_event($event);
+ }
}
&App::sub_exit() if ($App::trace);
}
-sub send_async_message {
+sub finish_killed_async_event {
&App::sub_entry if ($App::trace);
- my ($self, $host, $port, $message) = @_;
- my $pid = fork();
- if (!$pid) { # running in child
- $self->send_message($host, $port, $message);
- exit(0);
- }
+ my ($self, $event) = @_;
&App::sub_exit() if ($App::trace);
}
-sub send_message {
+sub find_runtime_event_token {
&App::sub_entry if ($App::trace);
- my ($self, $host, $port, $message) = @_;
- my $verbose = $self->{verbose};
- $self->log("send_message($host, $port, $message)\n") if ($verbose >= 2);
+ my ($self, $event_token) = @_;
+ my $running_async_event = $self->{running_async_event};
+ my ($runtime_event_token_found, $async_event);
+ foreach my $runtime_event_token (keys %$running_async_event) {
+ $async_event = $running_async_event->{$runtime_event_token};
+ if ($async_event->[0]{event_token} eq $event_token) {
+ $runtime_event_token_found = $runtime_event_token;
+ last;
+ }
+ }
+ &App::sub_exit($runtime_event_token_found) if ($App::trace);
+ return($runtime_event_token_found);
+}
+
+sub reset_running_async_events {
+ &App::sub_entry if ($App::trace);
+ my ($self, $runtime_event_token_prefix) = @_;
+ $runtime_event_token_prefix =~ s/:/-/; # in case they send
"localhost:8080" instead of "localhost-8080"
+ my $running_async_event = $self->{running_async_event};
+ my ($runtime_event_token, $async_event);
+ foreach $runtime_event_token (keys %$running_async_event) {
+ $async_event = $running_async_event->{$runtime_event_token};
+ if ($async_event && $runtime_event_token =~
/^$runtime_event_token_prefix\b/) {
+ $self->reset_running_async_event($runtime_event_token);
+ }
+ }
+ &App::sub_exit() if ($App::trace);
+}
- if (!$port && $host =~ /^([^:]+):([0-9]+)$/) {
- $host = $1;
- $port = $2;
- }
-
- my $sock = IO::Socket::INET->new(
- PeerAddr => $host,
- PeerPort => $port,
- Proto => "tcp",
- Type => SOCK_STREAM,
- );
+sub reset_running_async_event {
+ &App::sub_entry if ($App::trace);
+ my ($self, $runtime_event_token) = @_;
+ my $async_event = $self->abort_running_async_event($runtime_event_token);
+ if ($async_event) {
+ my $pending_async_events = $self->{pending_async_events};
+ unshift(@$pending_async_events, $async_event);
+ }
+ &App::sub_exit($async_event) if ($App::trace);
+ return($async_event);
+}
- my ($response);
- if ($sock) {
- eval {
- $sock->autoflush(1);
- print $sock $message, "\n";
- $response = <$sock>;
- $response =~ s/[\r\n]+$//;
- close($sock);
- };
- if ($@) {
- $response = "SEND ERROR: $@";
+sub abort_async_event {
+ &App::sub_entry if ($App::trace);
+ my ($self, $event_token) = @_;
+ my $pending_async_events = $self->{pending_async_events};
+ my ($async_event);
+ my $aborted = 0;
+ # first look for it in the pending list
+ for (my $i = 0; $i <= $#$pending_async_events; $i++) {
+ $async_event = $pending_async_events->[$i];
+ if ($async_event->[0]{event_token} eq $event_token) {
+ splice(@$pending_async_events, $i, 1);
+ $aborted = 1;
+ last;
}
}
- else {
- $response = "CONNECT ERROR: $!";
+ # then look for it in the running list
+ if (!$aborted) {
+ my $runtime_event_token =
$self->find_runtime_event_token($event_token);
+ if ($runtime_event_token) {
+ $async_event =
$self->abort_running_async_event($runtime_event_token);
+ }
}
+ &App::sub_exit($async_event) if ($App::trace);
+ return($async_event);
+}
- $self->log("send_message($host, $port, ...) => [$response]\n") if
($verbose >= 2);
- &App::sub_exit($response) if ($App::trace);
- return($response);
+sub abort_running_async_event {
+ &App::sub_entry if ($App::trace);
+ my ($self, $runtime_event_token) = @_;
+ my $running_async_event = $self->{running_async_event};
+ my $pending_async_events = $self->{pending_async_events};
+ my $async_event = $running_async_event->{$runtime_event_token};
+ if ($async_event) {
+ $self->{num_async_events}--;
+ delete $self->{running_async_event}{$runtime_event_token};
+ unshift(@$pending_async_events, $async_event);
+ $self->_abort_running_async_event($runtime_event_token, @$async_event);
+ }
+ &App::sub_exit($async_event) if ($App::trace);
+ return($async_event);
+}
+
+# $runtime_event_tokens take the following forms:
+# $runtime_event_token = $pid; --
App::Context::Server::send_async_event_now() and ::finish_pid()
+sub _abort_running_async_event {
+ &App::sub_entry if ($App::trace);
+ my ($self, $runtime_event_token, $event, $callback_event) = @_;
+ if ($runtime_event_token =~ /^[0-9]+$/) {
+ kill(15, $runtime_event_token);
+ }
+ else {
+ $self->log("Unable to abort running async event
[$runtime_event_token]\n");
+ }
+ &App::sub_exit() if ($App::trace);
}
1;