Author: spadkins
Date: Sat Jul 1 20:44:55 2006
New Revision: 6608
Modified:
p5ee/trunk/App-Context/lib/App/Context/ClusterController.pm
p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm
Log:
add application-driven async event assignment
Modified: p5ee/trunk/App-Context/lib/App/Context/ClusterController.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/ClusterController.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/ClusterController.pm Sat Jul 1
20:44:55 2006
@@ -80,7 +80,9 @@
my $response = $self->send_message($node_host, $node_port,
"ASYNC-EVENT:$event->{service_type}:$event->{name}:$event->{method}:$args", 1,
undef, 1);
if ($response =~ /^ASYNC-EVENT-TOKEN:(.+)/) {
my $runtime_event_token = $1;
+ my $destination = $event->{destination} || "local";
$self->{num_async_events}++;
+ $self->{node}{$destination}{num_async_events}++;
$self->{running_async_event}{$runtime_event_token} = [ $event,
$callback_event ];
}
elsif ($response =~ /ERROR/) {
@@ -118,8 +120,15 @@
my ($self, $event) = @_;
my $assigned = 0;
if ($self->{num_async_events} < $self->{max_async_events}) {
- $event->{destination} = $self->{host};
- $assigned = $self->assign_event_destination_by_round_robin($event);
+ # SPA 2006-07-01: I just commented this out. I shouldn't need it.
+ # $event->{destination} = $self->{host};
+ my $main_service = $self->{main_service};
+ if ($main_service && $main_service->can("assign_event_destination")) {
+ $assigned = $main_service->assign_event_destination($event,
$self->{nodes}, $self->{node});
+ }
+ else {
+ $assigned = $self->assign_event_destination_by_round_robin($event);
+ }
}
&App::sub_exit($assigned) if ($App::trace);
return($assigned);
@@ -132,11 +141,11 @@
my $assigned = 0;
my $nodes = $self->{nodes};
if ($#$nodes > -1) {
- my $node_idx = $self->{last_node_idx};
+ my $node_idx = $self->{node}{ALL}{last_node_idx};
$node_idx = (defined $node_idx) ? $node_idx + 1 : 0;
$node_idx = 0 if ($node_idx > $#$nodes);
$event->{destination} = $nodes->[$node_idx];
- $self->{last_node_idx} = $node_idx;
+ $self->{node}{ALL}{last_node_idx} = $node_idx;
$assigned = 1;
}
@@ -170,10 +179,13 @@
my $async_event =
$self->{running_async_event}{$runtime_event_token};
if ($async_event) {
- $self->{num_async_events}--;
delete $self->{running_async_event}{$runtime_event_token};
my ($event, $callback_event) = @$async_event;
+ my $destination = $event->{destination} || "local";
+ $self->{num_async_events}--;
+ $self->{node}{$destination}{num_async_events}--;
+
if ($callback_event) {
$callback_event->{args} = [] if (!
$callback_event->{args});
push(@{$callback_event->{args}},
@@ -225,9 +237,20 @@
my (@nodes);
@nodes = @{$self->{nodes}} if ($self->{nodes});
- $state .= "Nodes: up [EMAIL PROTECTED] last dispatched
[$self->{last_node_idx}]\n";
+ $state .= "Nodes: up [EMAIL PROTECTED] last dispatched
[$self->{node}{ALL}{last_node_idx}]\n";
+ my ($memfree, $memtotal, $swapfree, $swaptotal);
foreach my $node (sort keys %{$self->{node}}) {
- $state .= sprintf(" %-16s %4s\n", $node, $self->{node}{$node}{up} ?
"UP" : "down");
+ next if ($node eq "ALL");
+ $state .= sprintf(" %-16s %4s : %3d/%3d max :
[Load:%4.1f][Mem:%5.1f%%/%7d][Swap:%5.1f%%/%7d] : [%19s]\n", $node,
+ $self->{node}{$node}{up} ? "UP" : "down",
+ $self->{node}{$node}{num_async_events} || 0,
+ $self->{node}{$node}{max_async_events} || 0,
+ $self->{node}{$node}{load} || 0,
+ $self->{node}{$node}{memtotal} ?
100*($self->{node}{$node}{memtotal} -
$self->{node}{$node}{memfree})/$self->{node}{$node}{memtotal} : 0,
+ $self->{node}{$node}{memtotal} || 0,
+ $self->{node}{$node}{swaptotal} ?
100*($self->{node}{$node}{memtotal} -
$self->{node}{$node}{swapfree})/$self->{node}{$node}{swaptotal} : 0,
+ $self->{node}{$node}{swaptotal} || 0,
+ $self->{node}{$node}{datetime});
}
$state .= $self->SUPER::_state();
@@ -250,7 +273,19 @@
sub set_node_up {
&App::sub_entry if ($App::trace);
my ($self, $node) = @_;
- my ($retval);
+ my ($retval, $values);
+ if ($node =~ /^([^:]+:\d+):(.*)/) {
+ $node = $1;
+ $values = $2;
+ if ($values) {
+ foreach my $value (split(/,/, $values)) {
+ if ($value =~ /^([^=]+)=(.*)/) {
+ $self->{node}{$node}{$1} = $2;
+ }
+ }
+ }
+ }
+ $self->{node}{$node}{datetime} = time2str("%Y-%m-%d %H:%M:%S", time());
if ($self->{node}{$node}{up}) {
$retval = "ok";
}
Modified: p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm Sat Jul 1
20:44:55 2006
@@ -54,14 +54,9 @@
&App::sub_entry if ($App::trace);
my ($self) = @_;
$self->log({level=>2},"Starting Cluster Node on
$self->{host}:$self->{port}\n");
- my $controller_host = $self->{controller_host};
- my $controller_port = $self->{controller_port};
- my $node_host = $self->{host};
- my $node_port = $self->{port};
- my $node_heartbeat = $self->{options}{node_heartbeat} || 300;
+ my $node_heartbeat = $self->{options}{node_heartbeat} || 60;
$self->schedule_event(
- method => "send_async_message",
- args => [ $controller_host, $controller_port,
"NODE-UP:$node_host:$node_port" ],
+ method => "send_node_status",
time => time(), # immediately ...
interval => $node_heartbeat, # and every X seconds hereafter
);
@@ -84,6 +79,28 @@
&App::sub_exit() if ($App::trace);
}
+sub send_node_status {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+ my $controller_host = $self->{controller_host};
+ my $controller_port = $self->{controller_port};
+ my $node_host = $self->{host};
+ my $node_port = $self->{port};
+ my $values = $self->system_values();
+ $self->send_async_message($controller_host, $controller_port,
"NODE-UP:$node_host:$node_port:$values");
+ &App::sub_exit() if ($App::trace);
+}
+
+sub system_values {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+ my $info = $self->get_sys_info();
+ my $memfree = $info->{memfree} + $info->{buffers} + $info->{cached};
+ my $values =
"load=$info->{load},memfree=$memfree,memtotal=$info->{memtotal},swapfree=$info->{swapfree},swaptotal=$info->{swaptotal}";
+ &App::sub_exit($values) if ($App::trace);
+ return($values);
+}
+
sub process_msg {
&App::sub_entry if ($App::trace);
my ($self, $msg) = @_;