Hi all,

I am trying to implement a simple architecture where various backend 
servers gather data (on Android devices) and push them to the subscribers. 
In the backend the data gatherer runs is a separate thread from the server. 
My current issue is when messages are sent to the subscriber, they are not 
received on the client side. I am not sure if I need to wait on the promise 
that is returned by the send() call in order to actually place the message 
on the wire or maybe the problem is completely unrelated.

First, I've tried using executor->executeSync(), from the gatherer thread, 
to the send the messages, but then I can't wait on the promise because the 
code runs in an event callback.
I've also tried to start a kj event loop in the gatherer thread and publish 
the messages from there, but then when  I try to wait I get the following 
exception " Nothing to wait for; this thread would hang forever."
Please find the relevant code samples below:

*test.capnp*

@0xf84f5237f35aec13;

interface Proxy {
   getBackend @0 () -> (backend: Backend);
}

interface Backend {
   consume @0 (msg :Message);
   subscribe @1 (receiver :Receiver);
}

interface Receiver {
   receive @0 (msg: Message);
}

struct Message {
   body @0 :Data;
}

*backend.cpp*

class BackendImpl final: public Backend::Server {
    private:
        bool first_received;
        bool receiver_ready;
        unsigned recv_count;
        unsigned count;
        unsigned size;
        chrono::time_point<chrono::high_resolution_clock> begin;
        Receiver::Client receiver;
    
    public:
        BackendImpl(unsigned cnt): first_received(false), 
receiver_ready(false), count(cnt), receiver(nullptr) {};
        kj::Promise<void> consume(ConsumeContext context) override {
            //... Benchmark tests are omitted
            return kj::READY_NOW;
        }

        kj::Promise<void> subscribe(SubscribeContext context) override {
            this->receiver = context.getParams().getReceiver();
            cout << "Receiver received" << endl;
            this->receiver_ready = true;
            return kj::READY_NOW;
        }

        //bool send(int cnt, int size, kj::WaitScope& ws) { // I have tried 
passing the wait scope
        bool send(int cnt, int size) {
            if (!this->receiver_ready) {
                return false;
            }
            this->receiver_ready = false;
            cout << "Sending data" << endl;
            unsigned char* msg = new unsigned char[size];
            memset(msg, 'a', size);


            for (int i = 0; i < this->count; i++) {
                
                auto request = this->receiver.receiveRequest();
                request.getMsg().setBody(capnp::Data::Reader(msg, size));
                auto promise = request.send();
                //auto promise = request.send().wait(ws);

                auto res = 
promise.then([](capnp::Response<Receiver::ReceiveResults>&& result){
                    cout << result.totalSize().wordCount << endl << flush; 
// This never gets called
                    return result.totalSize().wordCount;
                    });

            }
            delete[] msg;
            cout << "Sending done" << endl << flush;
            return true;
        }
};

BackendImpl* backend;
void data_pusher(const kj::Executor* executor, int size, int cnt) {

    //kj::EventLoop loop;
    //loop.run();
    //kj::WaitScope waitScope(loop);
    while (executor->isLive()) {
        this_thread::sleep_for(1s);

        //backend->send(cnt, size, waitScope);

        executor->executeSync( [backend=backend, cnt=cnt, size=size] {
                backend->send(cnt, size);
            });
    }    
}

int main(int argc, const char* argv[]) {

  auto be = kj::heap<BackendImpl>(10000);
  backend = be.get(); // There must be a sexier way to do this
  
  // Set up a server.
  capnp::EzRpcServer server(kj::mv(be), "127.0.0.1:11223");

  waitScope = &server.getWaitScope();

  const kj::Executor& executor = kj::getCurrentThreadExecutor();
  std::thread thr(data_pusher, &executor, 4096, 10000);
  std::cout << "Backend Ready" << std::endl;

  // Run forever, accepting connections and handling requests.
  kj::NEVER_DONE.wait(*waitScope);
  thr.join();
}

*client.py*

capnp.remove_import_hook()
a = capnp.load('./test.capnp')

class ReceiverImpl(a.Receiver.Server):

    def __init__(self, count):
        self.count = count
        self.has_received = False
        self.recv_cnt = 0

    def receive(self, msg, _context, **kwargs):
        print("We get called, yay!")
        #... benchmarking code omitted

def direct_subscribe():

    client = capnp.TwoPartyClient('127.0.0.1:11223')
    backend = client.bootstrap().cast_as(a.Backend)
    #backend = proxy.getBackend().backend

    receiver = ReceiverImpl(10000)

    backend.subscribe(receiver).wait()

    while receiver.recv_cnt < 10000:
        time.sleep(1)

direct_subscribe()

When I execute this code, without any wait() calls in the publisher, the 
sending immediately completes async, but the receive() function of the 
receiver never gets called.

Thank you for your time and assistance.
-Gym

-- 
You received this message because you are subscribed to the Google Groups 
"Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to capnproto+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/capnproto/3e34a5bb-4903-40c5-9012-582935fd27b7n%40googlegroups.com.

Reply via email to