Hi all, I am trying to implement a simple architecture where various backend servers gather data (on Android devices) and push them to the subscribers. In the backend the data gatherer runs is a separate thread from the server. My current issue is when messages are sent to the subscriber, they are not received on the client side. I am not sure if I need to wait on the promise that is returned by the send() call in order to actually place the message on the wire or maybe the problem is completely unrelated.
First, I've tried using executor->executeSync(), from the gatherer thread, to the send the messages, but then I can't wait on the promise because the code runs in an event callback. I've also tried to start a kj event loop in the gatherer thread and publish the messages from there, but then when I try to wait I get the following exception " Nothing to wait for; this thread would hang forever." Please find the relevant code samples below: *test.capnp* @0xf84f5237f35aec13; interface Proxy { getBackend @0 () -> (backend: Backend); } interface Backend { consume @0 (msg :Message); subscribe @1 (receiver :Receiver); } interface Receiver { receive @0 (msg: Message); } struct Message { body @0 :Data; } *backend.cpp* class BackendImpl final: public Backend::Server { private: bool first_received; bool receiver_ready; unsigned recv_count; unsigned count; unsigned size; chrono::time_point<chrono::high_resolution_clock> begin; Receiver::Client receiver; public: BackendImpl(unsigned cnt): first_received(false), receiver_ready(false), count(cnt), receiver(nullptr) {}; kj::Promise<void> consume(ConsumeContext context) override { //... Benchmark tests are omitted return kj::READY_NOW; } kj::Promise<void> subscribe(SubscribeContext context) override { this->receiver = context.getParams().getReceiver(); cout << "Receiver received" << endl; this->receiver_ready = true; return kj::READY_NOW; } //bool send(int cnt, int size, kj::WaitScope& ws) { // I have tried passing the wait scope bool send(int cnt, int size) { if (!this->receiver_ready) { return false; } this->receiver_ready = false; cout << "Sending data" << endl; unsigned char* msg = new unsigned char[size]; memset(msg, 'a', size); for (int i = 0; i < this->count; i++) { auto request = this->receiver.receiveRequest(); request.getMsg().setBody(capnp::Data::Reader(msg, size)); auto promise = request.send(); //auto promise = request.send().wait(ws); auto res = promise.then([](capnp::Response<Receiver::ReceiveResults>&& result){ cout << result.totalSize().wordCount << endl << flush; // This never gets called return result.totalSize().wordCount; }); } delete[] msg; cout << "Sending done" << endl << flush; return true; } }; BackendImpl* backend; void data_pusher(const kj::Executor* executor, int size, int cnt) { //kj::EventLoop loop; //loop.run(); //kj::WaitScope waitScope(loop); while (executor->isLive()) { this_thread::sleep_for(1s); //backend->send(cnt, size, waitScope); executor->executeSync( [backend=backend, cnt=cnt, size=size] { backend->send(cnt, size); }); } } int main(int argc, const char* argv[]) { auto be = kj::heap<BackendImpl>(10000); backend = be.get(); // There must be a sexier way to do this // Set up a server. capnp::EzRpcServer server(kj::mv(be), "127.0.0.1:11223"); waitScope = &server.getWaitScope(); const kj::Executor& executor = kj::getCurrentThreadExecutor(); std::thread thr(data_pusher, &executor, 4096, 10000); std::cout << "Backend Ready" << std::endl; // Run forever, accepting connections and handling requests. kj::NEVER_DONE.wait(*waitScope); thr.join(); } *client.py* capnp.remove_import_hook() a = capnp.load('./test.capnp') class ReceiverImpl(a.Receiver.Server): def __init__(self, count): self.count = count self.has_received = False self.recv_cnt = 0 def receive(self, msg, _context, **kwargs): print("We get called, yay!") #... benchmarking code omitted def direct_subscribe(): client = capnp.TwoPartyClient('127.0.0.1:11223') backend = client.bootstrap().cast_as(a.Backend) #backend = proxy.getBackend().backend receiver = ReceiverImpl(10000) backend.subscribe(receiver).wait() while receiver.recv_cnt < 10000: time.sleep(1) direct_subscribe() When I execute this code, without any wait() calls in the publisher, the sending immediately completes async, but the receive() function of the receiver never gets called. Thank you for your time and assistance. -Gym -- You received this message because you are subscribed to the Google Groups "Cap'n Proto" group. To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/capnproto/3e34a5bb-4903-40c5-9012-582935fd27b7n%40googlegroups.com.