Re: using a separate session to process multiple client inputs from Component::Server::TCP

2004-03-24 Thread Wiggins d'Anconia
[EMAIL PROTECTED] wrote:
First let me say that I think POE is awesome! It's been a mind-blowing 
couple of weeks trying to get my brain around it.

I am rewriting a Network Monitoring system where the clients periodically 
connect to my server and send things. Right now I am using a simple accept 
and forking IO::Socket::INET server. The problem is that we are at the 
mercy of the clients and sometimes the clients hang or take a long time 
while sending data. Add to this that many clients may end up connecting at 
nearly the same time and you have a recipe for out of memory/cannot fork 
conditions. Some of this is due to other design problems. But I am doing a 
complete rewrite of the whole thing so I will fix those as well. So on the 
server I know I want to do something more robust and POE seems to 
encapsulate that already so that's what I am going to use.

Basically my server would be getting an XML document from the client. I'll 
only process the data from the client after it disconnects. No need to 
answer back to it (at least not yet). I would also like to batch up the 
responses and process them with POE::Wheel::Run (because processing the 
queue involves a lot of slow running parsing of some XML and entering it 
into a database). So I am building up a queue and periodically processing 
it. The next step would be to take the entire queue, hand it of to 
POE::Wheel::Run, zero the queue, and keep moving. In my mind this is a Poe 
Man's pre-forking server. Here (below) is a skeleton of what I have so far 
to demonstrate this (but w/o the POE::Wheel::Run). Here are my questions:

1) Am I on the right track to have the Server and the queue processing in 
separate sessions where the Server "post"s to the queue's session?
2) Is there any way to high-water mark the queue so that it is processed 
immediately if reaching that mark? Or should I just be happy with 
periodically processing the queue as I have done below?
3) Am I using delay_set below in the way it was intended to be used?
4) Can anyone point me to examples of timing-out a client connection 
gracefully after a certain period of time.
5) Am I crazy?

Thanks for reading,
Lance
[snip Lance's code]

Sounds like you are on the right track to me, but I am no expert.  This 
is where I begin to wonder if what I am about to say is better off in a 
separate discussion, that is, why does everyone go directly to the 
inline states sessions?  I have been wondering this for about 4 months 
(aka since I found out about the object states), I found the object 
states setup much much much easier to control and work with than the 
inline states.  Possibly because most of the components are done inline 
or because the cookbook (in order to keep the examples simple) generally 
uses them?  Back to the point

I currently have an app in production, that handles files in a similar 
type of queue to yours.  Elements are added to a queue from a directory 
watcher rather than a socket but the principle is the same. In my scheme 
I also have a series of queues in order to throttle the processing, aka 
a single file is processed in 4 steps, each taking a varying amount of 
time and cpu usage.  Yielding 4 queues per direction (inbound and 
outbound).  Each of the queues manipulates an object using the same 
'process' method but the object gets blessed from one type of 
"Processor" to another depending on which queue it is in, this keeps my 
queue code generic, allows the processors to all act in the same manner 
(which makes writing each of them simple), while allowing different 
processing to occur at each stage. Finally, the process method is called 
within a POE::Wheel::Run as you describe to allow for multi-tasking. 
Doing all of this with inline states gave me headaches, when I switched 
to the object states it was so much clearer, the queue itself is just a 
standard object, things pop in and out of the queue as if it was static, 
but the implementation that makes it "run" is based on the fact that the 
queue object wraps a POE::Session, this also allows me to completely 
de-couple the POE events from the rest of the application, which to me 
is simpler and keeps my head from falling off. Ok so that is a great 
story and all but how does it help you?

Well I am not completely allowed to post full code, so I have stripped 
down my process queue, removed names to protect the innocent, removed 
the logging and other crap and then put it up online (at least 
temporarily).  About the only other thing to know is that I use a home 
grown Exception object to throw errors, and normally the modules 
wouldn't have top level module names.  I don't know if the code will run 
out of the box as I may have made some mistakes when cutting it down, 
but it may give you ideas.  Anyone interested in it, or have suggestions 
of how it can be improved I would love to hear them.  If the POE group 
is at all interested in what I have done, I will take the discussion to 
the next level wher

using a separate session to process multiple client inputs from Component::Server::TCP

2004-03-24 Thread Lance_Braswell
First let me say that I think POE is awesome! It's been a mind-blowing 
couple of weeks trying to get my brain around it.

I am rewriting a Network Monitoring system where the clients periodically 
connect to my server and send things. Right now I am using a simple accept 
and forking IO::Socket::INET server. The problem is that we are at the 
mercy of the clients and sometimes the clients hang or take a long time 
while sending data. Add to this that many clients may end up connecting at 
nearly the same time and you have a recipe for out of memory/cannot fork 
conditions. Some of this is due to other design problems. But I am doing a 
complete rewrite of the whole thing so I will fix those as well. So on the 
server I know I want to do something more robust and POE seems to 
encapsulate that already so that's what I am going to use.

Basically my server would be getting an XML document from the client. I'll 
only process the data from the client after it disconnects. No need to 
answer back to it (at least not yet). I would also like to batch up the 
responses and process them with POE::Wheel::Run (because processing the 
queue involves a lot of slow running parsing of some XML and entering it 
into a database). So I am building up a queue and periodically processing 
it. The next step would be to take the entire queue, hand it of to 
POE::Wheel::Run, zero the queue, and keep moving. In my mind this is a Poe 
Man's pre-forking server. Here (below) is a skeleton of what I have so far 
to demonstrate this (but w/o the POE::Wheel::Run). Here are my questions:

1) Am I on the right track to have the Server and the queue processing in 
separate sessions where the Server "post"s to the queue's session?
2) Is there any way to high-water mark the queue so that it is processed 
immediately if reaching that mark? Or should I just be happy with 
periodically processing the queue as I have done below?
3) Am I using delay_set below in the way it was intended to be used?
4) Can anyone point me to examples of timing-out a client connection 
gracefully after a certain period of time.
5) Am I crazy?

