Author: spadkins
Date: Tue Aug  7 12:11:58 2007
New Revision: 9827

Modified:
   p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
   p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
   p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm

Log:
first working version of POE cluster context

Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm     
(original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm     Tue Aug 
 7 12:11:58 2007
@@ -44,9 +44,10 @@
     $self->{max_async_events} = 0;  # start with 0 because there are no nodes 
up
 
     push(@{$self->{poe_states}},
-        "poe_remote_async_event_queued", "poe_set_node_status", 
"poe_run_event",
-        "poe_register_node", "poe_set_node_up", "poe_set_node_down");
-    push(@{$self->{poe_ikc_published_states}}, "poe_set_node_status");
+        "poe_receive_node_status",
+        "poe_run_event");
+    push(@{$self->{poe_ikc_published_states}},
+        "poe_receive_node_status");
 
     $self->_init_poe($options);
 
@@ -83,21 +84,22 @@
         my $event_token = $self->send_async_event_in_process($event, 
$callback_event);
     }
     elsif ($destination =~ /^([^:]+):([0-9]+)$/) {
+        my $controller = "$self->{host}:$self->{port}";
         my $node_host = $1;
         my $node_port = $2;
         my $args = $event->{args};
 
         my $remote_server_name = "poe_${node_host}_${node_port}";
         my $remote_session_alias = $self->{poe_session_name};  # remote is 
same as local
-        my $remote_session_state = "poe_send_async_event";
-        my $local_callback_state = "poe_remote_async_event_queued";
+        my $remote_session_state = "poe_enqueue_async_event";
+        my $local_callback_state = "poe_enqueue_async_event_finished";
 
         $self->{num_async_events}++;
         $self->{node}{$destination}{num_async_events}++;
 
         my $kernel = $self->{poe_kernel};
         $kernel->post("IKC", "call", 
"poe://$remote_server_name/$remote_session_alias/$remote_session_state",
-            [ $event, $callback_event ], "poe:$local_callback_state" );
+            [ $controller, $event, $callback_event ], 
"poe:$local_callback_state" );
     }
     else {
         $self->SUPER::send_async_event_now($event, $callback_event);
@@ -109,8 +111,9 @@
     &App::sub_entry if ($App::trace);
     my ($self, $kernel, $session_name) = @_[OBJECT, KERNEL, ARG1];
     $self->log({level=>2},"POE: ikc_register ($session_name)\n");
-    if ($session_name =~ /^ikc_/) {
-        # do something
+    if ($session_name =~ /^ikc_([^_]+)_(\d+)$/) {
+        my $node = "$1:$2";
+        $self->set_node_up($node);
     }
     my ($retval);
     &App::sub_exit($retval) if ($App::trace);
@@ -119,8 +122,12 @@
 
 sub ikc_unregister {
     &App::sub_entry if ($App::trace);
-    my ($self, $kernel, $remote_kernel_id) = @_[OBJECT, KERNEL, ARG0];
-    $self->log({level=>2},"POE: ikc_unregister ($remote_kernel_id)\n");
+    my ($self, $kernel, $session_name) = @_[OBJECT, KERNEL, ARG1];
+    $self->log({level=>2},"POE: ikc_unregister ($session_name)\n");
+    if ($session_name =~ /^ikc_([^_]+)_(\d+)$/) {
+        my $node = "$1:$2";
+        $self->set_node_down($node);
+    }
     &App::sub_exit() if ($App::trace);
 }
 
@@ -132,14 +139,6 @@
     return;
 }
 
-sub poe_remote_async_event_queued {
-    &App::sub_entry if ($App::trace);
-    my ($self, $kernel, $runtime_event_token, $async_event) = @_[OBJECT, 
KERNEL, ARG0, ARG1];
-    $self->log({level=>2},"POE: poe_remote_async_event_queued 
($async_event->[0]{name}.$async_event->[0]{method} => $runtime_event_token\n");
-    $self->{running_async_event}{$runtime_event_token} = $async_event;
-    &App::sub_exit() if ($App::trace);
-}
-
 # $runtime_event_tokens take the following forms:
 #    $runtime_event_token = $pid; -- 
App::Context::Server::send_async_event_now() and ::finish_pid()
 #    $runtime_event_token = "$host-$port-$serial"; -- i.e. a plain event token 
on the node
@@ -207,13 +206,14 @@
     return($assigned);
 }
 
-sub poe_set_node_status {
+sub poe_receive_node_status {
     &App::sub_entry if ($App::trace);
     my ($self, $kernel, $args) = @_[OBJECT, KERNEL, ARG0];
-    $self->log("POE: poe_set_node_status args=(@$args)\n");
-    my ($retval);
-    &App::sub_exit($retval) if ($App::trace);
-    return($retval);
+    my ($node, $sys_info) = @$args;
+    $self->log("POE: poe_receive_node_status ($node) - " .
+       "load=$sys_info->{load}, 
memfree=$sys_info->{memfree}/$sys_info->{memtotal} 
swapfree=$sys_info->{swapfree}/$sys_info->{swaptotal}\n");
+    $self->set_node_up($node, $sys_info);
+    &App::sub_exit() if ($App::trace);
 }
 
 sub poe_run_event {
@@ -277,49 +277,11 @@
     return($state);
 }
 
-sub poe_register_node {
-    &App::sub_entry if ($App::trace);
-    my ($self, $kernel, $remote_kernel_id) = @_[OBJECT, KERNEL, ARG0];
-    $self->log({level=>2},"POE: poe_register_node ($remote_kernel_id)\n");
-
-    my ($node);
-    if ($remote_kernel_id =~ m!poe_([^_]+)_([0-9]+)!) {
-        $node = "$1:$2";
-    }
-    else {
-        $self->log("ERROR: poe_register_node: unparseable remote_kernel_id 
[$remote_kernel_id]\n");
-    }
-
-    if (!$self->{node}{$node}{up}) {
-        my $remote_server_name = "poe_${node}";
-        $remote_server_name =~ s/:/_/;
-        $kernel->post("IKC", "monitor", "poe://$remote_server_name",
-            {register   => "poe_set_node_up",
-             unregister => "poe_set_node_down",
-             shutdown   => "poe_set_node_down",
-             data       => $node});
-    }
-
-    &App::sub_exit() if ($App::trace);
-}
-
-sub poe_set_node_up {
+sub set_node_up {
     &App::sub_entry if ($App::trace);
-    my ($self, $kernel, $remote_kernel_id, $node) = @_[OBJECT, KERNEL, ARG0, 
ARG3];
-    $self->log({level=>2},"POE: poe_set_node_up ($remote_kernel_id; 
node=$node)\n");
-    my ($retval, $values);
+    my ($self, $node, $sys_info) = @_;
+    my ($retval);
     if (!$self->{node}{$node}{up}) {
-        if ($node =~ /^([^:]+:\d+):(.*)/) {
-            $node   = $1;
-            $values = $2;
-            if ($values) {
-                foreach my $value (split(/,/, $values)) {
-                    if ($value  =~ /^([^=]+)=(.*)/) {
-                        $self->{node}{$node}{$1} = $2;
-                    }
-                }
-            }
-        }
         $self->{node}{$node}{datetime} = time2str("%Y-%m-%d %H:%M:%S", time());
         if ($self->{node}{$node}{up}) {
             $retval = "ok";
@@ -330,14 +292,18 @@
             $retval = "new";
         }
     }
+    if ($sys_info) {
+        foreach my $sys_var (keys %$sys_info) {
+            $self->{node}{$node}{$sys_var} = $sys_info->{$sys_var};
+        }
+    }
     &App::sub_exit($retval) if ($App::trace);
     return($retval);
 }
 
-sub poe_set_node_down {
+sub set_node_down {
     &App::sub_entry if ($App::trace);
-    my ($self, $kernel, $remote_kernel_id, $node) = @_[OBJECT, KERNEL, ARG0, 
ARG3];
-    $self->log({level=>2},"POE: poe_set_node_down ($remote_kernel_id; 
node=$node)\n");
+    my ($self, $node) = @_;
     my $runtime_event_token_prefix = $node;
     $runtime_event_token_prefix =~ s/:/-/;
     $self->reset_running_async_events($runtime_event_token_prefix);

Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm   (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm   Tue Aug  7 
12:11:58 2007
@@ -61,7 +61,7 @@
     &App::sub_entry if ($App::trace);
     my ($self, $options) = @_;
 
-    my $ikc_name = "ikc_$self->{host}_$self->{port}";
+    my $ikc_name = "poe_$self->{host}_$self->{port}";
     ### Set up a server
     POE::Component::IKC::Responder->spawn();
     POE::Component::IKC::Client->spawn(
@@ -74,7 +74,7 @@
 
     my $session_name = $self->{poe_session_name};
     POE::Component::Server::SimpleHTTP->new(
-        'ALIAS'    => $self->{poe_kernel_httpd_name},
+        'ALIAS'    => $self->{poe_kernel_http_name},
         'ADDRESS'  => INADDR_ANY,
         'PORT'     => $self->{options}{http_port},
         'HANDLERS' => [
@@ -140,9 +140,10 @@
 
 sub ikc_unregister {
     &App::sub_entry if ($App::trace);
-    my ($self, $kernel, $remote_kernel_id, $node) = @_[OBJECT, KERNEL, ARG0, 
ARG3];
-    $self->log({level=>2},"POE: ikc_unregister ($remote_kernel_id; 
node=$node)\n");
+    my ($self, $kernel, $remote_kernel_id, $session_name, $node) = @_[OBJECT, 
KERNEL, ARG0, ARG1, ARG3];
+    $self->log({level=>2},"POE: ikc_unregister ($remote_kernel_id; 
session_name=$session_name; node=$node)\n");
     $self->{controller_up} = 0;
+    $kernel->yield("poe_shutdown");
     &App::sub_exit() if ($App::trace);
 }
 
@@ -161,7 +162,7 @@
     my $node_heartbeat  = $self->{options}{node_heartbeat} || 60;
     $self->schedule_event(
         method => "send_node_status",
-        time => time(),  # immediately ...
+        time => time()+5,  # immediately ...
         interval => $node_heartbeat,  # and every X seconds hereafter
     );
     &App::sub_exit() if ($App::trace);
@@ -184,13 +185,21 @@
 
     my $remote_server_name = "poe_${controller_host}_${controller_port}";
     my $remote_session_alias = $self->{poe_session_name};  # remote is same as 
local
-    my $remote_session_state = "poe_set_node_status";
+    my $remote_session_state = "poe_receive_node_status";
     my $sys_info = $self->get_sys_info();
+    my $memfree = $sys_info->{memfree} + $sys_info->{buffers} + 
$sys_info->{cached};
+    my $s_info = {
+        load => $sys_info->{load},
+        memfree => $memfree,
+        memtotal => $sys_info->{memtotal},
+        swapfree => $sys_info->{swapfree},
+        swaptotal => $sys_info->{swaptotal},
+    };
 
     if ($self->{controller_up}) {
         my $kernel = $self->{poe_kernel};
         $kernel->post("IKC", "post", 
"poe://$remote_server_name/$remote_session_alias/$remote_session_state",
-            [ $sys_info ]);
+            [ "$node_host:$node_port", $s_info ]);
     }
 
     &App::sub_exit() if ($App::trace);

Modified: p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm        (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm        Tue Aug  7 
12:11:58 2007
@@ -44,7 +44,7 @@
     $self->{port}            = $options->{port};
     $options->{http_port}  ||= $options->{port}+1;
     $self->{poe_kernel_name}            = "poe_$self->{host}_$self->{port}";
-    $self->{poe_kernel_httpd_name}      = $self->{poe_kernel_name} . "_httpd";
+    $self->{poe_kernel_http_name}      = $self->{poe_kernel_name} . "_httpd";
     $self->{poe_session_name}           = "poe_session";
     $self->{poe_kernel}                 = $poe_kernel;
 
@@ -65,8 +65,13 @@
         ikc_register ikc_unregister ikc_shutdown
         poe_run_event poe_event_loop_extension 
poe_dispatch_pending_async_events
         poe_server_state poe_http_server_state poe_http_test_run
+        poe_enqueue_async_event poe_enqueue_async_event_finished 
poe_remote_async_event_finished
+    )];
+    $self->{poe_ikc_published_states} = [qw(
+        poe_server_state
+        poe_enqueue_async_event
+        poe_remote_async_event_finished
     )];
-    $self->{poe_ikc_published_states} = ["poe_server_state"];
 
     ### Does nothing by default, used by ClusterController, maybe other 
subclasses?
     $self->_init2a($options);
@@ -116,7 +121,7 @@
 
     my $session_name = $self->{poe_session_name};
     POE::Component::Server::SimpleHTTP->new(
-        'ALIAS'    => $self->{poe_kernel_httpd_name},
+        'ALIAS'    => $self->{poe_kernel_http_name},
         'ADDRESS'  => INADDR_ANY,
         'PORT'     => $self->{options}{http_port},
         'HANDLERS' => [
@@ -729,7 +734,7 @@
 sub _stop {
     &App::sub_entry if ($App::trace);
     my ( $self, $kernel, $heap, $state, $args ) = @_[ OBJECT, KERNEL, HEAP, 
ARG0, ARG1 ];
-    $self->log({level=>2},"POE: _start\n");
+    $self->log({level=>2},"POE: _stop\n");
     #sleep(1);  # take a second to let child processes to die (perhaps not 
necessary, perhaps necessary when using POE::Wheel::Run)
     &App::sub_exit() if ($App::trace);
 }
@@ -745,8 +750,8 @@
 
 sub ikc_unregister {
     &App::sub_entry if ($App::trace);
-    my ($self, $kernel, $remote_kernel_id) = @_[OBJECT, KERNEL, ARG0];
-    $self->log({level=>2},"POE: ikc_unregister ($remote_kernel_id)\n");
+    my ($self, $kernel, $session_name) = @_[OBJECT, KERNEL, ARG1];
+    $self->log({level=>2},"POE: ikc_unregister ($session_name)\n");
     &App::sub_exit() if ($App::trace);
 }
 
@@ -834,6 +839,9 @@
     # get rid of external ref count
     $kernel->refcount_decrement( $session, $self->{poe_session_name} );
 
+    $kernel->post( $self->{poe_kernel_http_name}, 'SHUTDOWN');
+    $kernel->post('IKC', 'shutdown');
+
     # propagate the message to children
     $kernel->post( $heap->{child_session}, 'poe_shutdown' );
     &App::sub_exit() if ($App::trace);
@@ -855,18 +863,26 @@
     $self->log({level=>2},"POE: poe_event_loop_extension\n");
     my $event_loop_extensions = $self->{event_loop_extensions};
     #$self->log({level=>2},"Event Loop extension ($event_loop_extensions: #=" 
. ($#$event_loop_extensions+1) . ").\n");
+    my $async_event_added = 0;
     if ($event_loop_extensions && $#$event_loop_extensions > -1) {
         my ($extension, $obj, $method, $args, $event_executed);
         for (my $i = 0; $i <= $#$event_loop_extensions; $i++) {
             $extension = $event_loop_extensions->[$i];
             ($obj, $method, $args) = @$extension;
             $event_executed = $obj->$method(@$args);  # execute extension
+            $async_event_added = 1 if ($event_executed);
             #if ($event_executed) {
             #    $self->log({level=>2},"Event Loop extension: 
${obj}->${method}(@$args) = $event_executed\n");
             #}
         }
     }
-    $kernel->delay_set("poe_event_loop_extension", 1);
+    if ($async_event_added) {
+        $kernel->yield("poe_dispatch_pending_async_events");
+        $kernel->yield("poe_event_loop_extension");
+    }
+    else {
+        $kernel->delay_set("poe_event_loop_extension", 1);
+    }
     &App::sub_exit() if ($App::trace);
 }
 
@@ -888,6 +904,77 @@
     &App::sub_exit() if ($App::trace);
 }
 
+# State on Node
+sub poe_enqueue_async_event {
+    &App::sub_entry if ($App::trace);
+    my ($self, $kernel, $args) = @_[OBJECT, KERNEL, ARG0];
+    my ($sender, $event, $callback_event) = @$args;
+    $self->log({level=>2},"POE: poe_enqueue_async_event 
($event->{name}.$event->{method})\n");
+
+    my $runtime_event_token = $self->send_async_event($event, { method => 
"async_event_finished", args => [ $sender, $event, $callback_event ], });
+    $event->{event_token} = $runtime_event_token;
+
+    &App::sub_exit([$runtime_event_token, [$event, $callback_event]]) if 
($App::trace);
+    return([$runtime_event_token, [$event, $callback_event]]);
+}
+
+# State on Controller
+sub poe_enqueue_async_event_finished {
+    &App::sub_entry if ($App::trace);
+    my ($self, $kernel, $return_values) = @_[OBJECT, KERNEL, ARG0];
+    my ($runtime_event_token, $async_event) = @$return_values;
+    $self->log({level=>2},"POE: poe_enqueue_async_event_finished 
($async_event->[0]{name}.$async_event->[0]{method} => $runtime_event_token)\n");
+    $self->{running_async_event}{$runtime_event_token} = $async_event;
+    &App::sub_exit() if ($App::trace);
+}
+
+# Method on Node
+sub async_event_finished {
+    &App::sub_entry if ($App::trace);
+    my ($self, $sender, $event, $callback_event) = @_;
+
+    my $runtime_event_token = $event->{event_token};
+    my $remote_server_name = "poe_${sender}";
+    $remote_server_name =~ s/:/_/;
+    my $remote_session_alias = $self->{poe_session_name};  # remote is same as 
local
+    my $remote_session_state = "poe_remote_async_event_finished";
+
+    my $kernel = $self->{poe_kernel};
+    $kernel->post("IKC", "post", 
"poe://$remote_server_name/$remote_session_alias/$remote_session_state",
+        [ $runtime_event_token, $callback_event->{args} ]);
+
+    &App::sub_exit() if ($App::trace);
+}
+
+# State on Controller
+sub poe_remote_async_event_finished {
+    &App::sub_entry if ($App::trace);
+    my ($self, $kernel, $args) = @_[OBJECT, KERNEL, ARG0];
+    my ($runtime_event_token, $callback_args) = @$args;
+
+    my $async_event = $self->{running_async_event}{$runtime_event_token};
+
+    if ($async_event) {
+        my ($event, $callback_event) = @$async_event;
+        $self->log({level=>2},"POE: poe_remote_async_event_finished 
($event->{name}.$event->{method} => $runtime_event_token)\n");
+        delete $self->{running_async_event}{$runtime_event_token};
+
+        my $destination = $event->{destination} || "local";
+        $self->{num_async_events}--;
+        $self->{node}{$destination}{num_async_events}--;
+
+        if ($callback_event) {
+            $callback_event->{args} = $callback_args;
+            $self->send_event($callback_event);
+        }
+    }
+    else {
+        $self->log({level=>2},"POE: poe_remote_async_event_finished 
($runtime_event_token)\n");
+    }
+
+    &App::sub_exit() if ($App::trace);
+}
+
 sub poe_server_state {
     &App::sub_entry if ($App::trace);
     my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
@@ -909,7 +996,7 @@
     $response->content($server_state);
 
     # Signal that the request was handled okay.
-    $kernel->post( $self->{poe_kernel_httpd_name}, 'DONE', $response );
+    $kernel->post( $self->{poe_kernel_http_name}, 'DONE', $response );
     &App::sub_exit(RC_OK) if ($App::trace);
     return RC_OK;
 }
@@ -932,7 +1019,7 @@
     $response->content("SessionObject(mvworkd).sleep(30)");
 
     # Signal that the request was handled okay.
-    $kernel->post( $self->{poe_kernel_httpd_name}, 'DONE', $response );
+    $kernel->post( $self->{poe_kernel_http_name}, 'DONE', $response );
     &App::sub_exit(RC_OK) if ($App::trace);
     return RC_OK;
 }

Reply via email to