i have a web crawler using POE::Component::Client::HTTP that has been
working fine to suck down non-small binary files (pdf, ps, tar.gz,
etc.). i tried to eek out a little more performance by using IO::AIO,
but now about 5% of the downloads are corrupt.  they are the correct
size as indicated by the Content-Length header, but the contents are
invalid data. so i'm guessing that the response data is sometimes
being written out of order.
i know PerlBal does something similar with IO::AIO, so i'm doubting
that  aio threads are messing the order. can anybody see a heisenbug
in my code below? thanks.

sub ua_make_request {
    my ($kernel, $heap, $session) = @_[ KERNEL, HEAP, SESSION ];

    return if $heap->{is_shutdown};

    if ( @{ $heap->{ids} } < $MAX_PARALLEL * 2 ) {
        $kernel->yield('dbi_get_urls');
    }

    my $url= shift @{ $heap->{urls} };

    # Check later.
    unless ($url) {
        $kernel->delay_set( ua_make_request => 60 );
        return;
    }

    # construct HTTP::Request and set $file and $path
    ...

    aio_mkdir( $path, 0777, sub {
        my $ret = $_[0];
        if ( $ret and not $!{EEXIST} ) {
            printf STDERR "%s: $!\n", $path;
            return $kernel->post( $session, ua_cleanup_request =>
$req, $ret );
        }

        my $file = sprintf "%s.tmp", $heap->{req}{$req}{file};
        aio_open( $file, O_WRONLY|O_CREAT|O_TRUNC, 0644, sub {
            unless ( $heap->{req}{$req}{fh} = $_[0] ) {
                printf STDERR "%s: $!\n", $file;
                return $kernel->post( $session, ua_cleanup_request =>
$req, -1 );
            }
            $kernel->post(
                ua => request => ua_got_response => $req => "$req",
undef,
                $proxy->proxy
            );
        });
    });
}

sub ua_got_response {
    my ($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION];
    my ($req, $res, $data) = ( $_[ARG0]->[0], @{ $_[ARG1] } );

    # Might have previously cancelled the request.
    my $href = $heap->{req}{$req} or return;

    if ($res->is_error) {
        ...
    }
    elsif (defined $data) {
        my ($fh, $file, $got) = @$href{qw(fh file got)};
        my $len = do { use bytes; length $data };

        $href->{is_writing} = 1;

        # Only need to do these tests on the initial response.
        if ($got == 0) {
            ...
        }

        aio_write( $fh, $got, $len, $data, 0, sub {
            my $put = $_[0];

            if ($put < 0) {
                printf STDERR "Failed to write to %s: $!\n", $file;
                $kernel->call( $session, ua_cleanup_request => $req,
-1 );
            }
            elsif ($put != $len) {
                printf STDERR "Only wrote %d / %d bytes for %s\n",
$put, $len,
                    $file;
                $kernel->call( $session, ua_cleanup_request => $req,
-1 );
            }
            else {
                $href->{got} += $len;
                $href->{is_writing} = 0;
                $href->{response} = $res;

                # Call after writing each chunk to avoid a race
condition when
                # the last chunk is scheduled to be written, but the
last empty
                # response is received before the writing is finished.
                $kernel->yield( ua_finish_response => $res );
            }
        });
    }
    else {
        $href->{is_done} = 1;
        $kernel->yield( ua_finish_response => $res );
    }
}

sub ua_finish_response {
    my ($kernel, $heap, $res) = @_[ KERNEL, HEAP, ARG0 ];
    my ($req, $ret) = ( $res->request, -1 );
    my $href = $heap->{req}{$req};

    return if not $href->{is_done} or $href->{is_writing};

    # Prevent race condition.
    $href->{is_writing} = 1;

    if ($res->is_error) {
        ...
    }
    else {
        # rename the file
    }
}

Reply via email to