Increasing/decreasing workers count will be useful in some situations. --- lib/PublicInbox/IPC.pm | 99 ++++++++++++++++++++++++++++++------------ t/ipc.t | 9 ++++ 2 files changed, 81 insertions(+), 27 deletions(-)
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 0c5205c1..5bca3627 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -8,7 +8,7 @@ use v5.10.1; use Carp qw(confess croak); use PublicInbox::DS qw(dwaitpid); use PublicInbox::Spawn; -use POSIX (); +use POSIX qw(WNOHANG); use Socket qw(AF_UNIX MSG_EOR); my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough? use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF(); @@ -112,7 +112,7 @@ sub ipc_worker_spawn { $w_res->autoflush(1); $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT)); local $0 = $ident; - PublicInbox::DS::sig_setmask($oldset); + PublicInbox::DS::sig_setmask($sigset); my $on_destroy = $self->ipc_atfork_child; eval { ipc_worker_loop($self, $r_req, $w_res) }; die "worker $ident PID:$$ died: $@\n" if $@; @@ -229,12 +229,13 @@ sub ipc_sibling_atfork_child { $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself"; } -sub wq_worker_loop ($$) { - my ($self, $s2) = @_; +sub wq_worker_loop ($) { + my ($self) = @_; my $buf; my $len = $self->{wq_req_len} // (4096 * 33); my ($rec, $sub, @args); - while (1) { + my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2'; + until ($self->{-wq_quit}) { my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF my $i = 0; my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]}; @@ -258,7 +259,7 @@ sub wq_worker_loop ($$) { sub wq_do { # always async my ($self, $sub, $in, $out, $err, @args) = @_; - if (my $s1 = $self->{-wq_seq}) { # run in worker + if (my $s1 = $self->{-wq_s1}) { # run in worker $_ = fileno($_) for ($in, $out, $err); $send_cmd->($s1, $in, $out, $err, freeze([$sub, @args]), MSG_EOR); @@ -270,42 +271,86 @@ sub wq_do { # always async } } +sub _wq_worker_start ($$) { + my ($self, $oldset) = @_; + my $pid = fork // die "fork: $!"; + if ($pid == 0) { + eval { PublicInbox::DS->Reset }; + close(delete $self->{-wq_s1}); + delete $self->{qw(-wq_workers -wq_quit)}; + my $quit = sub { $self->{-wq_quit} = 1 }; + $SIG{$_} = $quit for (qw(TERM INT QUIT)); + $SIG{$_} = 'IGNORE' for (qw(TTOU TTIN)); + local $0 = $self->{-wq_ident}; + PublicInbox::DS::sig_setmask($oldset); + my $on_destroy = $self->ipc_atfork_child; + eval { wq_worker_loop($self) }; + die "worker $self->{-wq_ident} PID:$$ died: $@\n" if $@; + exit; + } else { + $self->{-wq_workers}->{$pid} = \undef; + } +} + # starts workqueue workers if Sereal or Storable is installed sub wq_workers_start { my ($self, $ident, $nr_workers, $oldset) = @_; ($enc && $send_cmd && $recv_cmd && defined($SEQPACKET)) or return; - return if $self->{-wq_seq}; # idempotent + return if $self->{-wq_s1}; # idempotent my ($s1, $s2); socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!"; - my $sigset = $oldset // PublicInbox::DS::block_signals(); $self->ipc_atfork_parent; $nr_workers //= 4; + my $sigset = $oldset // PublicInbox::DS::block_signals(); $self->{-wq_workers} = {}; - for my $i (0..($nr_workers - 1)) { - defined(my $pid = fork) or die "fork: $!"; - if ($pid == 0) { - eval { PublicInbox::DS->Reset }; - $s1 = undef; - $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT)); - local $0 = $ident."[$i]"; - PublicInbox::DS::sig_setmask($oldset); - my $on_destroy = $self->ipc_atfork_child; - eval { wq_worker_loop($self, $s2) }; - die "worker $ident PID:$$ died: $@\n" if $@; - exit; - } else { - $self->{-wq_workers}->{$pid} = $i; - } - } + $self->{-wq_ident} = $ident; + $self->{-wq_s1} = $s1; + $self->{-wq_s2} = $s2; + _wq_worker_start($self, $sigset) for (1..$nr_workers); PublicInbox::DS::sig_setmask($sigset) unless $oldset; - $s2 = undef; - $self->{-wq_seq} = $s1; $self->{-wq_ppid} = $$; } +sub wq_worker_incr { # SIGTTIN handler + my ($self, $oldset) = @_; + $self->{-wq_s2} or return; + $self->ipc_atfork_parent; + my $sigset = $oldset // PublicInbox::DS::block_signals(); + _wq_worker_start($self, $sigset); + PublicInbox::DS::sig_setmask($sigset) unless $oldset; +} + +sub wq_exit { # wakes up wq_worker_decr_wait + send($_[0]->{-wq_s2}, $$, MSG_EOR) // die "$$ send: $!"; + exit; +} + +sub wq_worker_decr { # SIGTTOU handler, kills first idle worker + my ($self) = @_; + my $workers = $self->{-wq_workers} or return; + my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2'; + $self->wq_do('wq_exit', $s2, $s2, $s2); + $self->{-wq_exit_pending}++; + # caller must call wq_worker_decr_wait in main loop +} + +sub wq_worker_decr_wait { + my ($self, $timeout) = @_; + return if $self->{-wq_ppid} != $$; # can't reap siblings or parents + my $s1 = $self->{-wq_s1} // croak 'BUG: no wq_s1'; + vec(my $rin = '', fileno($s1), 1) = 1; + select(my $rout = $rin, undef, undef, $timeout) or + croak 'timed out waiting for wq_exit'; + recv($s1, my $pid, 64, 0) // croak "recv: $!"; + my $workers = $self->{-wq_workers} // croak 'BUG: no wq_workers'; + delete $workers->{$pid} // croak "BUG: PID:$pid invalid"; + $self->{-wq_exit_pending}--; + dwaitpid($pid, \&ipc_worker_reap, $self); +} + sub wq_close { my ($self) = @_; - delete $self->{-wq_seq} or return; + delete @$self{qw(-wq_s1 -wq_s2)} or return; my $ppid = delete $self->{-wq_ppid} // die 'BUG: no wq_ppid'; my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers'; return if $ppid != $$; # can't reap siblings or parents diff --git a/t/ipc.t b/t/ipc.t index f09f76ef..51e347c6 100644 --- a/t/ipc.t +++ b/t/ipc.t @@ -168,4 +168,13 @@ like($warn[0], qr/ wq_do: /, '1st warned from wq_do'); like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker'); is($warn[2], $warn[1], 'worker did not die'); +$SIG{__WARN__} = 'DEFAULT'; +is($ipc->wq_workers_start('wq', 1), $$, 'workers started again'); +is(scalar(keys %{$ipc->{-wq_workers}}), 1, '1 worker started'); +$ipc->wq_worker_incr; +is(scalar(keys %{$ipc->{-wq_workers}}), 2, 'worker count bumped'); +$ipc->wq_worker_decr; +$ipc->wq_worker_decr_wait(10); +is(scalar(keys %{$ipc->{-wq_workers}}), 1, 'worker count lowered'); + done_testing; -- unsubscribe: one-click, see List-Unsubscribe header archive: https://public-inbox.org/meta/