This avoids forking new shard processes for each repo we scan,
but we can't avoid many excessive commits since we need to
ensure the `seen()' sub can avoid excessive work.
---
 lib/PublicInbox/CodeSearchIdx.pm | 374 ++++++++++++++++++++-----------
 1 file changed, 240 insertions(+), 134 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 02c9ed84..13fe1c28 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -14,9 +14,11 @@
 # See PublicInbox::CodeSearch (read-only API) for more
 package PublicInbox::CodeSearchIdx;
 use v5.12;
-use parent qw(PublicInbox::Lock PublicInbox::CodeSearch 
PublicInbox::SearchIdx);
+# parent order matters, we want ->DESTROY from IPC, not SearchIdx
+use parent qw(PublicInbox::CodeSearch PublicInbox::IPC PublicInbox::SearchIdx);
 use PublicInbox::Eml;
-use PublicInbox::DS ();
+use PublicInbox::DS qw(awaitpid);
+use PublicInbox::PktOp;
 use PublicInbox::IPC qw(nproc_shards);
 use PublicInbox::Admin;
 use POSIX qw(WNOHANG SEEK_SET);
@@ -26,11 +28,19 @@ use PublicInbox::SHA qw(sha256_hex);
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::SearchIdx qw(add_val);
 use PublicInbox::Config;
-use PublicInbox::Spawn qw(spawn);
+use PublicInbox::Spawn qw(spawn popen_rd);
 use PublicInbox::OnDestroy;
-our $LIVE; # pid => callback
-our $LIVE_JOBS;
-our @XDB_SHARDS_FLAT;
+use Socket qw(MSG_EOR);
+use Carp ();
+our (
+       $LIVE, # pid => cmd
+       $DEFER, # [ [ cb, @args ], ... ]
+       $LIVE_JOBS, # integer
+       $MY_SIG, # like %SIG
+       $SIGSET,
+       @RDONLY_SHARDS, # Xapian::Database
+       @IDX_SHARDS # clones of self
+);
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
 # branches don't diverge by more than this number of commits...
@@ -110,14 +120,14 @@ sub progress {
        $pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n");
 }
 
-sub store_repo ($$$) {
-       my ($self, $git, $repo) = @_;
-       my $xdb = delete($repo->{shard})->idx_acquire;
-       $xdb->begin_transaction;
+sub store_repo { # wq_do - returns docid
+       my ($self, $repo) = @_;
+       $self->begin_txn_lazy;
+       my $xdb = $self->{xdb};
        for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed?
-       if (defined $repo->{id}) {
-               my $doc = $xdb->get_document($repo->{id}) //
-                       die "$git->{git_dir} doc #$repo->{id} gone";
+       if (defined $repo->{docid}) {
+               my $doc = $xdb->get_document($repo->{docid}) //
+                       die "$repo->{git_dir} doc #$repo->{docid} gone";
                add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct});
                my %new = map { $_ => undef } @{$repo->{roots}};
                my $old = xap_terms('G', $doc);
@@ -126,34 +136,38 @@ sub store_repo ($$$) {
                delete @$old{@{$repo->{roots}}};
                $doc->remove_term('G'.$_) for keys %$old;
                $doc->set_data($repo->{fp});
-               $xdb->replace_document($repo->{id}, $doc);
+               $xdb->replace_document($repo->{docid}, $doc);
+               $repo->{docid}
        } else {
                my $new = $PublicInbox::Search::X{Document}->new;
                add_val($new, PublicInbox::CodeSearch::CT, $repo->{ct});
-               $new->add_boolean_term("P$git->{git_dir}");
+               $new->add_boolean_term("P$repo->{git_dir}");
                $new->add_boolean_term('T'.'r');
                $new->add_boolean_term('G'.$_) for @{$repo->{roots}};
                $new->set_data($repo->{fp}); # \n delimited
                $xdb->add_document($new);
        }
-       $xdb->commit_transaction;
 }
 
 # sharded reader for `git log --pretty=format: --stdin'
