Pushed a temporary fix in:
```
commit b492d4458c97b5e025e96ca229bc87a6e2500b40
Author: Jan Schlicht <[email protected]>
Date: Mon Jan 9 15:39:21 2017 -0800
Removed unsupported `friend` declaration.
The `friend` class declaration of nested, templated classes will raise
an error when compiling with Clang. This is a temporary fix, resolved by
making the constructor of `ControlFlow` public.
Review: https://reviews.apache.org/r/55339/
```
On Mon, Jan 9, 2017 at 2:52 PM, Benjamin Mahler <[email protected]> wrote:
> Looks like there are some warnings for some of the loop changes:
>
> ../../../3rdparty/libprocess/src/io.cpp: In lambda function:
> ../../../3rdparty/libprocess/src/io.cpp:75:9: warning: control reaches
> end of non-void function [-Wreturn-type]
> }();
> ^
> mv -f .deps/libprocess_la-io.Tpo .deps/libprocess_la-io.Plo
> mv -f .deps/libprocess_la-metrics.Tpo .deps/libprocess_la-metrics.Plo
> ../../../3rdparty/libprocess/src/http.cpp: In lambda function:
> ../../../3rdparty/libprocess/src/http.cpp:1414:7: warning: control
> reaches end of non-void function [-Wreturn-type]
> },
> ^
> ../../../3rdparty/libprocess/src/http.cpp: In lambda function:
> ../../../3rdparty/libprocess/src/http.cpp:1646:13: warning: control
> reaches end of non-void function [-Wreturn-type]
> }()
> ^
>
> On Sun, Jan 8, 2017 at 7:27 PM, <[email protected]> wrote:
>
>> Used `loop` in implementation of io::read and io::write.
>>
>> Review: https://reviews.apache.org/r/54841
>>
>>
>> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d28c198
>> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d28c198
>> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d28c198
>>
>> Branch: refs/heads/master
>> Commit: 2d28c198a5c09308825b2771483b70ac42839d16
>> Parents: 9984449
>> Author: Benjamin Hindman <[email protected]>
>> Authored: Mon Dec 5 10:48:56 2016 -0800
>> Committer: Benjamin Hindman <[email protected]>
>> Committed: Sun Jan 8 19:27:02 2017 -0800
>>
>> ----------------------------------------------------------------------
>> 3rdparty/libprocess/src/io.cpp | 289 ++++++++++++++----------------
>> ------
>> 1 file changed, 114 insertions(+), 175 deletions(-)
>> ----------------------------------------------------------------------
>>
>>
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/2d28c198/3
>> rdparty/libprocess/src/io.cpp
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/src/io.cpp
>> b/3rdparty/libprocess/src/io.cpp
>> index d0b3ba1..8aa3576 100644
>> --- a/3rdparty/libprocess/src/io.cpp
>> +++ b/3rdparty/libprocess/src/io.cpp
>> @@ -42,132 +42,114 @@ enum ReadFlags
>> };
>>
>>
>> -void read(
>> - int fd,
>> - void* data,
>> - size_t size,
>> - ReadFlags flags,
>> - const std::shared_ptr<Promise<size_t>>& promise,
>> - const Future<short>& future)
>> +Future<size_t> read(int fd, void* data, size_t size, ReadFlags flags =
>> NONE)
>> {
>> - // Ignore this function if the read operation has been discarded.
>> - if (promise->future().hasDiscard()) {
>> - CHECK(!future.isPending());
>> - promise->discard();
>> - return;
>> - }
>> -
>> + // TODO(benh): Let the system calls do what ever they're supposed to
>> + // rather than return 0 here?
>> if (size == 0) {
>> - promise->set(0);
>> - return;
>> + return 0;
>> }
>>
>> - if (future.isDiscarded()) {
>> - promise->fail("Failed to poll: discarded future");
>> - } else if (future.isFailed()) {
>> - promise->fail(future.failure());
>> - } else {
>> - ssize_t length;
>> - if (flags == NONE) {
>> - length = os::read(fd, data, size);
>> - } else { // PEEK.
>> - // In case 'fd' is not a socket ::recv() will fail with ENOTSOCK
>> and the
>> - // error will be propagted out.
>> - // NOTE: We cast to `char*` here because the function prototypes on
>> - // Windows use `char*` instead of `void*`.
>> - length = net::recv(fd, (char*) data, size, MSG_PEEK);
>> - }
>> -
>> + return loop(
>> + None(),
>> + [=]() -> Future<Option<size_t>> {
>> + // Because the file descriptor is non-blocking, we call
>> + // read()/recv() immediately. If no data is available than
>> + // we'll call `poll` and block. We also observed that for some
>> + // combination of libev and Linux kernel versions, the poll
>> + // would block for non-deterministically long periods of
>> + // time. This may be fixed in a newer version of libev (we use
>> + // 3.8 at the time of writing this comment).
>> + ssize_t length = [=]() {
>> + switch (flags) {
>> + case PEEK:
>> + // In case `fd` is not a socket os::recv() will fail
>> + // with ENOTSOCK and the error will be returned.
>> + //
>> + // NOTE: We cast to `char*` here because the function
>> + // prototypes on Windows use `char*` instead of `void*`.
>> + return net::recv(fd, (char*) data, size, MSG_PEEK);
>> + case NONE:
>> + return os::read(fd, data, size);
>> + }
>> + }();
>> +
>> + if (length < 0) {
>> #ifdef __WINDOWS__
>> - int error = WSAGetLastError();
>> + int error = WSAGetLastError();
>> #else
>> - int error = errno;
>> + int error = errno;
>> #endif // __WINDOWS__
>>
>> - if (length < 0) {
>> - if (net::is_restartable_error(error) ||
>> net::is_retryable_error(error)) {
>> - // Restart the read operation.
>> - Future<short> future =
>> - io::poll(fd, process::io::READ).onAny(
>> - lambda::bind(&internal::read,
>> - fd,
>> - data,
>> - size,
>> - flags,
>> - promise,
>> - lambda::_1));
>> -
>> - // Stop polling if a discard occurs on our future.
>> - promise->future().onDiscard(
>> - lambda::bind(&process::internal::discard<short>,
>> - WeakFuture<short>(future)));
>> - } else {
>> - // Error occurred.
>> - promise->fail(os::strerror(errno));
>> - }
>> - } else {
>> - promise->set(length);
>> - }
>> - }
>> + if (!net::is_restartable_error(error) &&
>> + !net::is_retryable_error(error)) {
>> + // TODO(benh): Confirm that `os::strerror` does the right
>> + // thing for `error` on Windows.
>> + return Failure(os::strerror(error));
>> + }
>> +
>> + return None();
>> + }
>> +
>> + return length;
>> + },
>> + [=](const Option<size_t>& length) -> Future<ControlFlow<size_t>> {
>> + // Restart/retry if we don't yet have a result.
>> + if (length.isNone()) {
>> + return io::poll(fd, io::READ)
>> + .then([](short event) -> ControlFlow<size_t> {
>> + CHECK_EQ(io::READ, event);
>> + return Continue();
>> + });
>> + }
>> + return Break(length.get());
>> + });
>> }
>>
>>
>> -void write(
>> - int fd,
>> - const void* data,
>> - size_t size,
>> - const std::shared_ptr<Promise<size_t>>& promise,
>> - const Future<short>& future)
>> +Future<size_t> write(int fd, const void* data, size_t size)
>> {
>> - // Ignore this function if the write operation has been discarded.
>> - if (promise->future().hasDiscard()) {
>> - promise->discard();
>> - return;
>> - }
>> -
>> + // TODO(benh): Let the system calls do what ever they're supposed to
>> + // rather than return 0 here?
>> if (size == 0) {
>> - promise->set(0);
>> - return;
>> + return 0;
>> }
>>
>> - if (future.isDiscarded()) {
>> - promise->fail("Failed to poll: discarded future");
>> - } else if (future.isFailed()) {
>> - promise->fail(future.failure());
>> - } else {
>> - ssize_t length = os::write(fd, data, size);
>> + return loop(
>> + None(),
>> + [=]() -> Future<Option<size_t>> {
>> + ssize_t length = os::write(fd, data, size);
>>
>> + if (length < 0) {
>> #ifdef __WINDOWS__
>> - int error = WSAGetLastError();
>> + int error = WSAGetLastError();
>> #else
>> - int error = errno;
>> + int error = errno;
>> #endif // __WINDOWS__
>>
>> - if (length < 0) {
>> - if (net::is_restartable_error(error) ||
>> net::is_retryable_error(error)) {
>> - // Restart the write operation.
>> - Future<short> future =
>> - io::poll(fd, process::io::WRITE).onAny(
>> - lambda::bind(&internal::write,
>> - fd,
>> - data,
>> - size,
>> - promise,
>> - lambda::_1));
>> -
>> - // Stop polling if a discard occurs on our future.
>> - promise->future().onDiscard(
>> - lambda::bind(&process::internal::discard<short>,
>> - WeakFuture<short>(future)));
>> - } else {
>> - // Error occurred.
>> - promise->fail(os::strerror(errno));
>> - }
>> - } else {
>> - // TODO(benh): Retry if 'length' is 0?
>> - promise->set(length);
>> - }
>> - }
>> + if (!net::is_restartable_error(error) &&
>> + !net::is_retryable_error(error)) {
>> + // TODO(benh): Confirm that `os::strerror` does the right
>> + // thing for `error` on Windows.
>> + return Failure(os::strerror(error));
>> + }
>> +
>> + return None();
>> + }
>> +
>> + return length;
>> + },
>> + [=](const Option<size_t>& length) -> Future<ControlFlow<size_t>> {
>> + // Restart/retry if we don't yet have a result.
>> + if (length.isNone()) {
>> + return io::poll(fd, io::WRITE)
>> + .then([](short event) -> ControlFlow<size_t> {
>> + CHECK_EQ(io::WRITE, event);
>> + return Continue();
>> + });
>> + }
>> + return Break(length.get());
>> + });
>> }
>>
>> } // namespace internal {
>> @@ -177,32 +159,18 @@ Future<size_t> read(int fd, void* data, size_t size)
>> {
>> process::initialize();
>>
>> - std::shared_ptr<Promise<size_t>> promise(new Promise<size_t>());
>> -
>> // Check the file descriptor.
>> Try<bool> nonblock = os::isNonblock(fd);
>> if (nonblock.isError()) {
>> // The file descriptor is not valid (e.g., has been closed).
>> - promise->fail(
>> - "Failed to check if file descriptor was non-blocking: " +
>> - nonblock.error());
>> - return promise->future();
>> + return Failure("Failed to check if file descriptor was non-blocking:
>> " +
>> + nonblock.error());
>> } else if (!nonblock.get()) {
>> // The file descriptor is not non-blocking.
>> - promise->fail("Expected a non-blocking file descriptor");
>> - return promise->future();
>> + return Failure("Expected a non-blocking file descriptor");
>> }
>>
>> - // Because the file descriptor is non-blocking, we call read()
>> - // immediately. The read may in turn call poll if necessary,
>> - // avoiding unnecessary polling. We also observed that for some
>> - // combination of libev and Linux kernel versions, the poll would
>> - // block for non-deterministically long periods of time. This may be
>> - // fixed in a newer version of libev (we use 3.8 at the time of
>> - // writing this comment).
>> - internal::read(fd, data, size, internal::NONE, promise, io::READ);
>> -
>> - return promise->future();
>> + return internal::read(fd, data, size);
>> }
>>
>>
>> @@ -210,32 +178,19 @@ Future<size_t> write(int fd, const void* data,
>> size_t size)
>> {
>> process::initialize();
>>
>> - std::shared_ptr<Promise<size_t>> promise(new Promise<size_t>());
>> -
>> // Check the file descriptor.
>> Try<bool> nonblock = os::isNonblock(fd);
>> if (nonblock.isError()) {
>> // The file descriptor is not valid (e.g., has been closed).
>> - promise->fail(
>> + return Failure(
>> "Failed to check if file descriptor was non-blocking: " +
>> nonblock.error());
>> - return promise->future();
>> } else if (!nonblock.get()) {
>> // The file descriptor is not non-blocking.
>> - promise->fail("Expected a non-blocking file descriptor");
>> - return promise->future();
>> + return Failure("Expected a non-blocking file descriptor");
>> }
>>
>> - // Because the file descriptor is non-blocking, we call write()
>> - // immediately. The write may in turn call poll if necessary,
>> - // avoiding unnecessary polling. We also observed that for some
>> - // combination of libev and Linux kernel versions, the poll would
>> - // block for non-deterministically long periods of time. This may be
>> - // fixed in a newer version of libev (we use 3.8 at the time of
>> - // writing this comment).
>> - internal::write(fd, data, size, promise, io::WRITE);
>> -
>> - return promise->future();
>> + return internal::write(fd, data, size);
>> }
>>
>>
>> @@ -280,43 +235,15 @@ Future<size_t> peek(int fd, void* data, size_t
>> size, size_t limit)
>> nonblock.error());
>> }
>>
>> - std::shared_ptr<Promise<size_t>> promise(new Promise<size_t>());
>> -
>> - // Because the file descriptor is non-blocking, we call read()
>> - // immediately. The read may in turn call poll if necessary,
>> - // avoiding unnecessary polling. We also observed that for some
>> - // combination of libev and Linux kernel versions, the poll would
>> - // block for non-deterministically long periods of time. This may be
>> - // fixed in a newer version of libev (we use 3.8 at the time of
>> - // writing this comment).
>> - internal::read(fd, data, limit, internal::PEEK, promise, io::READ);
>> -
>> - // NOTE: We wrap `os::close` in a lambda to disambiguate on Windows.
>> - promise->future().onAny([fd]() { os::close(fd); });
>> -
>> - return promise->future();
>> + return internal::read(fd, data, limit, internal::PEEK)
>> + .onAny([fd]() {
>> + os::close(fd);
>> + });
>> }
>>
>>
>> namespace internal {
>>
>> -Future<string> _read(
>> - int fd,
>> - const std::shared_ptr<string>& buffer,
>> - const boost::shared_array<char>& data,
>> - size_t length)
>> -{
>> - return io::read(fd, data.get(), length)
>> - .then([=](size_t size) -> Future<string> {
>> - if (size == 0) { // EOF.
>> - return string(*buffer);
>> - }
>> - buffer->append(data.get(), size);
>> - return _read(fd, buffer, data, length);
>> - });
>> -}
>> -
>> -
>> Future<Nothing> splice(
>> int from,
>> int to,
>> @@ -392,9 +319,21 @@ Future<string> read(int fd)
>> std::shared_ptr<string> buffer(new string());
>> boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]);
>>
>> - // NOTE: We wrap `os::close` in a lambda to disambiguate on Windows.
>> - return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE)
>> - .onAny([fd]() { os::close(fd); });
>> + return loop(
>> + None(),
>> + [=]() {
>> + return io::read(fd, data.get(), BUFFERED_READ_SIZE);
>> + },
>> + [=](size_t length) -> ControlFlow<string> {
>> + if (length == 0) { // EOF.
>> + return Break(std::move(*buffer));
>> + }
>> + buffer->append(data.get(), length);
>> + return Continue();
>> + })
>> + .onAny([fd]() {
>> + os::close(fd);
>> + });
>> }
>>
>>
>>
>>
>