Author: spadkins
Date: Tue May  2 06:53:20 2006
New Revision: 6005

Modified:
   p5ee/trunk/App-Context/lib/App/Context/ClusterController.pm
   p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm
   p5ee/trunk/App-Context/lib/App/Context/Server.pm

Log:
test whether these are ready for production

Modified: p5ee/trunk/App-Context/lib/App/Context/ClusterController.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/ClusterController.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/ClusterController.pm Tue May  2 
06:53:20 2006
@@ -37,15 +37,16 @@
     &App::sub_entry if ($App::trace);
     my ($self, $options) = @_;
     die "Controller must have a port defined (\$context->{options}{port})" if 
(!$self->{port});
-    $self->{num_node_events} = 0;
-    $self->{max_node_events} = 
$self->{options}{"app.context.cluster_controller.max_node_events"} || 10;
+    $self->{num_async_events} = 0;
+    $self->{max_async_events_per_node} = 
$self->{options}{"app.context.max_async_events_per_node"} || 10;
+    $self->{max_async_events} = 0;  # start with 0 because there are no nodes 
up
     &App::sub_exit() if ($App::trace);
 }
 
 sub _init2b {
     &App::sub_entry if ($App::trace);
     my ($self, $options) = @_;
-    # nothing yet
+    $self->startup_nodes($options) if ($options->{startup});
     &App::sub_exit() if ($App::trace);
 }
 
@@ -61,7 +62,6 @@
     # nothing special yet
 }
 
