Repository: mesos
Updated Branches:
  refs/heads/1.5.x aedbcfd5d -> 9343f4200


Reduced likelihood of a stack overflow in libprocess socket send path.

Currently, the socket send path is implemented using an asynchronous
loop with callbacks. Without using `process::loop`, this pattern is
prone to a stack overflow in the case that all asynchronous calls
complete synchronously. This is possible with sockets if the socket
is always ready for writing. Users have reported the crash in both
MESOS-8594 and MESOS-8834, so the stack overflow is encountered in
practice.

This patch updates the send path to leverage `process::loop`, which
is supposed to prevent stack overflows in asynchronous loops. However,
it is still possible for `process::loop` to stack overflow due to
MESOS-8852. In practice, I expect that even without MESOS-8852 fixed,
users won't see any stack overflows in the send path.

Review: https://reviews.apache.org/r/66863


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9d6b0b1c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9d6b0b1c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9d6b0b1c

Branch: refs/heads/1.5.x
Commit: 9d6b0b1cf5b045e67d05ba04aa91b49d56b31ad5
Parents: aedbcfd
Author: Benjamin Mahler <bmah...@apache.org>
Authored: Sat Apr 28 18:28:39 2018 -0700
Committer: Benjamin Mahler <bmah...@apache.org>
Committed: Mon Apr 30 17:53:31 2018 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 115 ++++++++++++++++---------------
 1 file changed, 59 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9d6b0b1c/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp 
b/3rdparty/libprocess/src/process.cpp
index 53969a7..631119c 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -73,6 +73,7 @@
 #include <process/id.hpp>
 #include <process/io.hpp>
 #include <process/logging.hpp>
+#include <process/loop.hpp>
 #include <process/mime.hpp>
 #include <process/owned.hpp>
 #include <process/process.hpp>
@@ -1699,71 +1700,73 @@ void SocketManager::unproxy(const Socket& socket)
 
 namespace internal {
 
-void _send(
-    const Future<size_t>& result,
-    Socket socket,
-    Encoder* encoder,
-    size_t size);
-
+Future<Nothing> _send(Encoder* encoder, Socket socket);
 
 void send(Encoder* encoder, Socket socket)
 {
-  switch (encoder->kind()) {
-    case Encoder::DATA: {
-      size_t size;
-      const char* data = static_cast<DataEncoder*>(encoder)->next(&size);
-      socket.send(data, size)
-        .onAny(lambda::bind(
-            &internal::_send,
-            lambda::_1,
-            socket,
-            encoder,
-            size));
-      break;
-    }
-    case Encoder::FILE: {
-      off_t offset;
-      size_t size;
-      int_fd fd = static_cast<FileEncoder*>(encoder)->next(&offset, &size);
-      socket.sendfile(fd, offset, size)
-        .onAny(lambda::bind(
-            &internal::_send,
-            lambda::_1,
-            socket,
-            encoder,
-            size));
-      break;
-    }
-  }
+  _send(encoder, socket)
+    .then([socket] {
+      // Continue sending until this socket has no more
+      // queued outgoing messages.
+      return process::loop(
+          None(),
+          [=] { return socket_manager->next(socket); },
+          [=](Encoder* encoder) -> Future<ControlFlow<Nothing>> {
+            if (encoder == nullptr) {
+              return Break();
+            }
+
+            return _send(encoder, socket)
+              .then([]() -> ControlFlow<Nothing> { return Continue(); });
+        });
+    });
 }
 
 
-void _send(
-    const Future<size_t>& length,
-    Socket socket,
-    Encoder* encoder,
-    size_t size)
+Future<Nothing> _send(Encoder* encoder, Socket socket)
 {
-  if (length.isDiscarded() || length.isFailed()) {
-    socket_manager->close(socket);
-    delete encoder;
-  } else {
-    // Update the encoder with the amount sent.
-    encoder->backup(size - length.get());
+  // Loop until all of the data in the provided encoder is sent.
+  return process::loop(
+      None(),
+      [=] {
+        size_t size;
+        Future<size_t> send;
 
-    // See if there is any more of the message to send.
-    if (encoder->remaining() == 0) {
-      delete encoder;
+        switch (encoder->kind()) {
+          case Encoder::DATA: {
+            const char* data =
+              static_cast<DataEncoder*>(encoder)->next(&size);
+            send = socket.send(data, size);
+            break;
+          }
+          case Encoder::FILE: {
+            off_t offset;
+            int_fd fd =
+              static_cast<FileEncoder*>(encoder)->next(&offset, &size);
+            send = socket.sendfile(fd, offset, size);
+            break;
+          }
+        }
 
-      // Check for more stuff to send on socket.
-      Encoder* next = socket_manager->next(socket);
-      if (next != nullptr) {
-        send(next, socket);
-      }
-    } else {
-      send(encoder, socket);
-    }
-  }
+        return send
+          .then([=](size_t sent) {
+            // Update the encoder with the amount sent.
+            encoder->backup(size - sent);
+            return Nothing();
+          })
+          .recover([=](const Future<Nothing>& f) {
+            socket_manager->close(socket);
+            delete encoder;
+            return f; // Break the loop by propagating the "failure".
+          });
+      },
+      [=](Nothing) -> ControlFlow<Nothing> {
+        if (encoder->remaining() == 0) {
+          delete encoder;
+          return Break();
+        }
+        return Continue();
+      });
 }
 
 } // namespace internal {

Reply via email to