Author: spadkins
Date: Fri Sep 7 12:42:17 2007
New Revision: 9922
Modified:
p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
Log:
updates to get cancel to work more correctly, X cancel still broken
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
(original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm Fri Sep
7 12:42:17 2007
@@ -76,7 +76,7 @@
&App::sub_entry if ($App::trace);
my ($self, $event, $callback_event) = @_;
- $self->log({level=>2}, "CC: send_async_event_now()
$event->{name}.$event->{method} : $event->{destination}\n") if
$self->{options}{poe_trace};
+ #$self->log({level=>2}, "CC: send_async_event_now()
$event->{name}.$event->{method} : $event->{destination}\n") if
$self->{options}{poe_trace};
my $destination = $event->{destination};
if (! defined $destination) {
@@ -146,26 +146,30 @@
# $runtime_event_token = "$host-$port-$serial"; -- i.e. a plain event token
on the node
sub _abort_running_async_event {
&App::sub_entry if ($App::trace);
- my ($self, $runtime_event_token) = @_;
- my $async_event = $self->{running_async_event}{$runtime_event_token};
- my ($event, $callback_event) = @$async_event;
- if ($runtime_event_token =~ /^[0-9]+$/) {
- kill(9, $runtime_event_token);
- }
- elsif ($runtime_event_token =~ /^([^-]+)-([0-9]+)-/) {
- my $node_host = $1;
- my $node_port = $2;
+ my ($self, $runtime_event_token, $event, $callback_event) = @_;
+ #$self->log({level=>2}, "CC: _abort_running_async_event :
runtime_event_token=[$runtime_event_token] : event=[$event] :
callback_event=[$callback_event]\n");
+ #my $async_event = $self->{running_async_event}{$runtime_event_token};
+ if ($runtime_event_token && $event && $callback_event) {
+ #my ($event, $callback_event) = @$async_event;
+ if ($runtime_event_token =~ /^[0-9]+$/) {
+ kill(9, $runtime_event_token);
+ }
+ elsif ($runtime_event_token =~ /^([^-]+)-([0-9]+)-/) {
+ my $node_host = $1;
+ my $node_port = $2;
- my $remote_server_name = "poe_${node_host}_${node_port}";
- my $remote_session_alias = $self->{poe_session_name}; # remote is
same as local
- my $remote_session_state = "poe_cancel_async_event";
+ my $remote_server_name = "poe_${node_host}_${node_port}";
+ my $remote_session_alias = $self->{poe_session_name}; # remote is
same as local
+ my $remote_session_state = "poe_cancel_async_event";
- my $kernel = $self->{poe_kernel};
- $kernel->post("IKC", "post",
"poe://$remote_server_name/$remote_session_alias/$remote_session_state",
- [ $runtime_event_token ]);
- }
- else {
- $self->log("ERROR: _abort_running_async_event()
$event->{name}.$event->{method} : unparseable runtime event token
[$runtime_event_token]\n");
+ my $kernel = $self->{poe_kernel};
+ #$self->log({level=>2},"CC: _abort_running_async_event : calling
remote cancel for $runtime_event_token\n");
+ $kernel->post("IKC", "post",
"poe://$remote_server_name/$remote_session_alias/$remote_session_state",
+ [ $runtime_event_token ]);
+ }
+ else {
+ $self->log("ERROR: _abort_running_async_event()
$event->{name}.$event->{method} : unparseable runtime event token
[$runtime_event_token]\n");
+ }
}
&App::sub_exit() if ($App::trace);
}
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm Fri Sep 7
12:42:17 2007
@@ -49,8 +49,8 @@
die "Node must have a controller host and port defined
(\$context->{options}{controller_host} and {controller_port})"
if (!$self->{controller_host} || !$self->{controller_port});
- #push(@{$self->{poe_states}}, "foo", "bar");
- #push(@{$self->{poe_ikc_published_states}}, "more", "states");
+ push(@{$self->{poe_states}}, "poe_cancel_async_event");
+ push(@{$self->{poe_ikc_published_states}}, "poe_cancel_async_event");
$self->_init_poe($options);
@@ -222,7 +222,7 @@
my ($self) = @_;
my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
- my $state = "Cluster Node: $self->{host}:$self->{port}
procs[$self->{num_procs}/$self->{max_procs}:max]
async_events[$self->{num_async_events}/$self->{max_async_events}:max]\n[$datetime]\n";
+ my $state = "Cluster Node: Node: $self->{host}:$self->{port}
procs[$self->{num_procs}/$self->{max_procs}:max]
async_events[$self->{num_async_events}/$self->{max_async_events}:max]\n[$datetime]\n";
$state .= "\n";
$state .= $self->_state();
@@ -242,5 +242,52 @@
return($state);
}
+sub poe_cancel_async_event {
+ &App::sub_entry if ($App::trace);
+ my ( $self, $kernel, $heap, $arg0 ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ];
+ my ($runtime_event_token) = @$arg0;
+ $self->log({level=>2},"CN : poe_cancel_async_event :
runtime_event_token=[$runtime_event_token]\n");
+ my $async_event = $self->{running_async_event}{$runtime_event_token};
+
+ ### Find if running
+ for my $pid (keys %{$self->{running_async_event}}) {
+ #$self->log({level=>2},"CN : poe_cancel_async_event :
running_async_event : pid=[$pid]\n");
+ #my $ae = $self->{running_async_event}{$pid};
+ #my $e = join(", ", map {sprintf("$_ = [$ae->[0]{$_}]")} keys
%{$ae->[0]});
+ #my $ce = join(", ", map {sprintf("$_ = [$ae->[1]{$_}]")} keys
%{$ae->[1]});
+ #$self->log({level=>2},"CN : poe_cancel_async_event : e=[$e]\n");
+ #$self->log({level=>2},"CN : poe_cancel_async_event : ce=[$ce]\n");
+
+ my $event_token = $self->{running_async_event}{$pid}[0]{event_token};
+ if ($runtime_event_token eq $event_token) {
+ $self->log({level=>2},"CN : poe_cancel_async_event :
running_async_event : found event_token=[$event_token] pid=[$pid]\n");
+
+ ### Kill it
+ if ($pid =~ /^[0-9]+$/) {
+ kill(9, $pid);
+ }
+
+ ### Remove from pending
+ delete $self->{running_async_event}{$pid};
+
+ last;
+ }
+ }
+
+ ### Find if pending
+ for (my $i = 0; $i < @{$self->{pending_async_events}}; $i++) {
+ my $event_token = $self->{pending_async_events}[$i][0]{event_token};
+ $self->log({level=>2},"CN : poe_cancel_async_event :
pending_async_events : event_token=[$event_token]\n");
+ if ($runtime_event_token eq $event_token) {
+ splice(@{$self->{pending_async_events}}, $i, 1);
+ }
+ #my $ae = $self->{pending_async_events}{$foo};
+ #my $e = join(", ", map {sprintf("$_ = [$ae->[0]{$_}]")} keys
%{$ae->[0]});
+ #my $ce = join(", ", map {sprintf("$_ = [$ae->[1]{$_}]")} keys
%{$ae->[1]});
+ #$self->log({level=>2},"CN : poe_cancel_async_event : e=[$e]\n");
+ }
+ &App::sub_exit() if ($App::trace);
+}
+
1;
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm Fri Sep 7
12:42:17 2007
@@ -23,6 +23,8 @@
use POE::Component::IKC::Server;
use HTTP::Status qw/RC_OK/;
use Socket qw(INADDR_ANY);
+use Data::Dumper;
+$Data::Dumper::Terse = 1;
sub _init {
&App::sub_entry if ($App::trace);
@@ -416,7 +418,7 @@
&App::sub_entry if ($App::trace);
my ($self, $event, $callback_event) = @_;
- $self->log({level=>2}, "Server: send_async_event_now()
$event->{name}.$event->{method} : $event->{destination}\n");
+ #$self->log({level=>2}, "Server: send_async_event_now()
$event->{name}.$event->{method} : $event->{destination}\n");
if ($event->{destination} eq "in_process") {
my $event_token = $self->send_async_event_in_process($event,
$callback_event);
@@ -549,7 +551,8 @@
my ($runtime_event_token_found, $async_event);
foreach my $runtime_event_token (keys %$running_async_event) {
$async_event = $running_async_event->{$runtime_event_token};
- if ($async_event->[0]{event_token} eq $event_token) {
+ ### async_event = [ event, callback_event ]
+ if ($async_event->[1]{event_token} eq $event_token) {
$runtime_event_token_found = $runtime_event_token;
last;
}
@@ -591,6 +594,9 @@
my $pending_async_events = $self->{pending_async_events};
my ($async_event);
my $aborted = 0;
+
+ #$self->log({level=>2}, "S: abort_async_event : event_token=[$event_token]
: pending_async_events: ", Dumper($pending_async_events));
+
# first look for it in the pending list
for (my $i = 0; $i <= $#$pending_async_events; $i++) {
$async_event = $pending_async_events->[$i];
@@ -614,10 +620,12 @@
sub abort_running_async_event {
&App::sub_entry if ($App::trace);
my ($self, $runtime_event_token) = @_;
+ #$self->log({level=>2}, "S: abort_running_async_event :
runtime_event_token=[$runtime_event_token]\n");
my $running_async_event = $self->{running_async_event};
my $pending_async_events = $self->{pending_async_events};
my $async_event = $running_async_event->{$runtime_event_token};
if ($async_event) {
+ #$self->log({level=>2}, "S: abort_running_async_event :
async_event=[$async_event]\n");
$self->{num_async_events}--;
delete $self->{running_async_event}{$runtime_event_token};
unshift(@$pending_async_events, $async_event);