Looks like there are some warnings for some of the loop changes:
../../../3rdparty/libprocess/src/io.cpp: In lambda function:
../../../3rdparty/libprocess/src/io.cpp:75:9: warning: control reaches end
of non-void function [-Wreturn-type]
}();
^
mv -f .deps/libprocess_la-io.Tpo .deps/libprocess_la-io.Plo
mv -f .deps/libprocess_la-metrics.Tpo .deps/libprocess_la-metrics.Plo
../../../3rdparty/libprocess/src/http.cpp: In lambda function:
../../../3rdparty/libprocess/src/http.cpp:1414:7: warning: control reaches
end of non-void function [-Wreturn-type]
},
^
../../../3rdparty/libprocess/src/http.cpp: In lambda function:
../../../3rdparty/libprocess/src/http.cpp:1646:13: warning: control reaches
end of non-void function [-Wreturn-type]
}()
^
On Sun, Jan 8, 2017 at 7:27 PM, <[email protected]> wrote:
> Used `loop` in implementation of io::read and io::write.
>
> Review: https://reviews.apache.org/r/54841
>
>
> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d28c198
> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d28c198
> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d28c198
>
> Branch: refs/heads/master
> Commit: 2d28c198a5c09308825b2771483b70ac42839d16
> Parents: 9984449
> Author: Benjamin Hindman <[email protected]>
> Authored: Mon Dec 5 10:48:56 2016 -0800
> Committer: Benjamin Hindman <[email protected]>
> Committed: Sun Jan 8 19:27:02 2017 -0800
>
> ----------------------------------------------------------------------
> 3rdparty/libprocess/src/io.cpp | 289 ++++++++++++++----------------------
> 1 file changed, 114 insertions(+), 175 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/2d28c198/
> 3rdparty/libprocess/src/io.cpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/io.cpp b/3rdparty/libprocess/src/io.
> cpp
> index d0b3ba1..8aa3576 100644
> --- a/3rdparty/libprocess/src/io.cpp
> +++ b/3rdparty/libprocess/src/io.cpp
> @@ -42,132 +42,114 @@ enum ReadFlags
> };
>
>
> -void read(
> - int fd,
> - void* data,
> - size_t size,
> - ReadFlags flags,
> - const std::shared_ptr<Promise<size_t>>& promise,
> - const Future<short>& future)
> +Future<size_t> read(int fd, void* data, size_t size, ReadFlags flags =
> NONE)
> {
> - // Ignore this function if the read operation has been discarded.
> - if (promise->future().hasDiscard()) {
> - CHECK(!future.isPending());
> - promise->discard();
> - return;
> - }
> -
> + // TODO(benh): Let the system calls do what ever they're supposed to
> + // rather than return 0 here?
> if (size == 0) {
> - promise->set(0);
> - return;
> + return 0;
> }
>
> - if (future.isDiscarded()) {
> - promise->fail("Failed to poll: discarded future");
> - } else if (future.isFailed()) {
> - promise->fail(future.failure());
> - } else {
> - ssize_t length;
> - if (flags == NONE) {
> - length = os::read(fd, data, size);
> - } else { // PEEK.
> - // In case 'fd' is not a socket ::recv() will fail with ENOTSOCK
> and the
> - // error will be propagted out.
> - // NOTE: We cast to `char*` here because the function prototypes on
> - // Windows use `char*` instead of `void*`.
> - length = net::recv(fd, (char*) data, size, MSG_PEEK);
> - }
> -
> + return loop(
> + None(),
> + [=]() -> Future<Option<size_t>> {
> + // Because the file descriptor is non-blocking, we call
> + // read()/recv() immediately. If no data is available than
> + // we'll call `poll` and block. We also observed that for some
> + // combination of libev and Linux kernel versions, the poll
> + // would block for non-deterministically long periods of
> + // time. This may be fixed in a newer version of libev (we use
> + // 3.8 at the time of writing this comment).
> + ssize_t length = [=]() {
> + switch (flags) {
> + case PEEK:
> + // In case `fd` is not a socket os::recv() will fail
> + // with ENOTSOCK and the error will be returned.
> + //
> + // NOTE: We cast to `char*` here because the function
> + // prototypes on Windows use `char*` instead of `void*`.
> + return net::recv(fd, (char*) data, size, MSG_PEEK);
> + case NONE:
> + return os::read(fd, data, size);
> + }
> + }();
> +
> + if (length < 0) {
> #ifdef __WINDOWS__
> - int error = WSAGetLastError();
> + int error = WSAGetLastError();
> #else
> - int error = errno;
> + int error = errno;
> #endif // __WINDOWS__
>
> - if (length < 0) {
> - if (net::is_restartable_error(error) ||
> net::is_retryable_error(error)) {
> - // Restart the read operation.
> - Future<short> future =
> - io::poll(fd, process::io::READ).onAny(
> - lambda::bind(&internal::read,
> - fd,
> - data,
> - size,
> - flags,
> - promise,
> - lambda::_1));
> -
> - // Stop polling if a discard occurs on our future.
> - promise->future().onDiscard(
> - lambda::bind(&process::internal::discard<short>,
> - WeakFuture<short>(future)));
> - } else {
> - // Error occurred.
> - promise->fail(os::strerror(errno));
> - }
> - } else {
> - promise->set(length);
> - }
> - }
> + if (!net::is_restartable_error(error) &&
> + !net::is_retryable_error(error)) {
> + // TODO(benh): Confirm that `os::strerror` does the right
> + // thing for `error` on Windows.
> + return Failure(os::strerror(error));
> + }
> +
> + return None();
> + }
> +
> + return length;
> + },
> + [=](const Option<size_t>& length) -> Future<ControlFlow<size_t>> {
> + // Restart/retry if we don't yet have a result.
> + if (length.isNone()) {
> + return io::poll(fd, io::READ)
> + .then([](short event) -> ControlFlow<size_t> {
> + CHECK_EQ(io::READ, event);
> + return Continue();
> + });
> + }
> + return Break(length.get());
> + });
> }
>
>
> -void write(
> - int fd,
> - const void* data,
> - size_t size,
> - const std::shared_ptr<Promise<size_t>>& promise,
> - const Future<short>& future)
> +Future<size_t> write(int fd, const void* data, size_t size)
> {
> - // Ignore this function if the write operation has been discarded.
> - if (promise->future().hasDiscard()) {
> - promise->discard();
> - return;
> - }
> -
> + // TODO(benh): Let the system calls do what ever they're supposed to
> + // rather than return 0 here?
> if (size == 0) {
> - promise->set(0);
> - return;
> + return 0;
> }
>
> - if (future.isDiscarded()) {
> - promise->fail("Failed to poll: discarded future");
> - } else if (future.isFailed()) {
> - promise->fail(future.failure());
> - } else {
> - ssize_t length = os::write(fd, data, size);
> + return loop(
> + None(),
> + [=]() -> Future<Option<size_t>> {
> + ssize_t length = os::write(fd, data, size);
>
> + if (length < 0) {
> #ifdef __WINDOWS__
> - int error = WSAGetLastError();
> + int error = WSAGetLastError();
> #else
> - int error = errno;
> + int error = errno;
> #endif // __WINDOWS__
>
> - if (length < 0) {
> - if (net::is_restartable_error(error) ||
> net::is_retryable_error(error)) {
> - // Restart the write operation.
> - Future<short> future =
> - io::poll(fd, process::io::WRITE).onAny(
> - lambda::bind(&internal::write,
> - fd,
> - data,
> - size,
> - promise,
> - lambda::_1));
> -
> - // Stop polling if a discard occurs on our future.
> - promise->future().onDiscard(
> - lambda::bind(&process::internal::discard<short>,
> - WeakFuture<short>(future)));
> - } else {
> - // Error occurred.
> - promise->fail(os::strerror(errno));
> - }
> - } else {
> - // TODO(benh): Retry if 'length' is 0?
> - promise->set(length);
> - }
> - }
> + if (!net::is_restartable_error(error) &&
> + !net::is_retryable_error(error)) {
> + // TODO(benh): Confirm that `os::strerror` does the right
> + // thing for `error` on Windows.
> + return Failure(os::strerror(error));
> + }
> +
> + return None();
> + }
> +
> + return length;
> + },
> + [=](const Option<size_t>& length) -> Future<ControlFlow<size_t>> {
> + // Restart/retry if we don't yet have a result.
> + if (length.isNone()) {
> + return io::poll(fd, io::WRITE)
> + .then([](short event) -> ControlFlow<size_t> {
> + CHECK_EQ(io::WRITE, event);
> + return Continue();
> + });
> + }
> + return Break(length.get());
> + });
> }
>
> } // namespace internal {
> @@ -177,32 +159,18 @@ Future<size_t> read(int fd, void* data, size_t size)
> {
> process::initialize();
>
> - std::shared_ptr<Promise<size_t>> promise(new Promise<size_t>());
> -
> // Check the file descriptor.
> Try<bool> nonblock = os::isNonblock(fd);
> if (nonblock.isError()) {
> // The file descriptor is not valid (e.g., has been closed).
> - promise->fail(
> - "Failed to check if file descriptor was non-blocking: " +
> - nonblock.error());
> - return promise->future();
> + return Failure("Failed to check if file descriptor was non-blocking:
> " +
> + nonblock.error());
> } else if (!nonblock.get()) {
> // The file descriptor is not non-blocking.
> - promise->fail("Expected a non-blocking file descriptor");
> - return promise->future();
> + return Failure("Expected a non-blocking file descriptor");
> }
>
> - // Because the file descriptor is non-blocking, we call read()
> - // immediately. The read may in turn call poll if necessary,
> - // avoiding unnecessary polling. We also observed that for some
> - // combination of libev and Linux kernel versions, the poll would
> - // block for non-deterministically long periods of time. This may be
> - // fixed in a newer version of libev (we use 3.8 at the time of
> - // writing this comment).
> - internal::read(fd, data, size, internal::NONE, promise, io::READ);
> -
> - return promise->future();
> + return internal::read(fd, data, size);
> }
>
>
> @@ -210,32 +178,19 @@ Future<size_t> write(int fd, const void* data,
> size_t size)
> {
> process::initialize();
>
> - std::shared_ptr<Promise<size_t>> promise(new Promise<size_t>());
> -
> // Check the file descriptor.
> Try<bool> nonblock = os::isNonblock(fd);
> if (nonblock.isError()) {
> // The file descriptor is not valid (e.g., has been closed).
> - promise->fail(
> + return Failure(
> "Failed to check if file descriptor was non-blocking: " +
> nonblock.error());
> - return promise->future();
> } else if (!nonblock.get()) {
> // The file descriptor is not non-blocking.
> - promise->fail("Expected a non-blocking file descriptor");
> - return promise->future();
> + return Failure("Expected a non-blocking file descriptor");
> }
>
> - // Because the file descriptor is non-blocking, we call write()
> - // immediately. The write may in turn call poll if necessary,
> - // avoiding unnecessary polling. We also observed that for some
> - // combination of libev and Linux kernel versions, the poll would
> - // block for non-deterministically long periods of time. This may be
> - // fixed in a newer version of libev (we use 3.8 at the time of
> - // writing this comment).
> - internal::write(fd, data, size, promise, io::WRITE);
> -
> - return promise->future();
> + return internal::write(fd, data, size);
> }
>
>
> @@ -280,43 +235,15 @@ Future<size_t> peek(int fd, void* data, size_t size,
> size_t limit)
> nonblock.error());
> }
>
> - std::shared_ptr<Promise<size_t>> promise(new Promise<size_t>());
> -
> - // Because the file descriptor is non-blocking, we call read()
> - // immediately. The read may in turn call poll if necessary,
> - // avoiding unnecessary polling. We also observed that for some
> - // combination of libev and Linux kernel versions, the poll would
> - // block for non-deterministically long periods of time. This may be
> - // fixed in a newer version of libev (we use 3.8 at the time of
> - // writing this comment).
> - internal::read(fd, data, limit, internal::PEEK, promise, io::READ);
> -
> - // NOTE: We wrap `os::close` in a lambda to disambiguate on Windows.
> - promise->future().onAny([fd]() { os::close(fd); });
> -
> - return promise->future();
> + return internal::read(fd, data, limit, internal::PEEK)
> + .onAny([fd]() {
> + os::close(fd);
> + });
> }
>
>
> namespace internal {
>
> -Future<string> _read(
> - int fd,
> - const std::shared_ptr<string>& buffer,
> - const boost::shared_array<char>& data,
> - size_t length)
> -{
> - return io::read(fd, data.get(), length)
> - .then([=](size_t size) -> Future<string> {
> - if (size == 0) { // EOF.
> - return string(*buffer);
> - }
> - buffer->append(data.get(), size);
> - return _read(fd, buffer, data, length);
> - });
> -}
> -
> -
> Future<Nothing> splice(
> int from,
> int to,
> @@ -392,9 +319,21 @@ Future<string> read(int fd)
> std::shared_ptr<string> buffer(new string());
> boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]);
>
> - // NOTE: We wrap `os::close` in a lambda to disambiguate on Windows.
> - return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE)
> - .onAny([fd]() { os::close(fd); });
> + return loop(
> + None(),
> + [=]() {
> + return io::read(fd, data.get(), BUFFERED_READ_SIZE);
> + },
> + [=](size_t length) -> ControlFlow<string> {
> + if (length == 0) { // EOF.
> + return Break(std::move(*buffer));
> + }
> + buffer->append(data.get(), length);
> + return Continue();
> + })
> + .onAny([fd]() {
> + os::close(fd);
> + });
> }
>
>
>
>