Author: spadkins
Date: Tue May  2 06:50:50 2006
New Revision: 6002

Modified:
   p5ee/trunk/App-Context/lib/App/Context.pm

Log:
logging/log_file/log-rotation, set_user()/clear_session(), state(), 
extend_event_loop(), send_async_event()/send_async_message()/send_message(), 
fork()/exit()/shutdown_unshareable_resources()

Modified: p5ee/trunk/App-Context/lib/App/Context.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context.pm   (original)
+++ p5ee/trunk/App-Context/lib/App/Context.pm   Tue May  2 06:50:50 2006
@@ -11,6 +11,9 @@
 use App;
 
 use Date::Format;
+use IO::Handle;     # for the STDOUT->autoflush() method
+use IO::Socket;
+use IO::Socket::INET;
 
 =head1 NAME
 
@@ -126,7 +129,7 @@
  $context->{Conf}{$user} Info from conf file
  [$context->{conf}]
     $conf->{$type}{$name}              Read-only service conf
- $context->{Session}{$session_id}
+ $context->{sessions}{$session_id}
  [$context->{session}]
     $session->{store}{$type}{$name}      Runtime state which is stored
     $session->{cache}{$type}{$name}      Instances of services
@@ -225,6 +228,10 @@
     $self->{options} = \%options;
     $options{context} = $self;
 
+    $self->{log_level} = $options{log_level};
+    $self->{log_level} = 2 if (!defined $self->{log_level});
+    $self->log_file_open();
+
     $conf_class   = $options{conf_class};
     $conf_class   = "App::Conf::File" if (! $conf_class);
 
@@ -259,9 +266,11 @@
     $self->{scheduled_events} = [];
     $self->{scheduled_event} = {};
 
+    $self->{event_loop_extensions} = [];
+
     $self->_init(\%options);   # allows the subclass to do initialization
 
-    $self->set_current_session($self->session("default"));
+    $self->set_current_session("default");
 
     &App::sub_exit($self) if ($App::trace);
     return $self;
@@ -1274,7 +1283,7 @@
     * Signature: $context->log(@args);
     * Signature: $context->log($options, @args);
     * Param:  $options     HASH    [in] (named)
-    * Param:  log_level    integer
+    * Param:  level        integer
     * Param:  @args        string  [in]
     * Return: void
     * Throws: <none>
@@ -1284,6 +1293,51 @@
 
     $context->log("oops, a bug happened");
 
