i have a web crawler using POE::Component::Client::HTTP that has been working fine to suck down non-small binary files (pdf, ps, tar.gz, etc.). i tried to eek out a little more performance by using IO::AIO, but now about 5% of the downloads are corrupt. they are the correct size as indicated by the Content-Length header, but the contents are invalid data. so i'm guessing that the response data is sometimes being written out of order. i know PerlBal does something similar with IO::AIO, so i'm doubting that aio threads are messing the order. can anybody see a heisenbug in my code below? thanks.
sub ua_make_request { my ($kernel, $heap, $session) = @_[ KERNEL, HEAP, SESSION ]; return if $heap->{is_shutdown}; if ( @{ $heap->{ids} } < $MAX_PARALLEL * 2 ) { $kernel->yield('dbi_get_urls'); } my $url= shift @{ $heap->{urls} }; # Check later. unless ($url) { $kernel->delay_set( ua_make_request => 60 ); return; } # construct HTTP::Request and set $file and $path ... aio_mkdir( $path, 0777, sub { my $ret = $_[0]; if ( $ret and not $!{EEXIST} ) { printf STDERR "%s: $!\n", $path; return $kernel->post( $session, ua_cleanup_request => $req, $ret ); } my $file = sprintf "%s.tmp", $heap->{req}{$req}{file}; aio_open( $file, O_WRONLY|O_CREAT|O_TRUNC, 0644, sub { unless ( $heap->{req}{$req}{fh} = $_[0] ) { printf STDERR "%s: $!\n", $file; return $kernel->post( $session, ua_cleanup_request => $req, -1 ); } $kernel->post( ua => request => ua_got_response => $req => "$req", undef, $proxy->proxy ); }); }); } sub ua_got_response { my ($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION]; my ($req, $res, $data) = ( $_[ARG0]->[0], @{ $_[ARG1] } ); # Might have previously cancelled the request. my $href = $heap->{req}{$req} or return; if ($res->is_error) { ... } elsif (defined $data) { my ($fh, $file, $got) = @$href{qw(fh file got)}; my $len = do { use bytes; length $data }; $href->{is_writing} = 1; # Only need to do these tests on the initial response. if ($got == 0) { ... } aio_write( $fh, $got, $len, $data, 0, sub { my $put = $_[0]; if ($put < 0) { printf STDERR "Failed to write to %s: $!\n", $file; $kernel->call( $session, ua_cleanup_request => $req, -1 ); } elsif ($put != $len) { printf STDERR "Only wrote %d / %d bytes for %s\n", $put, $len, $file; $kernel->call( $session, ua_cleanup_request => $req, -1 ); } else { $href->{got} += $len; $href->{is_writing} = 0; $href->{response} = $res; # Call after writing each chunk to avoid a race condition when # the last chunk is scheduled to be written, but the last empty # response is received before the writing is finished. $kernel->yield( ua_finish_response => $res ); } }); } else { $href->{is_done} = 1; $kernel->yield( ua_finish_response => $res ); } } sub ua_finish_response { my ($kernel, $heap, $res) = @_[ KERNEL, HEAP, ARG0 ]; my ($req, $ret) = ( $res->request, -1 ); my $href = $heap->{req}{$req}; return if not $href->{is_done} or $href->{is_writing}; # Prevent race condition. $href->{is_writing} = 1; if ($res->is_error) { ... } else { # rename the file } }