Re: using a separate session to process multiple client inputs from Component::Server::TCP
[EMAIL PROTECTED] wrote: First let me say that I think POE is awesome! It's been a mind-blowing couple of weeks trying to get my brain around it. I am rewriting a Network Monitoring system where the clients periodically connect to my server and send things. Right now I am using a simple accept and forking IO::Socket::INET server. The problem is that we are at the mercy of the clients and sometimes the clients hang or take a long time while sending data. Add to this that many clients may end up connecting at nearly the same time and you have a recipe for out of memory/cannot fork conditions. Some of this is due to other design problems. But I am doing a complete rewrite of the whole thing so I will fix those as well. So on the server I know I want to do something more robust and POE seems to encapsulate that already so that's what I am going to use. Basically my server would be getting an XML document from the client. I'll only process the data from the client after it disconnects. No need to answer back to it (at least not yet). I would also like to batch up the responses and process them with POE::Wheel::Run (because processing the queue involves a lot of slow running parsing of some XML and entering it into a database). So I am building up a queue and periodically processing it. The next step would be to take the entire queue, hand it of to POE::Wheel::Run, zero the queue, and keep moving. In my mind this is a Poe Man's pre-forking server. Here (below) is a skeleton of what I have so far to demonstrate this (but w/o the POE::Wheel::Run). Here are my questions: 1) Am I on the right track to have the Server and the queue processing in separate sessions where the Server "post"s to the queue's session? 2) Is there any way to high-water mark the queue so that it is processed immediately if reaching that mark? Or should I just be happy with periodically processing the queue as I have done below? 3) Am I using delay_set below in the way it was intended to be used? 4) Can anyone point me to examples of timing-out a client connection gracefully after a certain period of time. 5) Am I crazy? Thanks for reading, Lance [snip Lance's code] Sounds like you are on the right track to me, but I am no expert. This is where I begin to wonder if what I am about to say is better off in a separate discussion, that is, why does everyone go directly to the inline states sessions? I have been wondering this for about 4 months (aka since I found out about the object states), I found the object states setup much much much easier to control and work with than the inline states. Possibly because most of the components are done inline or because the cookbook (in order to keep the examples simple) generally uses them? Back to the point I currently have an app in production, that handles files in a similar type of queue to yours. Elements are added to a queue from a directory watcher rather than a socket but the principle is the same. In my scheme I also have a series of queues in order to throttle the processing, aka a single file is processed in 4 steps, each taking a varying amount of time and cpu usage. Yielding 4 queues per direction (inbound and outbound). Each of the queues manipulates an object using the same 'process' method but the object gets blessed from one type of "Processor" to another depending on which queue it is in, this keeps my queue code generic, allows the processors to all act in the same manner (which makes writing each of them simple), while allowing different processing to occur at each stage. Finally, the process method is called within a POE::Wheel::Run as you describe to allow for multi-tasking. Doing all of this with inline states gave me headaches, when I switched to the object states it was so much clearer, the queue itself is just a standard object, things pop in and out of the queue as if it was static, but the implementation that makes it "run" is based on the fact that the queue object wraps a POE::Session, this also allows me to completely de-couple the POE events from the rest of the application, which to me is simpler and keeps my head from falling off. Ok so that is a great story and all but how does it help you? Well I am not completely allowed to post full code, so I have stripped down my process queue, removed names to protect the innocent, removed the logging and other crap and then put it up online (at least temporarily). About the only other thing to know is that I use a home grown Exception object to throw errors, and normally the modules wouldn't have top level module names. I don't know if the code will run out of the box as I may have made some mistakes when cutting it down, but it may give you ideas. Anyone interested in it, or have suggestions of how it can be improved I would love to hear them. If the POE group is at all interested in what I have done, I will take the discussion to the next level wher
using a separate session to process multiple client inputs from Component::Server::TCP
First let me say that I think POE is awesome! It's been a mind-blowing couple of weeks trying to get my brain around it. I am rewriting a Network Monitoring system where the clients periodically connect to my server and send things. Right now I am using a simple accept and forking IO::Socket::INET server. The problem is that we are at the mercy of the clients and sometimes the clients hang or take a long time while sending data. Add to this that many clients may end up connecting at nearly the same time and you have a recipe for out of memory/cannot fork conditions. Some of this is due to other design problems. But I am doing a complete rewrite of the whole thing so I will fix those as well. So on the server I know I want to do something more robust and POE seems to encapsulate that already so that's what I am going to use. Basically my server would be getting an XML document from the client. I'll only process the data from the client after it disconnects. No need to answer back to it (at least not yet). I would also like to batch up the responses and process them with POE::Wheel::Run (because processing the queue involves a lot of slow running parsing of some XML and entering it into a database). So I am building up a queue and periodically processing it. The next step would be to take the entire queue, hand it of to POE::Wheel::Run, zero the queue, and keep moving. In my mind this is a Poe Man's pre-forking server. Here (below) is a skeleton of what I have so far to demonstrate this (but w/o the POE::Wheel::Run). Here are my questions: 1) Am I on the right track to have the Server and the queue processing in separate sessions where the Server "post"s to the queue's session? 2) Is there any way to high-water mark the queue so that it is processed immediately if reaching that mark? Or should I just be happy with periodically processing the queue as I have done below? 3) Am I using delay_set below in the way it was intended to be used? 4) Can anyone point me to examples of timing-out a client connection gracefully after a certain period of time. 5) Am I crazy? Thanks for reading, Lance #!/usr/bin/perl # after http://poe.perl.org/?POE_Cookbook/TCP_Servers use warnings; use strict; use POE qw(Component::Server::TCP); #sub MAX_QUEUE_SIZE { 3 } sub QUEUE_FLUSH_TIME { 10 } # in seconds to wait before flushing the input queue POE::Component::Server::TCP->new( Alias => "server", Port => 11211, ClientInput => sub { my ( $session, $heap, $input ) = @_[ SESSION, HEAP, ARG0 ]; print "Session ", $session->ID(), " got input: $input\n"; $heap->{client_data} .= $input; $heap->{client}->put($input); }, ClientDisconnected => sub { my ( $kernel, $session, $heap ) = @_[ KERNEL, SESSION, HEAP ]; my $client_data = $heap->{client_data}; if ( defined( $client_data ) ) { $kernel->post( "queue_handler" => "add_client_data" => $client_data ); } else { print "server didn't get back any client_data\n"; } }, ); POE::Session->create( inline_states => { _start => sub { my ( $kernel, $heap ) = @_[ KERNEL, HEAP ]; $heap->{queue} = []; $kernel->alias_set("queue_handler"); $kernel->delay_set("process_queue", QUEUE_FLUSH_TIME); }, add_client_data => sub { my ( $kernel, $heap, $client_data ) = @_[ KERNEL, HEAP, ARG0 ]; print "queue_handler session got some client_data: $client_data\n"; push( @{ $heap->{queue} }, $client_data ); # I would like to process the queue immediately if it reaches MAX_QUEUE_SIZE # but the following doesn't work. I know why but I don't know how to fix it. # Maybe I should just depend on periodic processing of the queue instead since # it's being done anyway #if ( ( scalar( @{ $heap->{queue} } ) >= MAX_QUEUE_SIZE ) # && ! $heap->{process_queue_posted} ) { # $kernel->yield("process_queue"); # $heap->{process_queue_posted} = 1; #} }, process_queue => sub { my ( $kernel, $heap ) = @_[ KERNEL, HEAP ]; print "time to process queue\n"; # This would eventually use a POE::Wheel::Run here to process the whole queue while( @{$heap->{queue}} ) { my $client_data = shift( @{$heap->{queue}} ); print "queue_handler session\'s process_queue event processed: $client_data\n"; } # see comments above in add_client_data #$heap->{process_queue_posted} = 0; $kernel->delay_set("process_queue", QUEUE_FLUSH_TIME); } } ); $poe_kernel->run(); exit 0;
Re: Connecting to my POE server using IO::Socket
Wait scratch that ... its the read: while(defined(<$sock>)) { print; } Where the hanging block is happening. And I'm still at a loss as to how to remedy this ... -john John Christian wrote: Hi, I've got a [seemingly] simple question: I'd like to connect to my POE::Wheel::SocketFactory based server with an IO::Socket::INET based script. I'm not doing anything out of the ordinary with my server or client sockets, but the statement in my IO::Socket based client: print $poe_sock "CMD\n"; Doesn't return at all (i.e. it just blocks forever)! Though, the server does indicate that it recieved CMD, and continues accepting other connections just fine. Interestingly, I'm able to telnet to the server and issue commands, which works fine. Also, I tried various "\r\n" combinations in the client just to be thorough, thinking that would be why telnet was succeeding ... but to no avail. Any ideas? -john
PoCo::Client::HTTP dies, can't find request id
I've got a script designed to do mass HTTP grabs, but after ~2000 requests it runs into sub poco_weeble_connect_error { my ($kernel, $heap, $operation, $errnum, $errstr, $wheel_id) = @_[KERNEL, HEAP, ARG0..ARG3]; DEBUG and warn "wheel $wheel_id encountered $operation error $errnum: $errstr\n"; # Drop the wheel and its cross-references. my $request_id = delete $heap->{wheel_to_request}->{$wheel_id}; die "expected a request ID, but there is none" unless defined $request_id; and dies. However, I can't figure out where the info's been lost. My code follows - #!/usr/local/bin/perl use strict; $| = 1; use POE qw/Component::Client::HTTP/; use HTTP::Request; use HTML::HeadParser; open IN, $ARGV[1]; system("mkdir out.$ARGV[1]"); POE::Component::Client::HTTP->spawn( Timeout => 5, Agent => 'Mozilla/4.0 (compatible; MSIE 5.5; Windows 98)', Alias => 'ua' ); sub handler_start { for (1..$ARGV[0]) { start_next_request( @_[ KERNEL ] ); } } sub start_next_request { my ($kernel) = @_; my $line = ; return unless ($line); chomp($line); my $req = new HTTP::Request('GET', "http://$line/";); $kernel->post( 'ua', 'request', 'response', $req ); } sub handler_response { my ($req_p, $res_p) = @_[ ARG0, ARG1 ]; my ($req, $res) = ($req_p->[0], $res_p->[0]); my $url = $req->url(); $url =~ m!http://(.*)/!; my $dom = $1; if ($res->is_success) { open OUT, ">out.$ARGV[1]/$dom"; print OUT $res->content; close OUT; } start_next_request( @_[ KERNEL ] ); } POE::Session->create( inline_states => { _start => \&handler_start, response => \&handler_response } ); $poe_kernel->run(); exit; -- Bring me my etherkiller; Oh clouds unfold! / Bring me the magic smoke of desire I shall not cease from mental fight / Nor shall my LART rest in my hand Till we have buried the bodies / Of all the lusers in all this land -- rpg, ASR[ My homepage is http://www.trout.me.uk/ ]