-# CONTROLLER ONLY
 sub send_async_event_now {
     &App::sub_entry if ($App::trace);
     my ($self, $event, $callback_event) = @_;
@@ -77,13 +77,14 @@
         if ($event->{args}) {
             $args = $self->{rpc_serializer}->serialize($event->{args});
         }
-        my $response = $self->send_message($node_host, $node_port, 
"ASYNC-EVENT:$event->{service_type}:$event->{name}:$event->{method}:$args");
-        $self->{num_node_events}++;
-        if ($callback_event) {
-            if ($response =~ /^ASYNC-EVENT-TOKEN:(.+)/) {
-                my $remote_event_token = $1;
-                $self->{pending_callback_event}{$remote_event_token} = 
$callback_event;
-            }
+        my $response = $self->send_message($node_host, $node_port, 
"ASYNC-EVENT:$event->{service_type}:$event->{name}:$event->{method}:$args", 1, 
undef, 1);
+        if ($response =~ /^ASYNC-EVENT-TOKEN:(.+)/) {
+            my $runtime_event_token = $1;
+            $self->{num_async_events}++;
+            $self->{running_async_event}{$runtime_event_token} = [ $event, 
$callback_event ];
+        }
+        elsif ($response =~ /ERROR/) {
+            $self->set_node_down("$node_host:$node_port");
         }
     }
     else {
@@ -92,11 +93,31 @@
     &App::sub_exit() if ($App::trace);
 }
 
+# $runtime_event_tokens take the following forms:
+#    $runtime_event_token = $pid; -- 
App::Context::Server::send_async_event_now() and ::finish_pid()
+#    $runtime_event_token = "$host-$port-$serial"; -- i.e. a plain event token 
on the node
+sub _abort_running_async_event {
+    &App::sub_entry if ($App::trace);
+    my ($self, $runtime_event_token, $event, $callback_event) = @_;
+    if ($runtime_event_token =~ /^[0-9]+$/) {
+        kill(9, $runtime_event_token);
+    }
+    elsif ($runtime_event_token =~ /^([^-]+)-([0-9]+)-/) {
+        my $node_host = $1;
+        my $node_port = $2;
+        $self->send_async_message($node_host, $node_port, 
"ABORT-ASYNC-EVENT:$runtime_event_token");
+    }
+    else {
+        $self->log("Unable to abort running async event [$runtime_event_token] 
(controller)\n");
+    }
+    &App::sub_exit() if ($App::trace);
+}
+
 sub assign_event_destination {
     &App::sub_entry if ($App::trace);
     my ($self, $event) = @_;
     my $assigned = 0;
-    if ($self->{num_node_events} < $self->{max_node_events}) {
+    if ($self->{num_async_events} < $self->{max_async_events}) {
         $event->{destination} = $self->{host};
         $assigned = $self->assign_event_destination_by_round_robin($event);
     }
@@ -125,27 +146,20 @@
 
 sub process_msg {
     &App::sub_entry if ($App::trace);
-    my ($self, $connection_socket, $msg) = @_;
+    my ($self, $msg) = @_;
     my $verbose = $self->{verbose};
     $self->log({level=>3},"process_msg: [$msg]\n");
-    my $processing_complete = $self->process_custom_msg($connection_socket, 
$msg);
-    if (!$processing_complete) {
+    my $return_value = $self->process_custom_msg($msg);
+    if (!$return_value) {
         if ($msg =~ /^NODE-UP:(.*)/) {
-            my $resp = $self->set_node_up($1) || "ok";
-            $connection_socket->print("$resp\n");
+            $return_value = $self->set_node_up($1);
         }
         elsif ($msg =~ /^NODE-DOWN:(.*)/) {
             $self->set_node_down($1);
-            $connection_socket->print("ok\n");
-        }
-        elsif ($msg =~ s/^GET//) {
-            $connection_socket->print("HTTP/1.0 200 OK\n");
-            $connection_socket->print("Content-type: text/plain\n");
-            $connection_socket->print("\n");
-            $connection_socket->print($self->state());
+            $return_value = "OK";
         }
         elsif ($msg =~ /^ASYNC-EVENT-RESULTS:([^:]+):(.*)$/) {
-            my $remote_event_token = $1;
+            my $runtime_event_token = $1;
             my $results = $2;
             if ($results ne "") {
                 $results = $self->{rpc_serializer}->deserialize($results);
@@ -153,31 +167,41 @@
                     $results = $results->[0];
                 }
             }
-            $connection_socket->print("OK\n");
-            $self->{num_node_events}--;
-            my $callback_event = 
$self->{pending_callback_event}{$remote_event_token};
-            if ($callback_event) {
-                delete $self->{pending_callback_event}{$remote_event_token};
-                $callback_event->{args} = [] if (! $callback_event->{args});
-                push(@{$callback_event->{args}},
-                    {event_token => $callback_event->{event_token}, returnval 
=> $results, errnum => 0, errmsg => ""});
-                $self->send_event($callback_event);
+            my $async_event = 
$self->{running_async_event}{$runtime_event_token};
+
+            if ($async_event) {
+                $self->{num_async_events}--;
+                delete $self->{running_async_event}{$runtime_event_token};
+
+                my ($event, $callback_event) = @$async_event;
+                if ($callback_event) {
+                    $callback_event->{args} = [] if (! 
$callback_event->{args});
+                    push(@{$callback_event->{args}},
+                        {event_token => $callback_event->{event_token}, 
returnval => $results, errnum => 0, errmsg => ""});
+                    $self->send_event($callback_event);
+                }
             }
+            else {
+                $self->log("WARNING: Unexpected Async Event Results: 
[$msg]\n");
+            }
+            $return_value = "OK";
         }
         else {
-            $connection_socket->print("unknown [$msg]\n");
+            $self->log("ERROR: unknown [$msg]\n");
+            $return_value = "unknown [$msg]";
         }
     }
-    &App::sub_exit() if ($App::trace);
+    &App::sub_exit($return_value) if ($App::trace);
+    return($return_value);
 }
 
 # Can be overridden to provide customized processing.
 sub process_custom_msg {
     &App::sub_entry if ($App::trace);
-    my ($self, $connection_socket, $msg) = @_;
-    my $processing_complete = 0;
-    &App::sub_exit($processing_complete) if ($App::trace);
-    return($processing_complete);
+    my ($self, $msg) = @_;
+    my $return_value = "";
+    &App::sub_exit($return_value) if ($App::trace);
+    return($return_value);
 }
 
 sub state {
@@ -185,7 +209,7 @@
     my ($self) = @_;
 
     my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
-    my $state = "Cluster Controller: 
$self->{host}:$self->{port}\n[$datetime]\n";
+    my $state = "Cluster Controller: $self->{host}:$self->{port}  
procs[$self->{num_procs}/$self->{max_procs}:max]  
async_events[$self->{num_async_events}/$self->{max_async_events}:max/$self->{max_async_events_per_node}:per]\n[$datetime]\n";
     $state .= "\n";
     $state .= $self->_state();
 
@@ -199,6 +223,13 @@
 
     my $state = "";
 
+    my (@nodes);
+    @nodes = @{$self->{nodes}} if ($self->{nodes});
+    $state .= "Nodes: up [EMAIL PROTECTED] last dispatched 
[$self->{last_node_idx}]\n";
+    foreach my $node (sort keys %{$self->{node}}) {
+        $state .= sprintf("   %-16s %4s\n", $node, $self->{node}{$node}{up} ? 
"UP" : "down");
+    }
+
     $state .= $self->SUPER::_state();
 
     &App::sub_exit($state) if ($App::trace);
@@ -208,6 +239,9 @@
 sub set_node_down {
     &App::sub_entry if ($App::trace);
     my ($self, $node) = @_;
+    my $runtime_event_token_prefix = $node;
+    $runtime_event_token_prefix =~ s/:/-/;
+    $self->reset_running_async_events($runtime_event_token_prefix);
     $self->{node}{$node}{up} = 0;
     $self->set_nodes();
     &App::sub_exit() if ($App::trace);
@@ -239,6 +273,102 @@
         }
     }
     $self->{nodes} = [EMAIL PROTECTED];
+    $self->{max_async_events} = $self->{max_async_events_per_node} * ($#nodes 
+ 1);
+    my $main_service = $self->{main_service};
+    if ($main_service && $main_service->can("capacity_change")) {
+        $main_service->capacity_change($self->{max_async_events}, [EMAIL 
PROTECTED], $self->{node});
+    }
+    &App::sub_exit() if ($App::trace);
+}
+
+sub shutdown {
+    &App::sub_entry if ($App::trace);
+    my $self = shift;
+    $self->shutdown_nodes();
+    $self->write_node_file();
+    $self->SUPER::shutdown();
+    &App::sub_exit() if ($App::trace);
+}
+
+sub shutdown_nodes {
+    &App::sub_entry if ($App::trace);
+    my $self = shift;
+    foreach my $node (@{$self->{nodes}}) {
+        $self->send_message($node, undef, "QUIT", 0, undef, 1);
+    }
+    &App::sub_exit() if ($App::trace);
+}
+
+sub startup_nodes {
+    &App::sub_entry if ($App::trace);
+    my ($self, $options) = @_;
+
+    my $startup = $options->{startup};
+
+    if ($startup eq "1") {
+        $self->read_node_file();
+    }
+    else {
+        foreach my $node (split(/,/,$startup)) {
+            $self->{node}{$node} = {};
+        }
+    }
+
+    my ($msg, $host, $port, $cmd);
+    my $cmd_fmt = $self->{options}{"app.context.node_start_cmd"} || "ssh %s 
'mvnode --port=%s > /dev/null 2>&1 &'";
+    foreach my $node (keys %{$self->{node}}) {
+        $msg = $self->send_message($node, undef, "CONTROLLER-UP:", 0, undef, 
1);
+        if ($msg =~ /ERROR:/) {
+            if ($node =~ /^([^:]+):([0-9]+)$/) {
+                $host = $1;
+                $port = $2;
+                $cmd = sprintf($cmd_fmt, $host, $port);
+                $self->log("Starting Node [$node]: [$cmd]\n");
+                system("nohup $cmd < /dev/null > /dev/null 2>&1 &");
+            }
+        }
+    }
+    &App::sub_exit() if ($App::trace);
+}
+
+sub write_node_file {
+    &App::sub_entry if ($App::trace);
+    my $self = shift;
+    my $prefix = $self->{options}{prefix};
+    my $node_file = 
"$prefix/log/$self->{options}{app}-$self->{host}:$self->{port}.nodes";
+    if (open(FILE, "> $node_file")) {
+        foreach my $node (@{$self->{nodes}}) {
+            print App::Context::ClusterController::FILE "$node\n";
+        }
+        close(App::Context::ClusterController::FILE);
+    }
+    else {
+        $self->log("WARNING: Can't write node file [$node_file]: $!\n");
+    }
+    &App::sub_exit() if ($App::trace);
+}
+
+sub read_node_file {
+    &App::sub_entry if ($App::trace);
+    my $self = shift;
+    my $prefix = $self->{options}{prefix};
+    my $node_file = 
"$prefix/log/$self->{options}{app}-$self->{host}:$self->{port}.nodes";
+    my ($node);
+    if (open(FILE, "< $node_file")) {
+        while (<App::Context::ClusterController::FILE>) {
+            chomp;
+            if (/^[^:]+:[0-9]+$/) {
+                $node = $_;
+                # just take note of its existence. we don't know yet if it is 
up.
+                $self->{node}{$node} = {} if (!defined $self->{node}{$node});
+            }
+        }
+        close(App::Context::ClusterController::FILE);
+    }
+    else {
+        # This is not really a problem.
+        # $self->log("WARNING: Can't read node file [$node_file]: $!\n");
+    }
     &App::sub_exit() if ($App::trace);
 }
 

Modified: p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm       (original)
+++ p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm       Tue May  2 
06:53:20 2006
@@ -76,24 +76,22 @@
     my $controller_port = $self->{controller_port};
     my $node_host       = $self->{host};
     my $node_port       = $self->{port};
-    $self->send_async_message($controller_host, $controller_port, 
"NODE-DOWN:$node_host:$node_port");
+    # We need to close the listen socket before we do a synchronous connection 
to the controller
+    # in order to avoid deadlock.
+    $self->close_listen_socket();
+    # This message needs to be synchronous, otherwise the parent will kill the 
subprocess during shutdown.
+    $self->send_message($controller_host, $controller_port, 
"NODE-DOWN:$node_host:$node_port");
     &App::sub_exit() if ($App::trace);
 }
 
 sub process_msg {
     &App::sub_entry if ($App::trace);
-    my ($self, $connection_socket, $msg) = @_;
+    my ($self, $msg) = @_;
     my $verbose = $self->{verbose};
     $self->log({level=>3},"process_msg: [$msg]\n");
-    my $processing_complete = $self->process_custom_msg($connection_socket, 
$msg);
-    if (!$processing_complete) {
-        if ($msg =~ s/^GET//) {
-            $connection_socket->print("HTTP/1.0 200 OK\n");
-            $connection_socket->print("Content-type: text/plain\n");
-            $connection_socket->print("\n");
-            $connection_socket->print($self->state());
-        }
-        elsif ($msg =~ /^ASYNC-EVENT:([^:]+):([^:]+):([^:]+):(.*)$/) {
+    my $return_value = $self->process_custom_msg($msg);
+    if (!$return_value) {
+        if ($msg =~ /^ASYNC-EVENT:([^:]+):([^:]+):([^:]+):(.*)$/) {
             my %event = (
                 service_type => $1,
                 name         => $2,
@@ -103,22 +101,39 @@
             $event{args} = $self->{rpc_serializer}->deserialize($args) if 
($args ne "");
             my $event_token = $self->send_async_event({method => 
"process_async_event", args => [\%event],});
             $event{event_token} = $event_token;
-            $connection_socket->print("ASYNC-EVENT-TOKEN:$event_token\n");
+            $return_value = "ASYNC-EVENT-TOKEN:$event_token\n";
+        }
+        elsif ($msg =~ /^CONTROLLER-UP:/) {
+            my $controller_host = $self->{controller_host};
+            my $controller_port = $self->{controller_port};
+            my $node_host       = $self->{host};
+            my $node_port       = $self->{port};
+            $self->send_async_event({
+                method => "send_async_message",
+                args => [ $controller_host, $controller_port, 
"NODE-UP:$node_host:$node_port" ],
+            });
+            $return_value = "OK";
+        }
+        elsif ($msg =~ /^ABORT-ASYNC-EVENT:(.*)/) {
+            my $event_token = $1;
+            $self->abort_async_event($event_token);
+            $return_value = "OK";
         }
         else {
-            $connection_socket->print("unknown [$msg]\n");
+            $return_value = "ERROR: unknown [$msg]";
         }
     }
-    &App::sub_exit() if ($App::trace);
+    &App::sub_exit($return_value) if ($App::trace);
+    return($return_value);
 }
 
 # Can be overridden to provide customized processing.
 sub process_custom_msg {
     &App::sub_entry if ($App::trace);
-    my ($self, $connection_socket, $msg) = @_;
-    my $processing_complete = 0;
-    &App::sub_exit($processing_complete) if ($App::trace);
-    return($processing_complete);
+    my ($self, $msg) = @_;
+    my $return_value = "";
+    &App::sub_exit($return_value) if ($App::trace);
+    return($return_value);
 }
 
 sub state {
@@ -126,7 +141,7 @@
     my ($self) = @_;
 
     my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
-    my $state = "Cluster Node: $self->{host}:$self->{port}\n[$datetime]\n";
+    my $state = "Cluster Node: $self->{host}:$self->{port}  
procs[$self->{num_procs}/$self->{max_procs}:max]  
async_events[$self->{num_async_events}/$self->{max_async_events}:max]\n[$datetime]\n";
     $state .= "\n";
     $state .= $self->_state();
 
@@ -149,12 +164,26 @@
 sub process_async_event {
     &App::sub_entry if ($App::trace);
     my ($self, $event) = @_;
-    my $results = $self->send_event($event);
+    my ($results);
+    eval {
+        $results = $self->send_event($event);
+    };
+    if ($@) {
+        $results = $@;
+    }
     my $results_txt = $self->{rpc_serializer}->serialize($results);
     my $msg = "ASYNC-EVENT-RESULTS:$event->{event_token}:$results_txt";
     $self->send_message($self->{controller_host}, $self->{controller_port}, 
$msg);
-    &App::sub_exit(undef) if ($App::trace);
-    return(undef);
+    &App::sub_exit() if ($App::trace);
+}
+
+sub finish_killed_async_event {
+    &App::sub_entry if ($App::trace);
+    my ($self, $event) = @_;
+    my $results_txt = $self->{rpc_serializer}->serialize("Subrequest Killed");
+    my $msg = "ASYNC-EVENT-RESULTS:$event->{event_token}:$results_txt";
+    $self->send_async_message($self->{controller_host}, 
$self->{controller_port}, $msg);
+    &App::sub_exit() if ($App::trace);
 }
 
 1;

Modified: p5ee/trunk/App-Context/lib/App/Context/Server.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/Server.pm    (original)
+++ p5ee/trunk/App-Context/lib/App/Context/Server.pm    Tue May  2 06:53:20 2006
@@ -16,6 +16,8 @@
 use IO::Socket;
 use IO::Socket::INET;
 use POSIX ":sys_wait_h";
+use Date::Format;
+use Date::Parse;
 
 use strict;
 
@@ -42,6 +44,10 @@
     my ($self, $options) = @_;
     $options = {} if (!defined $options);
 
+    $self->SUPER::_init($options);
+
+    App->mkdir($options->{prefix}, "data", "app", "Context");
+
     $| = 1;  # autoflush STDOUT (not sure this is required)
     open(STDERR, ">&STDOUT") || die "Unable to redirect STDERR to STDOUT";
 
@@ -49,9 +55,15 @@
     $self->{hostname} = $host;
     $host =~ s/\..*//;   # get rid of fully qualified domain name
     $self->{host} = $host;
+    $self->{port} = $options->{port} || 8080;
 
+    $self->{num_procs} = 0;
+    $self->{max_procs} = $self->{options}{"app.context.max_procs"} || 10;
+    $self->{max_async_events} = 
$self->{options}{"app.context.max_async_events"}
+        if (defined $self->{options}{"app.context.max_async_events"});
+    $self->{async_event_count} = 0;
     $self->{pending_async_events} = [];
-    $self->{pending_callback_event} = {};
+    $self->{running_async_event} = {};
 
     $self->{verbose} = $options->{verbose};
 
@@ -59,11 +71,34 @@
 
     my $listen_socket = IO::Socket::INET->new(
         Proto     => "tcp",
+        # LocalAddr => $self->{host},  # allow both the "hostname" and 
"localhost" to be used
         LocalPort => $self->{port},
         Listen    => SOMAXCONN,
-    ) || die "Unable to listen on $host:$self->{port} - $!";
+        ReuseAddr => 1,
+    ) || die "Unable to listen on $self->{host}:$self->{port} - $!";
 
     $self->{listen_socket} = $listen_socket;
+    my $listen_fd = fileno($listen_socket);
+    my $listen_vec;
+    vec($listen_vec, $listen_fd, 1) = 1;
+    $self->{listen_vec} = $listen_vec;
+
+    $self->{rpc_serializer} = $self->serializer("server_rpc", class => 
"App::Serializer::Perl", indent => 0);
+
+    if ($self->{options}{log_rotate}) {
+        my $rotate_sec = $self->{options}{log_rotate};
+        $rotate_sec = $rotate_sec*(24*3600) if ($rotate_sec <= 31);
+        my $time = time();
+        my $base_time = str2time(time2str("%Y-%m-%d 00:00:00", $time));  # I 
need a base which is midnight local time
+        my $next_rotate_time = ((int(($time - 
$base_time)/$rotate_sec)+1)*$rotate_sec) + $base_time;
+        $self->schedule_event(
+            tag => "context-log-rotation",
+            method => "log_file_open",
+            args => [0], # don't overwrite
+            time => $next_rotate_time,
+            interval => $rotate_sec,  # and every X seconds hereafter
+        );
+    }
 
     $self->_init2b($options);
 
@@ -73,7 +108,6 @@
 sub _init2a {
     &App::sub_entry if ($App::trace);
     my ($self, $options) = @_;
-    $self->{port} = $options->{port} || $options->{controller_port} || 8080;
     &App::sub_exit() if ($App::trace);
 }
 
@@ -87,8 +121,41 @@
     &App::sub_entry if ($App::trace);
     my ($self) = @_;
     if ($self->{listen_socket}) {
-        $self->{listen_socket}->close();
+        my $listen_socket = $self->{listen_socket};
+        my $listen_fd = fileno($listen_socket);
+        $self->log({level=>4},"Closed listen socket($listen_fd)\n");
+        $listen_socket->close();
+        $listen_socket = undef;
         delete $self->{listen_socket};
+        delete $self->{listen_vec};
+    }
+    &App::sub_exit() if ($App::trace);
+}
+
+sub shutdown_unshareable_resources {
+    &App::sub_entry if ($App::trace);
+    my $self = shift;
+    $self->close_listen_socket();
+    $self->SUPER::shutdown_unshareable_resources();
+    &App::sub_exit() if ($App::trace);
+}
+
+sub shutdown {
+    &App::sub_entry if ($App::trace);
+    my $self = shift;
+    $self->close_listen_socket();
+    $self->shutdown_child_processes();
+    $self->SUPER::shutdown();
+    &App::sub_exit() if ($App::trace);
+}
+
+sub shutdown_child_processes {
+    &App::sub_entry if ($App::trace);
+    my $self = shift;
+    if ($self->{proc}) {
+        foreach my $pid (keys %{$self->{proc}}) {
+            kill(15, $pid);
+        }
     }
     &App::sub_exit() if ($App::trace);
 }
@@ -102,9 +169,9 @@
 
 sub dispatch_events {
     &App::sub_entry if ($App::trace);
-    my ($self) = @_;
+    my ($self, $max_events_occurred) = @_;
 
-    my ($role, $port, $startup, $shutdown, $process_msg);
+    my ($role, $port, $startup, $shutdown);
     $self->dispatch_events_begin();
 
     my $verbose = $self->{verbose};
@@ -123,82 +190,165 @@
                 $name = $object;
             }
             $service = $self->service($service_type, $name);  # instantiate 
it. that's all.
-            $self->log("$service_type $name instantiated [$service]\n");
+            $self->log({level=>3},"$service_type $name instantiated 
[$service]\n");
+            $self->{main_service} = $service if (!$self->{main_service});
         }
     }
 
     my $quit = 0;
-    $SIG{USR1} = sub { $self->log("Caught Signal: @_\n") if ($verbose); };
-    $SIG{USR2} = sub { $self->log("Caught Signal: @_\n") if ($verbose); };
-    $SIG{HUP}  = sub { $self->log("Caught Signal: @_\n") if ($verbose); };
-    $SIG{INT}  = sub { $self->log("Caught Signal: @_ (quitting)\n") if 
($verbose); $quit = 1; };
-    $SIG{QUIT} = sub { $self->log("Caught Signal: @_ (quitting)\n") if 
($verbose); $quit = 1; };
-    $SIG{TERM} = sub { $self->log("Caught Signal: @_ (quitting)\n") if 
($verbose); $quit = 1; };
-    $SIG{CHLD} = "DEFAULT";
+
+    $SIG{HUP}  = sub { $self->log({level=>2},"Caught Signal: @_\n"); };        
                 # SIG  1
+    $SIG{INT}  = sub { $self->log({level=>2},"Caught Signal: @_ 
(quitting)\n"); $quit = 1; };   # SIG  2
+    $SIG{QUIT} = sub { $self->log({level=>2},"Caught Signal: @_ 
(quitting)\n"); $quit = 1; };   # SIG  3
+    $SIG{USR1} = sub { $self->log({level=>2},"Caught Signal: @_\n"); };        
                 # SIG 10
+    $SIG{USR2} = sub { $self->log({level=>2},"Caught Signal: @_\n"); };        
                 # SIG 12
+    $SIG{TERM} = sub { $self->log({level=>2},"Caught Signal: @_ 
(quitting)\n"); $quit = 1; };   # SIG 15
+    $SIG{CHLD} = "DEFAULT";                                                    
                 # SIG 17
 
     my $default_sleep_interval = 15*60;
 
     my $listen_socket = $self->{listen_socket};
-    my ($connection_socket, $msg);
+    my $listen_vec    = $self->{listen_vec};
+    my $listen_fd     = fileno($listen_socket);
+    my ($connection_socket, $connection_fd, $msg, $accept_worthwhile);
     my ($event, @events);
-    my ($time, $time_of_next_event, $sleep_interval, $event_occurred);
+    my $event_loop_extensions = $self->{event_loop_extensions};
+    my ($extension, $obj, $method, $args, $extension_idx, 
$extension_events_occurred);
+    my $last_extension_idx = -1;
+    my ($time, $time_of_next_event, $sleep_interval);
+    my $total_events_occurred = 0;
+    my ($events_occurred);
     my ($pid, $exitval, $sig);
+    my ($await_return_value, $server_close, $return_value);
     while (!$quit) {
         eval {
-            $event_occurred = 0;
+            $events_occurred = 0;
             if ($#{$self->{pending_async_events}} > -1) {
-                $self->dispatch_pending_async_events();
-                $event_occurred = 1;
+                $events_occurred += $self->dispatch_pending_async_events();
             }
-            while (($pid = waitpid(-1,WNOHANG)) > 0) {
-                $event_occurred = 1;
-                $exitval = $? >> 8;
-                $sig     = $? & 255;
-                $self->log("Child $pid finished 
[exitval=$exitval,sig=$sig]\n");
-                $self->finish_pid($pid, $exitval, $sig);
-            }
-            $self->log("Checking for scheduled events.\n") if ($verbose >= 8);
+            $events_occurred += $self->dispatch_finished_processes();
+
+            # Scheduled events: Every time through the loop, we check to see
+            # if it is time for a scheduled event to occur.  If so, we send
+            # each of those events out.
+            $self->log({level=>4},"Checking for scheduled events.\n");
             $time = time();
             $time_of_next_event = $self->get_current_events([EMAIL PROTECTED], 
$time);
             if ($#events > -1) {
-                $event_occurred = 1;
                 foreach $event (@events) {
                     $self->send_event($event);
+                    $events_occurred++;
                 }
                 $time = time();
             }
-            $time = time();
-            if ($time_of_next_event > 0) {
-                $sleep_interval = $time_of_next_event - $time;
-                $sleep_interval = 0 if ($sleep_interval < 0);
-            }
-            else {
-                $sleep_interval = $default_sleep_interval;
+
+            # Registered Extensions to the Event Loop: These are lower 
priority.
+            # We only allow the extensions to be run in any given iteration 
through
+            # the event loop if we have no other core event that has occurred.
+            # Even then, we only allow one extension (that returns true) to run
+            # in each iteration, and we check them in round-robin fashion so 
that
+            # one extension does not get more attention than the others.
+            if (!$events_occurred) {
+                $extension_idx = $last_extension_idx;  # start with last 
executed extension
+                for (my $i = 0; $i <= $#$event_loop_extensions; $i++) {
+                    $extension_idx ++;   # increment it in round-robin fashion
+                    $extension_idx = 0 if ($extension_idx > 
$#$event_loop_extensions);
+                    $extension = $event_loop_extensions->[$extension_idx];
+                    ($obj, $method, $args) = @$extension;
+                    $extension_events_occurred = $obj->$method(@$args);  # 
execute extension and ...
+                    if ($extension_events_occurred) {     # check return value 
for true
+                        $last_extension_idx = $extension_idx;
+                        $events_occurred += $extension_events_occurred;
+                        last;
+                    }
+                }
             }
-            if (!$event_occurred) {
+
+            if (!$events_occurred) {
+                # Sleep Interval: Based on when the next event is scheduled 
and the current
+                # time, we determine the sleep interval.
+                $time = time();
+                if ($time_of_next_event > 0) {
+                    $sleep_interval = $time_of_next_event - $time;
+                    $sleep_interval = 0 if ($sleep_interval < 0);
+                }
+                else {
+                    $sleep_interval = $default_sleep_interval;
+                }
+
                 # TODO: if (sleep_interval == 0), use select() to see if 
anyone is waiting, else ...
-                $self->log("Sleeping on socket: timeout($sleep_interval)\n") 
if ($verbose >= 3);
-                $listen_socket->timeout($sleep_interval);
-                $SIG{CHLD}  = sub { $self->log("Caught Signal: @_\n") if 
($verbose); };
-                $connection_socket = $listen_socket->accept();
-                $SIG{CHLD}  = "DEFAULT";
-                if ($connection_socket) {
-                    $connection_socket->autoflush(1);
-                    $msg = <$connection_socket>;
-                    $msg =~ s/[\015\012]+$//;
-                    if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
-                        $quit = 1;
+                $self->log({level=>4},"Listening on socket($listen_fd): 
timeout($sleep_interval)\n");
+                $accept_worthwhile = 1;
+                # NOTE: to understand why I do this section of code, read the 
3rd paragraph under the
+                # accept() method of IO::Socket (i.e. "man IO::Socket") or 
read it here.
+                # http://perldoc.perl.org/IO/Socket.html
+                if ($sleep_interval == 0) {
+                    if (select($listen_vec, undef, $listen_vec, 0) == 0) {  # 
nothing happening on the socket
+                        $accept_worthwhile = 0;  # don't bother to call 
accept() on it
                     }
-                    else {
-                        $self->process_msg($connection_socket, $msg);
+                }
+                # Here is the truth table for $await_return_value, 
$server_close
+                #   $await_return_value  $server_close =         client        
 +        server     
+                #   -------------------  -------------   
----------------------   ---------------------
+                #             0                0              write/close      
        read/close
+                #             0                1            write/read/close   
        read/close
+                #             1                0         
write/read/write/close   read/write/read/close
+                #             1                1            write/read/close   
      read/write/close
+                # See: http://hea-www.harvard.edu/~fine/Tech/addrinuse.html
+                if ($accept_worthwhile) {
+                    $listen_socket->timeout($sleep_interval);
+                    #$SIG{CHLD}  = sub { $self->log({level=>4},"Caught Signal: 
@_\n"); };
+                    $SIG{CHLD}  = sub { };  # the point is to interrupt the 
accept() system call, not to do anything.
+                    $connection_socket = $listen_socket->accept();
+                    $SIG{CHLD}  = "DEFAULT";
+                    if ($connection_socket) {
+                        $connection_fd = fileno($connection_socket);
+                        $msg = $connection_socket->getline();
+                        $self->log({level=>4},"Message on 
socket($connection_fd) [$msg]\n");
+                        if ($msg) {
+                            $await_return_value = ($msg =~ s/^RV-//);
+                            $server_close       = ($msg =~ s/^SC-//);
+                            $msg =~ s/[\015\012]+$//;
+                            if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
+                                $quit = 1;
+                            }
+                            elsif ($msg =~ s/^GET//) {
+                                $await_return_value = 1;
+                                my $content = $self->state();
+                                my $content_length = length($content);
+                                $return_value = <<EOF;
+HTTP/1.1 200 OK
+Content-type: text/plain
+Content-length: $content_length
+Connection: close
+
+$content
+EOF
+                            }
+                            else {
+                                $return_value = $self->process_msg($msg);
+                                $return_value .= "\n" if ($return_value !~ 
/\n$/);
+                            }
+                            if ($await_return_value) {
+                                $self->log({level=>4},"Returned on 
socket($connection_fd) [$msg]\n") if ($msg !~ /^GET/);
+                                $connection_socket->autoflush(1);
+                                $connection_socket->print($return_value);
+                                $connection_socket->getline() if 
(!$server_close);
+                            }
+                            $connection_socket->close();
+                        }
+                        else {
+                            $connection_socket->close();
+                        }
                     }
-                    $connection_socket->close();
                 }
             }
         };
         if ($@) {
-            $self->log($@) if ($verbose);
+            $self->log($@);
         }
+        $total_events_occurred += $events_occurred;
+        $quit = 1 if ($max_events_occurred && $total_events_occurred >= 
$max_events_occurred);
     }
 
     $self->close_listen_socket();
@@ -207,159 +357,486 @@
     &App::sub_exit() if ($App::trace);
 }
 
+sub dispatch_network_events {
+    &App::sub_entry if ($App::trace);
+    my ($self, $sleep_interval) = @_;
+
+    $sleep_interval ||= 0;
+    my $verbose = $self->{verbose};
+    my $events_occurred = 0;
+
+    my ($connection_socket, $msg, $accept_worthwhile);
+    $self->log({level=>4},"Listening on socket: timeout($sleep_interval)\n");
+    my $listen_socket = $self->{listen_socket};
+    my $listen_vec    = $self->{listen_vec};
+
+    $accept_worthwhile = 1;
+    if ($sleep_interval == 0) {
+        # NOTE: to understand why I do this section of code, read the 3rd 
paragraph under the
+        # accept() method of IO::Socket (i.e. "man IO::Socket") or read it 
here.
+        # http://perldoc.perl.org/IO/Socket.html
+        if (select($listen_vec, undef, $listen_vec, 0) == 0) {  # nothing 
happening on the socket
+            $accept_worthwhile = 0;  # don't bother to call accept() on it
+        }
+    }
+
+    if ($accept_worthwhile) {
+        $listen_socket->timeout($sleep_interval);
+        $SIG{CHLD}  = sub { };
+        $connection_socket = $listen_socket->accept();
+        $SIG{CHLD}  = "DEFAULT";
+        if ($connection_socket) {
+            $connection_socket->autoflush(1);
+            $msg = <$connection_socket>;
+            $msg =~ s/[\015\012]+$//;
+            if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
+                # $quit = 1;
+            }
+            else {
+                $self->process_msg($msg);
+            }
+            $connection_socket->close();
+            $events_occurred ++;
+        }
+    }
+
+    &App::sub_exit($events_occurred) if ($App::trace);
+    return($events_occurred);
+}
+
+sub dispatch_finished_processes {
+    &App::sub_entry if ($App::trace);
+    my ($self) = @_;
+    my ($pid, $exitval, $sig);
+    my $events_occurred = 0;
+    while (($pid = waitpid(-1,WNOHANG)) > 0) {
+        $events_occurred ++;
+        $exitval = $? >> 8;
+        $sig     = $? & 255;
+        $self->log({level=>4},"Child $pid finished 
[exitval=$exitval,sig=$sig]\n");
+        $self->finish_pid($pid, $exitval, $sig);
+    }
+    &App::sub_exit($events_occurred) if ($App::trace);
+    return($events_occurred);
+}
+
 sub dispatch_events_begin {
+    &App::sub_entry if ($App::trace);
     my ($self) = @_;
     my $verbose = $self->{verbose};
-    $self->log("Starting Server on $self->{host}:$self->{port}\n") if 
($verbose);
+    $self->log({level=>2},"Starting Server on $self->{host}:$self->{port}\n");
+    &App::sub_exit() if ($App::trace);
 }
 
 sub dispatch_events_end {
     my ($self) = @_;
     my $verbose = $self->{verbose};
-    $self->log("Stopping Server.\n") if ($verbose);
+    $self->log({level=>2},"Stopping Server.\n");
 }
 
 sub process_msg {
-    my ($self, $connection_socket, $msg) = @_;
-    $self->log("process_msg: [$msg]\n");
+    my ($self, $msg) = @_;
+    $self->log({level=>3},"process_msg: [$msg]\n");
     my $verbose = $self->{verbose};
-    my $processing_complete = $self->process_app_msg($connection_socket, $msg);
-    if (!$processing_complete) {
-        if ($msg =~ s/^GET//) {
-            $connection_socket->print("HTTP/1.0 200 OK\n");
-            $connection_socket->print("Content-type: text/plain\n");
-            $connection_socket->print("\n");
-            $connection_socket->print("$self->{host}:$self->{port} as a 
server\n");
-            $connection_socket->print($self->state());
-        }
-        else {
-            $connection_socket->print("unknown [$msg]\n");
-        }
+    my $return_value = $self->process_custom_msg($msg);
+    if (!$return_value) {
+        $return_value = "unknown [$msg]\n";
     }
-    &App::sub_exit() if ($App::trace);
+    &App::sub_exit($return_value) if ($App::trace);
+    return($return_value);
 }
 
-sub process_app_msg {
+sub process_custom_msg {
     &App::sub_entry if ($App::trace);
-    my ($self, $connection_socket, $msg) = @_;
-    my $processing_complete = 0;
-    &App::sub_exit($processing_complete) if ($App::trace);
-    return($processing_complete);
+    my ($self, $msg) = @_;
+    my $return_value = "";
+    &App::sub_exit($return_value) if ($App::trace);
+    return($return_value);
 }
 
 sub state {
     &App::sub_entry if ($App::trace);
     my ($self) = @_;
+
+    my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
+    my $state = "Server: $self->{host}:$self->{port}  
procs[$self->{num_procs}/$self->{max_procs}:max]  
async_events[$self->{num_async_events}/$self->{max_async_events}:max]\n[$datetime]\n";
+    $state .= "\n";
+    $state .= $self->_state();
+
+    &App::sub_exit($state) if ($App::trace);
+    return($state);
+}
+
+sub _state {
+    &App::sub_entry if ($App::trace);
+    my ($self) = @_;
+
     my $state = "";
+
+    my $options = $self->{options};
+    my $objects = $options->{init_objects};
+    my ($service_type, $name, $service);
+    foreach my $object (split(/ *[;,]+ */, $objects)) {
+        if ($object) {
+            if ($object =~ /^([A-Z][A-Za-z0-9]+)\.([A-Za-z0-9_-]+)$/) {
+                $service_type = $1;
+                $name = $2;
+            }
+            else {
+                $service_type = "SessionObject";
+                $name = $object;
+            }
+            $service = $self->service($service_type, $name);  # instantiate 
it. that's all.
+            if ($service->can("state")) {
+                $state .= "\n";
+                $state .= $service->state();
+            }
+        }
+    }
+
+    my $main_service = $self->{main_service};
+
+    $state .= "\n";
+    $state .= "Running Async Events:\n";
+    my ($async_event, $event, $callback_event, @args, $args_str, $event_token, 
$runtime_event_token, $str);
+    foreach $runtime_event_token (sort keys %{$self->{running_async_event}}) {
+        $async_event = $self->{running_async_event}{$runtime_event_token};
+        ($event, $callback_event) = @$async_event;
+        $str = "";
+        if ($main_service && $main_service->can("format_async_event")) {
+            $str = $main_service->format_async_event($event, $callback_event, 
$runtime_event_token);
+        }
+        if ($str) {
+            $state .= "   ";
+            $state .= $main_service->format_async_event($event, 
$callback_event, $runtime_event_token);
+            $state .= "\n";
+        }
+        else {
+            @args = ();
+            @args = @{$event->{args}} if ($event->{args});
+            $args_str = join(",",@args);
+            $state .= sprintf("   %-20s %-20s %-24s", $event->{event_token}, 
$runtime_event_token, "$event->{name}.$event->{method}($args_str)");
+            if ($callback_event) {
+                @args = ();
+                @args = @{$callback_event->{args}} if 
($callback_event->{args});
+                $args_str = join(",",@args);
+                $state .= 
"$callback_event->{name}.$callback_event->{method}($args_str)";
+            }
+            $state .= "\n";
+        }
+    }
+
+    $state .= "\n";
+    $state .= "Pending Async Events: count [$self->{async_event_count}]\n";
+    foreach $async_event (@{$self->{pending_async_events}}) {
+        ($event, $callback_event) = @$async_event;
+        $str = "";
+        if ($main_service && $main_service->can("format_async_event")) {
+            $str = $main_service->format_async_event($event, $callback_event);
+        }
+        if ($str) {
+            $state .= "   ";
+            $state .= $main_service->format_async_event($event, 
$callback_event);
+            $state .= "\n";
+        }
+        else {
+            @args = ();
+            @args = @{$event->{args}} if ($event->{args});
+            $args_str = join(",",@args);
+            $state .= sprintf("   %-20s %-40s", $event->{event_token}, 
"$event->{name}.$event->{method}($args_str)");
+            if ($callback_event) {
+                @args = ();
+                @args = @{$callback_event->{args}} if 
($callback_event->{args});
+                $args_str = join(",",@args);
+                $state .= " => 
$callback_event->{name}.$callback_event->{method}($args_str)";
+            }
+            $state .= "\n";
+        }
+    }
+
+    $state .= "\n";
+
+    $state .= $self->SUPER::_state();
+
     &App::sub_exit($state) if ($App::trace);
     return($state);
 }
 
 # TODO: Implement this as a fork() or a context-level message to a node to 
fork().
 #       i.e. messages such as "EVENT:" and "EVENT-OK:"
-#       Save the callback_event according to an event_tag.
+#       Save the callback_event according to an event_token.
 #       Then implement cleanup_pid to send the callback_event.
 
 sub send_async_event {
     &App::sub_entry if ($App::trace);
     my ($self, $event, $callback_event) = @_;
+    my $event_token = $self->new_event_token();
+    $event->{event_token} = $event_token;
+    $callback_event->{event_token} = $event_token if ($callback_event);
     push(@{$self->{pending_async_events}}, [ $event, $callback_event ]);
-    &App::sub_exit() if ($App::trace);
+    &App::sub_exit($event_token) if ($App::trace);
+    return($event_token);
+}
+
+sub new_event_token {
+    &App::sub_entry if ($App::trace);
+    my ($self) = @_;
+    $self->{async_event_count} ++;
+    my $event_token = "$self->{host}-$self->{port}-$self->{async_event_count}";
+    &App::sub_exit($event_token) if ($App::trace);
+    return($event_token);
 }
 
 sub dispatch_pending_async_events {
     &App::sub_entry if ($App::trace);
     my ($self) = @_;
     my $pending_async_events = $self->{pending_async_events};
-    my ($async_event);
+    my ($async_event, $assigned);
+    my $events_occurred = 0;
     while ($#$pending_async_events > -1) {
-        $async_event = shift(@$pending_async_events);
-        $self->send_async_event_now(@$async_event);
+        $assigned = 
$self->assign_event_destination($pending_async_events->[0][0]);
+        if ($assigned) {
+            $async_event = shift(@$pending_async_events);
+            $self->send_async_event_now(@$async_event);
+            $events_occurred ++;
+        }
+        else {
+            last;
+        }
     }
-    &App::sub_exit() if ($App::trace);
+    &App::sub_exit($events_occurred) if ($App::trace);
+    return($events_occurred);
+}
+
+sub assign_event_destination {
+    &App::sub_entry if ($App::trace);
+    my ($self, $event) = @_;
+    my $assigned = 0;
+    if ($self->{num_procs} < $self->{max_procs} &&
+        (!defined $self->{max_async_events} || $self->{num_async_events} < 
$self->{max_async_events})) {
+        $event->{destination} = $self->{host};
+        $assigned = 1;
+    }
+    &App::sub_exit($assigned) if ($App::trace);
+    return($assigned);
 }
 
 sub send_async_event_now {
     &App::sub_entry if ($App::trace);
     my ($self, $event, $callback_event) = @_;
-    my $pid = fork();
+    my $pid = $self->fork();
     if (!$pid) {   # running in child
-        $self->send_event($event);
-        exit(0);
-    }
-    if ($callback_event) {
-        my $event_tag = "local-$pid";
-        $self->{pending_callback_event}{$event_tag} = $callback_event;
+        my $exitval = 0;
+        my (@results);
+        eval {
+            @results = $self->send_event($event);
+        };
+        if ($@) {
+            @results = ($@);
+        }
+        if ($#results > -1 && defined $results[0] && $results[0] ne "") {
+            my $textfile = $self->{options}{prefix} . "/data/app/Context/$$";
+            if (open(FILE, "> $textfile")) {
+                print App::Context::Server::FILE @results;
+                close(App::Context::Server::FILE);
+            }
+            else {
+                $exitval = 1;
+            }
+        }
+        $self->shutdown();
+        $self->exit($exitval);
     }
+    $self->{num_async_events}++;
+    my $runtime_event_token = $pid;
+    $self->{running_async_event}{$runtime_event_token} = [ $event, 
$callback_event ];
+    &App::sub_exit() if ($App::trace);
+}
+
+=head2 wait_for_event()
+
+    * Signature: $self->wait_for_event($event_token)
+    * Param:     $event_token     string
+    * Return:    void
+    * Throws:    App::Exception
+    * Since:     0.01
+
+    Sample Usage: 
+
+    $self->wait_for_event($event_token);
+
+The wait_for_event() method is called when an asynchronous event has been
+sent and no more processing can be completed before it is done.
+
+=cut
+
+sub wait_for_event {
+    &App::sub_entry if ($App::trace);
+    my ($self, $event_token) = @_;
     &App::sub_exit() if ($App::trace);
 }
 
+sub fork {
+    &App::sub_entry if ($App::trace);
+    my ($self) = @_;
+    my $pid = $self->SUPER::fork();
+    if ($pid) {  # the parent process has a new child process
+        $self->{num_procs}++;
+        $self->{proc}{$pid} = {};
+    }
+    else {  # the new child process has no sub-processes
+        $self->{num_procs} = 0;
+        $self->{proc} = {};
+        $SIG{INT}  = sub { $self->log({level=>2},"Caught Signal: @_ 
(quitting)\n"); $self->exit(102); };   # SIG  2
+        $SIG{QUIT} = sub { $self->log({level=>2},"Caught Signal: @_ 
(quitting)\n"); $self->exit(103); };   # SIG  3
+        $SIG{TERM} = sub { $self->log({level=>2},"Caught Signal: @_ 
(quitting)\n"); $self->exit(115); };   # SIG 15
+    }
+    &App::sub_exit($pid) if ($App::trace);
+    return($pid);
+}
+
 sub finish_pid {
     &App::sub_entry if ($App::trace);
     my ($self, $pid, $exitval, $sig) = @_;
 
-    my $event_tag = "local-$pid";
-    my $callback_event = $self->{pending_callback_event}{$event_tag};
-    if ($callback_event) {
-        delete $self->{pending_callback_event}{$event_tag};
-        $callback_event->{args} = [] if (! $callback_event->{args});
-        my $message = (!$exitval || !$sig) ? "Error $exitval on $pid 
[sig=$sig]" : "";
-        push(@{$callback_event->{args}}, {location => $event_tag, returnval => 
$exitval, message => "Sig $sig"});
-        $self->send_event($callback_event);
+    $self->{num_procs}--;
+    delete $self->{proc}{$pid};
+
+    my $runtime_event_token = $pid;
+    my $async_event = $self->{running_async_event}{$runtime_event_token};
+    if ($async_event) {
+        my ($event, $callback_event) = @$async_event;
+        my $returnval = "";
+        my $returnvalfile = $self->{options}{prefix} . 
"/data/app/Context/$pid";
+        if (open(FILE, $returnvalfile)) {
+            if ($callback_event) {
+                $returnval = join("",<App::Context::Server::FILE>);
+            }
+            close(App::Context::Server::FILE);
+            unlink($returnvalfile);
+        }
+
+        $self->{num_async_events}--;
+        delete $self->{running_async_event}{$runtime_event_token};
+
+        if ($callback_event) {
+            $callback_event->{args} = [] if (! $callback_event->{args});
+            my $errmsg = ($exitval || $sig) ? "Exit $exitval on $pid 
[sig=$sig]" : "";
+            push(@{$callback_event->{args}},
+                {event_token => $callback_event->{event_token}, returnval => 
$returnval, errnum => $exitval, errmsg => $errmsg});
+            $self->send_event($callback_event);
+        }
+        elsif ($sig == 9) {  # killed without a chance to finish its work
+            $self->finish_killed_async_event($event);
+        }
     }
 
     &App::sub_exit() if ($App::trace);
 }
 
-sub send_async_message {
+sub finish_killed_async_event {
     &App::sub_entry if ($App::trace);
-    my ($self, $host, $port, $message) = @_;
-    my $pid = fork();
-    if (!$pid) {   # running in child
-        $self->send_message($host, $port, $message);
-        exit(0);
-    }
+    my ($self, $event) = @_;
     &App::sub_exit() if ($App::trace);
 }
 
-sub send_message {
+sub find_runtime_event_token {
     &App::sub_entry if ($App::trace);
-    my ($self, $host, $port, $message) = @_;
-    my $verbose = $self->{verbose};
-    $self->log("send_message($host, $port, $message)\n") if ($verbose >= 2);
+    my ($self, $event_token) = @_;
+    my $running_async_event = $self->{running_async_event};
+    my ($runtime_event_token_found, $async_event);
+    foreach my $runtime_event_token (keys %$running_async_event) {
+        $async_event = $running_async_event->{$runtime_event_token};
+        if ($async_event->[0]{event_token} eq $event_token) {
+            $runtime_event_token_found = $runtime_event_token;
+            last;
+        }
+    }
+    &App::sub_exit($runtime_event_token_found) if ($App::trace);
+    return($runtime_event_token_found);
+}
+
+sub reset_running_async_events {
+    &App::sub_entry if ($App::trace);
+    my ($self, $runtime_event_token_prefix) = @_;
+    $runtime_event_token_prefix =~ s/:/-/;  # in case they send 
"localhost:8080" instead of "localhost-8080"
+    my $running_async_event = $self->{running_async_event};
+    my ($runtime_event_token, $async_event);
+    foreach $runtime_event_token (keys %$running_async_event) {
+        $async_event = $running_async_event->{$runtime_event_token};
+        if ($async_event && $runtime_event_token =~ 
/^$runtime_event_token_prefix\b/) {
+            $self->reset_running_async_event($runtime_event_token);
+        }
+    }
+    &App::sub_exit() if ($App::trace);
+}
 
-    if (!$port && $host =~ /^([^:]+):([0-9]+)$/) {
-        $host = $1;
-        $port = $2;
-    }
-
-    my $sock = IO::Socket::INET->new(
-        PeerAddr => $host,
-        PeerPort => $port,
-        Proto    => "tcp",
-        Type     => SOCK_STREAM,
-    );
+sub reset_running_async_event {
+    &App::sub_entry if ($App::trace);
+    my ($self, $runtime_event_token) = @_;
+    my $async_event = $self->abort_running_async_event($runtime_event_token);
+    if ($async_event) {
+        my $pending_async_events = $self->{pending_async_events};
+        unshift(@$pending_async_events, $async_event);
+    }
+    &App::sub_exit($async_event) if ($App::trace);
+    return($async_event);
+}
 
-    my ($response);
-    if ($sock) {
-        eval {
-            $sock->autoflush(1);
-            print $sock $message, "\n";
-            $response = <$sock>;
-            $response =~ s/[\r\n]+$//;
-            close($sock);
-        };
-        if ($@) {
-            $response = "SEND ERROR: $@";
+sub abort_async_event {
+    &App::sub_entry if ($App::trace);
+    my ($self, $event_token) = @_;
+    my $pending_async_events = $self->{pending_async_events};
+    my ($async_event);
+    my $aborted = 0;
+    # first look for it in the pending list
+    for (my $i = 0; $i <= $#$pending_async_events; $i++) {
+        $async_event = $pending_async_events->[$i];
+        if ($async_event->[0]{event_token} eq $event_token) {
+            splice(@$pending_async_events, $i, 1);
+            $aborted = 1;
+            last;
         }
     }
-    else {
-        $response = "CONNECT ERROR: $!";
+    # then look for it in the running list
+    if (!$aborted) {
+        my $runtime_event_token = 
$self->find_runtime_event_token($event_token);
+        if ($runtime_event_token) {
+            $async_event = 
$self->abort_running_async_event($runtime_event_token);
+        }
     }
+    &App::sub_exit($async_event) if ($App::trace);
+    return($async_event);
+}
 
-    $self->log("send_message($host, $port, ...) => [$response]\n") if 
($verbose >= 2);
-    &App::sub_exit($response) if ($App::trace);
-    return($response);
+sub abort_running_async_event {
+    &App::sub_entry if ($App::trace);
+    my ($self, $runtime_event_token) = @_;
+    my $running_async_event  = $self->{running_async_event};
+    my $pending_async_events = $self->{pending_async_events};
+    my $async_event = $running_async_event->{$runtime_event_token};
+    if ($async_event) {
+        $self->{num_async_events}--;
+        delete $self->{running_async_event}{$runtime_event_token};
+        unshift(@$pending_async_events, $async_event);
+        $self->_abort_running_async_event($runtime_event_token, @$async_event);
+    }
+    &App::sub_exit($async_event) if ($App::trace);
+    return($async_event);
+}
+
+# $runtime_event_tokens take the following forms:
+#    $runtime_event_token = $pid; -- 
App::Context::Server::send_async_event_now() and ::finish_pid()
+sub _abort_running_async_event {
+    &App::sub_entry if ($App::trace);
+    my ($self, $runtime_event_token, $event, $callback_event) = @_;
+    if ($runtime_event_token =~ /^[0-9]+$/) {
+        kill(15, $runtime_event_token);
+    }
+    else {
+        $self->log("Unable to abort running async event 
[$runtime_event_token]\n");
+    }
+    &App::sub_exit() if ($App::trace);
 }
 
 1;

Reply via email to