Author: spadkins
Date: Tue Aug 28 12:01:45 2007
New Revision: 9897
Modified:
p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
Log:
tweaks on POE stuff
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
(original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm Tue Aug
28 12:01:45 2007
@@ -63,12 +63,12 @@
sub dispatch_events_begin {
my ($self) = @_;
- $self->log({level=>2},"Starting Cluster Controller on
$self->{host}:$self->{port}\n");
+ $self->log({level=>2},"Starting Cluster Controller on
$self->{host}:$self->{port}\n") if $self->{options}{poe_trace};
}
sub dispatch_events_end {
my ($self) = @_;
- $self->log({level=>2},"Stopping Cluster Controller\n");
+ $self->log({level=>2},"Stopping Cluster Controller\n") if
$self->{options}{poe_trace};
# nothing special yet
}
@@ -76,6 +76,8 @@
&App::sub_entry if ($App::trace);
my ($self, $event, $callback_event) = @_;
+ $self->log({level=>2}, "CC: send_async_event_now()
$event->{name}.$event->{method} : $event->{destination}\n") if
$self->{options}{poe_trace};
+
my $destination = $event->{destination};
if (! defined $destination) {
$self->log("ERROR: send_async_event_now()
$event->{name}.$event->{method} : destination not assigned\n");
@@ -110,7 +112,7 @@
sub ikc_register {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $session_name) = @_[OBJECT, KERNEL, ARG1];
- $self->log({level=>2},"POE: ikc_register ($session_name)\n");
+ $self->log({level=>2},"POE: ikc_register ($session_name)\n") if
$self->{options}{poe_ikc_debug};
if ($session_name =~ /^ikc_([^_]+)_(\d+)$/) {
my $node = "$1:$2";
$self->set_node_up($node);
@@ -123,8 +125,8 @@
sub ikc_unregister {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $session_name) = @_[OBJECT, KERNEL, ARG1];
- $self->log({level=>2},"POE: ikc_unregister ($session_name)\n");
- if ($session_name =~ /^ikc_([^_]+)_(\d+)$/) {
+ $self->log({level=>2},"POE: ikc_unregister ($session_name)\n") if
$self->{options}{poe_ikc_debug};
+ if ($session_name =~ /^poe_([^_]+)_(\d+)$/) {
my $node = "$1:$2";
$self->set_node_down($node);
}
@@ -134,7 +136,7 @@
sub ikc_shutdown {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $arg0, $arg1, $arg2, $arg3) = @_[OBJECT, KERNEL, ARG0,
ARG1, ARG2, ARG3];
- $self->log({level=>2},"POE: ikc_shutdown args=($arg0, $arg1, $arg2,
$arg3)\n");
+ $self->log({level=>2},"POE: ikc_shutdown args=($arg0, $arg1, $arg2,
$arg3)\n") if $self->{options}{poe_ikc_debug};
&App::sub_exit() if ($App::trace);
return;
}
@@ -211,7 +213,9 @@
my ($self, $kernel, $args) = @_[OBJECT, KERNEL, ARG0];
my ($node, $sys_info) = @$args;
$self->log("POE: poe_receive_node_status ($node) - " .
- "load=$sys_info->{load},
memfree=$sys_info->{memfree}/$sys_info->{memtotal}
swapfree=$sys_info->{swapfree}/$sys_info->{swaptotal}\n");
+ "load=$sys_info->{load}, " .
+ "memfree=$sys_info->{memfree}/$sys_info->{memtotal} " .
+ "swapfree=$sys_info->{swapfree}/$sys_info->{swaptotal}\n") if
$self->{options}{poe_trace};
$self->set_node_up($node, $sys_info);
&App::sub_exit() if ($App::trace);
}
@@ -229,7 +233,7 @@
else {
$event_str = "$event->{method}($args_str)";
}
- $self->log({level=>2},"Run Event: $event_str\n");
+ $self->log({level=>2},"Run Event: $event_str\n") if
$self->{options}{poe_trace};
$self->send_event($event);
&App::sub_exit() if ($App::trace);
}
@@ -381,7 +385,7 @@
$cmd = $cmd_fmt;
$cmd =~ s/{host}/$host/g;
$cmd =~ s/{port}/$port/g;
- $self->log("Starting Node [$node]: [$cmd]\n");
+ $self->log("Starting Node [$node]: [$cmd]\n") if
$self->{options}{poe_trace};
system("$cmd < /dev/null &");
}
else {
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm Tue Aug 28
12:01:45 2007
@@ -70,7 +70,7 @@
name => $ikc_name,
timeout => 60,
);
- $self->log({level=>2},"Listening for Inter-Kernel Communications on
$self->{host}:$self->{port}\n");
+ $self->log({level=>2},"Listening for Inter-Kernel Communications on
$self->{host}:$self->{port}\n") if $self->{options}{poe_ikc_debug};
my $session_name = $self->{poe_session_name};
POE::Component::Server::SimpleHTTP->new(
@@ -82,7 +82,7 @@
{ 'DIR' => '.*', 'SESSION' => $session_name, 'EVENT' =>
'poe_http_server_state', },
],
);
- $self->log({level=>2},"Listening for HTTP Requests on
$self->{host}:$self->{options}{http_port}\n");
+ $self->log({level=>2},"Listening for HTTP Requests on
$self->{host}:$self->{options}{http_port}\n") if
$self->{options}{poe_http_debug};
&App::sub_exit() if ($App::trace);
}
@@ -97,7 +97,7 @@
sub _start {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
- $self->log({level=>2},"POE: _start\n");
+ $self->log({level=>2},"POE: _start\n") if $self->{options}{poe_trace};
my $name = $self->{poe_session_name};
$kernel->alias_set($name);
@@ -131,7 +131,7 @@
sub ikc_register {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $remote_kernel_id, $node) = @_[OBJECT, KERNEL, ARG0,
ARG3];
- $self->log({level=>2},"POE: ikc_register ($remote_kernel_id;
node=$node)\n");
+ $self->log({level=>2},"POE: ikc_register ($remote_kernel_id;
node=$node)\n") if $self->{options}{poe_ikc_debug};
$self->{controller_up} = 1;
my ($retval);
&App::sub_exit($retval) if ($App::trace);
@@ -141,7 +141,7 @@
sub ikc_unregister {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $remote_kernel_id, $session_name, $node) = @_[OBJECT,
KERNEL, ARG0, ARG1, ARG3];
- $self->log({level=>2},"POE: ikc_unregister ($remote_kernel_id;
session_name=$session_name; node=$node)\n");
+ $self->log({level=>2},"POE: ikc_unregister ($remote_kernel_id;
session_name=$session_name; node=$node)\n") if $self->{options}{poe_ikc_debug};
$self->{controller_up} = 0;
$kernel->yield("poe_shutdown");
&App::sub_exit() if ($App::trace);
@@ -150,7 +150,7 @@
sub ikc_shutdown {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $session, $heap ) = @_[ OBJECT, KERNEL, SESSION, HEAP
];
- $self->log({level=>2},"POE: ikc_shutdown\n");
+ $self->log({level=>2},"POE: ikc_shutdown\n") if
$self->{options}{poe_ikc_debug};
&App::sub_exit() if ($App::trace);
return;
}
@@ -158,20 +158,26 @@
sub dispatch_events_begin {
&App::sub_entry if ($App::trace);
my ($self) = @_;
- $self->log({level=>2},"Starting Cluster Node on
$self->{host}:$self->{port}\n");
+ $self->log({level=>2},"Starting Cluster Node on
$self->{host}:$self->{port}\n") if $self->{options}{poe_trace};
my $node_heartbeat = $self->{options}{node_heartbeat} || 60;
$self->schedule_event(
method => "send_node_status",
time => time()+5, # immediately ...
interval => $node_heartbeat, # and every X seconds hereafter
);
+ my $node_alarm_interval = $self->{options}{node_alarm_interval} || 5;
+ $self->schedule_event(
+ method => "alarm_noop",
+ #time => time()+5, # immediately ...
+ interval => $node_alarm_interval, # and every X seconds hereafter
+ );
&App::sub_exit() if ($App::trace);
}
sub dispatch_events_end {
&App::sub_entry if ($App::trace);
my ($self) = @_;
- $self->log({level=>2},"Stopping Cluster Node\n");
+ $self->log({level=>2},"Stopping Cluster Node\n") if
$self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
}
@@ -205,6 +211,12 @@
&App::sub_exit() if ($App::trace);
}
+sub alarm_noop {
+ &App::sub_entry if ($App::trace);
+ &App::sub_exit() if ($App::trace);
+ return();
+}
+
sub state {
&App::sub_entry if ($App::trace);
my ($self) = @_;
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm Tue Aug 28
12:01:45 2007
@@ -40,13 +40,13 @@
$self->{hostname} = $host;
$host =~ s/\..*//; # get rid of fully qualified domain name
$self->{host} = $host;
- $options->{port} ||= 8080;
- $self->{port} = $options->{port};
- $options->{http_port} ||= $options->{port}+1;
- $self->{poe_kernel_name} = "poe_$self->{host}_$self->{port}";
- $self->{poe_kernel_http_name} = $self->{poe_kernel_name} . "_httpd";
- $self->{poe_session_name} = "poe_session";
- $self->{poe_kernel} = $poe_kernel;
+ $options->{port} ||= 8080;
+ $self->{port} = $options->{port};
+ $options->{http_port} ||= $options->{port}+1;
+ $self->{poe_kernel_name} = "poe_$self->{host}_$self->{port}";
+ $self->{poe_kernel_http_name} = $self->{poe_kernel_name} . "_httpd";
+ $self->{poe_session_name} = "poe_session";
+ $self->{poe_kernel} = $poe_kernel;
$self->{num_procs} = 0;
$self->{max_procs} = $self->{options}{"app.context.max_procs"} || 10;
@@ -116,7 +116,7 @@
port => $self->{port},
name => $self->{poe_kernel_name},
);
- $self->log({level=>2},"Listening for Inter-Kernel Communications on
$self->{host}:$self->{port}\n");
+ $self->log({level=>2},"Listening for Inter-Kernel Communications on
$self->{host}:$self->{port}\n") if $self->{options}{poe_ikc_debug};
POE::Component::IKC::Responder->spawn();
my $session_name = $self->{poe_session_name};
@@ -129,7 +129,7 @@
{ 'DIR' => '.*', 'SESSION' => $session_name, 'EVENT' =>
'poe_http_server_state', },
],
);
- $self->log({level=>2},"Listening for HTTP Requests on
$self->{host}:$self->{options}{http_port}\n");
+ $self->log({level=>2},"Listening for HTTP Requests on
$self->{host}:$self->{options}{http_port}\n") if
$self->{options}{poe_http_debug};
&App::sub_exit() if ($App::trace);
}
@@ -415,6 +415,9 @@
sub send_async_event_now {
&App::sub_entry if ($App::trace);
my ($self, $event, $callback_event) = @_;
+
+ $self->log({level=>2}, "Server: send_async_event_now()
$event->{name}.$event->{method} : $event->{destination}\n");
+
if ($event->{destination} eq "in_process") {
my $event_token = $self->send_async_event_in_process($event,
$callback_event);
}
@@ -677,14 +680,14 @@
my (@args);
@args = @$args if (ref($args) eq "ARRAY");
@args = ($args) if (!ref($args));
- $self->log({level=>2},"POE: _default - WARNING: Entered an unhandled state
($state) with args (@args)\n");
+ $self->log({level=>2},"POE: _default - WARNING: Entered an unhandled state
($state) with args (@args)\n") if $self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
}
sub _start {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
- $self->log({level=>2},"POE: _start\n");
+ $self->log({level=>2},"POE: _start\n") if $self->{options}{poe_trace};
my $name = $self->{poe_session_name};
$kernel->alias_set($name);
@@ -714,7 +717,7 @@
sub _stop {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap, $state, $args ) = @_[ OBJECT, KERNEL, HEAP,
ARG0, ARG1 ];
- $self->log({level=>2},"POE: _stop\n");
+ $self->log({level=>2},"POE: _stop\n") if $self->{options}{poe_trace};
#sleep(1); # take a second to let child processes to die (perhaps not
necessary, perhaps necessary when using POE::Wheel::Run)
&App::sub_exit() if ($App::trace);
}
@@ -722,7 +725,7 @@
sub ikc_register {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $session_name) = @_[OBJECT, KERNEL, ARG1];
- $self->log({level=>2},"POE: ikc_register ($session_name)\n");
+ $self->log({level=>2},"POE: ikc_register ($session_name)\n") if
$self->{options}{poe_ikc_debug};
my ($retval);
&App::sub_exit($retval) if ($App::trace);
return($retval);
@@ -731,14 +734,14 @@
sub ikc_unregister {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $session_name) = @_[OBJECT, KERNEL, ARG1];
- $self->log({level=>2},"POE: ikc_unregister ($session_name)\n");
+ $self->log({level=>2},"POE: ikc_unregister ($session_name)\n") if
$self->{options}{poe_ikc_debug};
&App::sub_exit() if ($App::trace);
}
sub ikc_shutdown {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $arg0, $arg1, $arg2, $arg3) = @_[OBJECT, KERNEL, ARG0,
ARG1, ARG2, ARG3];
- $self->log({level=>2},"POE: ikc_shutdown args=($arg0, $arg1, $arg2,
$arg3)\n");
+ $self->log({level=>2},"POE: ikc_shutdown args=($arg0, $arg1, $arg2,
$arg3)\n") if $self->{options}{poe_ikc_debug};
&App::sub_exit() if ($App::trace);
return;
}
@@ -746,7 +749,7 @@
sub poe_sigterm {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap, $signame ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ];
- $self->log({level=>2},"POE: poe_sigterm (Caught signal $signame.
quitting.)\n");
+ $self->log({level=>2},"POE: poe_sigterm (Caught signal $signame.
quitting.)\n") if $self->{options}{poe_trace};
# How do I shut down the POE kernel now and exit?
# I think I need to shut down the last session and the kernel will exit.
@@ -760,7 +763,7 @@
sub poe_sigignore {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap, $signame ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ];
- $self->log({level=>2},"POE: poe_sigignore (Caught signal $signame.
quitting.)\n");
+ $self->log({level=>2},"POE: poe_sigignore (Caught signal $signame.
quitting.)\n") if $self->{options}{poe_trace};
&App::sub_exit() if ($App::trace);
}
@@ -770,7 +773,7 @@
#print STDERR "NOTICE: STATE (poe_sigchld) invoked with ($pid, $status)
args\n";
my $exitval = $status >> 8;
my $sig = $status & 255;
- $self->log({level=>2},"POE: poe_sigchld (Child $pid finished
[exitval=$exitval,sig=$sig])\n");
+ $self->log({level=>2},"POE: poe_sigchld (Child $pid finished
[exitval=$exitval,sig=$sig])\n") if $self->{options}{poe_trace};
$self->finish_pid($pid, $exitval, $sig);
$kernel->yield("poe_dispatch_pending_async_events");
&App::sub_exit() if ($App::trace);
@@ -779,7 +782,17 @@
sub poe_alarm {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
- $self->log({level=>2},"POE: poe_alarm (Dispatching pending events and
queueing scheduled events)\n");
+ $self->log({level=>2},"POE: poe_alarm (Dispatching pending events and
queueing scheduled events)\n") if $self->{options}{poe_trace};
+ my $main_service = $self->{main_service};
+ if ($self->{options}{poe_trace}) {
+ for my $async_event (@{$self->{pending_async_events}}) {
+ if ($main_service && $main_service->can("format_async_event")) {
+ my ($event, $callback_event) = @$async_event;
+ my $str = $main_service->format_async_event($event,
$callback_event);
+ $self->log({level=>2},"POE: poe_alarm : pending_async_events :
$str\n");
+ }
+ }
+ }
$kernel->yield("poe_dispatch_pending_async_events");
my $time = time();
my (@events);
@@ -805,7 +818,7 @@
sub poe_shutdown {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $session, $heap ) = @_[ OBJECT, KERNEL, SESSION, HEAP
];
- $self->log({level=>2},"POE: poe_shutdown\n");
+ $self->log({level=>2},"POE: poe_shutdown\n") if
$self->{options}{poe_trace};
# delete all wheels.
delete $heap->{wheel};
@@ -831,7 +844,7 @@
sub poe_dispatch_pending_async_events {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
- $self->log({level=>2},"POE: poe_dispatch_pending_async_events\n");
+ $self->log({level=>2},"POE: poe_dispatch_pending_async_events\n") if
$self->{options}{poe_trace};
my $events_occurred = $self->dispatch_pending_async_events();
$kernel->yield("poe_dispatch_pending_async_events") if ($events_occurred >
0);
&App::sub_exit() if ($App::trace);
@@ -840,9 +853,9 @@
sub poe_event_loop_extension {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
- $self->log({level=>2},"POE: poe_event_loop_extension\n");
+ $self->log({level=>2},"POE: poe_event_loop_extension\n") if
$self->{options}{poe_trace};
my $event_loop_extensions = $self->{event_loop_extensions};
- #$self->log({level=>2},"Event Loop extension ($event_loop_extensions: #="
. ($#$event_loop_extensions+1) . ").\n");
+ #$self->log({level=>2},"Event Loop extension ($event_loop_extensions: #="
. ($#$event_loop_extensions+1) . ").\n") if $self->{options}{poe_trace};
my $async_event_added = 0;
if ($event_loop_extensions && $#$event_loop_extensions > -1) {
my ($extension, $obj, $method, $args, $event_executed);
@@ -852,7 +865,7 @@
$event_executed = $obj->$method(@$args); # execute extension
$async_event_added = 1 if ($event_executed);
#if ($event_executed) {
- # $self->log({level=>2},"Event Loop extension:
${obj}->${method}(@$args) = $event_executed\n");
+ # $self->log({level=>2},"Event Loop extension:
${obj}->${method}(@$args) = $event_executed\n") if $self->{options}{poe_trace};
#}
}
}
@@ -879,7 +892,7 @@
else {
$event_str = "$event->{method}($args_str)";
}
- $self->log({level=>2},"POE: poe_run_event ($event_str)\n");
+ $self->log({level=>2},"POE: poe_run_event ($event_str)\n") if
$self->{options}{poe_trace};
$self->send_event($event);
&App::sub_exit() if ($App::trace);
}
@@ -889,7 +902,7 @@
&App::sub_entry if ($App::trace);
my ($self, $kernel, $args) = @_[OBJECT, KERNEL, ARG0];
my ($sender, $event, $callback_event) = @$args;
- $self->log({level=>2},"POE: poe_enqueue_async_event
($event->{name}.$event->{method})\n");
+ $self->log({level=>2},"POE: poe_enqueue_async_event
($event->{name}.$event->{method})\n") if $self->{options}{poe_trace};
my $runtime_event_token = $self->send_async_event($event, { method =>
"async_event_finished", args => [ $sender, $event, $callback_event ], });
$event->{event_token} = $runtime_event_token;
@@ -903,7 +916,7 @@
&App::sub_entry if ($App::trace);
my ($self, $kernel, $return_values) = @_[OBJECT, KERNEL, ARG0];
my ($runtime_event_token, $async_event) = @$return_values;
- $self->log({level=>2},"POE: poe_enqueue_async_event_finished
($async_event->[0]{name}.$async_event->[0]{method} => $runtime_event_token)\n");
+ $self->log({level=>2},"POE: poe_enqueue_async_event_finished
($async_event->[0]{name}.$async_event->[0]{method} => $runtime_event_token)\n")
if $self->{options}{poe_trace};
$self->{running_async_event}{$runtime_event_token} = $async_event;
&App::sub_exit() if ($App::trace);
}
@@ -936,7 +949,7 @@
if ($async_event) {
my ($event, $callback_event) = @$async_event;
- $self->log({level=>2},"POE: poe_remote_async_event_finished
($event->{name}.$event->{method} => $runtime_event_token)\n");
+ $self->log({level=>2},"POE: poe_remote_async_event_finished
($event->{name}.$event->{method} => $runtime_event_token)\n") if
$self->{options}{poe_trace};
delete $self->{running_async_event}{$runtime_event_token};
my $destination = $event->{destination} || "local";
@@ -949,7 +962,7 @@
}
}
else {
- $self->log({level=>2},"POE: poe_remote_async_event_finished
($runtime_event_token)\n");
+ $self->log({level=>2},"POE: poe_remote_async_event_finished
($runtime_event_token)\n") if $self->{options}{poe_trace};
}
&App::sub_exit() if ($App::trace);
@@ -958,7 +971,7 @@
sub poe_server_state {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
- $self->log({level=>2},"POE: poe_server_state\n");
+ #$self->log({level=>2},"POE: poe_server_state\n") if
$self->{options}{poe_trace};
my $server_state = $self->state();
&App::sub_exit($server_state) if ($App::trace);
return $server_state;
@@ -967,7 +980,7 @@
sub poe_http_server_state {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap, $request, $response ) = @_[ OBJECT, KERNEL,
HEAP, ARG0, ARG1 ];
- $self->log({level=>2},"POE: poe_http_server_state\n");
+ $self->log({level=>2},"POE: poe_http_server_state\n") if
$self->{options}{poe_http_debug};
my $server_state = $kernel->call( $self->{poe_session_name},
'poe_server_state' );
# Build the response.
@@ -984,7 +997,7 @@
sub poe_http_test_run {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap, $request, $response ) = @_[ OBJECT, KERNEL,
HEAP, ARG0, ARG1 ];
- $self->log({level=>2},"POE: poe_http_test_run\n");
+ $self->log({level=>2},"POE: poe_http_test_run\n") if
$self->{options}{poe_http_debug};
my $event = {
service_type => "SessionObject",
name => "mvworkd",