Author: spadkins
Date: Sat Jul  1 20:44:55 2006
New Revision: 6608

Modified:
   p5ee/trunk/App-Context/lib/App/Context/ClusterController.pm
   p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm

Log:
add application-driven async event assignment

Modified: p5ee/trunk/App-Context/lib/App/Context/ClusterController.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/ClusterController.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/ClusterController.pm Sat Jul  1 
20:44:55 2006
@@ -80,7 +80,9 @@
         my $response = $self->send_message($node_host, $node_port, 
"ASYNC-EVENT:$event->{service_type}:$event->{name}:$event->{method}:$args", 1, 
undef, 1);
         if ($response =~ /^ASYNC-EVENT-TOKEN:(.+)/) {
             my $runtime_event_token = $1;
+            my $destination = $event->{destination} || "local";
             $self->{num_async_events}++;
+            $self->{node}{$destination}{num_async_events}++;
             $self->{running_async_event}{$runtime_event_token} = [ $event, 
$callback_event ];
         }
         elsif ($response =~ /ERROR/) {
@@ -118,8 +120,15 @@
     my ($self, $event) = @_;
     my $assigned = 0;
     if ($self->{num_async_events} < $self->{max_async_events}) {
-        $event->{destination} = $self->{host};
-        $assigned = $self->assign_event_destination_by_round_robin($event);
+        # SPA 2006-07-01: I just commented this out. I shouldn't need it.
+        # $event->{destination} = $self->{host};
+        my $main_service = $self->{main_service};
+        if ($main_service && $main_service->can("assign_event_destination")) {
+            $assigned = $main_service->assign_event_destination($event, 
$self->{nodes}, $self->{node});
+        }
+        else {
+            $assigned = $self->assign_event_destination_by_round_robin($event);
+        }
     }
     &App::sub_exit($assigned) if ($App::trace);
     return($assigned);
@@ -132,11 +141,11 @@
     my $assigned = 0;
     my $nodes = $self->{nodes};
     if ($#$nodes > -1) {
-        my $node_idx = $self->{last_node_idx};
+        my $node_idx = $self->{node}{ALL}{last_node_idx};
         $node_idx = (defined $node_idx) ? $node_idx + 1 : 0;
         $node_idx = 0 if ($node_idx > $#$nodes);
         $event->{destination} = $nodes->[$node_idx];
-        $self->{last_node_idx} = $node_idx;
+        $self->{node}{ALL}{last_node_idx} = $node_idx;
         $assigned = 1;
     }
 
@@ -170,10 +179,13 @@
             my $async_event = 
$self->{running_async_event}{$runtime_event_token};
 
             if ($async_event) {
-                $self->{num_async_events}--;
                 delete $self->{running_async_event}{$runtime_event_token};
 
                 my ($event, $callback_event) = @$async_event;
+                my $destination = $event->{destination} || "local";
+                $self->{num_async_events}--;
+                $self->{node}{$destination}{num_async_events}--;
+
                 if ($callback_event) {
                     $callback_event->{args} = [] if (! 
$callback_event->{args});
                     push(@{$callback_event->{args}},
@@ -225,9 +237,20 @@
 
     my (@nodes);
     @nodes = @{$self->{nodes}} if ($self->{nodes});
-    $state .= "Nodes: up [EMAIL PROTECTED] last dispatched 
[$self->{last_node_idx}]\n";
+    $state .= "Nodes: up [EMAIL PROTECTED] last dispatched 
[$self->{node}{ALL}{last_node_idx}]\n";
+    my ($memfree, $memtotal, $swapfree, $swaptotal);
     foreach my $node (sort keys %{$self->{node}}) {
-        $state .= sprintf("   %-16s %4s\n", $node, $self->{node}{$node}{up} ? 
"UP" : "down");
+        next if ($node eq "ALL");
+        $state .= sprintf("   %-16s %4s : %3d/%3d max : 
[Load:%4.1f][Mem:%5.1f%%/%7d][Swap:%5.1f%%/%7d] : [%19s]\n", $node,
+            $self->{node}{$node}{up} ? "UP" : "down",
+            $self->{node}{$node}{num_async_events} || 0,
+            $self->{node}{$node}{max_async_events} || 0,
+            $self->{node}{$node}{load} || 0,
+            $self->{node}{$node}{memtotal} ? 
100*($self->{node}{$node}{memtotal} - 
$self->{node}{$node}{memfree})/$self->{node}{$node}{memtotal} : 0,
+            $self->{node}{$node}{memtotal} || 0,
+            $self->{node}{$node}{swaptotal} ? 
100*($self->{node}{$node}{memtotal} - 
$self->{node}{$node}{swapfree})/$self->{node}{$node}{swaptotal} : 0,
+            $self->{node}{$node}{swaptotal} || 0,
+            $self->{node}{$node}{datetime});
     }
 
     $state .= $self->SUPER::_state();
@@ -250,7 +273,19 @@
 sub set_node_up {
     &App::sub_entry if ($App::trace);
     my ($self, $node) = @_;
-    my ($retval);
+    my ($retval, $values);
+    if ($node =~ /^([^:]+:\d+):(.*)/) {
+        $node   = $1;
+        $values = $2;
+        if ($values) {
+            foreach my $value (split(/,/, $values)) {
+                if ($value  =~ /^([^=]+)=(.*)/) {
+                    $self->{node}{$node}{$1} = $2;
+                }
+            }
+        }
+    }
+    $self->{node}{$node}{datetime} = time2str("%Y-%m-%d %H:%M:%S", time());
     if ($self->{node}{$node}{up}) {
         $retval = "ok";
     }

Modified: p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm       (original)
+++ p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm       Sat Jul  1 
20:44:55 2006
@@ -54,14 +54,9 @@
     &App::sub_entry if ($App::trace);
     my ($self) = @_;
     $self->log({level=>2},"Starting Cluster Node on 
$self->{host}:$self->{port}\n");
-    my $controller_host = $self->{controller_host};
-    my $controller_port = $self->{controller_port};
-    my $node_host       = $self->{host};
-    my $node_port       = $self->{port};
-    my $node_heartbeat  = $self->{options}{node_heartbeat} || 300;
+    my $node_heartbeat  = $self->{options}{node_heartbeat} || 60;
     $self->schedule_event(
-        method => "send_async_message",
-        args => [ $controller_host, $controller_port, 
"NODE-UP:$node_host:$node_port" ],
+        method => "send_node_status",
         time => time(),  # immediately ...
         interval => $node_heartbeat,  # and every X seconds hereafter
     );
@@ -84,6 +79,28 @@
     &App::sub_exit() if ($App::trace);
 }
 
+sub send_node_status {
+    &App::sub_entry if ($App::trace);
+    my ($self) = @_;
+    my $controller_host = $self->{controller_host};
+    my $controller_port = $self->{controller_port};
+    my $node_host       = $self->{host};
+    my $node_port       = $self->{port};
+    my $values          = $self->system_values();
+    $self->send_async_message($controller_host, $controller_port, 
"NODE-UP:$node_host:$node_port:$values");
+    &App::sub_exit() if ($App::trace);
+}
+
+sub system_values {
+    &App::sub_entry if ($App::trace);
+    my ($self) = @_;
+    my $info = $self->get_sys_info();
+    my $memfree = $info->{memfree} + $info->{buffers} + $info->{cached};
+    my $values = 
"load=$info->{load},memfree=$memfree,memtotal=$info->{memtotal},swapfree=$info->{swapfree},swaptotal=$info->{swaptotal}";
+    &App::sub_exit($values) if ($App::trace);
+    return($values);
+}
+
 sub process_msg {
     &App::sub_entry if ($App::trace);
     my ($self, $msg) = @_;

Reply via email to