While individual Xapian shards are consistent due to the use of
Xapian transactions, the data across shards still needs to be
in a consistent state for our search to work.
---
 lib/PublicInbox/CodeSearchIdx.pm | 71 +++++++++++++++++++++-----------
 1 file changed, 48 insertions(+), 23 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 1a472b64..82f90368 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -38,6 +38,8 @@ our (
        $LIVE_JOBS, # integer
        $MY_SIG, # like %SIG
        $SIGSET,
+       $TXN_BYTES, # number of bytes in current shard transaction
+       $DO_QUIT, # signal number
        @RDONLY_SHARDS, # Xapian::Database
        @IDX_SHARDS # clones of self
 );
@@ -153,18 +155,14 @@ sub store_repo { # wq_do - returns docid
 sub shard_index { # via wq_io_do
        my ($self, $git, $n, $roots) = @_;
        local $self->{current_info} = "$git->{git_dir} [$n]";
-       my ($quit, $cmt);
+       my $cmt;
        local $self->{roots} = $roots;
        my $in = delete($self->{0}) // die 'BUG: no {0} input';
        my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
        my $batch_bytes = $self->{-opt}->{batch_size} //
                                $PublicInbox::SearchIdx::BATCH_BYTES;
-       my $max = $batch_bytes;
-       my $set_quit = sub { $quit = shift };
-       local $SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import'
-       local $SIG{QUIT} = $set_quit;
-       local $SIG{TERM} = $set_quit;
-       local $SIG{INT} = $set_quit;
+       # local-ized in parent before fork
+       $TXN_BYTES = $batch_bytes;
        local $self->{git} = $git; # for patchid
        my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
        close $in or die "close: $!";
@@ -179,22 +177,23 @@ sub shard_index { # via wq_io_do
        $self->begin_txn_lazy;
        while (defined($buf = <$rd>)) {
                chomp($buf);
-               $max -= length($buf);
+               $TXN_BYTES -= length($buf);
                @$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
                $/ = "\n";
                add_commit($self, $cmt);
-               last if $quit; # likely SIGPIPE
+               last if $DO_QUIT;
                ++$nr;
-               if ($max <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
+               if ($TXN_BYTES <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) 
{
                        progress($self, "[$n] $nr");
                        $self->{xdb}->commit_transaction;
-                       $max = $batch_bytes;
+                       $TXN_BYTES = $batch_bytes;
                        $self->{xdb}->begin_transaction;
                }
                $/ = $FS;
        }
        close($rd);
-       if (!$? || ($quit && ($? & 127) == POSIX::SIGPIPE)) {
+       if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
+                               ($? & 127) == POSIX::SIGPIPE))) {
                send($op_p, "shard_done $n", MSG_EOR);
        } else {
                warn "E: git @LOG_STDIN: \$?=$?\n";
@@ -254,7 +253,7 @@ sub need_reap { # post_loop_do
 sub cidx_reap ($$) {
        my ($self, $jobs) = @_;
        while (run_todo($self)) {}
-       local @PublicInbox::DS::post_loop_do = \(&need_reap, $jobs);
+       local @PublicInbox::DS::post_loop_do = (\&need_reap, $jobs);
        while (need_reap(undef, $jobs)) {
                PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
        }
@@ -263,7 +262,7 @@ sub cidx_reap ($$) {
 
 sub cidx_await_cb { # awaitpid cb
        my ($pid, $cb, $self, $git, @args) = @_;
-       return if !$LIVE; # premature shutdown
+       return if !$LIVE || $DO_QUIT;
        my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd';
        PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC
        if ($?) {
@@ -283,7 +282,7 @@ sub cidx_await ($$$$$@) {
 # only care about --heads (branches) and --tags, and not even their names
 sub fp_start ($$$) {
        my ($self, $git, $prep_repo) = @_;
-       return if !$LIVE; # premature exit
+       return if !$LIVE || $DO_QUIT;
        cidx_reap($self, $LIVE_JOBS);
        open my $refs, '+>', undef or die "open: $!";
        my $cmd = ['git', "--git-dir=$git->{git_dir}",
@@ -305,7 +304,7 @@ sub fp_fini { # cidx_await cb
 
 sub ct_start ($$$) {
        my ($self, $git, $prep_repo) = @_;
-       return if !$LIVE; # premature exit
+       return if !$LIVE || $DO_QUIT;
        cidx_reap($self, $LIVE_JOBS);
        my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
                qw[for-each-ref --sort=-committerdate
@@ -325,7 +324,7 @@ sub ct_fini { # cidx_await cb
 # TODO: also index gitweb.owner and the full fingerprint for grokmirror?
 sub prep_repo ($$) {
        my ($self, $git) = @_;
-       return if !$LIVE || $git->{-cidx_err}; # premature exit
+       return if !$LIVE || $DO_QUIT || $git->{-cidx_err};
        my $repo = $git->{-repo} // die 'BUG: no {-repo}';
        if (!defined($repo->{ct})) {
                warn "W: $git->{git_dir} has no commits, skipping\n";
@@ -449,6 +448,11 @@ sub index_repo { # cidx_await cb
        PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
        my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
        die "E: $git->{git_dir} $n shards failed" if $n;
+       if ($DO_QUIT) {
+               commit_used_shards($self, $git, \%CONSUMERS);
+               progress($self, "$git->{git_dir}: done");
+               return;
+       }
        $repo->{git_dir} = $git->{git_dir};
        my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
        if ($id > 0) {
@@ -462,7 +466,7 @@ sub index_repo { # cidx_await cb
 
 sub get_roots ($$) {
        my ($self, $git) = @_;
-       return if !$LIVE; # premature exit
+       return if !$LIVE || $DO_QUIT;
        my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
        sysseek($refs, 0, SEEK_SET) or die "seek: $!";
        open my $roots, '+>', undef or die "open: $!";
@@ -489,6 +493,10 @@ sub load_existing ($) { # for -u/--update
        @$dirs = grep { !$uniq{$_}++ } @$dirs;
 }
 
+# SIG handlers:
+sub shard_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->() }
+sub shard_usr1 { $TXN_BYTES = -1 }
+
 sub cidx_init ($) {
        my ($self) = @_;
        my $dir = $self->{cidx_dir};
@@ -498,12 +506,13 @@ sub cidx_init ($) {
        }
        $self->lock_acquire;
        my @shards;
+       local $TXN_BYTES;
        for my $n (0..($self->{nshard} - 1)) {
                my $shard = bless { %$self, shard => $n }, ref($self);
                delete @$shard{qw(lockfh lock_path)};
                $shard->idx_acquire;
                $shard->idx_release;
-               $shard->wq_workers_start("shard[$n]", 1, undef, {
+               $shard->wq_workers_start("shard[$n]", 1, $SIGSET, {
                        siblings => \@shards, # for ipc_atfork_child
                }, \&shard_done_wait, $self);
                push @shards, $shard;
@@ -533,6 +542,15 @@ sub shards_active { # post_loop_do
        scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
 }
 
+# signal handlers
+sub kill_shards { $_->wq_kill(@_) for @IDX_SHARDS }
+
+sub parent_quit {
+       $DO_QUIT = $_[0];
+       kill_shards(@_);
+       warn "# SIG$_[0] received, quitting...\n";
+}
+
 sub cidx_run { # main entry point
        my ($self) = @_;
        local $self->{todo} = [];
@@ -541,13 +559,15 @@ sub cidx_run { # main entry point
        my $restore = PublicInbox::OnDestroy->new($$,
                \&PublicInbox::DS::sig_setmask, $SIGSET);
        local $LIVE = {};
+       local $DO_QUIT;
        local @IDX_SHARDS = cidx_init($self);
        local $self->{current_info} = '';
-       my $cb = $SIG{__WARN__} || \&CORE::warn;
        local $MY_SIG = {
                CHLD => \&PublicInbox::DS::enqueue_reap,
-               INT => sub { exit },
+               USR1 => \&kill_shards,
        };
+       $MY_SIG->{$_} = \&parent_quit for qw(TERM QUIT INT);
+       my $cb = $SIG{__WARN__} || \&CORE::warn;
        local $SIG{__WARN__} = sub {
                my $m = shift @_;
                $self->{current_info} eq '' or
@@ -594,14 +614,19 @@ sub cidx_run { # main entry point
 sub ipc_atfork_child {
        my ($self) = @_;
        $self->SUPER::ipc_atfork_child;
+       $SIG{USR1} = \&shard_usr1;
+       $SIG{$_} = \&shard_quit for qw(INT TERM QUIT);
        my $x = delete $self->{siblings} // die 'BUG: no {siblings}';
        $_->wq_close for @$x;
+       undef;
 }
 
 sub shard_done_wait { # awaitpid cb via ipc_worker_reap
        my ($pid, $shard, $self) = @_;
-       delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
-       return unless $?;
+       if ($? == 0) { # success
+               delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
+               return;
+       }
        warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
        ++$self->{shard_err} if defined($self->{shard_err});
 }

Reply via email to