This ensures script/lei $send_cmd usage is EINTR-safe (since
I prefer to avoid loading PublicInbox::IPC for startup time).
Overall, it saves us some code, too.
---
 lib/PublicInbox/CmdIPC4.pm       | 24 +++++++++++++------
 lib/PublicInbox/IPC.pm           | 26 ++++----------------
 lib/PublicInbox/LEI.pm           |  6 ++---
 lib/PublicInbox/LeiSelfSocket.pm |  3 ++-
 lib/PublicInbox/Spawn.pm         | 41 ++++++++++++++++++--------------
 lib/PublicInbox/Syscall.pm       | 21 ++++++++--------
 lib/PublicInbox/XapClient.pm     |  2 +-
 lib/PublicInbox/XapHelper.pm     |  2 +-
 script/lei                       |  5 +---
 t/cmd_ipc.t                      | 12 ++++++----
 t/xap_helper.t                   |  4 ++--
 11 files changed, 72 insertions(+), 74 deletions(-)

diff --git a/lib/PublicInbox/CmdIPC4.pm b/lib/PublicInbox/CmdIPC4.pm
index 4bc4c729..2f102ec6 100644
--- a/lib/PublicInbox/CmdIPC4.pm
+++ b/lib/PublicInbox/CmdIPC4.pm
@@ -7,6 +7,16 @@
 package PublicInbox::CmdIPC4;
 use v5.12;
 use Socket qw(SOL_SOCKET SCM_RIGHTS);
+
+sub sendmsg_retry ($) {
+       return 1 if $!{EINTR};
+       return unless ($!{ENOMEM} || $!{ENOBUFS} || $!{ETOOMANYREFS});
+       return if ++$_[0] >= 50;
+       warn "# sleeping on sendmsg: $! (#$_[0])\n";
+       select(undef, undef, undef, 0.1);
+       1;
+}
+
 BEGIN { eval {
 require Socket::MsgHdr; # XS
 no warnings 'once';
@@ -20,21 +30,21 @@ no warnings 'once';
        my $try = 0;
        do {
                $s = Socket::MsgHdr::sendmsg($sock, $mh, $flags);
-       } while (!defined($s) &&
-                       ($!{ENOBUFS} || $!{ENOMEM} || $!{ETOOMANYREFS}) &&
-                       (++$try < 50) &&
-                       warn "# sleeping on sendmsg: $! (#$try)\n" &&
-                       select(undef, undef, undef, 0.1) == 0);
+       } while (!defined($s) && sendmsg_retry($try));
        $s;
 };
 
 *recv_cmd4 = sub ($$$) {
        my ($s, undef, $len) = @_; # $_[1] = destination buffer
        my $mh = Socket::MsgHdr->new(buflen => $len, controllen => 256);
-       my $r = Socket::MsgHdr::recvmsg($s, $mh, 0) // do {
+       my $r;
+       do {
+               $r = Socket::MsgHdr::recvmsg($s, $mh, 0);
+       } while (!defined($r) && $!{EINTR});
+       if (!defined($r)) {
                $_[1] = '';
                return (undef);
-       };
+       }
        $_[1] = $mh->buf;
        return () if $r == 0;
        my (undef, undef, $data) = $mh->cmsghdr;
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 9b4b1508..839281b2 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -204,27 +204,9 @@ sub ipc_sibling_atfork_child {
        $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
 }
 
