I'm not sure how this went undetected for so long, but EINTR
must be checked for when working with blocking sockets.  EINTR
shouldn't happen for non-blocking sockets, though, but it's
easier to just use the new wrapper in most of those places.

I don't know what I was smoking when I left out EINTR checks :x
---
 lib/PublicInbox/IPC.pm           | 35 +++++++++++++++++++++++++-------
 lib/PublicInbox/LEI.pm           | 33 +++++++++++++-----------------
 lib/PublicInbox/LeiSelfSocket.pm |  5 ++---
 3 files changed, 44 insertions(+), 29 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 548a72eb..7fa656d0 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -216,9 +216,27 @@ sub ipc_sibling_atfork_child {
        $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
 }
 
+sub send_cmd ($$$$) {
+       my ($s, $fds, $buf, $fl) = @_;
+       while (1) {
+               my $n = $send_cmd->($s, $fds, $buf, $fl);
+               next if !defined($n) && $!{EINTR};
+               return $n;
+       }
+}
+
+sub recv_cmd ($$$) {
+       my ($s, undef, $len) = @_; # $_[1] is $buf
+       while (1) {
+               my @fds = $recv_cmd->($s, $_[1], $len);
+               next if scalar(@fds) == 1 && !defined($fds[0]) && $!{EINTR};
+               return @fds;
+       }
+}
+
 sub recv_and_run {
        my ($self, $s2, $len, $full_stream) = @_;
-       my @fds = $recv_cmd->($s2, my $buf, $len // $MY_MAX_ARG_STRLEN);
+       my @fds = recv_cmd($s2, my $buf, $len // $MY_MAX_ARG_STRLEN);
        return if scalar(@fds) && !defined($fds[0]);
        my $n = length($buf) or return 0;
        my $nfd = 0;
@@ -278,15 +296,18 @@ sub stream_in_full ($$$) {
        my ($s1, $fds, $buf) = @_;
        socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
                croak "socketpair: $!";
-       my $n = $send_cmd->($s1, [ fileno($r) ],
+       my $n = send_cmd($s1, [ fileno($r) ],
                        ipc_freeze(['do_sock_stream', length($buf)]),
                        MSG_EOR) // croak "sendmsg: $!";
        undef $r;
-       $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!";
+       $n = send_cmd($w, $fds, $buf, 0) // croak "sendmsg: $!";
        while ($n < length($buf)) {
-               my $x = syswrite($w, $buf, length($buf) - $n, $n) //
-                               croak "syswrite: $!";
-               croak "syswrite wrote 0 bytes" if $x == 0;
+               my $x = syswrite($w, $buf, length($buf) - $n, $n);
+               if (!defined($n)) {
+                       next if $!{EINTR};
+                       croak "syswrite: $!";
+               }
+               $x or croak "syswrite wrote 0 bytes";
                $n += $x;
        }
 }
@@ -299,7 +320,7 @@ sub wq_io_do { # always async
                if (length($buf) > $MY_MAX_ARG_STRLEN) {
                        stream_in_full($s1, $fds, $buf);
                } else {
-                       my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR);
+                       my $n = send_cmd $s1, $fds, $buf, MSG_EOR;
                        return if defined($n); # likely
                        $!{ETOOMANYREFS} and
                                croak "sendmsg: $! (check RLIMIT_NOFILE)";
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index b83de91d..ff2db1ff 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -23,13 +23,14 @@ use PublicInbox::Lock;
 use PublicInbox::Eml;
 use PublicInbox::Import;
 use PublicInbox::ContentHash qw(git_sha);
+use PublicInbox::IPC;
 use Time::HiRes qw(stat); # ctime comparisons for config cache
 use File::Path ();
 use File::Spec;
+use Carp ();
 use Sys::Syslog qw(openlog syslog closelog);
 our $quit = \&CORE::exit;
-our ($current_lei, $errors_log, $listener, $oldset, $dir_idle,
-       $recv_cmd, $send_cmd);
+our ($current_lei, $errors_log, $listener, $oldset, $dir_idle);
 my $GLP = Getopt::Long::Parser->new;
 $GLP->configure(qw(gnu_getopt no_ignore_case auto_abbrev));
 my $GLP_PASS = Getopt::Long::Parser->new;
@@ -1013,9 +1014,11 @@ sub start_mua {
 
 sub send_exec_cmd { # tell script/lei to execute a command
        my ($self, $io, $cmd, $env) = @_;
-       my $sock = $self->{sock} // die 'lei client gone';
-       my $fds = [ map { fileno($_) } @$io ];
-       $send_cmd->($sock, $fds, exec_buf($cmd, $env), MSG_EOR);
+       PublicInbox::IPC::send_cmd(
+                       $self->{sock} // die('lei client gone'),
+                       [ map { fileno($_) } @$io ],
+                       exec_buf($cmd, $env), MSG_EOR) //
+               Carp::croak("sendmsg: $!");
 }
 
 sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail
@@ -1109,7 +1112,8 @@ sub accept_dispatch { # Listener {post_accept} callback
        select($rvec, undef, undef, 60) or
                return send($sock, 'timed out waiting to recv FDs', MSG_EOR);
        # (4096 * 33) >MAX_ARG_STRLEN
-       my @fds = $recv_cmd->($sock, my $buf, 4096 * 33) or return; # EOF
+       my @fds = PublicInbox::IPC::recv_cmd($sock, my $buf, 4096 * 33) or
+               return; # EOF
        if (!defined($fds[0])) {
                warn(my $msg = "recv_cmd failed: $!");
                return send($sock, $msg, MSG_EOR);
@@ -1147,7 +1151,8 @@ sub event_step {
        local %ENV = %{$self->{env}};
        local $current_lei = $self;
        eval {
-               my @fds = $recv_cmd->($self->{sock} // return, my $buf, 4096);
+               my @fds = PublicInbox::IPC::recv_cmd(
+                       $self->{sock} // return, my $buf, 4096);
                if (scalar(@fds) == 1 && !defined($fds[0])) {
                        return if $! == EAGAIN;
                        die "recvmsg: $!" if $! != ECONNRESET;
@@ -1273,18 +1278,8 @@ sub lazy_start {
        my @st = stat($path) or die "stat($path): $!";
        my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
        local $oldset = PublicInbox::DS::block_signals();
-       if ($narg == 5) {
-               $send_cmd = PublicInbox::Spawn->can('send_cmd4');
-               $recv_cmd = PublicInbox::Spawn->can('recv_cmd4') // do {
-                       require PublicInbox::CmdIPC4;
-                       $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4');
-                       PublicInbox::CmdIPC4->can('recv_cmd4');
-               } // do {
-                       $send_cmd = PublicInbox::Syscall->can('send_cmd4');
-                       PublicInbox::Syscall->can('recv_cmd4');
-               };
-       }
-       $recv_cmd or die <<"";
+       die "incompatible narg=$narg" if $narg != 5;
+       $PublicInbox::IPC::send_cmd or die <<"";
 (Socket::MsgHdr || Inline::C) missing/unconfigured (narg=$narg);
 
        require PublicInbox::Listener;
diff --git a/lib/PublicInbox/LeiSelfSocket.pm b/lib/PublicInbox/LeiSelfSocket.pm
index 860020cb..690cda3f 100644
--- a/lib/PublicInbox/LeiSelfSocket.pm
+++ b/lib/PublicInbox/LeiSelfSocket.pm
@@ -12,20 +12,19 @@ use Data::Dumper;
 $Data::Dumper::Useqq = 1; # should've been the Perl default :P
 use PublicInbox::Syscall qw(EPOLLIN);
 use PublicInbox::Spawn;
-my $recv_cmd;
+use PublicInbox::IPC;
 
 sub new {
        my ($cls, $r) = @_;
        my $self = bless { sock => $r }, $cls;
        $r->blocking(0);
        no warnings 'once';
-       $recv_cmd = $PublicInbox::LEI::recv_cmd;
        $self->SUPER::new($r, EPOLLIN);
 }
 
 sub event_step {
        my ($self) = @_;
-       my (@fds) = $recv_cmd->($self->{sock}, my $buf, 4096 * 33);
+       my @fds = PublicInbox::IPC::recv_cmd($self->{sock}, my $buf, 4096 * 33);
        if (scalar(@fds) == 1 && !defined($fds[0])) {
                return if $!{EAGAIN};
                die "recvmsg: $!" unless $!{ECONNRESET};

Reply via email to