+These are the standardized log levels.
+
+    0 - Context logs nothing (absolutely silent)                             
[???]
+    1 - only application events                                              
[???]
+    2 - [default] major system-level events        [standard level for 
operations]
+    3 - internal system-level events              [standard level for 
development]
+    4 - internal activities               [standard level for debugging 
internals]
+    5 - internal activities (inside loops) [extreme level for debugging 
internals]
+
+$self->log("ERROR: send_async_event_now(): node not assigned\n");
+$self->log($@);
+
+$self->log({level=>2},"Starting Cluster Node on 
$self->{host}:$self->{port}\n");
+$self->log({level=>2},"Stopping Cluster Node\n");
+$self->log({level=>2},"Starting Server on $self->{host}:$self->{port}\n");
+$self->log({level=>2},"Stopping Server.\n");
+$self->log({level=>2},"Starting Cluster Controller on 
$self->{host}:$self->{port}\n");
+$self->log({level=>2},"Stopping Cluster Controller\n");
+
+$self->log({level=>3},"Send Event: $service_type($name).$method(@args)\n");
+$self->log({level=>3},"Send Event: $method(@args)\n");
+$self->log({level=>3},"$service_type $name instantiated [$service]\n");
+$self->log({level=>3},"Schedule Event (" . join(",",%event) . ")\n";
+$self->log({level=>3},"Caught Signal: @_\n"); };
+$self->log({level=>3},"Caught Signal: @_\n"); };
+$self->log({level=>3},"Caught Signal: @_\n"); };
+$self->log({level=>3},"Caught Signal: @_ (quitting)\n"); $quit = 1; };
+$self->log({level=>3},"Caught Signal: @_ (quitting)\n"); $quit = 1; };
+$self->log({level=>3},"Caught Signal: @_ (quitting)\n"); $quit = 1; };
+$self->log({level=>3},"send_message($host, $port, $message)\n");
+$self->log({level=>3},"send_message($host, $port, ...) => [$response]\n");
+$self->log({level=>3},"process_msg($msg)\n");
+$self->log({level=>3},"process_msg: [$msg]\n");
+$self->log({level=>3},"process_msg($msg)\n");
+
+$self->log({level=>4},"Checking for scheduled events.\n");
+$self->log({level=>4},"Listening on socket: timeout($sleep_interval)\n");
+$self->log({level=>4},"Caught Signal: @_\n"); };
+$self->log({level=>4},"Listening on socket: timeout($sleep_interval)\n");
+$self->log({level=>4},"Child $pid finished [exitval=$exitval,sig=$sig]\n");
+
+$self->log({level=>5},"Checking event: time=$time [$event->{time}, every 
$event->{interval}] $event->{method}().\n");
+$self->log({level=>5},"Event Rescheduled: time=$time [$event->{time}, every 
$event->{interval}] $event->{method}().\n");
+$self->log({level=>5},"Event Removed: time=$time [$event->{time}, every 
$event->{interval}] $event->{method}().\n");
+
 =cut
 
 sub log {
@@ -1291,9 +1345,10 @@
     my $self = shift;
     my ($msg_options);
     $msg_options = shift if ($#_ > -1 && ref($_[0]) eq "HASH");
-    my $msg_log_level = $msg_options->{log_level} || 1;
+    my $msg_level = $msg_options->{level} || 1;
     my $log_level = $self->{options}{log_level};
-    if (!defined $log_level || $msg_log_level <= $log_level) {
+    $log_level = 2 if (!defined $log_level);
+    if (!defined $log_level || $msg_level <= $log_level) {
         $self->_log(@_);
     }
     &App::sub_exit() if ($App::trace);
@@ -1302,7 +1357,43 @@
 sub _log {
     &App::sub_entry if ($App::trace);
     my $self = shift;
-    print STDERR "[$$] ", time2str("%Y-%m-%d %H:%M:%S", time()), " ", @_;
+    if ($#_ > 0) {
+        my $fmt =  "[$$] " . time2str("%Y-%m-%d %H:%M:%S", time()) . " " . 
shift;
+        printf STDERR $fmt, @_;
+    }
+    elsif ($#_ == 0) {
+        print STDERR "[$$] ", time2str("%Y-%m-%d %H:%M:%S", time()), " ", @_;
+    }
+    &App::sub_exit() if ($App::trace);
+}
+
+# NOTE: log rotation always passes an $overwrite = 0, thus implementing the 
rule
+# that log rotation should never overwrite a log file, but only append to it.
+sub log_file_open {
+    &App::sub_entry if ($App::trace);
+    my ($self, $overwrite) = @_;
+    my $log_file = $self->{options}{log_file};
+    if ($log_file) {
+        if ($self->{log_fh}) {
+            close($self->{log_fh});
+            delete $self->{log_fh};
+        }
+        if ($log_file =~ /%/) {
+            $log_file = time2str($log_file, time());
+        }
+        if ((defined $overwrite && $overwrite) || (!defined $overwrite && 
$self->{options}{log_overwrite})) {
+            open(LOG, "> $log_file") || die "Unable to open $log_file log 
file: $!";
+        }
+        else {
+            open(LOG, ">> $log_file") || die "Unable to open $log_file log 
file: $!";
+        }
+        open(STDOUT, ">&LOG");
+        open(STDERR, ">&LOG");
+        LOG->autoflush(1);
+        STDOUT->autoflush(1);
+        STDERR->autoflush(1);
+        $self->{log_fh} = \*App::Context::LOG;
+    }
     &App::sub_exit() if ($App::trace);
 }
 
@@ -1330,8 +1421,16 @@
 sub user {
     &App::sub_entry if ($App::trace);
     my $self = shift;
-    &App::sub_exit("guest") if ($App::trace);
-    "guest";
+    my $user = $self->{user} || "guest";
+    &App::sub_exit($user) if ($App::trace);
+    $user;
+}
+
+sub set_user {
+    &App::sub_entry if ($App::trace);
+    my ($self, $user) = @_;
+    $self->{user} = $user;
+    &App::sub_exit() if ($App::trace);
 }
 
 #############################################################################
@@ -1418,13 +1517,8 @@
     &App::sub_entry if ($App::trace);
     my ($self, $session_id, $args) = @_;
     my ($session_class, $session, $options);
-    if ($session_id) {
-        $session = $self->{sessions}{$session_id};
-    }
-    else {
-        $session_id = "default";
-        $session = $self->{session};
-    }
+    $session_id = "default" if (! defined $session_id || $session_id eq "");
+    $session = $self->{sessions}{$session_id};
     if (!$session) {
         $options = $self->{options};
         $session_class = $options->{session_class} || 
$self->_default_session_class();
@@ -1459,8 +1553,9 @@
 
 sub set_current_session {
     &App::sub_entry if ($App::trace);
-    my ($self, $session) = @_;
-    $self->{session} = $session;
+    my ($self, $session_id) = @_;
+    $session_id = "default" if (!defined $session_id || $session_id ne "");
+    $self->{session} = $self->session($session_id);
     &App::sub_exit() if ($App::trace);
 }
 
@@ -1471,6 +1566,27 @@
     &App::sub_exit() if ($App::trace);
 }
 
+sub clear_session {
+    &App::sub_entry if ($App::trace);
+    my ($self, $session_id, @service_types) = @_;
+    $session_id = "default" if (!defined $session_id || $session_id ne "");
+    my $session = $self->{sessions}{$session_id};
+    if ($#service_types > -1) {
+        foreach my $service_type (@service_types) {
+            delete $session->{store}{$service_type};
+            delete $session->{cache}{$service_type};
+        }
+    }
+    else {
+        delete $self->{sessions}{$session_id};
+        if ($session eq $self->{session}) {
+            delete $self->{session};
+            $self->{session} = $self->session($session_id);
+        }
+    }
+    &App::sub_exit() if ($App::trace);
+}
+
 #############################################################################
 # PUBLIC METHODS
 #############################################################################
@@ -1479,6 +1595,54 @@
 
 =cut
 
+sub state {
+    &App::sub_entry if ($App::trace);
+    my ($self) = @_;
+
+    my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
+    my $class = ref($self);
+    my $state = "Context: [$class]\n[$datetime]\n";
+    $state .= "\n";
+    $state .= $self->_state();
+
+    &App::sub_exit($state) if ($App::trace);
+    return($state);
+}
+
+sub _state {
+    &App::sub_entry if ($App::trace);
+    my ($self) = @_;
+
+    my $state = "";
+
+    my ($event, @args, $args_str);
+    $state .= "Scheduled Events:\n";
+    foreach $event (@{$self->{scheduled_events}}) {
+        @args = ();
+        @args = @{$event->{args}} if ($event->{args});
+        $args_str = join(",",@args);
+        $state .= sprintf("   %19s %5s %-32s %s\n",
+            time2str("%Y-%m-%d %H:%M:%S",$event->{time}),
+            $event->{interval},
+            $event->{tag},
+            "$event->{name}.$event->{method}($args_str)");
+    }
+
+    $state .= "\n";
+    $state .= "Event Loop Extensions:\n";
+    my ($obj, $method, $args);
+    foreach my $event_loop_extension (@{$self->{event_loop_extensions}}) {
+        ($obj, $method, $args) = @$event_loop_extension;
+        @args = ();
+        @args = @$args if ($args);
+        $args_str = join(",",@args);
+        $state .= sprintf("   %s\n", "$obj->{name}.$method($args_str)");
+    }
+
+    &App::sub_exit($state) if ($App::trace);
+    return($state);
+}
+
 #############################################################################
 # dbg()
 #############################################################################
@@ -1705,7 +1869,7 @@
 
 sub dispatch_events {
     &App::sub_entry if ($App::trace);
-    my ($self) = @_;
+    my ($self, $max_events_occurred) = @_;
 
     $self->dispatch_events_begin();
 
@@ -1757,6 +1921,14 @@
     &App::sub_exit() if ($App::trace);
 }
 
+sub extend_event_loop {
+    &App::sub_entry if ($App::trace);
+    my ($self, $obj, $method, $args) = @_;
+    $args = [] if (!$args);
+    push(@{$self->{event_loop_extensions}}, [ $obj, $method, $args ]);
+    &App::sub_exit() if ($App::trace);
+}
+
 sub call {
     &App::sub_entry if ($App::trace);
     my ($self, $service_type, $name, $method, $args) = @_;
@@ -1894,7 +2066,7 @@
     }
 
     die "schedule_event(): (tag or method) is a required attribute of an 
event" if (!$event{tag} && !$event{method});
-    print "[$$] Schedule Event (", join(",",%event), ")\n" if 
($self->{verbose} >= 3);
+    $self->log({level=>3},"Schedule Event (" . join(",",%event) . ")\n");
 
     my $event;
     if ($event{tag}) {
@@ -1946,18 +2118,18 @@
     # we do unshift() to keep events executing in FIFO order for a particular 
time
     for (my $i = $#$scheduled_events; $i >= 0; $i--) {
         $event = $scheduled_events->[$i];
-        print "[$$] Checking event: time=$time [$event->{time}, every 
$event->{interval}] $event->{method}().\n" if ($verbose >= 9);
+        $self->log({level=>5},"Checking event: time=$time [$event->{time}, 
every $event->{interval}] $event->{method}().\n");
         if ($event->{time} <= $time) {
             unshift(@$events, $event);
             if ($event->{time} && $event->{interval}) {
                 $event->{time} += $event->{interval}; # reschedule the event
-                print "[$$] Event Rescheduled: time=$time [$event->{time}, 
every $event->{interval}] $event->{method}().\n" if ($verbose >= 9);
+                $self->log({level=>5},"Event Rescheduled: time=$time 
[$event->{time}, every $event->{interval}] $event->{method}().\n");
                 if ($time_of_next_event == 0 || $event->{time} < 
$time_of_next_event) {
                     $time_of_next_event = $event->{time};
                 }
             }
             else {
-                print "[$$] Event Removed: time=$time [$event->{time}, every 
$event->{interval}] $event->{method}().\n" if ($verbose >= 9);
+                $self->log({level=>5},"Event Removed: time=$time 
[$event->{time}, every $event->{interval}] $event->{method}().\n");
                 splice(@$scheduled_events, $i, 1); # remove the (one-time) 
event
                 $event->{scheduled} = 0;
             }
@@ -1979,17 +2151,32 @@
     my $method = $event->{method};
     my @args = $event->{args} ? @{$event->{args}} : ();
     my $service_type = $event->{service_type};
+    my (@results);
     if ($service_type) {
         my $name = $event->{name};
         my $service = $self->service($service_type, $name);
-        $self->log("Send Event: $service_type($name).$method(@args)\n") if 
($self->{verbose} >= 2);
-        $service->$method(@args);
+        $self->log({level=>3},"Send Event: 
$service_type($name).$method(@args)\n");
+        @results = $service->$method(@args);
     }
     else {
-        $self->log("Send Event: $method(@args)\n") if ($self->{verbose} >= 2);
-        $self->$method(@args);
+        $self->log({level=>3},"Send Event: $method(@args)\n");
+        @results = $self->$method(@args);
+    }
+    &App::sub_exit(@results) if ($App::trace);
+    if (wantarray()) {
+        return(@results);
+    }
+    else {
+        if ($#results == -1) {
+            return(undef);
+        }
+        elsif ($#results == 0) {
+            return($results[0]);
+        }
+        else {
+            return([EMAIL PROTECTED]);
+        }
     }
-    &App::sub_exit() if ($App::trace);
 }
 
 # NOTE: The baseline context implements the API for asynchronous events
@@ -2003,17 +2190,122 @@
 sub send_async_event {
     &App::sub_entry if ($App::trace);
     my ($self, $event, $callback_event) = @_;
-    $self->send_event($event);
+    my $errnum = 0;
+    my $errmsg = "";
+    my $event_token = "local-$$";
+    my ($returnval);
+    eval {
+        $returnval = $self->send_event($event);
+    };
+    if ($@) {
+        $errmsg = $@;
+        $errnum = 1;
+    }
     if ($callback_event) {
-        my $event_tag = "local-$$";
-        if (! $callback_event->{args}) {
-            $callback_event->{args} = [ $event_tag ];
-        }
+        $callback_event->{args} = [] if (! $callback_event->{args});
+        push(@{$callback_event->{args}}, {event_token => $event_token, 
returnval => $returnval, errnum => $errnum, errmsg => $errmsg});
         $self->send_event($callback_event);
     }
+    &App::sub_exit($event_token) if ($App::trace);
+    return($event_token);
+}
+
+=head2 wait_for_event()
+
+    * Signature: $self->wait_for_event($event_token)
+    * Param:     $event_token     string
+    * Return:    void
+    * Throws:    App::Exception
+    * Since:     0.01
+
+    Sample Usage: 
+
+    $self->wait_for_event($event_token);
+
+The wait_for_event() method is called when an asynchronous event has been
+sent and no more processing can be completed before it is done.
+
+=cut
+
+sub wait_for_event {
+    &App::sub_entry if ($App::trace);
+    my ($self, $event_token) = @_;
+    &App::sub_exit() if ($App::trace);
+}
+
+# NOTE: This send_message() and send_async_message() can be on the App::Context
+#       class to allow a program in any context to send this kind of message.
+#       (The only downside is a dependency on IO::Socket::INET.)
+sub send_async_message {
+    &App::sub_entry if ($App::trace);
+    my ($self, $host, $port, $message, $await_return_value, $timeout, 
$server_close) = @_;
+    my $pid = $self->fork();
+    if (!$pid) {   # running in child
+        $self->send_message($host, $port, $message, $await_return_value, 
$timeout, $server_close);
+        $self->exit(0);
+    }
     &App::sub_exit() if ($App::trace);
 }
 
+# NOTE: $messages that start with "RV-" wait for a return value.
+#       $messages that start with "SC-" force the server to close the socket 
first
+#       This is to help manage which system has the sockets lingering in 
TIME_WAIT state.
+# Here is the truth table for $await_return_value, $server_close
+#       $await_return_value  $server_close =         client         +        
server     
+#       -------------------  -------------   ----------------------   
---------------------
+#                 0                0              write/close              
read/close
+#                 0                1            write/read/close           
read/close
+#                 1                0         write/read/write/close   
read/write/read/close
+#                 1                1            write/read/close         
read/write/close
+sub send_message {
+    &App::sub_entry if ($App::trace);
+    my ($self, $host, $port, $message, $await_return_value, $timeout, 
$server_close) = @_;
+    my $verbose = $self->{verbose};
+
+    if (!$port && $host =~ /^([^:]+):([0-9]+)$/) {
+        $host = $1;
+        $port = $2;
+    }
+
+    my $send_socket = IO::Socket::INET->new(
+        PeerAddr  => $host,
+        PeerPort  => $port,
+        Proto     => "tcp",
+        Type      => SOCK_STREAM,
+        ReuseAddr => 1,
+    );
+    my ($send_fd);
+    $send_fd = fileno($send_socket) if ($send_socket);
+    $self->log({level=>3},"($send_fd) send_message($host, $port, $message)\n");
+
+    my $response = "";
+    my $rv = $await_return_value ? "RV-" : "";
+    my $sc = $server_close ? "SC-" : "";
+    if ($send_socket) {
+        eval {
+            $send_socket->autoflush(1) if ($await_return_value || 
$server_close);
+            $send_socket->print("$rv$sc$message\n");
+            if ($await_return_value || $server_close) {
+                # $send_socket->timeout($timeout) if ($timeout); # doesn't 
seem to work
+                $response = $send_socket->getline();
+                $response =~ s/[\r\n]+$//;
+                $send_socket->print("EOF\n") if ($await_return_value && 
!$server_close);
+            }
+            close($send_socket);
+        };
+        if ($@) {
+            $response = "SEND ERROR: $@";
+        }
+    }
+    else {
+        $response = "CONNECT ERROR: $!";
+    }
+
+    $self->log({level=>3},"send_message($host, $port, ...) => [$response]\n");
+    &App::sub_exit($response) if ($App::trace);
+    return($response);
+}
+
 =head2 fork()
 
     * Signature: $pid = $self->fork()
@@ -2032,7 +2324,7 @@
 connections to be created if necessary.
 
 Call this after a fork() in the child process.
-It will renew the resources which cannot be shared between a parent and 
+It will shut down the resources which cannot be shared between a parent and 
 a child process. 
 
 Currently, this is primarily for database connections.
@@ -2045,19 +2337,29 @@
     my ($self) = @_;
     my $pid = fork();
     if (!$pid) {  # in the child process
-        $self->renew_process_resources();
+        # $self->{is_child} = 1;   # I might need to add this sometime, but 
not now
+        $self->shutdown_unshareable_resources();
+    }
+    else {
+        $self->log({level=>4},"Child $pid started.\n");
     }
     &App::sub_exit($pid) if ($App::trace);
     return($pid);
 }
 
+sub exit {
+    my ($self, $exitval) = @_;
+    $self->shutdown();
+    exit($exitval);
+}
+
 #############################################################################
-# renew_process_resources()
+# shutdown_unshareable_resources()
 #############################################################################
 
-=head2 renew_process_resources()
+=head2 shutdown_unshareable_resources()
 
-    * Signature: $self->renew_process_resources()
+    * Signature: $self->shutdown_unshareable_resources()
     * Param:     void
     * Return:    void
     * Throws:    App::Exception
@@ -2065,15 +2367,15 @@
 
     Sample Usage: 
 
-    $self->renew_process_resources();
+    $self->shutdown_unshareable_resources();
 
-The renew_process_resources() method is called in a child process just after
+The shutdown_unshareable_resources() method is called in a child process just 
after
 it has been fork()ed.
 This causes connections to databases, etc. to be closed gracefully and new
 connections to be created if necessary.
 
 Call this after a fork() in the child process.
-It will renew the resources which cannot be shared between a parent and 
+It will shutdown_unshareable which cannot be shared between a parent and 
 a child process. 
 
 Currently, this is primarily for database connections.
@@ -2081,19 +2383,19 @@
 
 =cut
 
-sub renew_process_resources {
+sub shutdown_unshareable_resources {
     my $self = shift;
     my ($conf, $repdef, $repname, $instance);
     my ($class, $method, $args, $argidx, $repcache);
 
-    $self->dbgprint("Context->renew_process_resources()")
+    $self->dbgprint("Context->shutdown_unshareable_resources()")
         if ($App::DEBUG && $self->dbg(1));
 
     $repcache = $self->{session}{cache}{Repository};
     if (defined $repcache && ref($repcache) eq "HASH") {
         foreach $repname (keys %$repcache) {
             $instance = $repcache->{$repname};
-            $instance->_renew_process_resources();
+            $instance->_shutdown_unshareable_resources();
             delete $repcache->{$repname};
         }
     }

Reply via email to