In your send function you should 
request.send().detach([&](kj::Exception&& error) { /*handle returned 
error*/; });
If you don't the promise is destructed and the call gets cancelled.

On Friday, May 6, 2022 at 1:22:50 PM UTC+2 Gyorgy Miru wrote:

> Hi all,
>
> I am trying to implement a simple architecture where various backend 
> servers gather data (on Android devices) and push them to the subscribers. 
> In the backend the data gatherer runs is a separate thread from the server. 
> My current issue is when messages are sent to the subscriber, they are not 
> received on the client side. I am not sure if I need to wait on the promise 
> that is returned by the send() call in order to actually place the message 
> on the wire or maybe the problem is completely unrelated.
>
> First, I've tried using executor->executeSync(), from the gatherer 
> thread, to the send the messages, but then I can't wait on the promise 
> because the code runs in an event callback.
> I've also tried to start a kj event loop in the gatherer thread and 
> publish the messages from there, but then when  I try to wait I get the 
> following exception " Nothing to wait for; this thread would hang forever."
> Please find the relevant code samples below:
>
> *test.capnp*
>
> @0xf84f5237f35aec13;
>
> interface Proxy {
>    getBackend @0 () -> (backend: Backend);
> }
>
> interface Backend {
>    consume @0 (msg :Message);
>    subscribe @1 (receiver :Receiver);
> }
>
> interface Receiver {
>    receive @0 (msg: Message);
> }
>
> struct Message {
>    body @0 :Data;
> }
>
> *backend.cpp*
>
> class BackendImpl final: public Backend::Server {
>     private:
>         bool first_received;
>         bool receiver_ready;
>         unsigned recv_count;
>         unsigned count;
>         unsigned size;
>         chrono::time_point<chrono::high_resolution_clock> begin;
>         Receiver::Client receiver;
>     
>     public:
>         BackendImpl(unsigned cnt): first_received(false), 
> receiver_ready(false), count(cnt), receiver(nullptr) {};
>         kj::Promise<void> consume(ConsumeContext context) override {
>             //... Benchmark tests are omitted
>             return kj::READY_NOW;
>         }
>
>         kj::Promise<void> subscribe(SubscribeContext context) override {
>             this->receiver = context.getParams().getReceiver();
>             cout << "Receiver received" << endl;
>             this->receiver_ready = true;
>             return kj::READY_NOW;
>         }
>
>         //bool send(int cnt, int size, kj::WaitScope& ws) { // I have 
> tried passing the wait scope
>         bool send(int cnt, int size) {
>             if (!this->receiver_ready) {
>                 return false;
>             }
>             this->receiver_ready = false;
>             cout << "Sending data" << endl;
>             unsigned char* msg = new unsigned char[size];
>             memset(msg, 'a', size);
>
>
>             for (int i = 0; i < this->count; i++) {
>                 
>                 auto request = this->receiver.receiveRequest();
>                 request.getMsg().setBody(capnp::Data::Reader(msg, size));
>                 auto promise = request.send();
>                 //auto promise = request.send().wait(ws);
>
>                 auto res = 
> promise.then([](capnp::Response<Receiver::ReceiveResults>&& result){
>                     cout << result.totalSize().wordCount << endl << flush; 
> // This never gets called
>                     return result.totalSize().wordCount;
>                     });
>
>             }
>             delete[] msg;
>             cout << "Sending done" << endl << flush;
>             return true;
>         }
> };
>
> BackendImpl* backend;
> void data_pusher(const kj::Executor* executor, int size, int cnt) {
>
>     //kj::EventLoop loop;
>     //loop.run();
>     //kj::WaitScope waitScope(loop);
>     while (executor->isLive()) {
>         this_thread::sleep_for(1s);
>
>         //backend->send(cnt, size, waitScope);
>
>         executor->executeSync( [backend=backend, cnt=cnt, size=size] {
>                 backend->send(cnt, size);
>             });
>     }    
> }
>
> int main(int argc, const char* argv[]) {
>
>   auto be = kj::heap<BackendImpl>(10000);
>   backend = be.get(); // There must be a sexier way to do this
>   
>   // Set up a server.
>   capnp::EzRpcServer server(kj::mv(be), "127.0.0.1:11223");
>
>   waitScope = &server.getWaitScope();
>
>   const kj::Executor& executor = kj::getCurrentThreadExecutor();
>   std::thread thr(data_pusher, &executor, 4096, 10000);
>   std::cout << "Backend Ready" << std::endl;
>
>   // Run forever, accepting connections and handling requests.
>   kj::NEVER_DONE.wait(*waitScope);
>   thr.join();
> }
>
> *client.py*
>
> capnp.remove_import_hook()
> a = capnp.load('./test.capnp')
>
> class ReceiverImpl(a.Receiver.Server):
>
>     def __init__(self, count):
>         self.count = count
>         self.has_received = False
>         self.recv_cnt = 0
>
>     def receive(self, msg, _context, **kwargs):
>         print("We get called, yay!")
>         #... benchmarking code omitted
>
> def direct_subscribe():
>
>     client = capnp.TwoPartyClient('127.0.0.1:11223')
>     backend = client.bootstrap().cast_as(a.Backend)
>     #backend = proxy.getBackend().backend
>
>     receiver = ReceiverImpl(10000)
>
>     backend.subscribe(receiver).wait()
>
>     while receiver.recv_cnt < 10000:
>         time.sleep(1)
>
> direct_subscribe()
>
> When I execute this code, without any wait() calls in the publisher, the 
> sending immediately completes async, but the receive() function of the 
> receiver never gets called.
>
> Thank you for your time and assistance.
> -Gym
>

-- 
You received this message because you are subscribed to the Google Groups 
"Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to capnproto+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/capnproto/7e9ba9d1-6804-4cd5-bf4c-857dafd122c8n%40googlegroups.com.

Reply via email to