We are facing an issue where Next of a completion queue returns ok as false, if any other RPC is called. We need to implement a simple functionality of progress reporting of a long running RPC. So we need two RPCs 1. StartScan - to trigger the long running scan - runs in separate thread and its own completion queue 2. RegisterScanProgress - to register for the callback that reports status of the long running scan. - runs in separate thread and its own completion queue
If we call StartScan first, and then cqProgress_(ServerCompletionQueue for RegisterScanProgress) behaves as expected. though, if we call RegisterScanProgress first, then immediately after call to StartScan the Next of cqProgress_ returns ok as false. Need help to understand this behavior. As after calling RegisterScanProgress there could be call to any other RPC, and so we'd need its queue to keep functioning for progress reporting. It'll be of great help if someone helps us understand what we are missing. Proto looks like following service ScanService{ rpc StartScan (ScanRequest) returns (ScanReply) {} rpc RegisterScanProgress (ScanProgressRequest) returns (stream ScanProgressReply) {} } message ScanRequest{ string scanType = 1; } message ScanReply{ bool scanAlreadyInProgress = 1; } message ScanProgressRequest { } enum ScanStatus { NOT_STARTED = 0; STARTED = 1; IN_PROGRESS = 2; FINISHED_SUCCESS = 3; FINISHED_FAILED = 4; } message ScanProgressReply { ScanStatus status = 1; string details = 2; } this is how the server code looks class ServerImpl { public: ~ServerImpl() { server_->Shutdown(); // Always shutdown the completion queue after the server. cq_->Shutdown(); } void Run() { //... other server config code grpc::EnableDefaultHealthCheckService(true); grpc::reflection::InitProtoReflectionServerBuilderPlugin(); grpc::ServerBuilder builder; builder.AddListeningPort(server_address, serverCredentials); cq_ = builder.AddCompletionQueue(); cqProgress_ = builder.AddCompletionQueue(); // Use Keep-alive to stop initial slow calls builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 10000); builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 100000); builder.AddChannelArgument(GRPC_ARG_MAX_CONNECTION_IDLE_MS, 10000); builder.AddChannelArgument(GRPC_ARG_HTTP2_BDP_PROBE, 1); builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); // Finally assemble the server. server_ = (builder.BuildAndStart()); // kick start the queues for async RPCs HandleStartScan(cq_.get()); HandleRegisterScanProgress(cqProgress_.get()); progressOtherRpcs_ = std::thread(&ServerImpl::EventLoop, this, cq_.get()); progressReportThread_ = std::thread(&ServerImpl::EventLoop, this, cqProgress_.get()); server_->Wait(); } private: void HandleStartScan(ServerCompletionQueue* event_queue) { new StartScanCallData(&scanAsync_, event_queue); } void HandleRegisterScanProgress(ServerCompletionQueue* event_queue) { new RegisterScanProgressCallData(&scanAsync_, event_queue); } void EventLoop(ServerCompletionQueue* event_queue) { void* tag; // uniquely identifies a request. bool ok; while (event_queue->Next(&tag, &ok)) { IAsyncRpcDataAdapter* adapter = static_cast<IAsyncRpcDataAdapter*>(tag); if (ok) { adapter->Proceed(); } else { std::cout << "OK is false" << std::endl; continue; } } } std::unique_ptr<ServerCompletionQueue> cq_; std::unique_ptr<ServerCompletionQueue> cqProgress_; napa::Nvbackend::AsyncService scanAsync_; std::unique_ptr<Server> server_; std::thread progressReportThread_; std::thread progressOtherRpcs_; }; int main(int argc, char** argv) { ServerImpl server; server.Run(); return 0; } and this is how the service code looks enum CallStatus { CREATE, PROCESS, FINISH, PUSH_TO_BACK }; // interface for service to handle the async RPC // every RPC will need to implement this interface, so that GRPC server can // call RPC without knowing its type. class IAsyncRpcDataAdapter { public: virtual void Proceed() = 0; virtual ~IAsyncRpcDataAdapter() = default; protected: IAsyncRpcDataAdapter(ScanService* service, ServerCompletionQueue* cq) : scanAsync_(service) , cq_(cq) , status_(CREATE) { } ScanService* scanAsync_; ServerCompletionQueue* cq_; ServerContext ctx_; CallStatus status_; // The current serving state. grpc::Alarm alarm_; }; class StartScanCallData : public IAsyncRpcDataAdapter { public: StartScanCallData(ScanService* service, ServerCompletionQueue* cq); void Proceed() override; private: ScanRequest request_; ScanReply reply_; ServerAsyncResponseWriter<ScanReply> responder_; grpc::Status StartScan(grpc::ServerContext *context, const ScanRequest *request, ScanReply *reply); }; class RegisterScanProgressCallData : public IAsyncRpcDataAdapter { public: RegisterScanProgressCallData(ScanService* service, ServerCompletionQueue* cq); void Proceed() override; private: ScanProgressRequest request_; ScanProgressReply reply_; ServerAsyncWriter<ScanProgressReply> responder_; void ProgressReport(); void waitForScanEvent(); std::unique_ptr<std::promise<AppScanStatus>> scanPromise_; }; -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/46b73b0d-ac81-4278-aa5f-8b721a86b0d8n%40googlegroups.com.