Author: spadkins
Date: Fri Sep  7 12:42:17 2007
New Revision: 9922

Modified:
   p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
   p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
   p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm

Log:
updates to get cancel to work more correctly, X cancel still broken

Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm     
(original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm     Fri Sep 
 7 12:42:17 2007
@@ -76,7 +76,7 @@
     &App::sub_entry if ($App::trace);
     my ($self, $event, $callback_event) = @_;
 
-    $self->log({level=>2}, "CC: send_async_event_now() 
$event->{name}.$event->{method} : $event->{destination}\n") if 
$self->{options}{poe_trace};
+    #$self->log({level=>2}, "CC: send_async_event_now() 
$event->{name}.$event->{method} : $event->{destination}\n") if 
$self->{options}{poe_trace};
 
     my $destination = $event->{destination};
     if (! defined $destination) {
@@ -146,26 +146,30 @@
 #    $runtime_event_token = "$host-$port-$serial"; -- i.e. a plain event token 
on the node
 sub _abort_running_async_event {
     &App::sub_entry if ($App::trace);
-    my ($self, $runtime_event_token) = @_;
-    my $async_event = $self->{running_async_event}{$runtime_event_token};
-    my ($event, $callback_event) = @$async_event;
-    if ($runtime_event_token =~ /^[0-9]+$/) {
-        kill(9, $runtime_event_token);
-    }
-    elsif ($runtime_event_token =~ /^([^-]+)-([0-9]+)-/) {
-        my $node_host = $1;
-        my $node_port = $2;
+    my ($self, $runtime_event_token, $event, $callback_event) = @_;
+    #$self->log({level=>2}, "CC: _abort_running_async_event : 
runtime_event_token=[$runtime_event_token] : event=[$event] : 
callback_event=[$callback_event]\n");
+    #my $async_event = $self->{running_async_event}{$runtime_event_token};
+    if ($runtime_event_token && $event && $callback_event) {
+        #my ($event, $callback_event) = @$async_event;
+        if ($runtime_event_token =~ /^[0-9]+$/) {
+            kill(9, $runtime_event_token);
+        }
+        elsif ($runtime_event_token =~ /^([^-]+)-([0-9]+)-/) {
+            my $node_host = $1;
+            my $node_port = $2;
 
-        my $remote_server_name = "poe_${node_host}_${node_port}";
-        my $remote_session_alias = $self->{poe_session_name};  # remote is 
same as local
-        my $remote_session_state = "poe_cancel_async_event";
+            my $remote_server_name = "poe_${node_host}_${node_port}";
+            my $remote_session_alias = $self->{poe_session_name};  # remote is 
same as local
+            my $remote_session_state = "poe_cancel_async_event";
 
-        my $kernel = $self->{poe_kernel};
-        $kernel->post("IKC", "post", 
"poe://$remote_server_name/$remote_session_alias/$remote_session_state",
-            [ $runtime_event_token ]);
-    }
-    else {
-        $self->log("ERROR: _abort_running_async_event() 
$event->{name}.$event->{method} : unparseable runtime event token 
[$runtime_event_token]\n");
+            my $kernel = $self->{poe_kernel};
+            #$self->log({level=>2},"CC: _abort_running_async_event : calling 
remote cancel for $runtime_event_token\n");
+            $kernel->post("IKC", "post", 
"poe://$remote_server_name/$remote_session_alias/$remote_session_state",
+                [ $runtime_event_token ]);
+        }
+        else {
+            $self->log("ERROR: _abort_running_async_event() 
$event->{name}.$event->{method} : unparseable runtime event token 
[$runtime_event_token]\n");
+        }
     }
     &App::sub_exit() if ($App::trace);
 }

Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm   (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm   Fri Sep  7 
12:42:17 2007
@@ -49,8 +49,8 @@
     die "Node must have a controller host and port defined 
(\$context->{options}{controller_host} and {controller_port})"
         if (!$self->{controller_host} || !$self->{controller_port});
 
-    #push(@{$self->{poe_states}}, "foo", "bar");
-    #push(@{$self->{poe_ikc_published_states}}, "more", "states");
+    push(@{$self->{poe_states}}, "poe_cancel_async_event");
+    push(@{$self->{poe_ikc_published_states}}, "poe_cancel_async_event");
 
     $self->_init_poe($options);
 
@@ -222,7 +222,7 @@
     my ($self) = @_;
 
     my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
-    my $state = "Cluster Node: $self->{host}:$self->{port}  
procs[$self->{num_procs}/$self->{max_procs}:max]  
async_events[$self->{num_async_events}/$self->{max_async_events}:max]\n[$datetime]\n";
+    my $state = "Cluster Node: Node: $self->{host}:$self->{port}  
procs[$self->{num_procs}/$self->{max_procs}:max]  
async_events[$self->{num_async_events}/$self->{max_async_events}:max]\n[$datetime]\n";
     $state .= "\n";
     $state .= $self->_state();
 
@@ -242,5 +242,52 @@
     return($state);
 }
 
+sub poe_cancel_async_event {
+    &App::sub_entry if ($App::trace);
+    my ( $self, $kernel, $heap, $arg0 ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ];
+    my ($runtime_event_token) = @$arg0;
+    $self->log({level=>2},"CN : poe_cancel_async_event : 
runtime_event_token=[$runtime_event_token]\n");
+    my $async_event = $self->{running_async_event}{$runtime_event_token};
+
+    ### Find if running
+    for my $pid (keys %{$self->{running_async_event}}) {
+        #$self->log({level=>2},"CN : poe_cancel_async_event : 
running_async_event : pid=[$pid]\n");
+        #my $ae = $self->{running_async_event}{$pid};
+        #my $e = join(", ", map {sprintf("$_ = [$ae->[0]{$_}]")} keys 
%{$ae->[0]});
+        #my $ce = join(", ", map {sprintf("$_ = [$ae->[1]{$_}]")} keys 
%{$ae->[1]});
+        #$self->log({level=>2},"CN : poe_cancel_async_event : e=[$e]\n");
+        #$self->log({level=>2},"CN : poe_cancel_async_event : ce=[$ce]\n");
+
+        my $event_token = $self->{running_async_event}{$pid}[0]{event_token};
+        if ($runtime_event_token eq $event_token) {
+            $self->log({level=>2},"CN : poe_cancel_async_event : 
running_async_event : found event_token=[$event_token] pid=[$pid]\n");
+
+            ### Kill it
+            if ($pid =~ /^[0-9]+$/) {
+                kill(9, $pid);
+            }
+
+            ### Remove from pending
+            delete $self->{running_async_event}{$pid};
+
+            last;
+        }
+    }
+
+    ### Find if pending
+    for (my $i = 0; $i < @{$self->{pending_async_events}}; $i++) {
+        my $event_token = $self->{pending_async_events}[$i][0]{event_token};
+        $self->log({level=>2},"CN : poe_cancel_async_event : 
pending_async_events : event_token=[$event_token]\n");
+        if ($runtime_event_token eq $event_token) {
+            splice(@{$self->{pending_async_events}}, $i, 1);
+        }
+        #my $ae = $self->{pending_async_events}{$foo};
+        #my $e = join(", ", map {sprintf("$_ = [$ae->[0]{$_}]")} keys 
%{$ae->[0]});
+        #my $ce = join(", ", map {sprintf("$_ = [$ae->[1]{$_}]")} keys 
%{$ae->[1]});
+        #$self->log({level=>2},"CN : poe_cancel_async_event : e=[$e]\n");
+    }
+     &App::sub_exit() if ($App::trace);
+}
+
 1;
 

Modified: p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm        (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm        Fri Sep  7 
12:42:17 2007
@@ -23,6 +23,8 @@
 use POE::Component::IKC::Server;
 use HTTP::Status qw/RC_OK/;
 use Socket qw(INADDR_ANY);
+use Data::Dumper;
+$Data::Dumper::Terse = 1;
 
 sub _init {
     &App::sub_entry if ($App::trace);
@@ -416,7 +418,7 @@
     &App::sub_entry if ($App::trace);
     my ($self, $event, $callback_event) = @_;
 
-    $self->log({level=>2}, "Server: send_async_event_now() 
$event->{name}.$event->{method} : $event->{destination}\n");
+    #$self->log({level=>2}, "Server: send_async_event_now() 
$event->{name}.$event->{method} : $event->{destination}\n");
 
     if ($event->{destination} eq "in_process") {
         my $event_token = $self->send_async_event_in_process($event, 
$callback_event);
@@ -549,7 +551,8 @@
     my ($runtime_event_token_found, $async_event);
     foreach my $runtime_event_token (keys %$running_async_event) {
         $async_event = $running_async_event->{$runtime_event_token};
-        if ($async_event->[0]{event_token} eq $event_token) {
+        ### async_event = [ event, callback_event ]
+        if ($async_event->[1]{event_token} eq $event_token) {
             $runtime_event_token_found = $runtime_event_token;
             last;
         }
@@ -591,6 +594,9 @@
     my $pending_async_events = $self->{pending_async_events};
     my ($async_event);
     my $aborted = 0;
+
+    #$self->log({level=>2}, "S: abort_async_event : event_token=[$event_token] 
: pending_async_events: ", Dumper($pending_async_events));
+
     # first look for it in the pending list
     for (my $i = 0; $i <= $#$pending_async_events; $i++) {
         $async_event = $pending_async_events->[$i];
@@ -614,10 +620,12 @@
 sub abort_running_async_event {
     &App::sub_entry if ($App::trace);
     my ($self, $runtime_event_token) = @_;
+    #$self->log({level=>2}, "S: abort_running_async_event : 
runtime_event_token=[$runtime_event_token]\n");
     my $running_async_event  = $self->{running_async_event};
     my $pending_async_events = $self->{pending_async_events};
     my $async_event = $running_async_event->{$runtime_event_token};
     if ($async_event) {
+        #$self->log({level=>2}, "S: abort_running_async_event : 
async_event=[$async_event]\n");
         $self->{num_async_events}--;
         delete $self->{running_async_event}{$runtime_event_token};
         unshift(@$pending_async_events, $async_event);

Reply via email to