Author: spadkins
Date: Sat Apr 8 08:41:03 2006
New Revision: 5875
Added:
p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm
Modified:
p5ee/trunk/App-Context/lib/App/Context/Cluster.pm
Log:
moving clustering context toward maturity
Modified: p5ee/trunk/App-Context/lib/App/Context/Cluster.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/Cluster.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/Cluster.pm Sat Apr 8 08:41:03 2006
@@ -11,17 +11,13 @@
@ISA = ( "App::Context::Server" );
-use Sys::Hostname;
-use Socket;
-use IO::Socket;
-use IO::Socket::INET;
-use POSIX ":sys_wait_h";
+use Date::Format;
use strict;
=head1 NAME
-App::Context::Cluster - a runtime environment with a Cluster Controller and
many Cluster Nodes
+App::Context::Cluster - a runtime environment of a Cluster Controller served
by many Cluster Nodes
=head1 SYNOPSIS
@@ -40,115 +36,133 @@
sub _init2a {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
- $self->{controller_host} = $options->{controller_host};
- $self->{controller_port} = $options->{controller_port};
- $self->{is_controller} = $options->{controller};
- if ($self->{is_controller}) {
- $self->{port} = $options->{controller_port};
- }
- else {
- $self->{port} = $options->{node_port};
- }
+ die "Controller must have a port defined (\$context->{options}{port})" if
(!$self->{port});
+ $self->{num_node_events} = 0;
+ $self->{max_node_events} =
$self->{options}{"App.Context.Cluster.max_node_events"} || 10;
&App::sub_exit() if ($App::trace);
}
sub _init2b {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
- if ($self->{is_controller}) {
- $self->ctrl_init();
- }
- else {
- $self->node_init();
- }
+ # nothing yet
&App::sub_exit() if ($App::trace);
}
sub dispatch_events_begin {
my ($self) = @_;
-
- my $verbose = $self->{verbose};
- my $role = $self->{is_controller} ? "Controller" : "Node";
-
- print "[$$] Starting Cluster $role on $self->{host}:$self->{port}\n" if
($verbose);
-
- if ($self->{is_controller}) {
- $self->ctrl_startup();
- }
- else {
- $self->node_startup();
- }
+ $self->log({level=>2},"Starting Cluster Controller on
$self->{host}:$self->{port}\n");
+ # nothing special yet
}
sub dispatch_events_end {
my ($self) = @_;
+ $self->log({level=>2},"Stopping Cluster Controller\n");
+ # nothing special yet
+}
- my $verbose = $self->{verbose};
- my $role = $self->{is_controller} ? "Controller" : "Node";
+# CONTROLLER ONLY
+sub send_async_event_now {
+ &App::sub_entry if ($App::trace);
+ my ($self, $event, $callback_event) = @_;
- if ($self->{is_controller}) {
- $self->ctrl_shutdown();
+ my $destination = $event->{destination};
+ if (! defined $destination) {
+ $self->log("ERROR: send_async_event_now(): node not assigned\n");
}
- else {
- $self->node_shutdown();
- }
-
- print "[$$] Stopping Cluster $role\n" if ($verbose);
-}
-
-sub process_msg {
- my ($self, $connection_socket, $msg) = @_;
- if ($self->{is_controller}) {
- $self->ctrl_process_msg($connection_socket, $msg);
+ elsif ($destination =~ /^([^:]+):([0-9]+)$/) {
+ my $node_host = $1;
+ my $node_port = $2;
+ my $args = "";
+ if ($event->{args}) {
+ $args = $self->{rpc_serializer}->serialize($event->{args});
+ }
+ my $response = $self->send_message($node_host, $node_port,
"ASYNC-EVENT:$event->{service_type}:$event->{name}:$event->{method}:$args");
+ $self->{num_node_events}++;
+ if ($callback_event) {
+ if ($response =~ /^ASYNC-EVENT-TOKEN:(.+)/) {
+ my $remote_event_token = $1;
+ $self->{pending_callback_event}{$remote_event_token} =
$callback_event;
+ }
+ }
}
else {
- $self->node_process_msg($connection_socket, $msg);
+ $self->SUPER::send_async_event_now($event, $callback_event);
}
-}
-
-######################################################################
-# Controller
-######################################################################
-
-sub ctrl_init {
- &App::sub_entry if ($App::trace);
- my ($self) = @_;
&App::sub_exit() if ($App::trace);
}
-sub ctrl_startup {
+sub assign_event_destination {
&App::sub_entry if ($App::trace);
- my ($self) = @_;
- &App::sub_exit() if ($App::trace);
+ my ($self, $event) = @_;
+ my $assigned = 0;
+ if ($self->{num_node_events} < $self->{max_node_events}) {
+ $event->{destination} = $self->{host};
+ $assigned = $self->assign_event_destination_by_round_robin($event);
+ }
+ &App::sub_exit($assigned) if ($App::trace);
+ return($assigned);
}
-sub ctrl_shutdown {
+sub assign_event_destination_by_round_robin {
&App::sub_entry if ($App::trace);
- my ($self) = @_;
- &App::sub_exit() if ($App::trace);
+ my ($self, $event) = @_;
+
+ my $assigned = 0;
+ my $nodes = $self->{nodes};
+ if ($#$nodes > -1) {
+ my $node_idx = $self->{last_node_idx};
+ $node_idx = (defined $node_idx) ? $node_idx + 1 : 0;
+ $node_idx = 0 if ($node_idx > $#$nodes);
+ $event->{destination} = $nodes->[$node_idx];
+ $self->{last_node_idx} = $node_idx;
+ $assigned = 1;
+ }
+
+ &App::sub_exit($assigned) if ($App::trace);
+ return($assigned);
}
-sub ctrl_process_msg {
+sub process_msg {
&App::sub_entry if ($App::trace);
my ($self, $connection_socket, $msg) = @_;
my $verbose = $self->{verbose};
- print STDERR "[$$] process_msg($msg)\n" if ($verbose >= 2);
- my $processing_complete = $self->ctrl_process_app_msg($connection_socket,
$msg);
+ $self->log({level=>3},"process_msg: [$msg]\n");
+ my $processing_complete = $self->process_custom_msg($connection_socket,
$msg);
if (!$processing_complete) {
if ($msg =~ /^NODE-UP:(.*)/) {
- my $resp = $self->ctrl_set_node_up($1) || "ok";
+ my $resp = $self->set_node_up($1) || "ok";
$connection_socket->print("$resp\n");
}
elsif ($msg =~ /^NODE-DOWN:(.*)/) {
- $self->ctrl_set_node_down($1);
+ $self->set_node_down($1);
$connection_socket->print("ok\n");
}
elsif ($msg =~ s/^GET//) {
$connection_socket->print("HTTP/1.0 200 OK\n");
$connection_socket->print("Content-type: text/plain\n");
$connection_socket->print("\n");
- $connection_socket->print("$self->{host}:$self->{port} acting as
cluster controller\n");
- $connection_socket->print($self->ctrl_state());
+ $connection_socket->print($self->state());
+ }
+ elsif ($msg =~ /^ASYNC-EVENT-RESULTS:([^:]+):(.*)$/) {
+ my $remote_event_token = $1;
+ my $results = $2;
+ if ($results ne "") {
+ $results = $self->{rpc_serializer}->deserialize($results);
+ if ($results && ref($results) eq "ARRAY" && $#$results == 0) {
+ $results = $results->[0];
+ }
+ }
+ $connection_socket->print("OK\n");
+ $self->{num_node_events}--;
+ my $callback_event =
$self->{pending_callback_event}{$remote_event_token};
+ if ($callback_event) {
+ delete $self->{pending_callback_event}{$remote_event_token};
+ $callback_event->{args} = [] if (! $callback_event->{args});
+ push(@{$callback_event->{args}},
+ {event_token => $callback_event->{event_token}, returnval
=> $results, errnum => 0, errmsg => ""});
+ $self->send_event($callback_event);
+ }
}
else {
$connection_socket->print("unknown [$msg]\n");
@@ -157,7 +171,8 @@
&App::sub_exit() if ($App::trace);
}
-sub ctrl_process_app_msg {
+# Can be overridden to provide customized processing.
+sub process_custom_msg {
&App::sub_entry if ($App::trace);
my ($self, $connection_socket, $msg) = @_;
my $processing_complete = 0;
@@ -165,30 +180,40 @@
return($processing_complete);
}
-sub ctrl_state {
+sub state {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+
+ my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
+ my $state = "Cluster Controller:
$self->{host}:$self->{port}\n[$datetime]\n";
+ $state .= "\n";
+ $state .= $self->_state();
+
+ &App::sub_exit($state) if ($App::trace);
+ return($state);
+}
+
+sub _state {
&App::sub_entry if ($App::trace);
my ($self) = @_;
- my %state = (
- "node" => $self->{node},
- "nodes" => $self->{nodes},
- "scheduled_events" => $self->{scheduled_events},
- );
- my $d = Data::Dumper->new([ \%state ], [ "state" ]);
- $d->Indent(1);
- my $state = $d->Dump();
+
+ my $state = "";
+
+ $state .= $self->SUPER::_state();
+
&App::sub_exit($state) if ($App::trace);
return($state);
}
-sub ctrl_set_node_down {
+sub set_node_down {
&App::sub_entry if ($App::trace);
my ($self, $node) = @_;
$self->{node}{$node}{up} = 0;
- $self->ctrl_set_nodes();
+ $self->set_nodes();
&App::sub_exit() if ($App::trace);
}
-sub ctrl_set_node_up {
+sub set_node_up {
&App::sub_entry if ($App::trace);
my ($self, $node) = @_;
my ($retval);
@@ -197,14 +222,14 @@
}
else {
$self->{node}{$node}{up} = 1;
- $self->ctrl_set_nodes();
+ $self->set_nodes();
$retval = "new";
}
&App::sub_exit($retval) if ($App::trace);
return($retval);
}
-sub ctrl_set_nodes {
+sub set_nodes {
&App::sub_entry if ($App::trace);
my ($self) = @_;
my (@nodes);
@@ -217,72 +242,5 @@
&App::sub_exit() if ($App::trace);
}
-######################################################################
-# Node
-######################################################################
-
-sub node_init {
- &App::sub_entry if ($App::trace);
- my ($self) = @_;
- &App::sub_exit() if ($App::trace);
-}
-
-sub node_startup {
- &App::sub_entry if ($App::trace);
- my ($self) = @_;
- my $controller_host = $self->{controller_host};
- my $controller_port = $self->{controller_port};
- my $node_host = $self->{host};
- my $node_port = $self->{port};
- my $node_heartbeat = $self->{context}{options}{node_heartbeat} || 60;
- $self->schedule_event(
- method => "send_async_message",
- args => [ $controller_host, $controller_port,
"NODE-UP:$node_host:$node_port" ],
- time => time(), # immediately ...
- interval => $node_heartbeat, # and every X seconds hereafter
- );
- &App::sub_exit() if ($App::trace);
-}
-
-sub node_shutdown {
- &App::sub_entry if ($App::trace);
- my ($self) = @_;
- my $controller_host = $self->{controller_host};
- my $controller_port = $self->{controller_port};
- my $node_host = $self->{host};
- my $node_port = $self->{port};
- $self->send_async_message($controller_host, $controller_port,
"NODE-DOWN:$node_host:$node_port");
- &App::sub_exit() if ($App::trace);
-}
-
-sub node_process_msg {
- &App::sub_entry if ($App::trace);
- my ($self, $connection_socket, $msg) = @_;
- my $verbose = $self->{verbose};
- print STDERR "[$$] process_msg($msg)\n" if ($verbose >= 2);
- my $processing_complete = $self->node_process_app_msg($connection_socket,
$msg);
- if (!$processing_complete) {
- if ($msg =~ s/^GET//) {
- $connection_socket->print("HTTP/1.0 200 OK\n");
- $connection_socket->print("Content-type: text/plain\n");
- $connection_socket->print("\n");
- $connection_socket->print("$self->{host}:$self->{port} acting as
cluster node\n");
- #$connection_socket->print($self->ctrl_state());
- }
- else {
- $connection_socket->print("unknown [$msg]\n");
- }
- }
- &App::sub_exit() if ($App::trace);
-}
-
-sub node_process_app_msg {
- &App::sub_entry if ($App::trace);
- my ($self, $connection_socket, $msg) = @_;
- my $processing_complete = 0;
- &App::sub_exit($processing_complete) if ($App::trace);
- return($processing_complete);
-}
-
1;
Added: p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm
==============================================================================
--- (empty file)
+++ p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm Sat Apr 8
08:41:03 2006
@@ -0,0 +1,161 @@
+
+#############################################################################
+## $Id: ClusterNode.pm 3666 2006-03-11 20:34:10Z spadkins $
+#############################################################################
+
+package App::Context::ClusterNode;
+$VERSION = (q$Revision: 3666 $ =~ /(\d[\d\.]*)/)[0]; # VERSION numbers
generated by svn
+
+use App;
+use App::Context::Server;
+
[EMAIL PROTECTED] = ( "App::Context::Server" );
+
+use Date::Format;
+
+use strict;
+
+=head1 NAME
+
+App::Context::ClusterNode - a runtime environment for a Cluster Node that
serves a Cluster Controller
+
+=head1 SYNOPSIS
+
+ # ... official way to get a Context object ...
+ use App;
+ $context = App->context();
+ $config = $context->config(); # get the configuration
+ $config->dispatch_events(); # dispatch events
+
+ # ... alternative way (used internally) ...
+ use App::Context::ClusterNode;
+ $context = App::Context::ClusterNode->new();
+
+=cut
+
+sub _init2a {
+ &App::sub_entry if ($App::trace);
+ my ($self, $options) = @_;
+ $self->{controller_host} = $options->{controller_host};
+ $self->{controller_port} = $options->{controller_port};
+ die "Node must have a controller host and port defined
(\$context->{options}{controller_host} and {controller_port})"
+ if (!$self->{controller_host} || !$self->{controller_port});
+ &App::sub_exit() if ($App::trace);
+}
+
+sub _init2b {
+ &App::sub_entry if ($App::trace);
+ my ($self, $options) = @_;
+ # nothing yet
+ &App::sub_exit() if ($App::trace);
+}
+
+sub dispatch_events_begin {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+ $self->log({level=>2},"Starting Cluster Node on
$self->{host}:$self->{port}\n");
+ my $controller_host = $self->{controller_host};
+ my $controller_port = $self->{controller_port};
+ my $node_host = $self->{host};
+ my $node_port = $self->{port};
+ my $node_heartbeat = $self->{options}{node_heartbeat} || 300;
+ $self->schedule_event(
+ method => "send_async_message",
+ args => [ $controller_host, $controller_port,
"NODE-UP:$node_host:$node_port" ],
+ time => time(), # immediately ...
+ interval => $node_heartbeat, # and every X seconds hereafter
+ );
+ &App::sub_exit() if ($App::trace);
+}
+
+sub dispatch_events_end {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+ $self->log({level=>2},"Stopping Cluster Node\n");
+ my $controller_host = $self->{controller_host};
+ my $controller_port = $self->{controller_port};
+ my $node_host = $self->{host};
+ my $node_port = $self->{port};
+ $self->send_async_message($controller_host, $controller_port,
"NODE-DOWN:$node_host:$node_port");
+ &App::sub_exit() if ($App::trace);
+}
+
+sub process_msg {
+ &App::sub_entry if ($App::trace);
+ my ($self, $connection_socket, $msg) = @_;
+ my $verbose = $self->{verbose};
+ $self->log({level=>3},"process_msg: [$msg]\n");
+ my $processing_complete = $self->process_custom_msg($connection_socket,
$msg);
+ if (!$processing_complete) {
+ if ($msg =~ s/^GET//) {
+ $connection_socket->print("HTTP/1.0 200 OK\n");
+ $connection_socket->print("Content-type: text/plain\n");
+ $connection_socket->print("\n");
+ $connection_socket->print($self->state());
+ }
+ elsif ($msg =~ /^ASYNC-EVENT:([^:]+):([^:]+):([^:]+):(.*)$/) {
+ my %event = (
+ service_type => $1,
+ name => $2,
+ method => $3,
+ );
+ my $args = $4;
+ $event{args} = $self->{rpc_serializer}->deserialize($args) if
($args ne "");
+ my $event_token = $self->send_async_event({method =>
"process_async_event", args => [\%event],});
+ $event{event_token} = $event_token;
+ $connection_socket->print("ASYNC-EVENT-TOKEN:$event_token\n");
+ }
+ else {
+ $connection_socket->print("unknown [$msg]\n");
+ }
+ }
+ &App::sub_exit() if ($App::trace);
+}
+
+# Can be overridden to provide customized processing.
+sub process_custom_msg {
+ &App::sub_entry if ($App::trace);
+ my ($self, $connection_socket, $msg) = @_;
+ my $processing_complete = 0;
+ &App::sub_exit($processing_complete) if ($App::trace);
+ return($processing_complete);
+}
+
+sub state {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+
+ my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
+ my $state = "Cluster Node: $self->{host}:$self->{port}\n[$datetime]\n";
+ $state .= "\n";
+ $state .= $self->_state();
+
+ &App::sub_exit($state) if ($App::trace);
+ return($state);
+}
+
+sub _state {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+
+ my $state = "";
+
+ $state .= $self->SUPER::_state();
+
+ &App::sub_exit($state) if ($App::trace);
+ return($state);
+}
+
+sub process_async_event {
+ &App::sub_entry if ($App::trace);
+ my ($self, $event) = @_;
+ my $results = $self->send_event($event);
+ my $results_txt = $self->{rpc_serializer}->serialize($results);
+ my $msg = "ASYNC-EVENT-RESULTS:$event->{event_token}:$results_txt";
+ $self->send_message($self->{controller_host}, $self->{controller_port},
$msg);
+ &App::sub_exit(undef) if ($App::trace);
+ return(undef);
+}
+
+1;
+