`git log -p' can several seconds to generate its initial output. SMP systems can be processing prunes during this delay, so let DS do a one-shot notification for us while prune is running. On Linux, we'll also use the biggest pipe possible so git can do more CPU-intensive work to generate diffs while our Perl processes are indexing and likely hitting I/O wait. --- MANIFEST | 1 + lib/PublicInbox/CidxLogP.pm | 29 ++++++++++++++++++ lib/PublicInbox/CodeSearchIdx.pm | 51 +++++++++++++++++++++----------- 3 files changed, 63 insertions(+), 18 deletions(-) create mode 100644 lib/PublicInbox/CidxLogP.pm
diff --git a/MANIFEST b/MANIFEST index 3c421645..a0e64c6a 100644 --- a/MANIFEST +++ b/MANIFEST @@ -160,6 +160,7 @@ lib/PublicInbox/AdminEdit.pm lib/PublicInbox/AltId.pm lib/PublicInbox/AutoReap.pm lib/PublicInbox/Cgit.pm +lib/PublicInbox/CidxLogP.pm lib/PublicInbox/CmdIPC4.pm lib/PublicInbox/CodeSearch.pm lib/PublicInbox/CodeSearchIdx.pm diff --git a/lib/PublicInbox/CidxLogP.pm b/lib/PublicInbox/CidxLogP.pm new file mode 100644 index 00000000..7877d5ac --- /dev/null +++ b/lib/PublicInbox/CidxLogP.pm @@ -0,0 +1,29 @@ +# Copyright (C) all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +# +# Waits for initial `git log -p' output for PublicInbox::CodeSearchIdx. +# The initial output from `git log -p' can take a while to generate, +# CodeSearchIdx can process prune work while it's happening. Once +# `git log -p' starts generating output, it should be able to keep +# up with Xapian indexing, so we still rely on blocking reads to simplify +# cidx_read_log_p +package PublicInbox::CidxLogP; +use v5.12; +use parent qw(PublicInbox::DS); +use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT); + +sub new { + my ($cls, $rd, $cidx, $git, $roots) = @_; + my $self = bless { cidx => $cidx, git => $git, roots => $roots }, $cls; + fcntl($rd, 1031, 1048576) if $^O eq 'linux'; # fatter pipes + $self->SUPER::new($rd, EPOLLIN|EPOLLONESHOT); +} + +sub event_step { + my ($self) = @_; + my $rd = $self->{sock} // return warn('BUG?: no {sock}'); + $self->close; # PublicInbox::DS::close, deferred, so $sock is usable + delete($self->{cidx})->cidx_read_log_p($self, $rd); +} + +1; diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 035fab3e..215e337f 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -30,6 +30,7 @@ use PublicInbox::SearchIdx qw(add_val); use PublicInbox::Config qw(glob2re); use PublicInbox::Spawn qw(spawn popen_rd); use PublicInbox::OnDestroy; +use PublicInbox::CidxLogP; use Socket qw(MSG_EOR); use Carp (); our ( @@ -216,20 +217,41 @@ EOM $len; } -# sharded reader for `git log --pretty=format: --stdin' +sub cidx_reap_log { # awaitpid cb + my ($pid, $self, $op_p) = @_; + if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT || + ($? & 127) == POSIX::SIGPIPE))) { + send($op_p, "shard_done $self->{shard}", MSG_EOR); + } else { + warn "E: git @LOG_STDIN: \$?=$?\n"; + $self->{xdb}->cancel_transaction; + } +} + sub shard_index { # via wq_io_do in IDX_SHARDS - my ($self, $git, $n, $roots) = @_; - local $self->{current_info} = "$git->{git_dir} [$n]"; - local $self->{roots} = $roots; + my ($self, $git, $roots) = @_; + my $in = delete($self->{0}) // die 'BUG: no {0} input'; my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p'; + my ($rd, $pid) = $git->popen(@LOG_STDIN, undef, { 0 => $in }); + close $in or die "close: $!"; + awaitpid($pid, \&cidx_reap_log, $self, $op_p); + PublicInbox::CidxLogP->new($rd, $self, $git, $roots); + # CidxLogP->event_step will call cidx_read_log_p once there's input +} + +# sharded reader for `git log --pretty=format: --stdin' +sub cidx_read_log_p { + my ($self, $log_p, $rd) = @_; + my $git = delete $log_p->{git} // die 'BUG: no {git}'; + local $self->{current_info} = "$git->{git_dir} [$self->{shard}]"; + local $self->{roots} = delete $log_p->{roots} // die 'BUG: no {roots}'; + local $MAX_SIZE = $self->{-opt}->{max_size}; # local-ized in parent before fork $TXN_BYTES = $BATCH_BYTES; local $self->{git} = $git; # for patchid return if $DO_QUIT; - my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in }); - close $in or die "close: $!"; my $nr = 0; # a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4 @@ -238,7 +260,7 @@ sub shard_index { # via wq_io_do in IDX_SHARDS my $len; my $cmt = {}; local $/ = $FS; - my $buf = <$rd> // return close($rd); # leading $FS + my $buf = <$rd> // return; # leading $FS $buf eq $FS or die "BUG: not LF-NUL: $buf\n"; $self->begin_txn_lazy; while (!$DO_QUIT && defined($buf = <$rd>)) { @@ -251,22 +273,15 @@ sub shard_index { # via wq_io_do in IDX_SHARDS @$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT)); } if (($TXN_BYTES -= $len) <= 0) { - cidx_ckpoint($self, "[$n] $nr"); + cidx_ckpoint($self, "[$self->{shard}] $nr"); $TXN_BYTES -= $len; # len may be huge, >TXN_BYTES; } update_commit($self, $cmt); ++$nr; - cidx_ckpoint($self, "[$n] $nr") if $TXN_BYTES <= 0; + cidx_ckpoint($self, "[$self->{shard}] $nr") if $TXN_BYTES <= 0; $/ = $FS; } - close($rd); - if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT || - ($? & 127) == POSIX::SIGPIPE))) { - send($op_p, "shard_done $n", MSG_EOR); - } else { - warn "E: git @LOG_STDIN: \$?=$?\n"; - $self->{xdb}->cancel_transaction; - } + # return and wait for cidx_reap_log } sub shard_done { # called via PktOp on shard_index completion @@ -537,7 +552,7 @@ sub index_repo { # cidx_await cb $c->{ops}->{shard_done} = [ $self ]; $IDX_SHARDS[$n]->wq_io_do('shard_index', [ $shard_in[$n], $p->{op_p} ], - $git, $n, \@roots); + $git, \@roots); $consumers->{$n} = $c; } @shard_in = ();