`git log -p' can several seconds to generate its initial output.
SMP systems can be processing prunes during this delay, so let
DS do a one-shot notification for us while prune is running.  On
Linux, we'll also use the biggest pipe possible so git can do
more CPU-intensive work to generate diffs while our Perl
processes are indexing and likely hitting I/O wait.
---
 MANIFEST                         |  1 +
 lib/PublicInbox/CidxLogP.pm      | 29 ++++++++++++++++++
 lib/PublicInbox/CodeSearchIdx.pm | 51 +++++++++++++++++++++-----------
 3 files changed, 63 insertions(+), 18 deletions(-)
 create mode 100644 lib/PublicInbox/CidxLogP.pm

diff --git a/MANIFEST b/MANIFEST
index 3c421645..a0e64c6a 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -160,6 +160,7 @@ lib/PublicInbox/AdminEdit.pm
 lib/PublicInbox/AltId.pm
 lib/PublicInbox/AutoReap.pm
 lib/PublicInbox/Cgit.pm
+lib/PublicInbox/CidxLogP.pm
 lib/PublicInbox/CmdIPC4.pm
 lib/PublicInbox/CodeSearch.pm
 lib/PublicInbox/CodeSearchIdx.pm
diff --git a/lib/PublicInbox/CidxLogP.pm b/lib/PublicInbox/CidxLogP.pm
new file mode 100644
index 00000000..7877d5ac
--- /dev/null
+++ b/lib/PublicInbox/CidxLogP.pm
@@ -0,0 +1,29 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# Waits for initial `git log -p' output for PublicInbox::CodeSearchIdx.
+# The initial output from `git log -p' can take a while to generate,
+# CodeSearchIdx can process prune work while it's happening.  Once
+# `git log -p' starts generating output, it should be able to keep
+# up with Xapian indexing, so we still rely on blocking reads to simplify
+# cidx_read_log_p
+package PublicInbox::CidxLogP;
+use v5.12;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
+
+sub new {
+       my ($cls, $rd, $cidx, $git, $roots) = @_;
+       my $self = bless { cidx => $cidx, git => $git, roots => $roots }, $cls;
+       fcntl($rd, 1031, 1048576) if $^O eq 'linux'; # fatter pipes
+       $self->SUPER::new($rd, EPOLLIN|EPOLLONESHOT);
+}
+
+sub event_step {
+       my ($self) = @_;
+       my $rd = $self->{sock} // return warn('BUG?: no {sock}');
+       $self->close; # PublicInbox::DS::close, deferred, so $sock is usable
+       delete($self->{cidx})->cidx_read_log_p($self, $rd);
+}
+
+1;
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 035fab3e..215e337f 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -30,6 +30,7 @@ use PublicInbox::SearchIdx qw(add_val);
 use PublicInbox::Config qw(glob2re);
 use PublicInbox::Spawn qw(spawn popen_rd);
 use PublicInbox::OnDestroy;
+use PublicInbox::CidxLogP;
 use Socket qw(MSG_EOR);
 use Carp ();
 our (
@@ -216,20 +217,41 @@ EOM
        $len;
 }
 
-# sharded reader for `git log --pretty=format: --stdin'
+sub cidx_reap_log { # awaitpid cb
+       my ($pid, $self, $op_p) = @_;
+       if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
+                               ($? & 127) == POSIX::SIGPIPE))) {
+               send($op_p, "shard_done $self->{shard}", MSG_EOR);
+       } else {
+               warn "E: git @LOG_STDIN: \$?=$?\n";
+               $self->{xdb}->cancel_transaction;
+       }
+}
+
 sub shard_index { # via wq_io_do in IDX_SHARDS
-       my ($self, $git, $n, $roots) = @_;
-       local $self->{current_info} = "$git->{git_dir} [$n]";
-       local $self->{roots} = $roots;
+       my ($self, $git, $roots) = @_;
+
        my $in = delete($self->{0}) // die 'BUG: no {0} input';
        my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
+       my ($rd, $pid) = $git->popen(@LOG_STDIN, undef, { 0 => $in });
+       close $in or die "close: $!";
+       awaitpid($pid, \&cidx_reap_log, $self, $op_p);
+       PublicInbox::CidxLogP->new($rd, $self, $git, $roots);
+       # CidxLogP->event_step will call cidx_read_log_p once there's input
+}
+
+# sharded reader for `git log --pretty=format: --stdin'
+sub cidx_read_log_p {
+       my ($self, $log_p, $rd) = @_;
+       my $git = delete $log_p->{git} // die 'BUG: no {git}';
+       local $self->{current_info} = "$git->{git_dir} [$self->{shard}]";
+       local $self->{roots} = delete $log_p->{roots} // die 'BUG: no {roots}';
+
        local $MAX_SIZE = $self->{-opt}->{max_size};
        # local-ized in parent before fork
        $TXN_BYTES = $BATCH_BYTES;
        local $self->{git} = $git; # for patchid
        return if $DO_QUIT;
-       my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
-       close $in or die "close: $!";
        my $nr = 0;
 
        # a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4
@@ -238,7 +260,7 @@ sub shard_index { # via wq_io_do in IDX_SHARDS
        my $len;
        my $cmt = {};
        local $/ = $FS;
-       my $buf = <$rd> // return close($rd); # leading $FS
+       my $buf = <$rd> // return; # leading $FS
        $buf eq $FS or die "BUG: not LF-NUL: $buf\n";
        $self->begin_txn_lazy;
        while (!$DO_QUIT && defined($buf = <$rd>)) {
@@ -251,22 +273,15 @@ sub shard_index { # via wq_io_do in IDX_SHARDS
                        @$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
                }
                if (($TXN_BYTES -= $len) <= 0) {
-                       cidx_ckpoint($self, "[$n] $nr");
+                       cidx_ckpoint($self, "[$self->{shard}] $nr");
                        $TXN_BYTES -= $len; # len may be huge, >TXN_BYTES;
                }
                update_commit($self, $cmt);
                ++$nr;
-               cidx_ckpoint($self, "[$n] $nr") if $TXN_BYTES <= 0;
+               cidx_ckpoint($self, "[$self->{shard}] $nr") if $TXN_BYTES <= 0;
                $/ = $FS;
        }
-       close($rd);
-       if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
-                               ($? & 127) == POSIX::SIGPIPE))) {
-               send($op_p, "shard_done $n", MSG_EOR);
-       } else {
-               warn "E: git @LOG_STDIN: \$?=$?\n";
-               $self->{xdb}->cancel_transaction;
-       }
+       # return and wait for cidx_reap_log
 }
 
 sub shard_done { # called via PktOp on shard_index completion
@@ -537,7 +552,7 @@ sub index_repo { # cidx_await cb
                $c->{ops}->{shard_done} = [ $self ];
                $IDX_SHARDS[$n]->wq_io_do('shard_index',
                                        [ $shard_in[$n], $p->{op_p} ],
-                                       $git, $n, \@roots);
+                                       $git, \@roots);
                $consumers->{$n} = $c;
        }
        @shard_in = ();

Reply via email to