Using fewer sockets simplifies completion checks, too.
---
 lib/PublicInbox/CodeSearchIdx.pm | 54 ++++++++++++++++----------------
 1 file changed, 27 insertions(+), 27 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 14342683..05007afd 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -504,36 +504,35 @@ sub partition_refs ($$$) {
 }
 
 sub shard_commit { # via wq_io_do
-       my ($self, $n) = @_;
+       my ($self) = @_;
        my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
        $self->commit_txn_lazy;
-       send($op_p, "shard_done $n", MSG_EOR);
+       send($op_p, "shard_done $self->{shard}", MSG_EOR);
 }
 
-sub consumers_open { # post_loop_do
-       my (undef, $consumers) = @_;
-       return if $DO_QUIT;
-       scalar(grep { $_->{sock} } values %$consumers);
+sub consumer_open { # post_loop_do
+       my (undef, $c) = @_; # $c is PublicInbox::PktOp
+       $DO_QUIT ? undef : defined($c->{sock});
 }
 
-sub wait_consumers ($$$) {
-       my ($self, $git, $consumers) = @_;
-       local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
+sub wait_active ($$$$) {
+       my ($self, $git, $active, $c) = @_;
+       local @PublicInbox::DS::post_loop_do = (\&consumer_open, $c);
        PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
-       my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
+       my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$active;
        die "E: $git->{git_dir} $n shards failed" if $n && !$DO_QUIT;
 }
 
-sub commit_used_shards ($$$) {
-       my ($self, $git, $consumers) = @_;
+sub commit_active_shards ($$$) {
+       my ($self, $git, $active) = @_;
        local $self->{-shard_ok} = {};
-       for my $n (keys %$consumers) {
-               my ($c, $p) = PublicInbox::PktOp->pair;
-               $c->{ops}->{shard_done} = [ $self ];
-               $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
-               $consumers->{$n} = $c;
+       my ($c, $p) = PublicInbox::PktOp->pair;
+       $c->{ops}->{shard_done} = [ $self ];
+       for my $n (keys %$active) {
+               $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]);
        }
-       wait_consumers($self, $git, $consumers);
+       undef $p;
+       wait_active($self, $git, $active, $c);
 }
 
 sub index_repo { # cidx_await cb
@@ -547,30 +546,31 @@ sub index_repo { # cidx_await cb
        $repo->{roots} = \@roots;
        local $self->{current_info} = $git->{git_dir};
        my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
-       local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1
-       my $consumers = {};
+       local $self->{-shard_ok} = {};
+       my $active = {};
+       my ($c, $p) = PublicInbox::PktOp->pair;
+       $c->{ops}->{shard_done} = [ $self ];
        for my $n (0..$#shard_in) {
                -s $shard_in[$n] or next;
                last if $DO_QUIT;
-               my ($c, $p) = PublicInbox::PktOp->pair;
-               $c->{ops}->{shard_done} = [ $self ];
                $IDX_SHARDS[$n]->wq_io_do('shard_index',
                                        [ $shard_in[$n], $p->{op_p} ],
                                        $git, \@roots);
-               $consumers->{$n} = $c;
+               $active->{$n} = undef;
        }
+       undef $p;
        @shard_in = ();
-       wait_consumers($self, $git, $consumers);
+       wait_active($self, $git, $active, $c);
        if ($DO_QUIT) {
-               commit_used_shards($self, $git, $consumers);
+               commit_active_shards($self, $git, $active);
                progress($self, "$git->{git_dir}: done");
                return;
        }
        $repo->{git_dir} = $git->{git_dir};
        my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
        if ($id > 0) {
-               $consumers->{$repo->{shard_n}} = undef;
-               commit_used_shards($self, $git, $consumers);
+               $active->{$repo->{shard_n}} = undef;
+               commit_active_shards($self, $git, $active);
                progress($self, "$git->{git_dir}: done");
                return run_deferred();
        }

Reply via email to