First let me say that I think POE is awesome! It's been a mind-blowing 
couple of weeks trying to get my brain around it.

I am rewriting a Network Monitoring system where the clients periodically 
connect to my server and send things. Right now I am using a simple accept 
and forking IO::Socket::INET server. The problem is that we are at the 
mercy of the clients and sometimes the clients hang or take a long time 
while sending data. Add to this that many clients may end up connecting at 
nearly the same time and you have a recipe for out of memory/cannot fork 
conditions. Some of this is due to other design problems. But I am doing a 
complete rewrite of the whole thing so I will fix those as well. So on the 
server I know I want to do something more robust and POE seems to 
encapsulate that already so that's what I am going to use.

Basically my server would be getting an XML document from the client. I'll 
only process the data from the client after it disconnects. No need to 
answer back to it (at least not yet). I would also like to batch up the 
responses and process them with POE::Wheel::Run (because processing the 
queue involves a lot of slow running parsing of some XML and entering it 
into a database). So I am building up a queue and periodically processing 
it. The next step would be to take the entire queue, hand it of to 
POE::Wheel::Run, zero the queue, and keep moving. In my mind this is a Poe 
Man's pre-forking server. Here (below) is a skeleton of what I have so far 
to demonstrate this (but w/o the POE::Wheel::Run). Here are my questions:

1) Am I on the right track to have the Server and the queue processing in 
separate sessions where the Server "post"s to the queue's session?
2) Is there any way to high-water mark the queue so that it is processed 
immediately if reaching that mark? Or should I just be happy with 
periodically processing the queue as I have done below?
3) Am I using delay_set below in the way it was intended to be used?
4) Can anyone point me to examples of timing-out a client connection 
gracefully after a certain period of time.
5) Am I crazy?

Thanks for reading,
Lance

#!/usr/bin/perl

# after http://poe.perl.org/?POE_Cookbook/TCP_Servers

use warnings;
use strict;

use POE qw(Component::Server::TCP);

#sub MAX_QUEUE_SIZE { 3 }
sub QUEUE_FLUSH_TIME { 10 } # in seconds to wait before flushing the input 
queue

POE::Component::Server::TCP->new(
   Alias => "server",
   Port => 11211,
   ClientInput => sub {
      my ( $session, $heap, $input ) = @_[ SESSION, HEAP, ARG0 ];
      print "Session ", $session->ID(), " got input: $input\n";
      $heap->{client_data} .= $input;
      $heap->{client}->put($input);
   },
   ClientDisconnected => sub {
      my ( $kernel, $session, $heap ) = @_[ KERNEL, SESSION, HEAP ];
      my $client_data = $heap->{client_data};
      if ( defined( $client_data ) ) {
         $kernel->post( "queue_handler" => "add_client_data" => 
$client_data );
      } else {
         print "server didn't get back any client_data\n";
      }
   },
);

POE::Session->create(
   inline_states => {
      _start   => sub {
         my ( $kernel, $heap ) = @_[ KERNEL, HEAP ];
         $heap->{queue} = [];
         $kernel->alias_set("queue_handler");
         $kernel->delay_set("process_queue", QUEUE_FLUSH_TIME);
      },
      add_client_data => sub {
         my ( $kernel, $heap, $client_data ) = @_[ KERNEL, HEAP, ARG0 ];
         print "queue_handler session got some client_data: 
$client_data\n";
         push( @{ $heap->{queue} }, $client_data );

         # I would like to process the queue immediately if it reaches 
MAX_QUEUE_SIZE
         # but the following doesn't work. I know why but I don't know how 
to fix it.
         # Maybe I should just depend on periodic processing of the queue 
instead since
         # it's being done anyway
         #if ( ( scalar( @{ $heap->{queue} } ) >= MAX_QUEUE_SIZE )
         #  && ! $heap->{process_queue_posted} ) {
         #  $kernel->yield("process_queue");
         #  $heap->{process_queue_posted} = 1;
         #}
      },
      process_queue => sub {
         my ( $kernel, $heap ) = @_[ KERNEL, HEAP ];
         print "time to process queue\n";
         # This would eventually use a POE::Wheel::Run here to process the 
whole queue
         while( @{$heap->{queue}} ) {
            my $client_data = shift( @{$heap->{queue}} );
            print "queue_handler session\'s process_queue event processed: 
$client_data\n";
         }
         # see comments above in add_client_data
         #$heap->{process_queue_posted} = 0;
         $kernel->delay_set("process_queue", QUEUE_FLUSH_TIME);
      }
   }
);

$poe_kernel->run();
exit 0;

Reply via email to