Author: spadkins
Date: Fri Jan 18 10:25:04 2008
New Revision: 10611
Modified:
p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
Log:
a bunch of logging changes and cleanup to make running in production cleaner
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
(original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm Fri Jan
18 10:25:04 2008
@@ -1,4 +1,3 @@
-
#############################################################################
## $Id: ClusterController.pm 6785 2006-08-11 23:13:19Z spadkins $
#############################################################################
@@ -12,8 +11,6 @@
@ISA = ( "App::Context::POE::Server" );
use Date::Format;
-#sub POE::Kernel::TRACE_STATISTICS () { 1 }
-#sub POE::Kernel::TRACE_PROFILE () { 1 }
use POE;
use strict;
@@ -65,12 +62,12 @@
sub dispatch_events_begin {
my ($self) = @_;
- $self->log({level=>2},"Starting Cluster Controller on
$self->{host}:$self->{port}\n") if $self->{options}{poe_trace};
+ $self->log({level=>3},"Starting Cluster Controller on
$self->{host}:$self->{port}\n") if $self->{options}{poe_trace};
}
sub dispatch_events_end {
my ($self) = @_;
- $self->log({level=>2},"Stopping Cluster Controller\n") if
$self->{options}{poe_trace};
+ $self->log({level=>3},"Stopping Cluster Controller\n") if
$self->{options}{poe_trace};
# nothing special yet
}
@@ -80,7 +77,7 @@
my $destination = $event->{destination};
if (! defined $destination) {
- $self->log("ERROR: send_async_event_now()
$event->{name}.$event->{method} : destination not assigned\n");
+ $self->log({level=>2},"ERROR $event->{name}.$event->{method} :
destination not assigned\n");
}
elsif ($event->{destination} eq "in_process") {
my $event_token = $self->send_async_event_in_process($event,
$callback_event);
@@ -112,7 +109,7 @@
sub ikc_register {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $session_name) = @_[OBJECT, KERNEL, ARG1];
- $self->log({level=>2},"POE: ikc_register ($session_name)\n") if
$self->{options}{poe_ikc_debug};
+ $self->log({level=>3},"ikc_register: ($session_name)\n") if
$self->{options}{poe_ikc_debug};
if ($session_name =~ /^poe_([^_]+)_(\d+)$/) {
my $node = "$1:$2";
$self->set_node_up($node);
@@ -125,7 +122,7 @@
sub ikc_unregister {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $session_name) = @_[OBJECT, KERNEL, ARG1];
- $self->log({level=>2},"POE: ikc_unregister ($session_name)\n") if
$self->{options}{poe_ikc_debug};
+ $self->log({level=>3},"ikc_unregister: ($session_name)\n") if
$self->{options}{poe_ikc_debug};
if ($session_name =~ /^poe_([^_]+)_(\d+)$/) {
my $node = "$1:$2";
$self->set_node_down($node);
@@ -136,7 +133,7 @@
sub ikc_shutdown {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $arg0, $arg1, $arg2, $arg3) = @_[OBJECT, KERNEL, ARG0,
ARG1, ARG2, ARG3];
- $self->log({level=>2},"POE: ikc_shutdown args=($arg0, $arg1, $arg2,
$arg3)\n") if $self->{options}{poe_ikc_debug};
+ $self->log({level=>3},"ikc_shutdown: args=($arg0, $arg1, $arg2, $arg3)\n")
if $self->{options}{poe_ikc_debug};
&App::sub_exit() if ($App::trace);
return;
}
@@ -164,7 +161,7 @@
[ $runtime_event_token ]);
}
else {
- $self->log("ERROR: _abort_running_async_event()
$event->{name}.$event->{method} : unparseable runtime event token
[$runtime_event_token]\n");
+ $self->log({level=>2},"ERROR $event->{name}.$event->{method} :
unparseable runtime event token [$runtime_event_token]\n");
}
}
&App::sub_exit() if ($App::trace);
@@ -211,23 +208,20 @@
sub poe_receive_node_status {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $args) = @_[OBJECT, KERNEL, ARG0];
- $self->log("POE: poe_receive_node_status enter\n") if
$self->{options}{poe_trace};
my ($node, $sys_info) = @$args;
- $self->log("POE: poe_receive_node_status ($node) - " .
+ $self->log({level=>3},"poe_receive_node_status ($node) - " .
"load=$sys_info->{system_load}, " .
"memfree=$sys_info->{memfree}/$sys_info->{memtotal} " .
"swapfree=$sys_info->{swapfree}/$sys_info->{swaptotal}\n") if
$self->{options}{poe_trace};
$self->set_node_up($node, $sys_info);
- $self->log("POE: poe_receive_node_status exit\n") if
$self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
}
sub poe_run_event {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap, $event ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ];
- $self->log("POE: poe_run_event enter\n") if $self->{options}{poe_trace};
my ($event_str);
my $args = $event->{args} || [];
@@ -241,7 +235,6 @@
}
$self->send_event($event);
- $self->log("POE: poe_run_event exit : event[$event_str]\n") if
$self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
}
@@ -365,7 +358,7 @@
$kernel->post("IKC", "post",
"poe://$remote_server_name/$remote_session_alias/$remote_session_state");
}
else {
- $self->log("ERROR: shutdown_nodes: unparseable node [$node]\n");
+ $self->log({level=>2},"ERROR unparseable node [$node]\n");
}
}
&App::sub_exit() if ($App::trace);
@@ -395,11 +388,11 @@
$cmd = $cmd_fmt;
$cmd =~ s/{host}/$host/g;
$cmd =~ s/{port}/$port/g;
- $self->log("Starting Node [$node]: [$cmd]\n") if
$self->{options}{poe_trace};
+ $self->log({level=>3},"Starting Node [$node]: [$cmd]\n") if
$self->{options}{poe_trace};
system("$cmd < /dev/null &");
}
else {
- $self->log("ERROR: startup_nodes: unparseable node [$node]\n");
+ $self->log({level=>2},"ERROR unparseable node [$node]\n");
}
}
&App::sub_exit() if ($App::trace);
@@ -417,7 +410,7 @@
close(App::Context::POE::ClusterController::FILE);
}
else {
- $self->log("ERROR: Can't write node file [$node_file]: $!\n");
+ $self->log({level=>2},"ERROR Can't write node file [$node_file]:
$!\n");
}
&App::sub_exit() if ($App::trace);
}
@@ -439,10 +432,6 @@
}
close(App::Context::POE::ClusterController::FILE);
}
- else {
- # This is not really a problem.
- # $self->log("WARNING: Can't read node file [$node_file]: $!\n");
- }
&App::sub_exit() if ($App::trace);
}
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm Fri Jan 18
10:25:04 2008
@@ -1,4 +1,3 @@
-
#############################################################################
## $Id: ClusterNode.pm 3666 2006-03-11 20:34:10Z spadkins $
#############################################################################
@@ -15,8 +14,6 @@
use Date::Format;
-#sub POE::Kernel::TRACE_STATISTICS () { 1 }
-#sub POE::Kernel::TRACE_PROFILE () { 1 }
use POE;
use POE::Component::IKC::Client;
use POE::Component::IKC::Responder;
@@ -72,7 +69,7 @@
name => $ikc_name,
timeout => 60,
);
- $self->log({level=>2},"Listening for Inter-Kernel Communications on
$self->{host}:$self->{port}\n") if $self->{options}{poe_ikc_debug};
+ $self->log({level=>3},"Listening for Inter-Kernel Communications on
$self->{host}:$self->{port}\n") if $self->{options}{poe_ikc_debug};
my $session_name = $self->{poe_session_name};
POE::Component::Server::SimpleHTTP->new(
@@ -84,7 +81,7 @@
{ 'DIR' => '.*', 'SESSION' => $session_name, 'EVENT' =>
'poe_http_server_state', },
],
);
- $self->log({level=>2},"Listening for HTTP Requests on
$self->{host}:$self->{options}{http_port}\n") if
$self->{options}{poe_http_debug};
+ $self->log({level=>3},"Listening for HTTP Requests on
$self->{host}:$self->{options}{http_port}\n") if
$self->{options}{poe_http_debug};
&App::sub_exit() if ($App::trace);
}
@@ -99,7 +96,6 @@
sub _start {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
- $self->log({level=>2},"POE: _start\n") if $self->{options}{poe_trace};
my $name = $self->{poe_session_name};
$kernel->alias_set($name);
@@ -133,7 +129,7 @@
sub ikc_register {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $remote_kernel_id, $node) = @_[OBJECT, KERNEL, ARG0,
ARG3];
- $self->log({level=>2},"POE: ikc_register ($remote_kernel_id;
node=$node)\n") if $self->{options}{poe_ikc_debug};
+ $self->log({level=>3},"ikc_register: ($remote_kernel_id; node=$node)\n")
if $self->{options}{poe_ikc_debug};
$self->{controller_up} = 1;
$self->send_node_status();
&App::sub_exit() if ($App::trace);
@@ -143,7 +139,7 @@
sub ikc_unregister {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $remote_kernel_id, $session_name, $node) = @_[OBJECT,
KERNEL, ARG0, ARG1, ARG3];
- $self->log({level=>2},"POE: ikc_unregister ($remote_kernel_id;
session_name=$session_name; node=$node)\n") if $self->{options}{poe_ikc_debug};
+ $self->log({level=>3},"ikc_unregister: ($remote_kernel_id;
session_name=$session_name; node=$node)\n") if $self->{options}{poe_ikc_debug};
$self->{controller_up} = 0;
$kernel->yield("poe_shutdown");
&App::sub_exit() if ($App::trace);
@@ -152,7 +148,7 @@
sub ikc_shutdown {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $session, $heap ) = @_[ OBJECT, KERNEL, SESSION, HEAP
];
- $self->log({level=>2},"POE: ikc_shutdown\n") if
$self->{options}{poe_ikc_debug};
+ $self->log({level=>3},"ikc_shutdown\n") if $self->{options}{poe_ikc_debug};
&App::sub_exit() if ($App::trace);
return;
}
@@ -160,7 +156,7 @@
sub dispatch_events_begin {
&App::sub_entry if ($App::trace);
my ($self) = @_;
- $self->log({level=>2},"Starting Cluster Node on
$self->{host}:$self->{port}\n") if $self->{options}{poe_trace};
+ $self->log({level=>3},"Starting Cluster Node on
$self->{host}:$self->{port}\n") if $self->{options}{poe_trace};
my $node_heartbeat = $self->{options}{node_heartbeat} || 60;
$self->schedule_event(
method => "send_node_status",
@@ -179,7 +175,7 @@
sub dispatch_events_end {
&App::sub_entry if ($App::trace);
my ($self) = @_;
- $self->log({level=>2},"Stopping Cluster Node\n") if
$self->{options}{poe_trace};
+ $self->log({level=>3},"Stopping Cluster Node\n") if
$self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
}
@@ -249,14 +245,13 @@
sub poe_cancel_async_event {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap, $arg0 ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ];
- $self->log("POE: poe_cancel_async_event enter\n") if
$self->{options}{poe_trace};
my ($runtime_event_token) = @$arg0;
### Find if running
for my $pid (keys %{$self->{running_async_event}}) {
my $event_token = $self->{running_async_event}{$pid}[0]{event_token};
if ($runtime_event_token eq $event_token) {
- $self->log({level=>2},"CN : poe_cancel_async_event :
running_async_event : found event_token=[$event_token] pid=[$pid]\n");
+ $self->log({level=>3},"CN : poe_cancel_async_event:
running_async_event: found event_token=[$event_token] pid=[$pid]\n");
### Kill it
if ($pid =~ /^[0-9]+$/) {
@@ -278,7 +273,6 @@
}
}
- $self->log("POE: poe_cancel_async_event exit :
runtime_event_token[$runtime_event_token]\n") if $self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
}
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm Fri Jan 18
10:25:04 2008
@@ -26,8 +26,6 @@
use POE::Component::IKC::Server;
use HTTP::Status qw/RC_OK/;
use Socket qw(INADDR_ANY);
-use Data::Dumper;
-$Data::Dumper::Terse = 1;
sub _init {
&App::sub_entry if ($App::trace);
@@ -121,7 +119,7 @@
port => $self->{port},
name => $self->{poe_kernel_name},
);
- $self->log({level=>2},"Listening for Inter-Kernel Communications on
$self->{host}:$self->{port}\n") if $self->{options}{poe_ikc_debug};
+ $self->log({level=>3},"Listening for Inter-Kernel Communications on
$self->{host}:$self->{port}\n") if $self->{options}{poe_ikc_debug};
POE::Component::IKC::Responder->spawn();
my $session_name = $self->{poe_session_name};
@@ -134,7 +132,7 @@
{ 'DIR' => '.*', 'SESSION' => $session_name, 'EVENT' =>
'poe_http_server_state', },
],
);
- $self->log({level=>2},"Listening for HTTP Requests on
$self->{host}:$self->{options}{http_port}\n") if
$self->{options}{poe_http_debug};
+ $self->log({level=>3},"Listening for HTTP Requests on
$self->{host}:$self->{options}{http_port}\n") if
$self->{options}{poe_http_debug};
&App::sub_exit() if ($App::trace);
}
@@ -197,7 +195,7 @@
$name = $object;
}
$service = $self->service($service_type, $name); # instantiate
it. that's all.
- $self->log({level=>3},"$service_type $name instantiated
[$service]\n");
+ $self->log({level=>3},"dispatch_events: $service_type $name
instantiated [$service]\n");
$self->{main_service} = $service if (!$self->{main_service});
}
}
@@ -208,7 +206,7 @@
$poe_kernel->run();
};
if ($@) {
- $self->log($@);
+ $self->log({level=>2},$@);
}
$self->dispatch_events_end();
@@ -220,14 +218,14 @@
&App::sub_entry if ($App::trace);
my ($self) = @_;
my $verbose = $self->{verbose};
- $self->log({level=>2},"Starting Dispatching Events on Server on
$self->{host}:$self->{port}\n");
+ $self->log({level=>3},"Starting Dispatching Events on Server on
$self->{host}:$self->{port}\n");
&App::sub_exit() if ($App::trace);
}
sub dispatch_events_end {
my ($self) = @_;
my $verbose = $self->{verbose};
- $self->log({level=>2},"Stopping Dispatching Events on Server.\n");
+ $self->log({level=>3},"Stopping Dispatching Events on Server.\n");
}
sub state {
@@ -351,20 +349,22 @@
### POE state dumping - Currently commented out because it doesn't gain us
much
### in the way of visibility, and POE::API::Peek is a CPAN pain
### UNCOMMENT THIS IF YOU NEED IT, DEPENDS ON A PAINFUL LIBRARY
- App->use("POE::API::Peek");
- my $api = POE::API::Peek->new();
- my @queue = $api->event_queue_dump();
- $state .= "POE event_queue_dump\n";
- my $first = 1;
- my $poe_stuff = [qw(ID index priority event type source destination)];
- for my $item (@queue) {
- if ($first) {
- $state .= sprintf("%7s %6s %20s %30s %15s %30s %30s\n",
@$poe_stuff);
- $first = 0;
+ if ($self->{options}{poe_trace}) {
+ App->use("POE::API::Peek");
+ my $api = POE::API::Peek->new();
+ my @queue = $api->event_queue_dump();
+ $state .= "POE event_queue_dump\n";
+ my $first = 1;
+ my $poe_stuff = [qw(ID index priority event type source destination)];
+ for my $item (@queue) {
+ if ($first) {
+ $state .= sprintf("%7s %6s %20s %30s %15s %30s %30s\n",
@$poe_stuff);
+ $first = 0;
+ }
+ $state .= sprintf("%7d %6d %20f %30s %15s %30s %30s\n", @[EMAIL
PROTECTED]);
}
- $state .= sprintf("%7d %6d %20f %30s %15s %30s %30s\n", @[EMAIL
PROTECTED]);
+ $state .= "\n";
}
- $state .= "\n";
return $state;
}
@@ -445,9 +445,7 @@
&App::sub_entry if ($App::trace);
my ($self, $max_events) = @_;
my $t0 = [gettimeofday];
- #$self->log({level=>2},"S: dispatch_pending_async_events : poe_state : \n"
. $self->_state_poe() . "\n");
- $self->log({level=>2},"S: dispatch_pending_async_events : enter :
max_events=[$max_events]\n") if $self->{options}{poe_trace};
- #$self->log({level=>2},"S: dispatch_pending_async_events :
pending_async_events : " . Dumper($self->{pending_async_events}) . "\n") if
$self->{options}{poe_trace};
+ $self->log({level=>3},"dispatch_pending_async_events enter:
max_events=[$max_events]\n") if $self->{options}{poe_trace};
$max_events ||= 9999;
my $pending_async_events = $self->{pending_async_events};
@@ -484,7 +482,7 @@
}
}
- $self->log({level=>2},"S: dispatch_pending_async_events : exiting :
events_occurred=[$events_occurred] time=[" . sprintf("%.4f", tv_interval($t0,
[gettimeofday])) . "]\n") if $self->{options}{poe_trace};
+ $self->log({level=>3},"dispatch_pending_async_events exit:
events_occurred=[$events_occurred] time=[" . sprintf("%.4f", tv_interval($t0,
[gettimeofday])) . "]\n") if $self->{options}{poe_trace};
&App::sub_exit($events_occurred) if ($App::trace);
return($events_occurred);
}
@@ -505,8 +503,7 @@
sub send_async_event_now {
&App::sub_entry if ($App::trace);
my ($self, $event, $callback_event) = @_;
-
- $self->log({level=>2}, "Server: send_async_event_now()
$event->{name}.$event->{method} : $event->{destination}\n");
+ $self->log({level=>3}, "send_async_event_now enter:
$event->{name}.$event->{method} : $event->{destination}\n");
if ($event->{destination} eq "in_process") {
my $event_token = $self->send_async_event_in_process($event,
$callback_event);
@@ -541,6 +538,7 @@
my $runtime_event_token = $pid;
$self->{running_async_event}{$runtime_event_token} = [ $event,
$callback_event ];
}
+ $self->log({level=>3}, "send_async_event_now exit\n");
&App::sub_exit() if ($App::trace);
}
=head2 wait_for_event()
@@ -577,9 +575,9 @@
else { # the new child process has no sub-processes
$self->{num_procs} = 0;
$self->{proc} = {};
- $SIG{INT} = sub { $self->log({level=>2},"Caught Signal: @_
(quitting)\n"); $self->exit(102); }; # SIG 2
- $SIG{QUIT} = sub { $self->log({level=>2},"Caught Signal: @_
(quitting)\n"); $self->exit(103); }; # SIG 3
- $SIG{TERM} = sub { $self->log({level=>2},"Caught Signal: @_
(quitting)\n"); $self->shutdown(); $self->exit(115); }; # SIG 15
+ $SIG{INT} = sub { $self->log({level=>3},"fork: Caught Signal: @_
(quitting)\n"); $self->exit(102); }; # SIG 2
+ $SIG{QUIT} = sub { $self->log({level=>3},"fork: Caught Signal: @_
(quitting)\n"); $self->exit(103); }; # SIG 3
+ $SIG{TERM} = sub { $self->log({level=>3},"fork: Caught Signal: @_
(quitting)\n"); $self->shutdown(); $self->exit(115); }; # SIG 15
}
&App::sub_exit($pid) if ($App::trace);
return($pid);
@@ -729,7 +727,7 @@
kill(15, $runtime_event_token);
}
else {
- $self->log("Unable to abort running async event
[$runtime_event_token]\n");
+ $self->log({level=>3},"Unable to abort running async event
[$runtime_event_token]\n");
}
&App::sub_exit() if ($App::trace);
}
@@ -773,14 +771,13 @@
my (@args);
@args = @$args if (ref($args) eq "ARRAY");
@args = ($args) if (!ref($args));
- $self->log({level=>2},"POE: _default - WARNING: Entered an unhandled state
($state) with args (@args)\n") if $self->{options}{poe_trace};
+ $self->log({level=>3},"POE: _default: Entered an unhandled state ($state)
with args (@args)\n") if $self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
}
sub _start {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
- $self->log("POE: _start enter\n") if $self->{options}{poe_trace};
my $name = $self->{poe_session_name};
$kernel->alias_set($name);
@@ -804,23 +801,20 @@
$kernel->delay_set("poe_event_loop_extension", 5) if
(!$self->{disable_event_loop_extensions});
$kernel->delay_set("poe_alarm", 5);
- $self->log("POE: _start exit\n") if $self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
}
sub _stop {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap, $state, $args ) = @_[ OBJECT, KERNEL, HEAP,
ARG0, ARG1 ];
- $self->log("POE: _stop enter\n") if $self->{options}{poe_trace};
#sleep(1); # take a second to let child processes to die (perhaps not
necessary, perhaps necessary when using POE::Wheel::Run)
- $self->log("POE: _stop exit\n") if $self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
}
sub ikc_register {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $session_name) = @_[OBJECT, KERNEL, ARG1];
- $self->log("POE: ikc_register ($session_name)\n") if
$self->{options}{poe_ikc_debug};
+ $self->log({level=>3},"ikc_register: ($session_name)\n") if
$self->{options}{poe_ikc_debug};
my ($retval);
&App::sub_exit($retval) if ($App::trace);
return($retval);
@@ -829,14 +823,14 @@
sub ikc_unregister {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $session_name) = @_[OBJECT, KERNEL, ARG1];
- $self->log("POE: ikc_unregister ($session_name)\n") if
$self->{options}{poe_ikc_debug};
+ $self->log({level=>3},"ikc_unregister: ($session_name)\n") if
$self->{options}{poe_ikc_debug};
&App::sub_exit() if ($App::trace);
}
sub ikc_shutdown {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $arg0, $arg1, $arg2, $arg3) = @_[OBJECT, KERNEL, ARG0,
ARG1, ARG2, ARG3];
- $self->log("POE: ikc_shutdown args=($arg0, $arg1, $arg2, $arg3)\n") if
$self->{options}{poe_ikc_debug};
+ $self->log({level=>3},"ikc_shutdown: args=($arg0, $arg1, $arg2, $arg3)\n")
if $self->{options}{poe_ikc_debug};
&App::sub_exit() if ($App::trace);
return;
}
@@ -844,7 +838,6 @@
sub poe_yield {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $state, $max_count) = @_;
- $self->log("POE: poe_yield enter\n") if $self->{options}{poe_trace};
my $count = $self->{poe_count}{$state} || 0;
$max_count ||= 1;
@@ -854,7 +847,6 @@
$kernel->yield($state);
}
- $self->log("POE: poe_yield exit : state[$state] poe_count[" .
$self->{poe_count}{$state} . "]\n") if $self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
return;
}
@@ -862,7 +854,6 @@
sub poe_yield_acknowledged {
&App::sub_entry if ($App::trace);
my ($self, $state) = @_;
- $self->log("POE: poe_yield_acknowledged enter\n") if
$self->{options}{poe_trace};
if ($self->{poe_count}{$state}) {
$self->{poe_count}{$state}--;
@@ -871,7 +862,6 @@
$self->{poe_count}{$state} = 0;
}
- $self->log("POE: poe_yield_acknowledged exit : state[$state] poe_count[" .
$self->{poe_count}{$state} . "]\n") if $self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
return;
}
@@ -879,7 +869,6 @@
sub poe_sigterm {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap, $signame ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ];
- $self->log("POE: poe_sigterm enter\n") if $self->{options}{poe_trace};
# How do I shut down the POE kernel now and exit?
# I think I need to shut down the last session and the kernel will exit.
@@ -889,38 +878,32 @@
#$kernel->yield("poe_shutdown");
# However the signals which bring me here seem to do the shutdown for me,
so it's unnecessary
- $self->log("POE: poe_sigterm exit : signal[$signame]\n") if
$self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
}
sub poe_sigignore {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap, $signame ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ];
- $self->log("POE: poe_sigignore enter\n") if $self->{options}{poe_trace};
- $self->log("POE: poe_sigignore exit : signal[$signame]\n") if
$self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
}
sub poe_sigchld {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap, $pid, $status ) = @_[ OBJECT, KERNEL, HEAP,
ARG1, ARG2 ];
- $self->log("POE: poe_sigchld enter\n") if $self->{options}{poe_trace};
my $exitval = $status >> 8;
my $sig = $status & 255;
$self->finish_pid($pid, $exitval, $sig);
- $self->log("POE: poe_sigchld exit : child[$pid] exitval[$exitval]
sig[$sig]\n") if $self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
}
sub poe_alarm {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
- $self->log("POE: poe_alarm enter\n") if $self->{options}{poe_trace};
+ $self->log({level=>3},"poe_alarm enter\n") if $self->{options}{poe_trace};
- #$self->log({level=>2},"S: poe_alarm : poe_state : \n" .
$self->_state_poe() . "\n");
my $main_service = $self->{main_service};
### This is mostly for the node, which needs this to spawn queued execute
subrequest events
@@ -948,13 +931,15 @@
### call some POE profile dump functions, only happens when ENV variables
### POE_TRACE_PROFILE=1 POE_TRACE_STATISTICS=1
- my %data = $kernel->stat_getdata();
- for my $key (sort keys %data) {
- $self->log("POE: poe_alarm : poe_statistics [" . sprintf("%20s : %s",
$key, $data{$key}) . "]\n");
+ if ($self->{options}{poe_trace}) {
+ my %data = $kernel->stat_getdata();
+ for my $key (sort keys %data) {
+ $self->log({level=>3},"poe_alarm: poe_statistics [" .
sprintf("%20s : %s", $key, $data{$key}) . "]\n");
+ }
+ $kernel->stat_show_profile();
}
- $kernel->stat_show_profile();
- $self->log("POE: poe_alarm exit : events_occurred[$events_occurred]
sec_until_next_event[$sec_until_next_event]\n") if $self->{options}{poe_trace};
+ $self->log({level=>3},"poe_alarm exit: events_occurred[$events_occurred]
sec_until_next_event[$sec_until_next_event]\n") if $self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
}
@@ -962,14 +947,13 @@
sub poe_shutdown {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $session, $heap ) = @_[ OBJECT, KERNEL, SESSION, HEAP
];
- $self->log("POE: poe_shutdown enter\n") if $self->{options}{poe_trace};
### Abort all running async events
for my $event_token (keys %{$self->{running_async_event}}) {
### We can't use the normal abort_async_event
### because POE and IKC are shutting down shortly
$self->_abort_running_async_event($event_token);
- $self->log({level=>2},"POE: poe_shutdown : abort running events :
event_token=[$event_token]\n") if $self->{options}{poe_trace};
+ $self->log({level=>3},"poe_shutdown: abort running events :
event_token=[$event_token]\n") if $self->{options}{poe_trace};
}
### Clear your alias
@@ -987,7 +971,6 @@
### Shut down POE IKC
$kernel->post('IKC', 'shutdown');
- $self->log("POE: poe_shutdown exit\n") if $self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
return;
}
@@ -995,21 +978,20 @@
sub poe_dispatch_pending_async_events {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap, $arg ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ];
- $self->log("POE: poe_dispatch_pending_async_events enter\n") if
$self->{options}{poe_trace};
+ $self->log({level=>3},"POE: poe_dispatch_pending_async_events enter\n") if
$self->{options}{poe_trace};
$self->poe_yield_acknowledged("poe_dispatch_pending_async_events");
my $events_occurred = $self->dispatch_pending_async_events();
- $self->log("POE: poe_dispatch_pending_async_events exit :
events_occurred[$events_occurred]\n") if $self->{options}{poe_trace};
+ $self->log({level=>3},"POE: poe_dispatch_pending_async_events exit:
events_occurred[$events_occurred]\n") if $self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
}
sub poe_event_loop_extension {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
- $self->log("POE: poe_event_loop_extension enter\n") if
$self->{options}{poe_trace};
+ $self->log({level=>3},"POE: poe_event_loop_extension enter\n") if
$self->{options}{poe_trace};
- $self->log({level=>2},"POE: poe_event_loop_extension\n") if
$self->{options}{poe_trace};
$self->poe_yield_acknowledged("poe_event_loop_extension");
my $event_loop_extensions = $self->{event_loop_extensions};
my $async_events_added = 0;
@@ -1028,7 +1010,7 @@
$self->poe_yield($kernel, "poe_event_loop_extension");
}
- $self->log("POE: poe_event_loop_extension exit : event_loop_extensions[" .
@$event_loop_extensions . "] async_events_added[$async_events_added]\n") if
$self->{options}{poe_trace};
+ $self->log({level=>3},"POE: poe_event_loop_extension exit:
event_loop_extensions[" . @$event_loop_extensions . "]
async_events_added[$async_events_added]\n") if $self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
}
@@ -1044,7 +1026,7 @@
sub poe_run_event {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap, $event ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ];
- $self->log("POE: poe_run_event enter\n") if $self->{options}{poe_trace};
+ $self->log({level=>3},"POE: poe_run_event enter\n") if
$self->{options}{poe_trace};
my ($event_str);
my $args = $event->{args} || [];
@@ -1058,7 +1040,7 @@
}
$self->send_event($event);
- $self->log("POE: poe_run_event exit : event[$event_str]\n") if
$self->{options}{poe_trace};
+ $self->log({level=>3},"POE: poe_run_event exit: event[$event_str]\n") if
$self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
}
@@ -1066,13 +1048,13 @@
sub poe_enqueue_async_event {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $args) = @_[OBJECT, KERNEL, ARG0];
- $self->log("POE: poe_enqueue_async_event enter\n") if
$self->{options}{poe_trace};
+ $self->log({level=>3},"POE: poe_enqueue_async_event enter\n") if
$self->{options}{poe_trace};
my ($sender, $event, $callback_event) = @$args;
my $runtime_event_token = $self->send_async_event($event, { method =>
"async_event_finished", args => [ $sender, $event, $callback_event ], });
$event->{event_token} = $runtime_event_token;
- $self->log("POE: poe_enqueue_async_event exit :
event[$event->{name}.$event->{method} token=$event->{event_token}]
runtime_event_token[$runtime_event_token]\n") if $self->{options}{poe_trace};
+ $self->log({level=>3},"POE: poe_enqueue_async_event exit:
event[$event->{name}.$event->{method} token=$event->{event_token}]
runtime_event_token[$runtime_event_token]\n") if $self->{options}{poe_trace};
&App::sub_exit([$runtime_event_token, [$event, $callback_event]]) if
($App::trace);
return([$runtime_event_token, [$event, $callback_event]]);
}
@@ -1081,12 +1063,12 @@
sub poe_enqueue_async_event_finished {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $return_values) = @_[OBJECT, KERNEL, ARG0];
- $self->log("POE: poe_enqueue_async_event_finished enter\n") if
$self->{options}{poe_trace};
+ $self->log({level=>3},"POE: poe_enqueue_async_event_finished enter\n") if
$self->{options}{poe_trace};
my ($runtime_event_token, $async_event) = @$return_values;
$self->{running_async_event}{$runtime_event_token} = $async_event;
- $self->log("POE: poe_enqueue_async_event_finished exit :
event[$async_event->[0]{name}.$async_event->[0]{method} =>
$runtime_event_token]\n") if $self->{options}{poe_trace};
+ $self->log({level=>3},"POE: poe_enqueue_async_event_finished exit:
event[$async_event->[0]{name}.$async_event->[0]{method} =>
$runtime_event_token]\n") if $self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
}
@@ -1112,16 +1094,24 @@
sub poe_remote_async_event_finished {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $args) = @_[OBJECT, KERNEL, ARG0];
- $self->log("POE: poe_remote_async_event_finished enter\n") if
$self->{options}{poe_trace};
+ $self->log({level=>3},"POE: poe_remote_async_event_finished enter\n") if
$self->{options}{poe_trace};
my ($runtime_event_token, $callback_args) = @$args;
my $async_event = $self->{running_async_event}{$runtime_event_token};
if ($async_event) {
my ($event, $callback_event) = @$async_event;
- $self->log({level=>2},"POE: poe_remote_async_event_finished :
($event->{name}.$event->{method} => $runtime_event_token)\n") if
$self->{options}{poe_trace};
+ $self->log({level=>3},"POE: poe_remote_async_event_finished :
($event->{name}.$event->{method} => $runtime_event_token)\n") if
$self->{options}{poe_trace};
delete $self->{running_async_event}{$runtime_event_token};
my $destination = $event->{destination} || "local";
+ ### Decrease the node's load value a fraction of its last system load
+ my $system_load = $self->{node}{$destination}{system_load};
+ my $number_node_events = $self->{node}{$destination}{num_async_events};
+ if ($number_node_events > 0) {
+ my $load_decrease_amount = $system_load / $number_node_events;
+ $self->{node}{$destination}{load} -= $load_decrease_amount;
+ }
+ ### Keep track of our event counts
$self->{num_async_events}--;
$self->{node}{$destination}{num_async_events}--;
@@ -1130,22 +1120,22 @@
$self->send_event($callback_event);
}
else {
- $self->log({level=>2},"Server: WARNING :
poe_remote_async_event_finished called without callback_event :
runtime_event_token[$runtime_event_token]\n");
+ $self->log({level=>3},"Server: WARNING :
poe_remote_async_event_finished called without callback_event :
runtime_event_token[$runtime_event_token]\n");
}
}
- $self->log("POE: poe_remote_async_event_finished exit\n") if
$self->{options}{poe_trace};
+ $self->log({level=>3},"POE: poe_remote_async_event_finished exit\n") if
$self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
}
sub poe_server_state {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
- $self->log("POE: poe_server_state enter\n") if $self->{options}{poe_trace};
+ $self->log({level=>3},"POE: poe_server_state enter\n") if
$self->{options}{poe_trace};
my $server_state = $self->state();
- $self->log("POE: poe_server_state exit\n") if $self->{options}{poe_trace};
+ $self->log({level=>3},"POE: poe_server_state exit\n") if
$self->{options}{poe_trace};
&App::sub_exit($server_state) if ($App::trace);
return $server_state;
}
@@ -1153,7 +1143,7 @@
sub poe_http_server_state {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap, $request, $response ) = @_[ OBJECT, KERNEL,
HEAP, ARG0, ARG1 ];
- $self->log("POE: poe_http_server_state enter\n") if
$self->{options}{poe_trace};
+ $self->log({level=>3},"POE: poe_http_server_state enter\n") if
$self->{options}{poe_trace};
my $server_state = $kernel->call( $self->{poe_session_name},
'poe_server_state' );
### Build the response.
@@ -1163,7 +1153,7 @@
### Signal that the request was handled okay.
$kernel->post( $self->{poe_kernel_http_name}, 'DONE', $response );
- $self->log("POE: poe_http_server_state exit\n") if
$self->{options}{poe_trace};
+ $self->log({level=>3},"POE: poe_http_server_state exit\n") if
$self->{options}{poe_trace};
&App::sub_exit(RC_OK) if ($App::trace);
return RC_OK;
}
@@ -1171,7 +1161,7 @@
sub poe_http_test_run {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap, $request, $response ) = @_[ OBJECT, KERNEL,
HEAP, ARG0, ARG1 ];
- $self->log("POE: poe_http_test_run enter\n") if
$self->{options}{poe_trace};
+ $self->log({level=>3},"POE: poe_http_test_run enter\n") if
$self->{options}{poe_trace};
my $event = {
service_type => "SessionObject",
@@ -1189,7 +1179,7 @@
# Signal that the request was handled okay.
$kernel->post( $self->{poe_kernel_http_name}, 'DONE', $response );
- $self->log("POE: poe_http_test_run exit\n") if $self->{options}{poe_trace};
+ $self->log({level=>3},"POE: poe_http_test_run exit\n") if
$self->{options}{poe_trace};
&App::sub_exit(RC_OK) if ($App::trace);
return RC_OK;
}