I rewrote my queue for newCrossThreadPromiseAndFulfiller, after installing
Cap'n proto from git, but now Python no longer works :(
I also installed pycapnp from git, but am getting the following error:
undefined symbol: _ZN2kj1_24TransformPromiseNodeBase16getInnerForTraceEv

Cheers,
Pepijn

template<class T>
class AsyncQueue : public kj::AtomicRefcounted
{
    public:
    AsyncQueue() : paf(kj::newCrossThreadPromiseAndFulfiller<T>()) {}

    kj::Promise<T> pop() {
        kj::Locked<std::queue<T>> queue = mutex.lockExclusive();
        if (queue->empty()) {
            return kj::mv(paf.promise);
        } else {
            T val = queue->front();
            queue->pop();
            return val;
        }
    }

    void push(T val) {
        kj::Locked<std::queue<T>> queue = mutex.lockExclusive();
        if(this->paf.fulfiller->isWaiting()) {
            this->paf.fulfiller->fulfill(T(val));
            paf = kj::newCrossThreadPromiseAndFulfiller<T>();
        } else {
            queue->push(val);
        }
    }
    kj::MutexGuarded<std::queue<T>> mutex;
    kj::PromiseFulfillerPair<T> paf;
};

On Fri, Mar 19, 2021 at 7:58 PM pepijn de vos <[email protected]> wrote:

> Thanks a lot, I'll look into the level 2 stuff.
>
> newCrossThreadPromiseAndFulfiller looks perfect for what I'm trying to do.
> I actually wanted more of a queue abstraction, which doesn't seem to be
> there AFAICT, so I'm trying to make one.
>
> The basic idea is that when the consumer tries to pop from an empty queue
> it creates a PromiseAndFulfiller. When the producer pushes a new value it
> uses an executor to do so on the event loop thread of the consumer, and
> then checks if the consumer is waiting, if so fulfills the promise, else
> just pushes onto the queue. It looks like once I get a version with the new
> cross API, I can use that. For now I have something like this. Happy to
> make a PR once it works and if you think it's useful.
>
> Pepijn
>
> template<class T>
> class AsyncQueue
> {
>     public:
>     AsyncQueue() : paf(kj::newPromiseAndFulfiller<T>()),
> exec(kj::getCurrentThreadExecutor()) {}
>
>     kj::Promise<T> pop() {
>         if (queue.empty()) {
>             return kj::mv(paf.promise);
>         } else {
>             T val = queue.front();
>             queue.pop();
>             return val;
>         }
>     }
>
>     void push(T val) {
>         exec.executeSync([val, this]() {
>             if(this->paf.fulfiller->isWaiting()) {
>                 this->paf.fulfiller->fulfill(T(val));
>                 paf = kj::newPromiseAndFulfiller<T>();
>             } else {
>                 this->queue.push(val);
>             }
>         });
>     }
>     std::queue<T> queue;
>     kj::PromiseFulfillerPair<T> paf;
>     const kj::Executor &exec;
> };
>
> On Fri, Mar 19, 2021 at 7:49 PM Kenton Varda <[email protected]>
> wrote:
>
>> Yes, Executor is a good way to communicate between threads. FWIW you
>> might also check out newCrossThreadPromiseAndFulfiller(), which was added
>> very recently. Sometimes it's a better fit than kj::Executor.
>>
>> "Level 2" turns out to be something that can't really be built into
>> libcapnp itself because the design really depends on the execution
>> environment in which your servers run. A system for saving capabilities and
>> restoring them later needs to understand how to connect to -- and possibly
>> even start up -- the appropriate server and storage system. So, for
>> example, Sandstorm.io has implemented level 2 of the protocol in a way that
>> is appropriate for it, but there ends up being not much that libcapnp
>> itself ca ndo to help. You can take a look at persistent.capnp to see a
>> suggestion for how to structure a level 2 implementation, but it's not much
>> more than a suggestion.
>>
>> Ultimately, it's really up to you to come up with the right way for a
>> client to request a token representing a particular running simulation, and
>> then be able to connect back to that simulation later, probably by
>> presenting the token to another RPC service.
>>
>> -Kenton
>>
>> On Fri, Mar 19, 2021 at 10:10 AM [email protected] <
>> [email protected]> wrote:
>>
>>> I think I figured out the first two problems.
>>> Multiple inheritance worked out quite nicely, only downside is each
>>> simulator needs to be defined explicitly.
>>> I think I found the correct way to do threading, which seems to be to
>>> use kj::getCurrentThreadExecutor() to get a way to schedule callbacks
>>> on the eventloop thread.
>>> So I guess you'd pass a promise to the thread and then fulfill it using
>>> the executor.
>>> https://github.com/capnproto/capnproto/blob/master/kjdoc/tour.md#threads
>>> As with most of these things, they are quite puzzling at first but
>>> actually quite nice once you figure them out.
>>>
>>> So the only problem left is how to handle disconnects in extremely long
>>> running processes.
>>>
>>> Cheers,
>>> Pepijn
>>> On Thursday, 18 March 2021 at 12:18:50 UTC+1 [email protected] wrote:
>>>
>>>> Hey,
>>>>
>>>> I'm designing an RPC server for simulators that can run several
>>>> different long-running commands, and I have a few design questions.
>>>>
>>>> My current design is as follows.
>>>> Upon loading files to simulate, the simulator returns a list of unions
>>>> of interfaces, representing all the simulation commands it supports.
>>>> Upon calling one of the commands, a reader interface is returned that
>>>> allows streaming simulation results.
>>>> This currently happens in-thread, so reading big chunks will block the
>>>> entire server.
>>>>
>>>> So first question is, is there a nicer way to represent a thing that
>>>> can implement a subset of functions? I could just define a mega interface
>>>> and not implement methods, but then the issue is how to discover which
>>>> methods the server implements. Or maybe the correct approach is to use
>>>> multiple inheritance from the smaller interfaces to concrete
>>>> implementations?
>>>>
>>>> The next question is how to offload the simulation to a thread? I
>>>> assumed this would be a very common task but I can't find much in the docs.
>>>> I found KJ::Thread or something like that, but it's not clear to me how to
>>>> tie that into the event loop promise API.
>>>>
>>>> Final issue I'm thinking about, for *very* long running simulations,
>>>> the client disconnecting in the middle of simulation that takes days or
>>>> weeks becomes a real concern. This is basically level 2 of
>>>> https://capnproto.org/rpc.html#protocol-features but as far as I
>>>> understand C++ is only level 1. What would be a good way to go about things
>>>> here? If level 2 is just around the corner, I can just ignore the issue for
>>>> a while, but maybe I need to manually store simulator references outside
>>>> the connection and hand out tokens to it?
>>>>
>>>> Regards,
>>>> Pepijn
>>>>
>>> --
>>> You received this message because you are subscribed to the Google
>>> Groups "Cap'n Proto" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> To view this discussion on the web visit
>>> https://groups.google.com/d/msgid/capnproto/8fe36a3a-88dd-4761-9fa9-48ba90a1da0fn%40googlegroups.com
>>> <https://groups.google.com/d/msgid/capnproto/8fe36a3a-88dd-4761-9fa9-48ba90a1da0fn%40googlegroups.com?utm_medium=email&utm_source=footer>
>>> .
>>>
>>

-- 
You received this message because you are subscribed to the Google Groups 
"Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/capnproto/CANPfQgvR47w72GGqLZ5CJSEWjz7Bc55xFjhEycxsJ_uLDMCQtQ%40mail.gmail.com.

Reply via email to