Author: spadkins
Date: Tue May 2 06:50:50 2006
New Revision: 6002
Modified:
p5ee/trunk/App-Context/lib/App/Context.pm
Log:
logging/log_file/log-rotation, set_user()/clear_session(), state(),
extend_event_loop(), send_async_event()/send_async_message()/send_message(),
fork()/exit()/shutdown_unshareable_resources()
Modified: p5ee/trunk/App-Context/lib/App/Context.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context.pm Tue May 2 06:50:50 2006
@@ -11,6 +11,9 @@
use App;
use Date::Format;
+use IO::Handle; # for the STDOUT->autoflush() method
+use IO::Socket;
+use IO::Socket::INET;
=head1 NAME
@@ -126,7 +129,7 @@
$context->{Conf}{$user} Info from conf file
[$context->{conf}]
$conf->{$type}{$name} Read-only service conf
- $context->{Session}{$session_id}
+ $context->{sessions}{$session_id}
[$context->{session}]
$session->{store}{$type}{$name} Runtime state which is stored
$session->{cache}{$type}{$name} Instances of services
@@ -225,6 +228,10 @@
$self->{options} = \%options;
$options{context} = $self;
+ $self->{log_level} = $options{log_level};
+ $self->{log_level} = 2 if (!defined $self->{log_level});
+ $self->log_file_open();
+
$conf_class = $options{conf_class};
$conf_class = "App::Conf::File" if (! $conf_class);
@@ -259,9 +266,11 @@
$self->{scheduled_events} = [];
$self->{scheduled_event} = {};
+ $self->{event_loop_extensions} = [];
+
$self->_init(\%options); # allows the subclass to do initialization
- $self->set_current_session($self->session("default"));
+ $self->set_current_session("default");
&App::sub_exit($self) if ($App::trace);
return $self;
@@ -1274,7 +1283,7 @@
* Signature: $context->log(@args);
* Signature: $context->log($options, @args);
* Param: $options HASH [in] (named)
- * Param: log_level integer
+ * Param: level integer
* Param: @args string [in]
* Return: void
* Throws: <none>
@@ -1284,6 +1293,51 @@
$context->log("oops, a bug happened");
+These are the standardized log levels.
+
+ 0 - Context logs nothing (absolutely silent)
[???]
+ 1 - only application events
[???]
+ 2 - [default] major system-level events [standard level for
operations]
+ 3 - internal system-level events [standard level for
development]
+ 4 - internal activities [standard level for debugging
internals]
+ 5 - internal activities (inside loops) [extreme level for debugging
internals]
+
+$self->log("ERROR: send_async_event_now(): node not assigned\n");
+$self->log($@);
+
+$self->log({level=>2},"Starting Cluster Node on
$self->{host}:$self->{port}\n");
+$self->log({level=>2},"Stopping Cluster Node\n");
+$self->log({level=>2},"Starting Server on $self->{host}:$self->{port}\n");
+$self->log({level=>2},"Stopping Server.\n");
+$self->log({level=>2},"Starting Cluster Controller on
$self->{host}:$self->{port}\n");
+$self->log({level=>2},"Stopping Cluster Controller\n");
+
+$self->log({level=>3},"Send Event: $service_type($name).$method(@args)\n");
+$self->log({level=>3},"Send Event: $method(@args)\n");
+$self->log({level=>3},"$service_type $name instantiated [$service]\n");
+$self->log({level=>3},"Schedule Event (" . join(",",%event) . ")\n";
+$self->log({level=>3},"Caught Signal: @_\n"); };
+$self->log({level=>3},"Caught Signal: @_\n"); };
+$self->log({level=>3},"Caught Signal: @_\n"); };
+$self->log({level=>3},"Caught Signal: @_ (quitting)\n"); $quit = 1; };
+$self->log({level=>3},"Caught Signal: @_ (quitting)\n"); $quit = 1; };
+$self->log({level=>3},"Caught Signal: @_ (quitting)\n"); $quit = 1; };
+$self->log({level=>3},"send_message($host, $port, $message)\n");
+$self->log({level=>3},"send_message($host, $port, ...) => [$response]\n");
+$self->log({level=>3},"process_msg($msg)\n");
+$self->log({level=>3},"process_msg: [$msg]\n");
+$self->log({level=>3},"process_msg($msg)\n");
+
+$self->log({level=>4},"Checking for scheduled events.\n");
+$self->log({level=>4},"Listening on socket: timeout($sleep_interval)\n");
+$self->log({level=>4},"Caught Signal: @_\n"); };
+$self->log({level=>4},"Listening on socket: timeout($sleep_interval)\n");
+$self->log({level=>4},"Child $pid finished [exitval=$exitval,sig=$sig]\n");
+
+$self->log({level=>5},"Checking event: time=$time [$event->{time}, every
$event->{interval}] $event->{method}().\n");
+$self->log({level=>5},"Event Rescheduled: time=$time [$event->{time}, every
$event->{interval}] $event->{method}().\n");
+$self->log({level=>5},"Event Removed: time=$time [$event->{time}, every
$event->{interval}] $event->{method}().\n");
+
=cut
sub log {
@@ -1291,9 +1345,10 @@
my $self = shift;
my ($msg_options);
$msg_options = shift if ($#_ > -1 && ref($_[0]) eq "HASH");
- my $msg_log_level = $msg_options->{log_level} || 1;
+ my $msg_level = $msg_options->{level} || 1;
my $log_level = $self->{options}{log_level};
- if (!defined $log_level || $msg_log_level <= $log_level) {
+ $log_level = 2 if (!defined $log_level);
+ if (!defined $log_level || $msg_level <= $log_level) {
$self->_log(@_);
}
&App::sub_exit() if ($App::trace);
@@ -1302,7 +1357,43 @@
sub _log {
&App::sub_entry if ($App::trace);
my $self = shift;
- print STDERR "[$$] ", time2str("%Y-%m-%d %H:%M:%S", time()), " ", @_;
+ if ($#_ > 0) {
+ my $fmt = "[$$] " . time2str("%Y-%m-%d %H:%M:%S", time()) . " " .
shift;
+ printf STDERR $fmt, @_;
+ }
+ elsif ($#_ == 0) {
+ print STDERR "[$$] ", time2str("%Y-%m-%d %H:%M:%S", time()), " ", @_;
+ }
+ &App::sub_exit() if ($App::trace);
+}
+
+# NOTE: log rotation always passes an $overwrite = 0, thus implementing the
rule
+# that log rotation should never overwrite a log file, but only append to it.
+sub log_file_open {
+ &App::sub_entry if ($App::trace);
+ my ($self, $overwrite) = @_;
+ my $log_file = $self->{options}{log_file};
+ if ($log_file) {
+ if ($self->{log_fh}) {
+ close($self->{log_fh});
+ delete $self->{log_fh};
+ }
+ if ($log_file =~ /%/) {
+ $log_file = time2str($log_file, time());
+ }
+ if ((defined $overwrite && $overwrite) || (!defined $overwrite &&
$self->{options}{log_overwrite})) {
+ open(LOG, "> $log_file") || die "Unable to open $log_file log
file: $!";
+ }
+ else {
+ open(LOG, ">> $log_file") || die "Unable to open $log_file log
file: $!";
+ }
+ open(STDOUT, ">&LOG");
+ open(STDERR, ">&LOG");
+ LOG->autoflush(1);
+ STDOUT->autoflush(1);
+ STDERR->autoflush(1);
+ $self->{log_fh} = \*App::Context::LOG;
+ }
&App::sub_exit() if ($App::trace);
}
@@ -1330,8 +1421,16 @@
sub user {
&App::sub_entry if ($App::trace);
my $self = shift;
- &App::sub_exit("guest") if ($App::trace);
- "guest";
+ my $user = $self->{user} || "guest";
+ &App::sub_exit($user) if ($App::trace);
+ $user;
+}
+
+sub set_user {
+ &App::sub_entry if ($App::trace);
+ my ($self, $user) = @_;
+ $self->{user} = $user;
+ &App::sub_exit() if ($App::trace);
}
#############################################################################
@@ -1418,13 +1517,8 @@
&App::sub_entry if ($App::trace);
my ($self, $session_id, $args) = @_;
my ($session_class, $session, $options);
- if ($session_id) {
- $session = $self->{sessions}{$session_id};
- }
- else {
- $session_id = "default";
- $session = $self->{session};
- }
+ $session_id = "default" if (! defined $session_id || $session_id eq "");
+ $session = $self->{sessions}{$session_id};
if (!$session) {
$options = $self->{options};
$session_class = $options->{session_class} ||
$self->_default_session_class();
@@ -1459,8 +1553,9 @@
sub set_current_session {
&App::sub_entry if ($App::trace);
- my ($self, $session) = @_;
- $self->{session} = $session;
+ my ($self, $session_id) = @_;
+ $session_id = "default" if (!defined $session_id || $session_id ne "");
+ $self->{session} = $self->session($session_id);
&App::sub_exit() if ($App::trace);
}
@@ -1471,6 +1566,27 @@
&App::sub_exit() if ($App::trace);
}
+sub clear_session {
+ &App::sub_entry if ($App::trace);
+ my ($self, $session_id, @service_types) = @_;
+ $session_id = "default" if (!defined $session_id || $session_id ne "");
+ my $session = $self->{sessions}{$session_id};
+ if ($#service_types > -1) {
+ foreach my $service_type (@service_types) {
+ delete $session->{store}{$service_type};
+ delete $session->{cache}{$service_type};
+ }
+ }
+ else {
+ delete $self->{sessions}{$session_id};
+ if ($session eq $self->{session}) {
+ delete $self->{session};
+ $self->{session} = $self->session($session_id);
+ }
+ }
+ &App::sub_exit() if ($App::trace);
+}
+
#############################################################################
# PUBLIC METHODS
#############################################################################
@@ -1479,6 +1595,54 @@
=cut
+sub state {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+
+ my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
+ my $class = ref($self);
+ my $state = "Context: [$class]\n[$datetime]\n";
+ $state .= "\n";
+ $state .= $self->_state();
+
+ &App::sub_exit($state) if ($App::trace);
+ return($state);
+}
+
+sub _state {
+ &App::sub_entry if ($App::trace);
+ my ($self) = @_;
+
+ my $state = "";
+
+ my ($event, @args, $args_str);
+ $state .= "Scheduled Events:\n";
+ foreach $event (@{$self->{scheduled_events}}) {
+ @args = ();
+ @args = @{$event->{args}} if ($event->{args});
+ $args_str = join(",",@args);
+ $state .= sprintf(" %19s %5s %-32s %s\n",
+ time2str("%Y-%m-%d %H:%M:%S",$event->{time}),
+ $event->{interval},
+ $event->{tag},
+ "$event->{name}.$event->{method}($args_str)");
+ }
+
+ $state .= "\n";
+ $state .= "Event Loop Extensions:\n";
+ my ($obj, $method, $args);
+ foreach my $event_loop_extension (@{$self->{event_loop_extensions}}) {
+ ($obj, $method, $args) = @$event_loop_extension;
+ @args = ();
+ @args = @$args if ($args);
+ $args_str = join(",",@args);
+ $state .= sprintf(" %s\n", "$obj->{name}.$method($args_str)");
+ }
+
+ &App::sub_exit($state) if ($App::trace);
+ return($state);
+}
+
#############################################################################
# dbg()
#############################################################################
@@ -1705,7 +1869,7 @@
sub dispatch_events {
&App::sub_entry if ($App::trace);
- my ($self) = @_;
+ my ($self, $max_events_occurred) = @_;
$self->dispatch_events_begin();
@@ -1757,6 +1921,14 @@
&App::sub_exit() if ($App::trace);
}
+sub extend_event_loop {
+ &App::sub_entry if ($App::trace);
+ my ($self, $obj, $method, $args) = @_;
+ $args = [] if (!$args);
+ push(@{$self->{event_loop_extensions}}, [ $obj, $method, $args ]);
+ &App::sub_exit() if ($App::trace);
+}
+
sub call {
&App::sub_entry if ($App::trace);
my ($self, $service_type, $name, $method, $args) = @_;
@@ -1894,7 +2066,7 @@
}
die "schedule_event(): (tag or method) is a required attribute of an
event" if (!$event{tag} && !$event{method});
- print "[$$] Schedule Event (", join(",",%event), ")\n" if
($self->{verbose} >= 3);
+ $self->log({level=>3},"Schedule Event (" . join(",",%event) . ")\n");
my $event;
if ($event{tag}) {
@@ -1946,18 +2118,18 @@
# we do unshift() to keep events executing in FIFO order for a particular
time
for (my $i = $#$scheduled_events; $i >= 0; $i--) {
$event = $scheduled_events->[$i];
- print "[$$] Checking event: time=$time [$event->{time}, every
$event->{interval}] $event->{method}().\n" if ($verbose >= 9);
+ $self->log({level=>5},"Checking event: time=$time [$event->{time},
every $event->{interval}] $event->{method}().\n");
if ($event->{time} <= $time) {
unshift(@$events, $event);
if ($event->{time} && $event->{interval}) {
$event->{time} += $event->{interval}; # reschedule the event
- print "[$$] Event Rescheduled: time=$time [$event->{time},
every $event->{interval}] $event->{method}().\n" if ($verbose >= 9);
+ $self->log({level=>5},"Event Rescheduled: time=$time
[$event->{time}, every $event->{interval}] $event->{method}().\n");
if ($time_of_next_event == 0 || $event->{time} <
$time_of_next_event) {
$time_of_next_event = $event->{time};
}
}
else {
- print "[$$] Event Removed: time=$time [$event->{time}, every
$event->{interval}] $event->{method}().\n" if ($verbose >= 9);
+ $self->log({level=>5},"Event Removed: time=$time
[$event->{time}, every $event->{interval}] $event->{method}().\n");
splice(@$scheduled_events, $i, 1); # remove the (one-time)
event
$event->{scheduled} = 0;
}
@@ -1979,17 +2151,32 @@
my $method = $event->{method};
my @args = $event->{args} ? @{$event->{args}} : ();
my $service_type = $event->{service_type};
+ my (@results);
if ($service_type) {
my $name = $event->{name};
my $service = $self->service($service_type, $name);
- $self->log("Send Event: $service_type($name).$method(@args)\n") if
($self->{verbose} >= 2);
- $service->$method(@args);
+ $self->log({level=>3},"Send Event:
$service_type($name).$method(@args)\n");
+ @results = $service->$method(@args);
}
else {
- $self->log("Send Event: $method(@args)\n") if ($self->{verbose} >= 2);
- $self->$method(@args);
+ $self->log({level=>3},"Send Event: $method(@args)\n");
+ @results = $self->$method(@args);
+ }
+ &App::sub_exit(@results) if ($App::trace);
+ if (wantarray()) {
+ return(@results);
+ }
+ else {
+ if ($#results == -1) {
+ return(undef);
+ }
+ elsif ($#results == 0) {
+ return($results[0]);
+ }
+ else {
+ return([EMAIL PROTECTED]);
+ }
}
- &App::sub_exit() if ($App::trace);
}
# NOTE: The baseline context implements the API for asynchronous events
@@ -2003,17 +2190,122 @@
sub send_async_event {
&App::sub_entry if ($App::trace);
my ($self, $event, $callback_event) = @_;
- $self->send_event($event);
+ my $errnum = 0;
+ my $errmsg = "";
+ my $event_token = "local-$$";
+ my ($returnval);
+ eval {
+ $returnval = $self->send_event($event);
+ };
+ if ($@) {
+ $errmsg = $@;
+ $errnum = 1;
+ }
if ($callback_event) {
- my $event_tag = "local-$$";
- if (! $callback_event->{args}) {
- $callback_event->{args} = [ $event_tag ];
- }
+ $callback_event->{args} = [] if (! $callback_event->{args});
+ push(@{$callback_event->{args}}, {event_token => $event_token,
returnval => $returnval, errnum => $errnum, errmsg => $errmsg});
$self->send_event($callback_event);
}
+ &App::sub_exit($event_token) if ($App::trace);
+ return($event_token);
+}
+
+=head2 wait_for_event()
+
+ * Signature: $self->wait_for_event($event_token)
+ * Param: $event_token string
+ * Return: void
+ * Throws: App::Exception
+ * Since: 0.01
+
+ Sample Usage:
+
+ $self->wait_for_event($event_token);
+
+The wait_for_event() method is called when an asynchronous event has been
+sent and no more processing can be completed before it is done.
+
+=cut
+
+sub wait_for_event {
+ &App::sub_entry if ($App::trace);
+ my ($self, $event_token) = @_;
+ &App::sub_exit() if ($App::trace);
+}
+
+# NOTE: This send_message() and send_async_message() can be on the App::Context
+# class to allow a program in any context to send this kind of message.
+# (The only downside is a dependency on IO::Socket::INET.)
+sub send_async_message {
+ &App::sub_entry if ($App::trace);
+ my ($self, $host, $port, $message, $await_return_value, $timeout,
$server_close) = @_;
+ my $pid = $self->fork();
+ if (!$pid) { # running in child
+ $self->send_message($host, $port, $message, $await_return_value,
$timeout, $server_close);
+ $self->exit(0);
+ }
&App::sub_exit() if ($App::trace);
}
+# NOTE: $messages that start with "RV-" wait for a return value.
+# $messages that start with "SC-" force the server to close the socket
first
+# This is to help manage which system has the sockets lingering in
TIME_WAIT state.
+# Here is the truth table for $await_return_value, $server_close
+# $await_return_value $server_close = client +
server
+# ------------------- ------------- ----------------------
---------------------
+# 0 0 write/close
read/close
+# 0 1 write/read/close
read/close
+# 1 0 write/read/write/close
read/write/read/close
+# 1 1 write/read/close
read/write/close
+sub send_message {
+ &App::sub_entry if ($App::trace);
+ my ($self, $host, $port, $message, $await_return_value, $timeout,
$server_close) = @_;
+ my $verbose = $self->{verbose};
+
+ if (!$port && $host =~ /^([^:]+):([0-9]+)$/) {
+ $host = $1;
+ $port = $2;
+ }
+
+ my $send_socket = IO::Socket::INET->new(
+ PeerAddr => $host,
+ PeerPort => $port,
+ Proto => "tcp",
+ Type => SOCK_STREAM,
+ ReuseAddr => 1,
+ );
+ my ($send_fd);
+ $send_fd = fileno($send_socket) if ($send_socket);
+ $self->log({level=>3},"($send_fd) send_message($host, $port, $message)\n");
+
+ my $response = "";
+ my $rv = $await_return_value ? "RV-" : "";
+ my $sc = $server_close ? "SC-" : "";
+ if ($send_socket) {
+ eval {
+ $send_socket->autoflush(1) if ($await_return_value ||
$server_close);
+ $send_socket->print("$rv$sc$message\n");
+ if ($await_return_value || $server_close) {
+ # $send_socket->timeout($timeout) if ($timeout); # doesn't
seem to work
+ $response = $send_socket->getline();
+ $response =~ s/[\r\n]+$//;
+ $send_socket->print("EOF\n") if ($await_return_value &&
!$server_close);
+ }
+ close($send_socket);
+ };
+ if ($@) {
+ $response = "SEND ERROR: $@";
+ }
+ }
+ else {
+ $response = "CONNECT ERROR: $!";
+ }
+
+ $self->log({level=>3},"send_message($host, $port, ...) => [$response]\n");
+ &App::sub_exit($response) if ($App::trace);
+ return($response);
+}
+
=head2 fork()
* Signature: $pid = $self->fork()
@@ -2032,7 +2324,7 @@
connections to be created if necessary.
Call this after a fork() in the child process.
-It will renew the resources which cannot be shared between a parent and
+It will shut down the resources which cannot be shared between a parent and
a child process.
Currently, this is primarily for database connections.
@@ -2045,19 +2337,29 @@
my ($self) = @_;
my $pid = fork();
if (!$pid) { # in the child process
- $self->renew_process_resources();
+ # $self->{is_child} = 1; # I might need to add this sometime, but
not now
+ $self->shutdown_unshareable_resources();
+ }
+ else {
+ $self->log({level=>4},"Child $pid started.\n");
}
&App::sub_exit($pid) if ($App::trace);
return($pid);
}
+sub exit {
+ my ($self, $exitval) = @_;
+ $self->shutdown();
+ exit($exitval);
+}
+
#############################################################################
-# renew_process_resources()
+# shutdown_unshareable_resources()
#############################################################################
-=head2 renew_process_resources()
+=head2 shutdown_unshareable_resources()
- * Signature: $self->renew_process_resources()
+ * Signature: $self->shutdown_unshareable_resources()
* Param: void
* Return: void
* Throws: App::Exception
@@ -2065,15 +2367,15 @@
Sample Usage:
- $self->renew_process_resources();
+ $self->shutdown_unshareable_resources();
-The renew_process_resources() method is called in a child process just after
+The shutdown_unshareable_resources() method is called in a child process just
after
it has been fork()ed.
This causes connections to databases, etc. to be closed gracefully and new
connections to be created if necessary.
Call this after a fork() in the child process.
-It will renew the resources which cannot be shared between a parent and
+It will shutdown_unshareable which cannot be shared between a parent and
a child process.
Currently, this is primarily for database connections.
@@ -2081,19 +2383,19 @@
=cut
-sub renew_process_resources {
+sub shutdown_unshareable_resources {
my $self = shift;
my ($conf, $repdef, $repname, $instance);
my ($class, $method, $args, $argidx, $repcache);
- $self->dbgprint("Context->renew_process_resources()")
+ $self->dbgprint("Context->shutdown_unshareable_resources()")
if ($App::DEBUG && $self->dbg(1));
$repcache = $self->{session}{cache}{Repository};
if (defined $repcache && ref($repcache) eq "HASH") {
foreach $repname (keys %$repcache) {
$instance = $repcache->{$repname};
- $instance->_renew_process_resources();
+ $instance->_shutdown_unshareable_resources();
delete $repcache->{$repname};
}
}