-sub send_cmd ($$$$) {
-       my ($s, $fds, $buf, $fl) = @_;
-       while (1) {
-               my $n = $send_cmd->($s, $fds, $buf, $fl);
-               next if !defined($n) && $!{EINTR};
-               return $n;
-       }
-}
-
-sub recv_cmd ($$$) {
-       my ($s, undef, $len) = @_; # $_[1] is $buf
-       while (1) {
-               my @fds = $recv_cmd->($s, $_[1], $len);
-               next if scalar(@fds) == 1 && !defined($fds[0]) && $!{EINTR};
-               return @fds;
-       }
-}
-
 sub recv_and_run {
        my ($self, $s2, $len, $full_stream) = @_;
-       my @fds = recv_cmd($s2, my $buf, $len // $MY_MAX_ARG_STRLEN);
+       my @fds = $recv_cmd->($s2, my $buf, $len // $MY_MAX_ARG_STRLEN);
        return if scalar(@fds) && !defined($fds[0]);
        my $n = length($buf) or return 0;
        my $nfd = 0;
@@ -291,11 +273,11 @@ sub stream_in_full ($$$) {
        my ($s1, $fds, $buf) = @_;
        socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
                croak "socketpair: $!";
-       my $n = send_cmd($s1, [ fileno($r) ],
+       my $n = $send_cmd->($s1, [ fileno($r) ],
                        ipc_freeze(['do_sock_stream', length($buf)]),
                        0) // croak "sendmsg: $!";
        undef $r;
-       $n = send_cmd($w, $fds, $buf, 0) // croak "sendmsg: $!";
+       $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!";
        while ($n < length($buf)) {
                my $x = syswrite($w, $buf, length($buf) - $n, $n);
                if (!defined($n)) {
@@ -315,7 +297,7 @@ sub wq_io_do { # always async
                if (length($buf) > $MY_MAX_ARG_STRLEN) {
                        stream_in_full($s1, $fds, $buf);
                } else {
-                       my $n = send_cmd $s1, $fds, $buf, 0;
+                       my $n = $send_cmd->($s1, $fds, $buf, 0);
                        return if defined($n); # likely
                        $!{ETOOMANYREFS} and
                                croak "sendmsg: $! (check RLIMIT_NOFILE)";
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index e300f0a4..f8bcd43d 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1041,7 +1041,7 @@ sub start_mua {
 
 sub send_exec_cmd { # tell script/lei to execute a command
        my ($self, $io, $cmd, $env) = @_;
-       PublicInbox::IPC::send_cmd(
+       $PublicInbox::IPC::send_cmd->(
                        $self->{sock} // die('lei client gone'),
                        [ map { fileno($_) } @$io ],
                        exec_buf($cmd, $env), 0) //
@@ -1139,7 +1139,7 @@ sub accept_dispatch { # Listener {post_accept} callback
        select($rvec, undef, undef, 60) or
                return send($sock, 'timed out waiting to recv FDs', 0);
        # (4096 * 33) >MAX_ARG_STRLEN
-       my @fds = PublicInbox::IPC::recv_cmd($sock, my $buf, 4096 * 33) or
+       my @fds = $PublicInbox::IPC::recv_cmd->($sock, my $buf, 4096 * 33) or
                return; # EOF
        if (!defined($fds[0])) {
                warn(my $msg = "recv_cmd failed: $!");
@@ -1178,7 +1178,7 @@ sub event_step {
        local %ENV = %{$self->{env}};
        local $current_lei = $self;
        eval {
-               my @fds = PublicInbox::IPC::recv_cmd(
+               my @fds = $PublicInbox::IPC::recv_cmd->(
                        $self->{sock} // return, my $buf, 4096);
                if (scalar(@fds) == 1 && !defined($fds[0])) {
                        return if $! == EAGAIN;
diff --git a/lib/PublicInbox/LeiSelfSocket.pm b/lib/PublicInbox/LeiSelfSocket.pm
index b8745252..0e15bc7c 100644
--- a/lib/PublicInbox/LeiSelfSocket.pm
+++ b/lib/PublicInbox/LeiSelfSocket.pm
@@ -21,7 +21,8 @@ sub new {
 
 sub event_step {
        my ($self) = @_;
-       my @fds = PublicInbox::IPC::recv_cmd($self->{sock}, my $buf, 4096 * 33);
+       my ($buf, @fds);
+       @fds = $PublicInbox::IPC::recv_cmd->($self->{sock}, $buf, 4096 * 33);
        if (scalar(@fds) == 1 && !defined($fds[0])) {
                return if $!{EAGAIN};
                die "recvmsg: $!" unless $!{ECONNRESET};
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index bb2abe28..4c7e0f80 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -173,19 +173,20 @@ int pi_fork_exec(SV *redirref, SV *file, SV *cmdref, SV 
*envref, SV *rlimref,
        return (int)pid;
 }
 
-static int sleep_wait(unsigned *tries, int err)
+static int sendmsg_retry(unsigned *tries)
 {
        const struct timespec req = { 0, 100000000 }; /* 100ms */
+       int err = errno;
        switch (err) {
+       case EINTR: PERL_ASYNC_CHECK(); return 1;
        case ENOBUFS: case ENOMEM: case ETOOMANYREFS:
-               if (++*tries < 50) {
-                       fprintf(stderr, "# sleeping on sendmsg: %s (#%u)\n",
-                               strerror(err), *tries);
-                       nanosleep(&req, NULL);
-                       return 1;
-               }
-       default:
-               return 0;
+               if (++*tries >= 50) return 0;
+               fprintf(stderr, "# sleeping on sendmsg: %s (#%u)\n",
+                       strerror(err), *tries);
+               nanosleep(&req, NULL);
+               PERL_ASYNC_CHECK();
+               return 1;
+       default: return 0;
        }
 }
 
@@ -237,7 +238,7 @@ SV *send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags)
        }
        do {
                sent = sendmsg(PerlIO_fileno(s), &msg, flags);
-       } while (sent < 0 && sleep_wait(&tries, errno));
+       } while (sent < 0 && sendmsg_retry(&tries));
        return sent >= 0 ? newSViv(sent) : &PL_sv_undef;
 }
 
@@ -259,20 +260,24 @@ void recv_cmd4(PerlIO *s, SV *buf, STRLEN n)
        msg.msg_control = &cmsg.hdr;
        msg.msg_controllen = CMSG_SPACE(SEND_FD_SPACE);
 
-       i = recvmsg(PerlIO_fileno(s), &msg, 0);
+       for (;;) {
+               i = recvmsg(PerlIO_fileno(s), &msg, 0);
+               if (i >= 0 || errno != EINTR) break;
+               PERL_ASYNC_CHECK();
+       }
        if (i >= 0) {
                SvCUR_set(buf, i);
+               if (cmsg.hdr.cmsg_level == SOL_SOCKET &&
+                               cmsg.hdr.cmsg_type == SCM_RIGHTS) {
+                       size_t len = cmsg.hdr.cmsg_len;
+                       int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
+                       for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++)
+                               Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
+               }
        } else {
                Inline_Stack_Push(&PL_sv_undef);
                SvCUR_set(buf, 0);
        }
-       if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
-                       cmsg.hdr.cmsg_type == SCM_RIGHTS) {
-               size_t len = cmsg.hdr.cmsg_len;
-               int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
-               for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++)
-                       Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
-       }
        Inline_Stack_Done;
 }
 #endif /* defined(CMSG_SPACE) && defined(CMSG_LEN) */
diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm
index 4cf45d0f..e83beb6a 100644
--- a/lib/PublicInbox/Syscall.pm
+++ b/lib/PublicInbox/Syscall.pm
@@ -394,6 +394,8 @@ use constant msg_controllen => CMSG_SPACE(10 * SIZEOF_int) 
+ 16; # 10 FDs
 
 if (defined($SYS_sendmsg) && defined($SYS_recvmsg)) {
 no warnings 'once';
+require PublicInbox::CmdIPC4;
+
 *send_cmd4 = sub ($$$$) {
        my ($sock, $fds, undef, $flags) = @_;
        my $iov = pack('P'.TMPL_size_t,
@@ -418,16 +420,12 @@ no warnings 'once';
                        $cmsghdr, # msg_control
                        $msg_controllen,
                        0); # msg_flags
-       my $sent;
+       my $s;
        my $try = 0;
        do {
-               $sent = syscall($SYS_sendmsg, fileno($sock), $mh, $flags);
-       } while ($sent < 0 &&
-                       ($!{ENOBUFS} || $!{ENOMEM} || $!{ETOOMANYREFS}) &&
-                       (++$try < 50) &&
-                       warn "# sleeping on sendmsg: $! (#$try)\n" &&
-                       select(undef, undef, undef, 0.1) == 0);
-       $sent >= 0 ? $sent : undef;
+               $s = syscall($SYS_sendmsg, fileno($sock), $mh, $flags);
+       } while ($s < 0 && PublicInbox::CmdIPC4::sendmsg_retry($try));
+       $s >= 0 ? $s : undef;
 };
 
 *recv_cmd4 = sub ($$$) {
@@ -446,8 +444,11 @@ no warnings 'once';
                        $cmsghdr, # msg_control
                        msg_controllen,
                        0); # msg_flags
-       my $r = syscall($SYS_recvmsg, fileno($sock), $mh, 0);
-       if ($r < 0) { # $! is set
+       my $r;
+       do {
+               $r = syscall($SYS_recvmsg, fileno($sock), $mh, 0);
+       } while ($r < 0 && $!{EINTR});
+       if ($r < 0) {
                $_[1] = '';
                return (undef);
        }
diff --git a/lib/PublicInbox/XapClient.pm b/lib/PublicInbox/XapClient.pm
index f6c09c3b..9e2d71a0 100644
--- a/lib/PublicInbox/XapClient.pm
+++ b/lib/PublicInbox/XapClient.pm
@@ -21,7 +21,7 @@ sub mkreq {
        }
        my @fds = map fileno($_), @$ios;
        my $buf = join("\0", @arg, '');
-       $n = PublicInbox::IPC::send_cmd($self->{io}, \@fds, $buf, 0) //
+       $n = $PublicInbox::IPC::send_cmd->($self->{io}, \@fds, $buf, 0) //
                die "send_cmd: $!";
        $n == length($buf) or die "send_cmd: $n != ".length($buf);
        $r;
diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm
index c98708e3..ae907766 100644
--- a/lib/PublicInbox/XapHelper.pm
+++ b/lib/PublicInbox/XapHelper.pm
@@ -177,7 +177,7 @@ sub recv_loop {
        my $in = \*STDIN;
        while (!defined($parent_pid) || getppid == $parent_pid) {
                PublicInbox::DS::sig_setmask($workerset);
-               my @fds = PublicInbox::IPC::recv_cmd($in, $rbuf, 4096*33);
+               my @fds = $PublicInbox::IPC::recv_cmd->($in, $rbuf, 4096*33);
                scalar(@fds) or exit(66); # EX_NOINPUT
                die "recvmsg: $!" if !defined($fds[0]);
                PublicInbox::DS::block_signals();
diff --git a/script/lei b/script/lei
index 1d90be0a..087afc33 100755
--- a/script/lei
+++ b/script/lei
@@ -116,10 +116,7 @@ $SIG{CONT} = sub { send($sock, 'CONT', 0) };
 my $x_it_code = 0;
 while (1) {
        my (@fds) = $recv_cmd->($sock, my $buf, 4096 * 33);
-       if (scalar(@fds) == 1 && !defined($fds[0])) {
-               next if $!{EINTR};
-               die "recvmsg: $!";
-       }
+       die "recvmsg: $!" if scalar(@fds) == 1 && !defined($fds[0]);
        last if $buf eq '';
        if ($buf =~ /\Aexec (.+)\z/) {
                $exec_cmd->(\@fds, split(/\0/, $1));
diff --git a/t/cmd_ipc.t b/t/cmd_ipc.t
index e5d22aab..ccf4ca31 100644
--- a/t/cmd_ipc.t
+++ b/t/cmd_ipc.t
@@ -59,18 +59,20 @@ my $do_test = sub { SKIP: {
                        if ($pid == 0) {
                                # need to loop since Perl signals are racy
                                # (the interpreter doesn't self-pipe)
-                               CORE::kill('ALRM', $tgt) while (tick(0.05));
+                               my $n = 3;
+                               while (tick(0.01 * $n) && --$n) {
+                                       kill('ALRM', $tgt)
+                               }
+                               close $s1;
                                POSIX::_exit(1);
                        }
+                       close $s1;
                        @fds = $recv->($s2, $buf, length($src) + 1);
-                       ok($!{EINTR}, "EINTR set by ($desc)");
-                       kill('KILL', $pid);
                        waitpid($pid, 0);
-                       is_deeply(\@fds, [ undef ], "EINTR $desc");
+                       is_deeply(\@fds, [], "EINTR->EOF $desc");
                        ok($alrm, 'SIGALRM hit');
                }
 
-               close $s1;
                @fds = $recv->($s2, $buf, length($src) + 1);
                is_deeply(\@fds, [], "no FDs on EOF $desc");
                is($buf, '', "buffer cleared on EOF ($desc)");
diff --git a/t/xap_helper.t b/t/xap_helper.t
index 2303301d..27742cad 100644
--- a/t/xap_helper.t
+++ b/t/xap_helper.t
@@ -52,8 +52,8 @@ my $doreq = sub {
        my $buf = join("\0", @arg, '');
        my @fds = fileno($y);
        push @fds, fileno($err) if $err;
-       my $n = PublicInbox::IPC::send_cmd($s, \@fds, $buf, 0);
-       $n // xbail "send: $!";
+       my $n = $PublicInbox::IPC::send_cmd->($s, \@fds, $buf, 0) //
+               xbail "send: $!";
        my $exp = length($buf);
        $exp == $n or xbail "req @arg sent short ($n != $exp)";
        $x;

Reply via email to