Thanks for reading,
Lance

#!/usr/bin/perl

# after http://poe.perl.org/?POE_Cookbook/TCP_Servers

use warnings;
use strict;

use POE qw(Component::Server::TCP);

#sub MAX_QUEUE_SIZE { 3 }
sub QUEUE_FLUSH_TIME { 10 } # in seconds to wait before flushing the input 
queue

POE::Component::Server::TCP->new(
   Alias => "server",
   Port => 11211,
   ClientInput => sub {
  my ( $session, $heap, $input ) = @_[ SESSION, HEAP, ARG0 ];
  print "Session ", $session->ID(), " got input: $input\n";
  $heap->{client_data} .= $input;
  $heap->{client}->put($input);
   },
   ClientDisconnected => sub {
  my ( $kernel, $session, $heap ) = @_[ KERNEL, SESSION, HEAP ];
  my $client_data = $heap->{client_data};
  if ( defined( $client_data ) ) {
 $kernel->post( "queue_handler" => "add_client_data" => 
$client_data );
  } else {
 print "server didn't get back any client_data\n";
  }
   },
);

POE::Session->create(
   inline_states => {
  _start   => sub {
 my ( $kernel, $heap ) = @_[ KERNEL, HEAP ];
 $heap->{queue} = [];
 $kernel->alias_set("queue_handler");
 $kernel->delay_set("process_queue", QUEUE_FLUSH_TIME);
  },
  add_client_data => sub {
 my ( $kernel, $heap, $client_data ) = @_[ KERNEL, HEAP, ARG0 ];
 print "queue_handler session got some client_data: 
$client_data\n";
 push( @{ $heap->{queue} }, $client_data );

 # I would like to process the queue immediately if it reaches 
MAX_QUEUE_SIZE
 # but the following doesn't work. I know why but I don't know how 
to fix it.
 # Maybe I should just depend on periodic processing of the queue 
instead since
 # it's being done anyway
 #if ( ( scalar( @{ $heap->{queue} } ) >= MAX_QUEUE_SIZE )
 #  && ! $heap->{process_queue_posted} ) {
 #  $kernel->yield("process_queue");
 #  $heap->{process_queue_posted} = 1;
 #}
  },
  process_queue => sub {
 my ( $kernel, $heap ) = @_[ KERNEL, HEAP ];
 print "time to process queue\n";
 # This would eventually use a POE::Wheel::Run here to process the 
whole queue
 while( @{$heap->{queue}} ) {
my $client_data = shift( @{$heap->{queue}} );
print "queue_handler session\'s process_queue event processed: 
$client_data\n";
 }
 # see comments above in add_client_data
 #$heap->{process_queue_posted} = 0;
 $kernel->delay_set("process_queue", QUEUE_FLUSH_TIME);
  }
   }
);

