Author: spadkins
Date: Sat Apr  8 08:41:03 2006
New Revision: 5875

Added:
   p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm
Modified:
   p5ee/trunk/App-Context/lib/App/Context/Cluster.pm

Log:
moving clustering context toward maturity

Modified: p5ee/trunk/App-Context/lib/App/Context/Cluster.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/Cluster.pm   (original)
+++ p5ee/trunk/App-Context/lib/App/Context/Cluster.pm   Sat Apr  8 08:41:03 2006
@@ -11,17 +11,13 @@
 
 @ISA = ( "App::Context::Server" );
 
-use Sys::Hostname;
-use Socket;
-use IO::Socket;
-use IO::Socket::INET;
-use POSIX ":sys_wait_h";
+use Date::Format;
 
 use strict;
 
 =head1 NAME
 
-App::Context::Cluster - a runtime environment with a Cluster Controller and 
many Cluster Nodes
+App::Context::Cluster - a runtime environment of a Cluster Controller served 
by many Cluster Nodes
 
 =head1 SYNOPSIS
 
@@ -40,115 +36,133 @@
 sub _init2a {
     &App::sub_entry if ($App::trace);
     my ($self, $options) = @_;
-    $self->{controller_host} = $options->{controller_host};
-    $self->{controller_port} = $options->{controller_port};
-    $self->{is_controller}   = $options->{controller};
-    if ($self->{is_controller}) {
-        $self->{port} = $options->{controller_port};
-    }
-    else {
-        $self->{port} = $options->{node_port};
-    }
+    die "Controller must have a port defined (\$context->{options}{port})" if 
(!$self->{port});
+    $self->{num_node_events} = 0;
+    $self->{max_node_events} = 
$self->{options}{"App.Context.Cluster.max_node_events"} || 10;
     &App::sub_exit() if ($App::trace);
 }
 
 sub _init2b {
     &App::sub_entry if ($App::trace);
     my ($self, $options) = @_;
-    if ($self->{is_controller}) {
-        $self->ctrl_init();
-    }
-    else {
-        $self->node_init();
-    }
+    # nothing yet
     &App::sub_exit() if ($App::trace);
 }
 
 sub dispatch_events_begin {
     my ($self) = @_;
-
-    my $verbose = $self->{verbose};
-    my $role = $self->{is_controller} ? "Controller" : "Node";
-
-    print "[$$] Starting Cluster $role on $self->{host}:$self->{port}\n" if 
($verbose);
-
-    if ($self->{is_controller}) {
-        $self->ctrl_startup();
-    }
-    else {
-        $self->node_startup();
-    }
+    $self->log({level=>2},"Starting Cluster Controller on 
$self->{host}:$self->{port}\n");
+    # nothing special yet
 }
 
 sub dispatch_events_end {
     my ($self) = @_;
+    $self->log({level=>2},"Stopping Cluster Controller\n");
+    # nothing special yet
+}
 
-    my $verbose = $self->{verbose};
-    my $role = $self->{is_controller} ? "Controller" : "Node";
+# CONTROLLER ONLY
+sub send_async_event_now {
+    &App::sub_entry if ($App::trace);
+    my ($self, $event, $callback_event) = @_;
 
-    if ($self->{is_controller}) {
-        $self->ctrl_shutdown();
+    my $destination = $event->{destination};
+    if (! defined $destination) {
+        $self->log("ERROR: send_async_event_now(): node not assigned\n");
     }
-    else {
-        $self->node_shutdown();
-    }
-
-    print "[$$] Stopping Cluster $role\n" if ($verbose);
-}
-
-sub process_msg {
-    my ($self, $connection_socket, $msg) = @_;
-    if ($self->{is_controller}) {
-        $self->ctrl_process_msg($connection_socket, $msg);
+    elsif ($destination =~ /^([^:]+):([0-9]+)$/) {
+        my $node_host = $1;
+        my $node_port = $2;
+        my $args = "";
+        if ($event->{args}) {
+            $args = $self->{rpc_serializer}->serialize($event->{args});
+        }
+        my $response = $self->send_message($node_host, $node_port, 
"ASYNC-EVENT:$event->{service_type}:$event->{name}:$event->{method}:$args");
+        $self->{num_node_events}++;
+        if ($callback_event) {
+            if ($response =~ /^ASYNC-EVENT-TOKEN:(.+)/) {
+                my $remote_event_token = $1;
+                $self->{pending_callback_event}{$remote_event_token} = 
$callback_event;
+            }
+        }
     }
     else {
-        $self->node_process_msg($connection_socket, $msg);
+        $self->SUPER::send_async_event_now($event, $callback_event);
     }
-}
-
-######################################################################
-# Controller
-######################################################################
-
-sub ctrl_init {
-    &App::sub_entry if ($App::trace);
-    my ($self) = @_;
     &App::sub_exit() if ($App::trace);
 }
 
-sub ctrl_startup {
+sub assign_event_destination {
     &App::sub_entry if ($App::trace);
-    my ($self) = @_;
-    &App::sub_exit() if ($App::trace);
+    my ($self, $event) = @_;
+    my $assigned = 0;
+    if ($self->{num_node_events} < $self->{max_node_events}) {
+        $event->{destination} = $self->{host};
+        $assigned = $self->assign_event_destination_by_round_robin($event);
+    }
+    &App::sub_exit($assigned) if ($App::trace);
+    return($assigned);
 }
 
-sub ctrl_shutdown {
+sub assign_event_destination_by_round_robin {
     &App::sub_entry if ($App::trace);
-    my ($self) = @_;
-    &App::sub_exit() if ($App::trace);
+    my ($self, $event) = @_;
+    
+    my $assigned = 0;
+    my $nodes = $self->{nodes};
+    if ($#$nodes > -1) {
+        my $node_idx = $self->{last_node_idx};
+        $node_idx = (defined $node_idx) ? $node_idx + 1 : 0;
+        $node_idx = 0 if ($node_idx > $#$nodes);
+        $event->{destination} = $nodes->[$node_idx];
+        $self->{last_node_idx} = $node_idx;
+        $assigned = 1;
+    }
+
+    &App::sub_exit($assigned) if ($App::trace);
+    return($assigned);
 }
 
-sub ctrl_process_msg {
+sub process_msg {
     &App::sub_entry if ($App::trace);
     my ($self, $connection_socket, $msg) = @_;
     my $verbose = $self->{verbose};
-    print STDERR "[$$] process_msg($msg)\n" if ($verbose >= 2);
-    my $processing_complete = $self->ctrl_process_app_msg($connection_socket, 
$msg);
+    $self->log({level=>3},"process_msg: [$msg]\n");
+    my $processing_complete = $self->process_custom_msg($connection_socket, 
$msg);
     if (!$processing_complete) {
         if ($msg =~ /^NODE-UP:(.*)/) {
-            my $resp = $self->ctrl_set_node_up($1) || "ok";
+            my $resp = $self->set_node_up($1) || "ok";
             $connection_socket->print("$resp\n");
         }
         elsif ($msg =~ /^NODE-DOWN:(.*)/) {
-            $self->ctrl_set_node_down($1);
+            $self->set_node_down($1);
             $connection_socket->print("ok\n");
         }
         elsif ($msg =~ s/^GET//) {
             $connection_socket->print("HTTP/1.0 200 OK\n");
             $connection_socket->print("Content-type: text/plain\n");
             $connection_socket->print("\n");
-            $connection_socket->print("$self->{host}:$self->{port} acting as 
cluster controller\n");
-            $connection_socket->print($self->ctrl_state());
+            $connection_socket->print($self->state());
+        }
+        elsif ($msg =~ /^ASYNC-EVENT-RESULTS:([^:]+):(.*)$/) {
+            my $remote_event_token = $1;
+            my $results = $2;
+            if ($results ne "") {
+                $results = $self->{rpc_serializer}->deserialize($results);
+                if ($results && ref($results) eq "ARRAY" && $#$results == 0) {
+                    $results = $results->[0];
+                }
+            }
+            $connection_socket->print("OK\n");
+            $self->{num_node_events}--;
+            my $callback_event = 
$self->{pending_callback_event}{$remote_event_token};
+            if ($callback_event) {
+                delete $self->{pending_callback_event}{$remote_event_token};
+                $callback_event->{args} = [] if (! $callback_event->{args});
+                push(@{$callback_event->{args}},
+                    {event_token => $callback_event->{event_token}, returnval 
=> $results, errnum => 0, errmsg => ""});
+                $self->send_event($callback_event);
+            }
         }
         else {
             $connection_socket->print("unknown [$msg]\n");
@@ -157,7 +171,8 @@
     &App::sub_exit() if ($App::trace);
 }
 
-sub ctrl_process_app_msg {
+# Can be overridden to provide customized processing.
+sub process_custom_msg {
     &App::sub_entry if ($App::trace);
     my ($self, $connection_socket, $msg) = @_;
     my $processing_complete = 0;
@@ -165,30 +180,40 @@
     return($processing_complete);
 }
 
-sub ctrl_state {
+sub state {
+    &App::sub_entry if ($App::trace);
+    my ($self) = @_;
+
+    my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
+    my $state = "Cluster Controller: 
$self->{host}:$self->{port}\n[$datetime]\n";
+    $state .= "\n";
+    $state .= $self->_state();
+
+    &App::sub_exit($state) if ($App::trace);
+    return($state);
+}
+
+sub _state {
     &App::sub_entry if ($App::trace);
     my ($self) = @_;
-    my %state = (
-        "node" => $self->{node},
-        "nodes" => $self->{nodes},
-        "scheduled_events" => $self->{scheduled_events},
-    );
-    my $d = Data::Dumper->new([ \%state ], [ "state" ]);
-    $d->Indent(1);
-    my $state = $d->Dump();
+
+    my $state = "";
+
+    $state .= $self->SUPER::_state();
+
     &App::sub_exit($state) if ($App::trace);
     return($state);
 }
 
-sub ctrl_set_node_down {
+sub set_node_down {
     &App::sub_entry if ($App::trace);
     my ($self, $node) = @_;
     $self->{node}{$node}{up} = 0;
-    $self->ctrl_set_nodes();
+    $self->set_nodes();
     &App::sub_exit() if ($App::trace);
 }
 
-sub ctrl_set_node_up {
+sub set_node_up {
     &App::sub_entry if ($App::trace);
     my ($self, $node) = @_;
     my ($retval);
@@ -197,14 +222,14 @@
     }
     else {
         $self->{node}{$node}{up} = 1;
-        $self->ctrl_set_nodes();
+        $self->set_nodes();
         $retval = "new";
     }
     &App::sub_exit($retval) if ($App::trace);
     return($retval);
 }
 
-sub ctrl_set_nodes {
+sub set_nodes {
     &App::sub_entry if ($App::trace);
     my ($self) = @_;
     my (@nodes);
@@ -217,72 +242,5 @@
     &App::sub_exit() if ($App::trace);
 }
 
-######################################################################
-# Node
-######################################################################
-
-sub node_init {
-    &App::sub_entry if ($App::trace);
-    my ($self) = @_;
-    &App::sub_exit() if ($App::trace);
-}
-
-sub node_startup {
-    &App::sub_entry if ($App::trace);
-    my ($self) = @_;
-    my $controller_host = $self->{controller_host};
-    my $controller_port = $self->{controller_port};
-    my $node_host       = $self->{host};
-    my $node_port       = $self->{port};
-    my $node_heartbeat  = $self->{context}{options}{node_heartbeat} || 60;
-    $self->schedule_event(
-        method => "send_async_message",
-        args => [ $controller_host, $controller_port, 
"NODE-UP:$node_host:$node_port" ],
-        time => time(),  # immediately ...
-        interval => $node_heartbeat,  # and every X seconds hereafter
-    );
-    &App::sub_exit() if ($App::trace);
-}
-
-sub node_shutdown {
-    &App::sub_entry if ($App::trace);
-    my ($self) = @_;
-    my $controller_host = $self->{controller_host};
-    my $controller_port = $self->{controller_port};
-    my $node_host       = $self->{host};
-    my $node_port       = $self->{port};
-    $self->send_async_message($controller_host, $controller_port, 
"NODE-DOWN:$node_host:$node_port");
-    &App::sub_exit() if ($App::trace);
-}
-
-sub node_process_msg {
-    &App::sub_entry if ($App::trace);
-    my ($self, $connection_socket, $msg) = @_;
-    my $verbose = $self->{verbose};
-    print STDERR "[$$] process_msg($msg)\n" if ($verbose >= 2);
-    my $processing_complete = $self->node_process_app_msg($connection_socket, 
$msg);
-    if (!$processing_complete) {
-        if ($msg =~ s/^GET//) {
-            $connection_socket->print("HTTP/1.0 200 OK\n");
-            $connection_socket->print("Content-type: text/plain\n");
-            $connection_socket->print("\n");
-            $connection_socket->print("$self->{host}:$self->{port} acting as 
cluster node\n");
-            #$connection_socket->print($self->ctrl_state());
-        }
-        else {
-            $connection_socket->print("unknown [$msg]\n");
-        }
-    }
-    &App::sub_exit() if ($App::trace);
-}
-
-sub node_process_app_msg {
-    &App::sub_entry if ($App::trace);
-    my ($self, $connection_socket, $msg) = @_;
-    my $processing_complete = 0;
-    &App::sub_exit($processing_complete) if ($App::trace);
-    return($processing_complete);
-}
-
 1;
 

Added: p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm
==============================================================================
--- (empty file)
+++ p5ee/trunk/App-Context/lib/App/Context/ClusterNode.pm       Sat Apr  8 
08:41:03 2006
@@ -0,0 +1,161 @@
+
+#############################################################################
+## $Id: ClusterNode.pm 3666 2006-03-11 20:34:10Z spadkins $
+#############################################################################
+
+package App::Context::ClusterNode;
+$VERSION = (q$Revision: 3666 $ =~ /(\d[\d\.]*)/)[0];  # VERSION numbers 
generated by svn
+
+use App;
+use App::Context::Server;
+
[EMAIL PROTECTED] = ( "App::Context::Server" );
+
+use Date::Format;
+
+use strict;
+
+=head1 NAME
+
+App::Context::ClusterNode - a runtime environment for a Cluster Node that 
serves a Cluster Controller
+
+=head1 SYNOPSIS
+
+   # ... official way to get a Context object ...
+   use App;
+   $context = App->context();
+   $config = $context->config();   # get the configuration
+   $config->dispatch_events();     # dispatch events
+
+   # ... alternative way (used internally) ...
+   use App::Context::ClusterNode;
+   $context = App::Context::ClusterNode->new();
+
+=cut
+
+sub _init2a {
+    &App::sub_entry if ($App::trace);
+    my ($self, $options) = @_;
+    $self->{controller_host} = $options->{controller_host};
+    $self->{controller_port} = $options->{controller_port};
+    die "Node must have a controller host and port defined 
(\$context->{options}{controller_host} and {controller_port})"
+        if (!$self->{controller_host} || !$self->{controller_port});
+    &App::sub_exit() if ($App::trace);
+}
+
+sub _init2b {
+    &App::sub_entry if ($App::trace);
+    my ($self, $options) = @_;
+    # nothing yet
+    &App::sub_exit() if ($App::trace);
+}
+
+sub dispatch_events_begin {
+    &App::sub_entry if ($App::trace);
+    my ($self) = @_;
+    $self->log({level=>2},"Starting Cluster Node on 
$self->{host}:$self->{port}\n");
+    my $controller_host = $self->{controller_host};
+    my $controller_port = $self->{controller_port};
+    my $node_host       = $self->{host};
+    my $node_port       = $self->{port};
+    my $node_heartbeat  = $self->{options}{node_heartbeat} || 300;
+    $self->schedule_event(
+        method => "send_async_message",
+        args => [ $controller_host, $controller_port, 
"NODE-UP:$node_host:$node_port" ],
+        time => time(),  # immediately ...
+        interval => $node_heartbeat,  # and every X seconds hereafter
+    );
+    &App::sub_exit() if ($App::trace);
+}
+
+sub dispatch_events_end {
+    &App::sub_entry if ($App::trace);
+    my ($self) = @_;
+    $self->log({level=>2},"Stopping Cluster Node\n");
+    my $controller_host = $self->{controller_host};
+    my $controller_port = $self->{controller_port};
+    my $node_host       = $self->{host};
+    my $node_port       = $self->{port};
+    $self->send_async_message($controller_host, $controller_port, 
"NODE-DOWN:$node_host:$node_port");
+    &App::sub_exit() if ($App::trace);
+}
+
+sub process_msg {
+    &App::sub_entry if ($App::trace);
+    my ($self, $connection_socket, $msg) = @_;
+    my $verbose = $self->{verbose};
+    $self->log({level=>3},"process_msg: [$msg]\n");
+    my $processing_complete = $self->process_custom_msg($connection_socket, 
$msg);
+    if (!$processing_complete) {
+        if ($msg =~ s/^GET//) {
+            $connection_socket->print("HTTP/1.0 200 OK\n");
+            $connection_socket->print("Content-type: text/plain\n");
+            $connection_socket->print("\n");
+            $connection_socket->print($self->state());
+        }
+        elsif ($msg =~ /^ASYNC-EVENT:([^:]+):([^:]+):([^:]+):(.*)$/) {
+            my %event = (
+                service_type => $1,
+                name         => $2,
+                method       => $3,
+            );
+            my $args = $4;
+            $event{args} = $self->{rpc_serializer}->deserialize($args) if 
($args ne "");
+            my $event_token = $self->send_async_event({method => 
"process_async_event", args => [\%event],});
+            $event{event_token} = $event_token;
+            $connection_socket->print("ASYNC-EVENT-TOKEN:$event_token\n");
+        }
+        else {
+            $connection_socket->print("unknown [$msg]\n");
+        }
+    }
+    &App::sub_exit() if ($App::trace);
+}
+
+# Can be overridden to provide customized processing.
+sub process_custom_msg {
+    &App::sub_entry if ($App::trace);
+    my ($self, $connection_socket, $msg) = @_;
+    my $processing_complete = 0;
+    &App::sub_exit($processing_complete) if ($App::trace);
+    return($processing_complete);
+}
+
+sub state {
+    &App::sub_entry if ($App::trace);
+    my ($self) = @_;
+
+    my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
+    my $state = "Cluster Node: $self->{host}:$self->{port}\n[$datetime]\n";
+    $state .= "\n";
+    $state .= $self->_state();
+
+    &App::sub_exit($state) if ($App::trace);
+    return($state);
+}
+
+sub _state {
+    &App::sub_entry if ($App::trace);
+    my ($self) = @_;
+
+    my $state = "";
+
+    $state .= $self->SUPER::_state();
+
+    &App::sub_exit($state) if ($App::trace);
+    return($state);
+}
+
+sub process_async_event {
+    &App::sub_entry if ($App::trace);
+    my ($self, $event) = @_;
+    my $results = $self->send_event($event);
+    my $results_txt = $self->{rpc_serializer}->serialize($results);
+    my $msg = "ASYNC-EVENT-RESULTS:$event->{event_token}:$results_txt";
+    $self->send_message($self->{controller_host}, $self->{controller_port}, 
$msg);
+    &App::sub_exit(undef) if ($App::trace);
+    return(undef);
+}
+
+1;
+

Reply via email to