Author: spadkins
Date: Mon Dec  3 08:21:16 2007
New Revision: 10349

Modified:
   p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm

Log:
fixing the pile up during splitting

Modified: p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm        (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm        Mon Dec  3 
08:21:16 2007
@@ -17,6 +17,7 @@
 use Sys::Hostname;
 use Date::Format;
 use Date::Parse;
+use Time::HiRes qw(gettimeofday tv_interval);
 
 use POE;
 use POE::Component::Server::SimpleHTTP;
@@ -440,35 +441,39 @@
 sub dispatch_pending_async_events {
     &App::sub_entry if ($App::trace);
     my ($self, $max_events) = @_;
-    #$self->log({level=>2},"S: dispatch_pending_async_events : enter : 
max_events=[$max_events]\n") if $self->{options}{poe_trace};
+    my $t0 = [gettimeofday];
+    #$self->log({level=>2},"S: dispatch_pending_async_events : poe_state : \n" 
. $self->_state_poe() . "\n");
+    $self->log({level=>2},"S: dispatch_pending_async_events : enter : 
max_events=[$max_events]\n") if $self->{options}{poe_trace};
     #$self->log({level=>2},"S: dispatch_pending_async_events : 
pending_async_events : " . Dumper($self->{pending_async_events}) . "\n") if 
$self->{options}{poe_trace};
+
     $max_events ||= 9999;
     my $pending_async_events = $self->{pending_async_events};
-    my ($async_event, $assigned, $event, $in_process);
+    my ($async_event, $assigned, $event, $in_process, %unique_events);
     my $events_occurred = 0;
     my $i = 0;
     my $event_capacity_exists = 1;
     my $max_i = $#$pending_async_events;
     while ($i <= $max_i && $events_occurred < $max_events) {
-        #$self->log({level=>2},"S: dispatch_pending_async_events : 
i/max_i=[$i/$max_i] : pending_async_events=[$#$pending_async_events]\n") if 
$self->{options}{poe_trace};
         $async_event = $pending_async_events->[$i];
         $event = $async_event->[0];
         if ($event->{destination}) {
-            #$self->log({level=>2},"S: dispatch_pending_async_events : 
destination=[$event->{destination}]\n") if $self->{options}{poe_trace};
             $self->send_async_event_now(@$async_event);
             $events_occurred ++;
             splice(@$pending_async_events, $i, 1);  # remove 
$pending_async_events->[$i]
             $max_i--;
+
+            $unique_events{"$event->{name} $event->{method}"}++;
         }
         elsif ($event_capacity_exists) {
             $assigned = $self->assign_event_destination($event);
-            #$self->log({level=>2},"S: dispatch_pending_async_events : 
assigned=[$assigned]\n") if $self->{options}{poe_trace};
             if ($assigned) {
                 $self->send_async_event_now(@$async_event);
                 $events_occurred ++;
                 # keep $i the same
                 splice(@$pending_async_events, $i, 1);  # remove 
$pending_async_events->[$i]
                 $max_i--;
+
+                $unique_events{"$event->{name} $event->{method}"}++;
             }
             else {   # [undef] no servers are eligible for assignment
                 $event_capacity_exists = 0;   # there's no sense looking at 
the other pending async events
@@ -479,8 +484,15 @@
             $i++;   # look at the next one
         }
     }
+    my $sum_not_clear_pending_events = 0;
+    for my $key (keys %unique_events) {
+        if ($key ne "mvworkd _clear_pending_hotel_shop_requests") {
+            $sum_not_clear_pending_events += $unique_events{$key};
+        }
+    }
+
+    $self->log({level=>2},"S: dispatch_pending_async_events : exiting : 
events_occurred=[$events_occurred] 
events_not_clear_pending=[$sum_not_clear_pending_events] time=[" . 
sprintf("%.4f", tv_interval($t0, [gettimeofday])) . "]\n") if 
$self->{options}{poe_trace};
     &App::sub_exit($events_occurred) if ($App::trace);
-    #$self->log({level=>2},"S: dispatch_pending_async_events : exiting : 
events_occurred=[$events_occurred]\n") if $self->{options}{poe_trace};
     return($events_occurred);
 }
 
@@ -836,6 +848,42 @@
     return;
 }
 
+sub poe_yield {
+    &App::sub_entry if ($App::trace);
+    my ($self, $kernel, $state, $max_count) = @_;
+
+    $max_count ||= 1;
+    if (!defined($self->{poe_count}{$state})) {
+        $self->{poe_count}{$state} = 1;
+    }
+    else {
+        $self->{poe_count}{$state}++;
+    }    
+    if ($self->{poe_count}{$state} <= $max_count) {
+        $kernel->yield($state);
+    }
+
+    #$self->log({level=>2},"POE: poe_yield : poe_count : " . 
Dumper($self->{poe_count}) . "\n");
+    &App::sub_exit() if ($App::trace);
+    return;
+}
+
+sub poe_yield_acknowledged {
+    &App::sub_entry if ($App::trace);
+    my ($self, $state) = @_;
+
+    if ($self->{poe_count}{$state}) {
+        $self->{poe_count}{$state}--;
+    }    
+    else {
+        $self->{poe_count}{$state} = 0;
+    }
+
+    #$self->log({level=>2},"POE: poe_yield_acknowledged : poe_count : " . 
Dumper($self->{poe_count}) . "\n");
+    &App::sub_exit() if ($App::trace);
+    return;
+}
+
 sub poe_sigterm {
     &App::sub_entry if ($App::trace);
     my ( $self, $kernel, $heap, $signame ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ];
@@ -867,13 +915,14 @@
     my $sig     = $status & 255;
     $self->log({level=>2},"POE: poe_sigchld (Child $pid finished 
[exitval=$exitval,sig=$sig])\n") if $self->{options}{poe_trace};
     $self->finish_pid($pid, $exitval, $sig);
-    $kernel->yield("poe_dispatch_pending_async_events");
+    $self->poe_yield($kernel, "poe_dispatch_pending_async_events");
     &App::sub_exit() if ($App::trace);
 }
 
 sub poe_alarm {
     &App::sub_entry if ($App::trace);
     my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
+    #$self->log({level=>2},"S: poe_alarm : poe_state : \n" . 
$self->_state_poe() . "\n");
     $self->log({level=>2},"POE: poe_alarm (Dispatching pending events and 
queueing scheduled events)\n") if $self->{options}{poe_trace};
     my $main_service = $self->{main_service};
     if ($self->{options}{poe_trace}) {
@@ -881,11 +930,12 @@
             if ($main_service && $main_service->can("format_async_event")) {
                 my ($event, $callback_event) = @$async_event;
                 my $str = $main_service->format_async_event($event, 
$callback_event);
-                $self->log({level=>2},"POE: poe_alarm : pending_async_events : 
$str\n");
+                #$self->log({level=>2},"POE: poe_alarm : pending_async_events 
: $str\n");
             }
         }
     }
     $kernel->yield("poe_dispatch_pending_async_events");
+    $self->poe_yield($kernel, "poe_dispatch_pending_async_events");
     my $time = time();
     my (@events);
     my $events_occurred = 0;
@@ -894,7 +944,7 @@
         $time_of_next_event = $self->get_current_events([EMAIL PROTECTED], 
$time);
         if ($#events > -1) {
             foreach my $event (@events) {
-                $self->log({level=>2},"POE: poe_alarm : yield(poe_run_event, 
$event)\n");
+                #$self->log({level=>2},"POE: poe_alarm : yield(poe_run_event, 
$event)\n");
                 $kernel->yield("poe_run_event", $event);  # put on the POE run 
queue
                 $events_occurred++;
             }
@@ -944,8 +994,13 @@
     &App::sub_entry if ($App::trace);
     my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
     $self->log({level=>2},"POE: poe_dispatch_pending_async_events\n") if 
$self->{options}{poe_trace};
+
+    $self->poe_yield_acknowledged("poe_dispatch_pending_async_events");
     my $events_occurred = $self->dispatch_pending_async_events();
-    $kernel->yield("poe_dispatch_pending_async_events") if ($events_occurred > 
0);
+    ### These are currently causing our serious slowness during lots of 
splitting, fix the dogpile problem!
+    #$kernel->yield("poe_dispatch_pending_async_events") if ($events_occurred 
> 0);
+    $self->poe_yield($kernel, "poe_dispatch_pending_async_events") if 
($events_occurred > 0);
+
     &App::sub_exit() if ($App::trace);
 }
 
@@ -969,7 +1024,8 @@
         }
     }
     if ($async_event_added) {
-        $kernel->yield("poe_dispatch_pending_async_events");
+        #$kernel->yield("poe_dispatch_pending_async_events");
+        $self->poe_yield($kernel, "poe_dispatch_pending_async_events");
         $kernel->yield("poe_event_loop_extension");
     }
     else {

Reply via email to