$poe_kernel->run();
exit 0;


Re: Connecting to my POE server using IO::Socket

2004-03-24 Thread John Christian
Wait scratch that ... its the read:

while(defined(<$sock>)) { print; }

Where the hanging block is happening.
And I'm still at a loss as to how to remedy this ...
-john

John Christian wrote:

Hi, I've got a [seemingly] simple question:
I'd like to connect to my POE::Wheel::SocketFactory based server with 
an IO::Socket::INET based script.

I'm not doing anything out of the ordinary with my server or client 
sockets, but the statement in my IO::Socket based client:

print $poe_sock "CMD\n";

Doesn't return at all (i.e. it just blocks forever)!  Though, the 
server does indicate that it recieved CMD, and continues accepting 
other connections just fine.
Interestingly, I'm able to telnet to the server and issue commands, 
which works fine.  Also, I tried various "\r\n" combinations in the 
client just to be thorough, thinking that would be why telnet was 
succeeding ... but to no avail.

Any ideas?

-john




PoCo::Client::HTTP dies, can't find request id

2004-03-24 Thread Matthew Trout
I've got a script designed to do mass HTTP grabs, but after ~2000 requests it
runs into 

sub poco_weeble_connect_error {
  my ($kernel, $heap, $operation, $errnum, $errstr, $wheel_id) =
@_[KERNEL, HEAP, ARG0..ARG3];

  DEBUG and
warn "wheel $wheel_id encountered $operation error $errnum: $errstr\n";

  # Drop the wheel and its cross-references.
  my $request_id = delete $heap->{wheel_to_request}->{$wheel_id};
  die "expected a request ID, but there is none" unless defined $request_id;

and dies. However, I can't figure out where the info's been lost. My code
follows -

#!/usr/local/bin/perl

use strict;
$| = 1;

use POE qw/Component::Client::HTTP/;
use HTTP::Request;
use HTML::HeadParser;

open IN, $ARGV[1];

system("mkdir out.$ARGV[1]");

POE::Component::Client::HTTP->spawn(
Timeout => 5,
Agent => 'Mozilla/4.0 (compatible; MSIE 5.5; Windows 98)',
Alias => 'ua'
);

sub handler_start {
for (1..$ARGV[0]) {
start_next_request( @_[ KERNEL ] );
}
}

sub start_next_request {
my ($kernel) = @_;
my $line = ;
return unless ($line);
chomp($line);
my $req = new HTTP::Request('GET', "http://$line/";);
$kernel->post( 'ua', 'request', 'response', $req );
}

sub handler_response {
my ($req_p, $res_p) = @_[ ARG0, ARG1 ];
my ($req, $res) = ($req_p->[0], $res_p->[0]);
my $url = $req->url();
$url =~ m!http://(.*)/!;
my $dom = $1;
if ($res->is_success) {
  open OUT, ">out.$ARGV[1]/$dom";
  print OUT $res->content;
  close OUT;
}
start_next_request( @_[ KERNEL ] );
}

POE::Session->create(
inline_states => {
_start => \&handler_start,
response => \&handler_response
}
);

$poe_kernel->run();

exit;

-- 
Bring me my etherkiller; Oh clouds unfold! / Bring me the magic smoke of desire
I shall not cease from mental fight / Nor shall my LART rest in my hand
Till we have buried the bodies / Of all the lusers in all this land
  -- rpg, ASR[ My homepage is http://www.trout.me.uk/ ]