Hey, sorry for the delayed response, it's been a really busy time.

And TBH I'm not sure I really have an answer.

The Windows async bindings are definitely very very different from the Unix
ones, since Windows uses overlapped I/O and Unix uses readiness I/O and,
well, they work totally differently. KJ tries to abstract above both but it
doesn't surprise me at all if the behavior is slightly different,
especially in error and cancellation cases.

Unfortunately the Windows code doesn't get anywhere near as much usage as
the Unix code. Like, extremely so... Cloudflare Workers handles millions of
HTTP requests per second using KJ on Linux but the only time I ever run the
Windows code is literally in Cap'n Proto's CI tests. So while I can say the
Unix code is extremely battle-tested, the Windows code could easily have
lots of edge case bugs.

If you think something is behaving wrong and can narrow it down to a
self-contained unit test, I could try debugging.

-Kenton

On Thu, Jul 23, 2020 at 10:25 AM Vitali Lovich <vlov...@gmail.com> wrote:

> The paf.promise.fork I think was copy-paste error trying to follow what
> TwoPartyServer is doing.
>
> So here's something interesting. When I close one of the remote peers to
> KJ (remote peer for stream2 in this example), none of the error handlers
> even run. If I joinPromises then on Windows the shutdown is never
> processed. On Android there's a kj::Disconnected exception raised which is
> a weird platform difference (either the Windows backend has a bug or it's
> just an inevitable OS difference that can't be abstracted properly).
>
> Here's the KJ loop:
>
> kj::Canceler stillRunning;
>
> auto stream1 = ioContext.lowLevelProvider->wrapSocketFd(
>             rawSocket, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
>
> try {
> stillRunning.wrap(listener->accept().then([&](kj::Own<kj::AsyncIoStream>&&
> stream2) mutable {
>   kj::Vector<kj::Promise<void>> pumped;
>   pumped.add(stream1->pumpTo(stream2)
>     .ignoreResult()
>     .then(
>       [] { std::cerr << "stream1 finished pumping" << std::endl; },
>       [] (kj::Exception&& e) { std::cerr << "stream1 pumping error" <<
> std::endl; }));
>   pumped.add(stream2->pumpTo(stream1)
>     .ignoreResult()
>     .then(
>       [] { std::cerr << "stream2 finished pumping" << std::endl; },
>       [] (kj::Exception&& e) { std::cerr << "stream2 pumping error" <<
> std::endl; }));
>   return kj::joinPromises(pumped.releaseAsArray())
>     .attach(kj::mv(stream1), kj::mv(stream2));
> })).wait(waitScope);
> } catch (const kj::Exception& e) {
>   std::cerr << "Bridge failed: " << kj::str(e) << std::endl;
> }
>
> My test code (which runs on the main thread whereas the kj code runs in
> its own dedicated thread) connects 2 blocking sockets manually using
> regular blocking BSD sockets to each socket (i.e. the peers for stream1 and
> stream2). I then send 1 MB via the other endpoint of stream2. Then I
> shutdown that socket with SD_BOTH. Then I recv the 1 MB with MSG_WAITALL on
> the other side of stream1. That seems to work OK. Then I try to recv a
> second time on the expectation that one of the pumps would have detected
> some problem somewhere but I run into the situation outlined initially.
>
> Am I doing something wrong/have incorrect expectations on the error
> handlers for the pump?
>
> I haven't yet tried this code on stock Linux/macos.
>
> Appreciate any insights you may have! I'm thinking in the short term I'll
> just use `exclusiveJoin` because I'm thinking there's no world where any of
> the handlers for pumping between two network sockets would ever end up
> getting called.
>
> On Mon, Jul 13, 2020 at 5:01 PM Kenton Varda <ken...@cloudflare.com>
> wrote:
>
>> Hmm I don't understand what you're doing with paf.promise.fork(), it
>> looks like you're merely attaching it to another promise, not actually
>> waiting on it or anything.
>>
>> I am a little concerned about your use of shutdownWrite() / abortRead()
>> in a way that assumes these functions will cause any concurrent write or
>> read to throw an exception. That actually isn't quite the intent here.
>> These methods are intended to inform the *other side* of the connection
>> that the connection has ended; the weren't designed to cancel operations on
>> your own end. I think that the way they are implemented for native streams
>> today might produce the behavior you want mostly by accident.
>>
>> The "proper" thing to do would be to cancel any concurrent operation
>> first by destroying the associated promise. So before calling abortRead(),
>> make sure to destroy any promises that represent a read() operation.
>>
>> Unfortunately, this brings us back to the problem with joinPromises().
>> What you really want is for joinPromises() to fail fast if either branch
>> fails, but otherwise wait for both to finish.
>>
>> I guess one rather-ugly way you could do that today is to create a
>> PromiseAndFulfiller for cancellation purposes. Use
>> joinPromises().exclusiveJoin(paf.promise). Then reject the fulfiller if you
>> catch an exception on either side. But that's pretty ugly.
>>
>> We need a joinFailfast(), I think...
>>
>> -Kenton
>>
>> On Mon, Jul 6, 2020 at 10:13 AM Vitali Lovich <vlov...@gmail.com> wrote:
>>
>>>
>>>
>>> On Mon, Jul 6, 2020 at 7:41 AM Vitali Lovich <vlov...@gmail.com> wrote:
>>>
>>>>
>>>> On Mon, Jul 6, 2020 at 7:26 AM Kenton Varda <ken...@cloudflare.com>
>>>> wrote:
>>>>
>>>>> Almost. Two issues I can think of:
>>>>>
>>>>> 1. After each pump completes, you probably want to call
>>>>> shutdownWrite() to propagate the EOF.
>>>>> 2. `joinPromises()` always waits for all promises to resolve. I've
>>>>> often found that this turns out to be the wrong behavior when one of the
>>>>> joined promises throws an exception. Usually you want the other tasks
>>>>> canceled in that case. I think that might be the case here -- if you get 
>>>>> an
>>>>> I/O error in one direction, you probably want to kill the whole stream.
>>>>> Then again, probably that'll happen anyway in most cases. (Whereas, EOF is
>>>>> not an error, so you do want to wait for the other promise in that case.)
>>>>>
>>>>
>>>> So more like this?
>>>>
>>>> return stream1.pumpTo(stream2).ignoreResult().then(
>>>>   [&] {stream2.shutdownWrite()},
>>>>   [&](kj::Exception&& e){
>>>>     stream1.shutdownWrite();
>>>>     stream1.abortRead();
>>>>     stream2.shutdownWrite();
>>>>     stream2.abortRead();
>>>>   })).exclusiveJoin(
>>>>     stream2.pumpTo(stream1).ignoreResult().then(
>>>>       [&] {stream1.shutdownWrite()},
>>>>       [&](kj::Exception&& e){
>>>>         stream1.shutdownWrite();
>>>>         stream1.abortRead();
>>>>         stream2.shutdownWrite();
>>>>         stream2.abortRead();
>>>>       }));
>>>>
>>> Actually, I think this is two hand-wavy. Also I think the original
>>> inclusive join is actually correct because I want to ensure that both sides
>>> finish any I/O that may be in flight. Otherwise I may end the stream
>>> prematurely just because 1 end finished (e.g. 1 end sends some data & then
>>> closes because its done - the peer won't receive all the data).
>>>
>>> My current code looks something like:
>>>
>>> void completelyClose(kj::AsyncIoStream& stream) {
>>>     stream.shutdownWrite();
>>>     stream.abortRead();
>>> };
>>>
>>> kj::Canceler stillRunning;
>>> auto stream1 = ioContext.lowLevelProvider->wrapSocketFd(
>>>             rawSocket, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
>>>
>>> stillRunning.wrap(listener->accept().then([&](kj::Own<kj::AsyncIoStream>&&
>>> stream2) mutable {
>>>   auto paf = kj::newPromiseAndFulfiller<void>();
>>>   auto unsafeStream2 = stream2.get();
>>>
>>>   kj::Vector<kj::Promise<void>> pumped;
>>>   pumped.add(stream1->pumpTo(stream2)
>>>     .ignoreResult()
>>>     .then(
>>>       [stream2 = stream2.get()] { stream2->shutdownWrite(); },
>>>       [&stream1, stream2 = stream2.get()] (kj::Exception&& e) {
>>>         completelyClose(*stream1);
>>>         completelyClose(*stream2);
>>>       });
>>>   pumped.add(unsafeStream2->pumpTo(stream1)
>>>     .ignoreResult()
>>>     .then(
>>>       [&stream1] { stream1->shutdownWrite(); },
>>>       [&stream1, stream2 = unsafeStream2] (kj::Exception&& e) {
>>>         completelyClose(*stream1);
>>>               completelyClose(*stream2);
>>>       }));
>>>   return kj::joinPromises(pumped.releaseAsArray())
>>>     .attach(
>>>       paf.promise.fork(),
>>>       // AcceptedConnection simply fulfills on destruction.
>>>       kj::heap<AcceptedConnection>(kj::mv(stream2),
>>> kj::mv(paf.fulfiller)),
>>>     );
>>> })).wait(waitScope);
>>>
>>> The fulfiller stuff is another place I'm pretty sure I haven't done
>>> right. I was just going off of what's happening under the hood when you
>>> wait on the promise that TwoPartyServer returns when it listens.
>>>
>>>
>>>>
>>>>> On another note, a warning: I'm likely to change the AsyncOutputStream
>>>>> interface significantly in the future, in order to replace
>>>>> `shutdownWrite()` with something that returns a promise, and to make it so
>>>>> that if you don't explicitly shut down a stream, then it's treated as an
>>>>> error. Currently, AsyncOutputStream's destructor implicitly sends a clean
>>>>> EOF, but that's the wrong thing to do when the sender terminated
>>>>> prematurely due to an exception. So, your code will need some updating 
>>>>> when
>>>>> that happens.
>>>>>
>>>>
>>>> Yeah, as long as it's a strict compilation error if there's changes
>>>> that need to be made to make previously correct work correct again, that's
>>>> fine (+ maybe documentation in the release notes on how to migrate). Can
>>>> just treat that as the cost of updating to a new version of the library.
>>>> More challenging would be any implicit behavioural changes that don't
>>>> change the API. Those can be harder to catch even with unit tests.
>>>>
>>>> Also while you're at it, does it make sense to have 1 function that
>>>> does both shutdownWrite/abortRead so that if both are returning futures,
>>>> joining them correctly can be hidden as an implementation detail? I'm
>>>> finding the promises stuff to be straightforward for simple cases but
>>>> anything more custom is harder to reason about in terms of making sure they
>>>> compose correctly - I never feel quite comfortable if I've written the
>>>> promise code correctly (especially with all the different types of
>>>> promises, Canceler & TaskSet). Hopefully it's easy to integrate C++20
>>>> coroutine support to make things read more linearly again. Not sure when
>>>> I'll get to use C++20 though.
>>>>
>>>>
>>>>>
>>>>> -Kenton
>>>>>
>>>>> On Sun, Jul 5, 2020 at 8:13 PM Vitali Lovich <vlov...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I was wondering what would be the best way to bridge 2
>>>>>> kj::AsyncIoStreams to each other (read to write/write to read) so that 
>>>>>> they
>>>>>> act as pass-through? I'm assuming something like:
>>>>>>
>>>>>> auto pumped = kj::ArrayBuilder<kj::Promise<void>>(2);
>>>>>> pumped.add(stream1.pumpTo(stream2).ignoreResult());
>>>>>> pumped.add(stream2.pumpTo(stream1).ignoreResult());
>>>>>> return kj::joinPromises(pumped.finish()).ignoreResult();
>>>>>>
>>>>>> Thanks,
>>>>>> Vitali
>>>>>>
>>>>>> --
>>>>>> You received this message because you are subscribed to the Google
>>>>>> Groups "Cap'n Proto" group.
>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>> send an email to capnproto+unsubscr...@googlegroups.com.
>>>>>> To view this discussion on the web visit
>>>>>> https://groups.google.com/d/msgid/capnproto/CAF8PYMh%3DKr9Yzmz9on4Cxprb0irNOGpV0MUtBxdGitbOgkjiEg%40mail.gmail.com
>>>>>> <https://groups.google.com/d/msgid/capnproto/CAF8PYMh%3DKr9Yzmz9on4Cxprb0irNOGpV0MUtBxdGitbOgkjiEg%40mail.gmail.com?utm_medium=email&utm_source=footer>
>>>>>> .
>>>>>>
>>>>>

-- 
You received this message because you are subscribed to the Google Groups 
"Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to capnproto+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/capnproto/CAJouXQmxN8py12QOTDarw1KBsjFKjCUOou4j7FhDitMkOsVz2w%40mail.gmail.com.

Reply via email to