-sub shard_worker ($$$) {
-       my ($self, $r, $sigset) = @_;
+sub shard_index { # via wq_io_do
+       my ($self, $git, $n, $roots) = @_;
+       local $self->{current_info} = "$git->{git_dir} [$n]";
        my ($quit, $cmt);
+       local $self->{roots} = $roots;
+       my $in = delete($self->{0}) // die 'BUG: no {0} input';
+       my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
        my $batch_bytes = $self->{-opt}->{batch_size} //
                                $PublicInbox::SearchIdx::BATCH_BYTES;
        my $max = $batch_bytes;
-       $SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import'
-       $SIG{QUIT} = $SIG{TERM} = $SIG{INT} = sub { $quit = shift };
-       PublicInbox::DS::sig_setmask($sigset);
-
-       # the parent process of this shard process writes directly to
-       # the stdin of `git log', we consume git log's stdout:
-       my $rd = $self->{git}->popen(@LOG_STDIN, undef, { 0 => $r });
-       close $r or die "close: $!";
+       my $set_quit = sub { $quit = shift };
+       local $SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import'
+       local $SIG{QUIT} = $set_quit;
+       local $SIG{TERM} = $set_quit;
+       local $SIG{INT} = $set_quit;
+       local $self->{git} = $git; # for patchid
+       my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
+       close $in or die "close: $!";
        my $nr = 0;
 
        # a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4
@@ -162,8 +176,7 @@ sub shard_worker ($$$) {
        local $/ = $FS;
        my $buf = <$rd> // return; # leading $FS
        $buf eq $FS or die "BUG: not LF-NUL: $buf\n";
-       my $xdb = $self->idx_acquire;
-       $xdb->begin_transaction;
+       $self->begin_txn_lazy;
        while (defined($buf = <$rd>)) {
                chomp($buf);
                $max -= length($buf);
@@ -174,24 +187,40 @@ sub shard_worker ($$$) {
                ++$nr;
                if ($max <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
                        progress($self, $nr);
-                       $xdb->commit_transaction;
+                       $self->{xdb}->commit_transaction;
                        $max = $batch_bytes;
-                       $xdb->begin_transaction;
+                       $self->{xdb}->begin_transaction;
                }
                $/ = $FS;
        }
        close($rd);
        if (!$? || ($quit && ($? & 127) == POSIX::SIGPIPE)) {
-               $xdb->commit_transaction;
+               send($op_p, "shard_done $n", MSG_EOR);
        } else {
                warn "E: git @LOG_STDIN: \$?=$?\n";
-               $xdb->cancel_transaction;
+               $self->{xdb}->cancel_transaction;
        }
 }
 
+sub shard_done { # called via PktOp on shard_index completion
+       my ($self, $n) = @_;
+       $self->{-shard_ok}->{$n} = 1 if defined($self->{-shard_ok});
+}
+
 sub seen ($$) {
        my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH"
-       $xdb->postlist_begin($q) != $xdb->postlist_end($q)
+       for (1..100) {
+               my $ret = eval {
+                       $xdb->postlist_begin($q) != $xdb->postlist_end($q);
+               };
+               return $ret unless $@;
+               if (ref($@) =~ /\bDatabaseModifiedError\b/) {
+                       $xdb->reopen;
+               } else {
+                       Carp::croak($@);
+               }
+       }
+       Carp::croak('too many Xapian DB modifications in progress');
 }
 
 # used to select the shard for a GIT_DIR
@@ -206,18 +235,42 @@ sub docids_by_postlist ($$) { # consider moving to 
PublicInbox::Search
        @ids;
 }
 
+sub run_todo ($) {
+       my ($self) = @_;
+       my $n;
+       while (defined(my $x = shift(@{$self->{todo} // []}))) {
+               my $cb = shift @$x;
+               $cb->(@$x);
+               ++$n;
+       }
+       $n;
+}
+
 sub cidx_reap ($$) {
        my ($self, $jobs) = @_;
-       while (keys(%$LIVE) >= $jobs) {
-               my $pid = waitpid(-1, 0) // die "waitpid(-1): $!";
-               last if $pid < 0;
-               if (my $x = delete $LIVE->{$pid}) {
-                       my $cb = shift @$x;
-                       $cb->(@$x) if $cb;
-               } else {
-                       warn "reaped unknown PID=$pid ($?)\n";
-               }
+       while (run_todo($self)) {}
+       my $cb = sub { keys(%$LIVE) > $jobs };
+       PublicInbox::DS->SetPostLoopCallback($cb);
+       PublicInbox::DS::event_loop($MY_SIG, $SIGSET) while $cb->();
+       while (!$jobs && run_todo($self)) {}
+}
+
+sub cidx_await_cb { # awaitpid cb
+       my ($pid, $cb, $self, $git, @args) = @_;
+       return if !$LIVE; # premature shutdown
+       my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd';
+       PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC
+       if ($?) {
+               $git->{-cidx_err} = 1;
+               return warn("@$cmd error: \$?=$?\n");
        }
+       push(@$DEFER, [ $cb, $self, $git, @args ]) if $DEFER;
+}
+
+sub cidx_await ($$$$$@) {
+       my ($pid, $cmd, $cb, $self, $git, @args) = @_;
+       $LIVE->{$pid} = $cmd;
+       awaitpid($pid, \&cidx_await_cb, $cb, $self, $git, @args);
 }
 
 # this is different from the grokmirror-compatible fingerprint since we
@@ -227,13 +280,14 @@ sub fp_start ($$$) {
        return if !$LIVE; # premature exit
        cidx_reap($self, $LIVE_JOBS);
        open my $refs, '+>', undef or die "open: $!";
-       my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
-               qw(show-ref --heads --tags --hash)], undef, { 1 => $refs });
+       my $cmd = ['git', "--git-dir=$git->{git_dir}",
+               qw(show-ref --heads --tags --hash)];
+       my $pid = spawn($cmd, undef, { 1 => $refs });
        $git->{-repo}->{refs} = $refs;
-       $LIVE->{$pid} = [ \&fp_fini, $self, $git, $prep_repo ];
+       cidx_await($pid, $cmd, \&fp_fini, $self, $git, $prep_repo);
 }
 
-sub fp_fini {
+sub fp_fini { # cidx_await cb
        my ($self, $git, $prep_repo) = @_;
        my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
        seek($refs, 0, SEEK_SET) or die "seek: $!";
@@ -247,13 +301,15 @@ sub ct_start ($$$) {
        my ($self, $git, $prep_repo) = @_;
        return if !$LIVE; # premature exit
        cidx_reap($self, $LIVE_JOBS);
-       my ($rd, $pid) = $git->popen([qw[for-each-ref --sort=-committerdate
+       my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
+               qw[for-each-ref --sort=-committerdate
                --format=%(committerdate:raw) --count=1
-               refs/heads/ refs/tags/]]);
-       $LIVE->{$pid} = [ \&ct_fini, $self, $git, $rd, $prep_repo ];
+               refs/heads/ refs/tags/] ];
+       my ($rd, $pid) = popen_rd($cmd);
+       cidx_await($pid, $cmd, \&ct_fini, $self, $git, $rd, $prep_repo);
 }
 
-sub ct_fini {
+sub ct_fini { # cidx_await cb
        my ($self, $git, $rd, $prep_repo) = @_;
        defined(my $ct = <$rd>) or return;
        $ct =~ s/\s+.*\z//s; # drop TZ + LF
@@ -263,34 +319,38 @@ sub ct_fini {
 # TODO: also index gitweb.owner and the full fingerprint for grokmirror?
 sub prep_repo ($$) {
        my ($self, $git) = @_;
-       return if !$LIVE; # premature exit
+       return if !$LIVE || $git->{-cidx_err}; # premature exit
        my $repo = $git->{-repo} // die 'BUG: no {-repo}';
-       my $git_dir = $git->{git_dir};
        if (!defined($repo->{ct})) {
-               warn "W: $git_dir has no commits, skipping\n";
+               warn "W: $git->{git_dir} has no commits, skipping\n";
                delete $git->{-repo};
                return;
        }
-       my $n = git_dir_hash($git_dir) % $self->{nshard};
-       my $shard = $repo->{shard} = bless { %$self, shard => $n }, ref($self);
+       my $n = git_dir_hash($git->{git_dir}) % $self->{nshard};
+       my $shard = bless { %$self, shard => $n }, ref($self);
+       $repo->{shard_n} = $n;
        delete @$shard{qw(lockfh lock_path)};
-       my $xdb = $XDB_SHARDS_FLAT[$n] // die "BUG: shard[$n] undef";
-       $xdb->reopen;
-       my @docids = docids_by_postlist({ xdb => $xdb }, 'P'.$git_dir);
+       local $shard->{xdb} = $RDONLY_SHARDS[$n] // die "BUG: shard[$n] undef";
+       $shard->retry_reopen(\&check_existing, $self, $git);
+}
+
+sub check_existing { # retry_reopen callback
+       my ($shard, $self, $git) = @_;
+       my @docids = docids_by_postlist($shard, 'P'.$git->{git_dir});
        my $docid = shift(@docids) // return get_roots($self, $git);
-       if (@docids) {
-               warn "BUG: $git_dir indexed multiple times, culling\n";
-               $repo->{to_delete} = \@docids; # XXX needed?
-       }
-       my $doc = $xdb->get_document($docid) //
-               die "BUG: no #$docid ($git_dir)";
+       my $doc = $shard->{xdb}->get_document($docid) //
+                       die "BUG: no #$docid ($git->{git_dir})";
        my $old_fp = $doc->get_data;
-       if ($old_fp eq $repo->{fp}) { # no change
-               progress($self, "$git_dir unchanged");
+       if ($old_fp eq $git->{-repo}->{fp}) { # no change
+               progress($self, "$git->{git_dir} unchanged");
                delete $git->{-repo};
                return;
        }
-       $repo->{id} = $docid;
+       $git->{-repo}->{docid} = $docid;
+       if (@docids) {
+               warn "BUG: $git->{git_dir} indexed multiple times, culling\n";
+               $git->{-repo}->{to_delete} = \@docids; # XXX needed?
+       }
        get_roots($self, $git);
 }
 
@@ -304,12 +364,12 @@ sub partition_refs ($$$) {
                $_->reopen;
                open my $fh, '+>', undef or die "open: $!";
                $fh;
-       } @XDB_SHARDS_FLAT;
+       } @RDONLY_SHARDS;
 
        while (defined(my $cmt = <$fh>)) {
                chomp $cmt;
-               my $n = hex(substr($cmt, 0, 8)) % scalar(@XDB_SHARDS_FLAT);
-               if (seen($XDB_SHARDS_FLAT[$n], 'Q'.$cmt)) {
+               my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS);
+               if (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) {
                        last if ++$seen > $SEEN_MAX;
                } else {
                        say { $shard_in[$n] } $cmt or die "say: $!";
@@ -330,9 +390,33 @@ sub partition_refs ($$$) {
        die "git --git-dir=$git->{git_dir} rev-list: \$?=$?\n";
 }
 
-sub index_repo {
+sub shard_commit { # via wq_io_do
+       my ($self, $n) = @_;
+       my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
+       $self->commit_txn_lazy;
+       send($op_p, "shard_done $n", MSG_EOR);
+}
+
+sub commit_used_shards ($$$) {
+       my ($self, $git, $consumers) = @_;
+       local $self->{-shard_ok} = {};
+       for my $n (keys %$consumers) {
+               my ($c, $p) = PublicInbox::PktOp->pair;
+               $c->{ops}->{shard_done} = [ $self ];
+               $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
+               $consumers->{$n} = $c;
+       }
+       PublicInbox::DS->SetPostLoopCallback(sub {
+               scalar(grep { $_->{sock} } values %$consumers);
+       });
+       PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+       my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
+       die "E: $git->{git_dir} $n shards failed" if $n;
+}
+
+sub index_repo { # cidx_await cb
        my ($self, $git, $roots) = @_;
-       return if !$LIVE; # premature exit
+       return if $git->{-cidx_err};
        my $repo = delete $git->{-repo} or return;
        seek($roots, 0, SEEK_SET) or die "seek: $!";
        chomp(my @roots = <$roots>);
@@ -341,73 +425,45 @@ sub index_repo {
        $repo->{roots} = \@roots;
        local $self->{current_info} = $git->{git_dir};
        my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
-       my %pids;
-       my $fwd_kill = sub {
-               my ($sig) = @_;
-               kill($sig, $_) for keys %pids;
-       };
-       local $SIG{USR1} = $fwd_kill;
-       local $SIG{QUIT} = $fwd_kill;
-       local $SIG{INT} = $fwd_kill;
-       local $SIG{TERM} = $fwd_kill;
-       my $sigset = PublicInbox::DS::block_signals();
-       for (my $n = 0; $n <= $#shard_in; $n++) {
+       local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1
+       my %CONSUMERS;
+       for my $n (0..$#shard_in) {
                -s $shard_in[$n] or next;
-               my $pid = fork // die "fork: $!";
-               if ($pid == 0) { # no RNG use, here
-                       $0 = "code index [$n]";
-                       $self->{git} = $git;
-                       $self->{shard} = $n;
-                       $self->{current_info} = "$self->{current_info} [$n]";
-                       delete @$self{qw(lockfh lock_path)};
-                       my $in = $shard_in[$n];
-                       @shard_in = ();
-                       $self->{roots} = \@roots;
-                       undef $repo;
-                       eval { shard_worker($self, $in, $sigset) };
-                       warn "E: $@" if $@;
-                       POSIX::_exit($@ ? 1 : 0);
-               } else {
-                       $pids{$pid} = "code index [$n]";
-               }
+               my ($c, $p) = PublicInbox::PktOp->pair;
+               $c->{ops}->{shard_done} = [ $self ];
+               $IDX_SHARDS[$n]->wq_io_do('shard_index',
+                                       [ $shard_in[$n], $p->{op_p} ],
+                                       $git, $n, \@roots);
+               $CONSUMERS{$n} = $c;
        }
-       PublicInbox::DS::sig_setmask($sigset);
        @shard_in = ();
-       my ($err, @todo);
-       while (keys %pids) {
-               my $pid = waitpid(-1, 0) // die "waitpid: $!";
-               if (my $j = delete $pids{$pid}) {
-                       next if $? == 0;
-                       warn "PID:$pid $j exited with \$?=$?\n";
-                       $err = 1;
-               } elsif (my $todo = delete $LIVE->{$pid}) {
-                       warn "PID:$pid exited with \$?=$?\n" if $?;
-                       push @todo, $todo;
-               } else {
-                       warn "reaped unknown PID=$pid ($?)\n";
-               }
-       }
-       die "subprocess(es) failed\n" if $err;
-       store_repo($self, $git, $repo);
-       progress($self, "$git->{git_dir}: done");
-       # TODO: check fp afterwards?
-       while (my $x = shift @todo) {
-               my $cb = shift @$x;
-               $cb->(@$x) if $cb;
+       PublicInbox::DS->SetPostLoopCallback(sub {
+               scalar(grep { $_->{sock} } values %CONSUMERS);
+       });
+       PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+       my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
+       die "E: $git->{git_dir} $n shards failed" if $n;
+       $repo->{git_dir} = $git->{git_dir};
+       my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
+       if ($id > 0) {
+               $CONSUMERS{$repo->{shard_n}} = undef;
+               commit_used_shards($self, $git, \%CONSUMERS);
+               progress($self, "$git->{git_dir}: done");
+               return run_todo($self);
        }
+       die "E: store_repo $git->{git_dir}: id=$id";
 }
 
 sub get_roots ($$) {
        my ($self, $git) = @_;
        return if !$LIVE; # premature exit
-       cidx_reap($self, $LIVE_JOBS);
        my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
        sysseek($refs, 0, SEEK_SET) or die "seek: $!";
        open my $roots, '+>', undef or die "open: $!";
-       my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
-                       qw(rev-list --stdin --max-parents=0)],
-                       undef, { 0 => $refs, 1 => $roots });
-       $LIVE->{$pid} = [ \&index_repo, $self, $git, $roots ];
+       my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
+                       qw(rev-list --stdin --max-parents=0) ];
+       my $pid = spawn($cmd, undef, { 0 => $refs, 1 => $roots });
+       cidx_await($pid, $cmd, \&index_repo, $self, $git, $roots);
 }
 
 # for PublicInbox::SearchIdx::patch_id and with_umask
@@ -434,9 +490,17 @@ sub cidx_init ($) {
                warn "# creating $dir\n" if !$self->{-opt}->{quiet};
                File::Path::mkpath($dir);
        }
+       $self->lock_acquire;
+       my @shards;
        for my $n (0..($self->{nshard} - 1)) {
                my $shard = bless { %$self, shard => $n }, ref($self);
+               delete @$shard{qw(lockfh lock_path)};
                $shard->idx_acquire;
+               $shard->idx_release;
+               $shard->wq_workers_start("shard[$n]", 1, undef, {
+                       siblings => \@shards, # for ipc_atfork_child
+               }, \&shard_done_wait, $self);
+               push @shards, $shard;
        }
        # this warning needs to happen after idx_acquire
        state $once;
@@ -444,14 +508,11 @@ sub cidx_init ($) {
 W: Xapian v1.2.21..v1.2.24 were missing close-on-exec on OFD locks,
 W: memory usage may be high for large indexing runs
 EOM
+       @shards;
 }
 
 sub scan_git_dirs ($) {
        my ($self) = @_;
-       local $LIVE_JOBS = $self->{-opt}->{jobs} //
-                       PublicInbox::IPC::detect_nproc() // 2;
-       local $LIVE = {};
-       local @XDB_SHARDS_FLAT = $self->xdb_shards_flat;
        for (@{$self->{git_dirs}}) {
                my $git = PublicInbox::Git->new($_);
                my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,
@@ -462,18 +523,31 @@ sub scan_git_dirs ($) {
        cidx_reap($self, 0);
 }
 
-sub cidx_run {
+sub shards_active { # PostLoopCallback
+       scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
+}
+
+sub cidx_run { # main entry point
        my ($self) = @_;
-       cidx_init($self);
+       local $self->{todo} = [];
+       local $DEFER = $self->{todo};
+       local $SIGSET = PublicInbox::DS::block_signals();
+       my $restore = PublicInbox::OnDestroy->new($$,
+               \&PublicInbox::DS::sig_setmask, $SIGSET);
+       local $LIVE = {};
+       local @IDX_SHARDS = cidx_init($self);
        local $self->{current_info} = '';
        my $cb = $SIG{__WARN__} || \&CORE::warn;
+       local $MY_SIG = {
+               CHLD => \&PublicInbox::DS::enqueue_reap,
+               INT => sub { exit },
+       };
        local $SIG{__WARN__} = sub {
                my $m = shift @_;
                $self->{current_info} eq '' or
                        $m =~ s/\A(#?\s*)/$1$self->{current_info}: /;
                $cb->($m, @_);
        };
-       $self->lock_acquire;
        load_existing($self);
        my @nc = grep { File::Spec->canonpath($_) ne $_ } @{$self->{git_dirs}};
        if (@nc) {
@@ -486,9 +560,41 @@ sub cidx_run {
                warn "E: canonicalized and attempting to continue\n";
        }
        local $self->{nchange} = 0;
+       local $LIVE_JOBS = $self->{-opt}->{jobs} ||
+                       PublicInbox::IPC::detect_nproc() || 2;
+       local @RDONLY_SHARDS = $self->xdb_shards_flat;
+
        # do_prune($self) if $self->{-opt}->{prune}; TODO
        scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
+
+       for my $s (@IDX_SHARDS) {
+               $s->{-cidx_quit} = 1;
+               $s->wq_close;
+       }
+
+       PublicInbox::DS->SetPostLoopCallback(\&shards_active);
+       PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active();
        $self->lock_release(!!$self->{nchange});
 }
 
+sub ipc_atfork_child {
+       my ($self) = @_;
+       $self->SUPER::ipc_atfork_child;
+       my $x = delete $self->{siblings} // die 'BUG: no {siblings}';
+       $_->wq_close for @$x;
+}
+
+sub shard_done_wait { # awaitpid cb via ipc_worker_reap
+       my ($pid, $shard, $self) = @_;
+       delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
+       return unless $?;
+       warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
+       ++$self->{shard_err} if defined($self->{shard_err});
+}
+
+sub with_umask { # TODO
+       my ($self, $cb, @arg) = @_;
+       $cb->(@arg);
+}
+
 1;

Reply via email to