Listing refs, fingerprinting and root scanning can all be
parallelized to reduce runtime on SMP systems.

We'll use DESTROY-based dependency management with
parallelizagion as in LeiMirror to handle ref listing and
fingerprinting before serializing Xapian DB access to check
against the existing fingerprint.

We'll also delay root listing until we get a fingerprint
mismatch to speed up no-op indexing.
---
 lib/PublicInbox/CodeSearchIdx.pm | 197 +++++++++++++++++++++----------
 1 file changed, 132 insertions(+), 65 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 218338da..a926886e 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -26,7 +26,10 @@ use PublicInbox::SHA qw(sha256_hex);
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::SearchIdx qw(add_val);
 use PublicInbox::Config;
-use PublicInbox::Spawn qw(run_die);
+use PublicInbox::Spawn qw(spawn);
+use PublicInbox::OnDestroy;
+our $LIVE; # pid => callback
+our $LIVE_JOBS;
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
 # branches don't diverge by more than this number of commits...
@@ -106,26 +109,27 @@ sub progress {
        $pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n");
 }
 
-sub store_repo ($$) {
-       my ($self, $repo) = @_;
+sub store_repo ($$$) {
+       my ($self, $git, $repo) = @_;
        my $xdb = delete($repo->{shard})->idx_acquire;
        $xdb->begin_transaction;
+       for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed?
        if (defined $repo->{id}) {
                my $doc = $xdb->get_document($repo->{id}) //
-                       die "$self->{git}->{git_dir} doc #$repo->{id} gone";
+                       die "$git->{git_dir} doc #$repo->{id} gone";
                add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct});
-               my %new = map { $_ => undef } @{$self->{roots}};
+               my %new = map { $_ => undef } @{$repo->{roots}};
                my $old = xap_terms('G', $doc);
                delete @new{keys %$old};
                $doc->add_boolean_term('G'.$_) for keys %new;
-               delete @$old{@{$self->{roots}}};
+               delete @$old{@{$repo->{roots}}};
                $doc->remove_term('G'.$_) for keys %$old;
                $doc->set_data($repo->{fp});
                $xdb->replace_document($repo->{id}, $doc);
        } else {
                my $new = $PublicInbox::Search::X{Document}->new;
                add_val($new, PublicInbox::CodeSearch::CT, $repo->{ct});
-               $new->add_boolean_term("P$self->{git}->{git_dir}");
+               $new->add_boolean_term("P$git->{git_dir}");
                $new->add_boolean_term('T'.'r');
                $new->add_boolean_term('G'.$_) for @{$repo->{roots}};
                $new->set_data($repo->{fp}); # \n delimited
@@ -201,75 +205,98 @@ sub docids_by_postlist ($$) { # consider moving to 
PublicInbox::Search
        @ids;
 }
 
-sub get_roots ($$) {
-       my ($self, $refs) = @_;
-       my @roots = $self->{git}->qx([qw(rev-list --stdin --max-parents=0)],
-               undef, { 0 => $refs });
-       die "git rev-list \$?=$?" if $?;
-       sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
-       chomp(@roots);
-       scalar(@roots) ? \@roots : undef;
+sub cidx_reap ($$) {
+       my ($self, $jobs) = @_;
+       while (keys(%$LIVE) >= $jobs) {
+               my $pid = waitpid(-1, 0) // die "waitpid(-1): $!";
+               last if $pid < 0;
+               if (my $x = delete $LIVE->{$pid}) {
+                       my $cb = shift @$x;
+                       $cb->(@$x) if $cb;
+               } else {
+                       warn "reaped unknown PID=$pid ($?)\n";
+               }
+       }
 }
 
 # this is different from the grokmirror-compatible fingerprint since we
 # only care about --heads (branches) and --tags, and not even their names
-sub cidx_fp ($) {
-       my ($self) = @_;
+sub fp_start ($$$) {
+       my ($self, $git, $prep_repo) = @_;
+       return if !$LIVE; # premature exit
+       cidx_reap($self, $LIVE_JOBS);
        open my $refs, '+>', undef or die "open: $!";
-       run_die(['git', "--git-dir=$self->{git}->{git_dir}",
+       my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
                qw(show-ref --heads --tags --hash)], undef, { 1 => $refs });
+       $git->{-repo}->{refs} = $refs;
+       $LIVE->{$pid} = [ \&fp_fini, $self, $git, $prep_repo ];
+}
+
+sub fp_fini {
+       my ($self, $git, $prep_repo) = @_;
+       my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
        seek($refs, 0, SEEK_SET) or die "seek: $!";
        my $buf;
        my $dig = PublicInbox::SHA->new(256);
        while (read($refs, $buf, 65536)) { $dig->add($buf) }
-       sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
-       ($dig->hexdigest, $refs);
+       $git->{-repo}->{fp} = $dig->hexdigest;
 }
 
-# TODO: should we also index gitweb.owner and the full fingerprint for 
grokmirror?
-sub prep_git_dir ($) {
-       my ($self) = @_;
-       my $git_dir = $self->{git}->{git_dir};
-       my $ct = $self->{git}->qx([qw[for-each-ref
-               --sort=-committerdate --format=%(committerdate:raw) --count=1
+sub ct_start ($$$) {
+       my ($self, $git, $prep_repo) = @_;
+       return if !$LIVE; # premature exit
+       cidx_reap($self, $LIVE_JOBS);
+       my ($rd, $pid) = $git->popen([qw[for-each-ref --sort=-committerdate
+               --format=%(committerdate:raw) --count=1
                refs/heads/ refs/tags/]]);
-       my $repo = {};
-       @$repo{qw(fp refs)} = cidx_fp($self);
-       $repo->{roots} = get_roots($self, $repo->{refs});
-       if (!$repo->{roots} || !defined($ct)) {
-               warn "W: $git_dir has no root commits, skipping\n";
+       $LIVE->{$pid} = [ \&ct_fini, $self, $git, $rd, $prep_repo ];
+}
+
+sub ct_fini {
+       my ($self, $git, $rd, $prep_repo) = @_;
+       defined(my $ct = <$rd>) or return;
+       $ct =~ s/\s+.*\z//s; # drop TZ + LF
+       $git->{-repo}->{ct} = $ct + 0;
+}
+
+# TODO: also index gitweb.owner and the full fingerprint for grokmirror?
+sub prep_repo ($$) {
+       my ($self, $git) = @_;
+       return if !$LIVE; # premature exit
+       my $repo = $git->{-repo} // die 'BUG: no {-repo}';
+       my $git_dir = $git->{git_dir};
+       if (!defined($repo->{ct})) {
+               warn "W: $git_dir has no commits, skipping\n";
+               delete $git->{-repo};
                return;
        }
-       $ct =~ s/ .*\z//s; # drop TZ
-       $repo->{ct} = $ct + 0;
        my $n = git_dir_hash($git_dir) % $self->{nshard};
        my $shard = $repo->{shard} = bless { %$self, shard => $n }, ref($self);
        delete @$shard{qw(lockfh lock_path)};
        local $shard->{xdb};
        my $xdb = $shard->idx_acquire;
        my @docids = docids_by_postlist($shard, 'P'.$git_dir);
-       my $docid = shift(@docids) // return $repo;
+       my $docid = shift(@docids) // return get_roots($self, $git);
        if (@docids) {
                warn "BUG: $git_dir indexed multiple times, culling\n";
-               $xdb->begin_transaction;
-               for (@docids) { $xdb->delete_document($_) }
-               $xdb->commit_transaction;
+               $repo->{to_delete} = \@docids; # XXX needed?
        }
        my $doc = $xdb->get_document($docid) //
                die "BUG: no #$docid ($git_dir)";
        my $old_fp = $doc->get_data;
        if ($old_fp eq $repo->{fp}) { # no change
-               progress($self, 'unchanged');
+               progress($self, "$git_dir unchanged");
+               delete $git->{-repo};
                return;
        }
        $repo->{id} = $docid;
-       $repo;
+       get_roots($self, $git);
 }
 
-sub partition_refs ($$) {
-       my ($self, $refs) = @_; # show-ref --heads --tags --hash output
-       my $fh = $self->{git}->popen(qw(rev-list --stdin), undef,
-                                       { 0 => $refs });
+sub partition_refs ($$$) {
+       my ($self, $git, $refs) = @_; # show-ref --heads --tags --hash output
+       sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
+       my $fh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
        close $refs or die "close: $!";
        local $self->{xdb};
        my $xdb = $self->{-opt}->{reindex} ? undef : $self->xdb;
@@ -292,22 +319,27 @@ sub partition_refs ($$) {
        close($fh);
        if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) {
                $self->{nchange} += $nchange;
-               progress($self, "$nchange commits");
+               progress($self, "$git->{git_dir}: $nchange commits");
                for my $fh (@shard_in) {
                        $fh->flush or die "flush: $!";
                        sysseek($fh, 0, SEEK_SET) or die "seek: $!";
                }
                return @shard_in;
        }
-       die "git-rev-list: \$?=$?\n";
+       die "git --git-dir=$git->{git_dir} rev-list: \$?=$?\n";
 }
 
-sub index_git_dir ($$) {
-       my ($self, $git_dir) = @_;
-       local $self->{git} = PublicInbox::Git->new($git_dir); # for ->patch_id
-       my $repo = prep_git_dir($self) or return;
-       local $self->{current_info} = $git_dir;
-       my @shard_in = partition_refs($self, delete($repo->{refs}));
+sub index_repo {
+       my ($self, $git, $roots) = @_;
+       return if !$LIVE; # premature exit
+       my $repo = delete $git->{-repo} or return;
+       seek($roots, 0, SEEK_SET) or die "seek: $!";
+       chomp(my @roots = <$roots>);
+       close($roots) or die "close: $!";
+       @roots or return warn("E: $git->{git_dir} has no root commits\n");
+       $repo->{roots} = \@roots;
+       local $self->{current_info} = $git->{git_dir};
+       my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
        my %pids;
        my $fwd_kill = sub {
                my ($sig) = @_;
@@ -323,12 +355,13 @@ sub index_git_dir ($$) {
                my $pid = fork // die "fork: $!";
                if ($pid == 0) { # no RNG use, here
                        $0 = "code index [$n]";
+                       $self->{git} = $git;
                        $self->{shard} = $n;
                        $self->{current_info} = "$self->{current_info} [$n]";
                        delete @$self{qw(lockfh lock_path)};
                        my $in = $shard_in[$n];
                        @shard_in = ();
-                       $self->{roots} = delete $repo->{roots};
+                       $self->{roots} = \@roots;
                        undef $repo;
                        eval { shard_worker($self, $in, $sigset) };
                        warn "E: $@" if $@;
@@ -339,18 +372,41 @@ sub index_git_dir ($$) {
        }
        PublicInbox::DS::sig_setmask($sigset);
        @shard_in = ();
-       my $err;
+       my ($err, @todo);
        while (keys %pids) {
-               my $pid = waitpid(-1, 0) or last;
-               my $j = delete $pids{$pid} // "unknown PID:$pid";
-               next if $? == 0;
-               warn "PID:$pid $j exited with \$?=$?\n";
-               $err = 1;
+               my $pid = waitpid(-1, 0) // die "waitpid: $!";
+               if (my $j = delete $pids{$pid}) {
+                       next if $? == 0;
+                       warn "PID:$pid $j exited with \$?=$?\n";
+                       $err = 1;
+               } elsif (my $todo = delete $LIVE->{$pid}) {
+                       warn "PID:$pid exited with \$?=$?\n" if $?;
+                       push @todo, $todo;
+               } else {
+                       warn "reaped unknown PID=$pid ($?)\n";
+               }
        }
        die "subprocess(es) failed\n" if $err;
-       store_repo($self, $repo);
-       progress($self, 'done');
+       store_repo($self, $git, $repo);
+       progress($self, "$git->{git_dir}: done");
        # TODO: check fp afterwards?
+       while (my $x = shift @todo) {
+               my $cb = shift @$x;
+               $cb->(@$x) if $cb;
+       }
+}
+
+sub get_roots ($$) {
+       my ($self, $git) = @_;
+       return if !$LIVE; # premature exit
+       cidx_reap($self, $LIVE_JOBS);
+       my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
+       sysseek($refs, 0, SEEK_SET) or die "seek: $!";
+       open my $roots, '+>', undef or die "open: $!";
+       my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
+                       qw(rev-list --stdin --max-parents=0)],
+                       undef, { 0 => $refs, 1 => $roots });
+       $LIVE->{$pid} = [ \&index_repo, $self, $git, $roots ];
 }
 
 # for PublicInbox::SearchIdx::patch_id and with_umask
@@ -389,6 +445,21 @@ W: memory usage may be high for large indexing runs
 EOM
 }
 
+sub scan_git_dirs ($) {
+       my ($self) = @_;
+       local $LIVE_JOBS = $self->{-opt}->{jobs} //
+                       PublicInbox::IPC::detect_nproc() // 2;
+       local $LIVE = {};
+       for (@{$self->{git_dirs}}) {
+               my $git = PublicInbox::Git->new($_);
+               my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,
+                                                       $self, $git);
+               fp_start($self, $git, $prep_repo);
+               ct_start($self, $git, $prep_repo);
+       }
+       cidx_reap($self, 0);
+}
+
 sub cidx_run {
        my ($self) = @_;
        cidx_init($self);
@@ -414,11 +485,7 @@ sub cidx_run {
        }
        local $self->{nchange} = 0;
        # do_prune($self) if $self->{-opt}->{prune}; TODO
-       if ($self->{-opt}->{scan} // 1) {
-               for my $gd (@{$self->{git_dirs}}) {
-                       index_git_dir($self, $gd);
-               }
-       }
+       scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
        $self->lock_release(!!$self->{nchange});
 }
 

Reply via email to