This fixes shutdown handling when shard_index() isn't running
and ensures we can shut down the process more quickly.
---
 lib/PublicInbox/CodeSearchIdx.pm | 55 ++++++++++++++++++++------------
 1 file changed, 34 insertions(+), 21 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index f0b506da..4f91e0b6 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -206,6 +206,7 @@ sub shard_index { # via wq_io_do
        # local-ized in parent before fork
        $TXN_BYTES = $batch_bytes;
        local $self->{git} = $git; # for patchid
+       return if $DO_QUIT;
        my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
        close $in or die "close: $!";
        my $nr = 0;
@@ -216,10 +217,10 @@ sub shard_index { # via wq_io_do
        my $len;
        my $cmt = {};
        local $/ = $FS;
-       my $buf = <$rd> // return; # leading $FS
+       my $buf = <$rd> // return close($rd); # leading $FS
        $buf eq $FS or die "BUG: not LF-NUL: $buf\n";
        $self->begin_txn_lazy;
-       while (defined($buf = <$rd>)) {
+       while (!$DO_QUIT && defined($buf = <$rd>)) {
                chomp($buf);
                $/ = "\n";
                $len = length($buf);
@@ -234,7 +235,6 @@ sub shard_index { # via wq_io_do
                        $TXN_BYTES = $batch_bytes - $len;
                }
                add_commit($self, $cmt);
-               last if $DO_QUIT;
                ++$nr;
                if ($TXN_BYTES <= 0) {
                        cidx_ckpoint($self, "[$n] $nr");
@@ -298,6 +298,7 @@ sub run_todo ($) {
 
 sub need_reap { # post_loop_do
        my (undef, $jobs) = @_;
+       return if !$LIVE || $DO_QUIT;
        scalar(keys(%$LIVE)) > $jobs;
 }
 
@@ -412,7 +413,7 @@ sub check_existing { # retry_reopen callback
 sub partition_refs ($$$) {
        my ($self, $git, $refs) = @_; # show-ref --heads --tags --hash output
        sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
-       my $fh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
+       my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
        close $refs or die "close: $!";
        my ($seen, $nchange) = (0, 0);
        my @shard_in = map {
@@ -421,7 +422,7 @@ sub partition_refs ($$$) {
                $fh;
        } @RDONLY_SHARDS;
 
-       while (defined(my $cmt = <$fh>)) {
+       while (defined(my $cmt = <$rfh>)) {
                chomp $cmt;
                my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS);
                if (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) {
@@ -431,8 +432,13 @@ sub partition_refs ($$$) {
                        ++$nchange;
                        $seen = 0;
                }
+               if ($DO_QUIT) {
+                       close($rfh);
+                       return ();
+               }
        }
-       close($fh);
+       close($rfh);
+       return () if $DO_QUIT;
        if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) {
                $self->{nchange} += $nchange;
                progress($self, "$git->{git_dir}: $nchange commits");
@@ -454,9 +460,18 @@ sub shard_commit { # via wq_io_do
 
 sub consumers_open { # post_loop_do
        my (undef, $consumers) = @_;
+       return if $DO_QUIT;
        scalar(grep { $_->{sock} } values %$consumers);
 }
 
+sub wait_consumers ($$$) {
+       my ($self, $git, $consumers) = @_;
+       local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
+       PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+       my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
+       die "E: $git->{git_dir} $n shards failed" if $n && !$DO_QUIT;
+}
+
 sub commit_used_shards ($$$) {
        my ($self, $git, $consumers) = @_;
        local $self->{-shard_ok} = {};
@@ -466,15 +481,12 @@ sub commit_used_shards ($$$) {
                $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
                $consumers->{$n} = $c;
        }
-       local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
-       PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
-       my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
-       die "E: $git->{git_dir} $n shards failed" if $n;
+       wait_consumers($self, $git, $consumers);
 }
 
 sub index_repo { # cidx_await cb
        my ($self, $git, $roots) = @_;
-       return if $git->{-cidx_err};
+       return if $git->{-cidx_err} || $DO_QUIT;
        my $repo = delete $git->{-repo} or return;
        seek($roots, 0, SEEK_SET) or die "seek: $!";
        chomp(my @roots = <$roots>);
@@ -484,31 +496,29 @@ sub index_repo { # cidx_await cb
        local $self->{current_info} = $git->{git_dir};
        my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
        local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1
-       my %CONSUMERS;
+       my $consumers = {};
        for my $n (0..$#shard_in) {
                -s $shard_in[$n] or next;
+               last if $DO_QUIT;
                my ($c, $p) = PublicInbox::PktOp->pair;
                $c->{ops}->{shard_done} = [ $self ];
                $IDX_SHARDS[$n]->wq_io_do('shard_index',
                                        [ $shard_in[$n], $p->{op_p} ],
                                        $git, $n, \@roots);
-               $CONSUMERS{$n} = $c;
+               $consumers->{$n} = $c;
        }
        @shard_in = ();
-       local @PublicInbox::DS::post_loop_do = (\&consumers_open, \%CONSUMERS);
-       PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
-       my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
-       die "E: $git->{git_dir} $n shards failed" if $n;
+       wait_consumers($self, $git, $consumers);
        if ($DO_QUIT) {
-               commit_used_shards($self, $git, \%CONSUMERS);
+               commit_used_shards($self, $git, $consumers);
                progress($self, "$git->{git_dir}: done");
                return;
        }
        $repo->{git_dir} = $git->{git_dir};
        my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
        if ($id > 0) {
-               $CONSUMERS{$repo->{shard_n}} = undef;
-               commit_used_shards($self, $git, \%CONSUMERS);
+               $consumers->{$repo->{shard_n}} = undef;
+               commit_used_shards($self, $git, $consumers);
                progress($self, "$git->{git_dir}: done");
                return run_todo($self);
        }
@@ -585,6 +595,7 @@ sub scan_git_dirs ($) {
                                                        $self, $git);
                fp_start($self, $git, $prep_repo);
                ct_start($self, $git, $prep_repo);
+               last if $DO_QUIT;
        }
        cidx_reap($self, 0);
 }
@@ -674,8 +685,10 @@ sub ipc_atfork_child {
 
 sub shard_done_wait { # awaitpid cb via ipc_worker_reap
        my ($pid, $shard, $self) = @_;
+       my $quit_req = delete($shard->{-cidx_quit});
+       return if $DO_QUIT || !$LIVE;
        if ($? == 0) { # success
-               delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
+               $quit_req // warn 'BUG: {-cidx_quit} unset';
                return;
        }
        warn "PID:$pid $shard->{shard} exited with \$?=$?\n";

Reply via email to