We are facing an issue where Next of a completion queue returns ok as 
false, if any other RPC is called.
We need to implement a simple functionality of progress reporting of a long 
running RPC. So we need two RPCs
1. StartScan - to trigger the long running scan - runs in separate thread 
and its own completion queue
2. RegisterScanProgress  - to register for the callback that reports status 
of the long running scan.  - runs in separate thread and its own completion 
queue

If we call  StartScan  first, and then cqProgress_(ServerCompletionQueue 
for RegisterScanProgress) behaves as expected. though, if we call 
RegisterScanProgress  first, then immediately after call to StartScan the 
Next of cqProgress_  returns ok as false.
Need help to understand this behavior. As after calling 
RegisterScanProgress there could be call to any other RPC, and so we'd need 
its queue to keep functioning for progress reporting.
It'll be of great help if someone helps us understand what we are missing.

Proto looks like following
service ScanService{
    rpc StartScan (ScanRequest) returns (ScanReply) {}
    rpc RegisterScanProgress (ScanProgressRequest) returns (stream 
ScanProgressReply) {}
}
message ScanRequest{
  string scanType = 1;
}
message ScanReply{
  bool scanAlreadyInProgress  = 1;
}
message ScanProgressRequest {
}
enum ScanStatus
{
  NOT_STARTED = 0;
  STARTED     = 1;
  IN_PROGRESS = 2;
  FINISHED_SUCCESS  = 3;
  FINISHED_FAILED   = 4;
}
message ScanProgressReply {
    ScanStatus status = 1;
    string details = 2;
}

this is how the server code looks

class ServerImpl
{
public:
  ~ServerImpl()
  {
    server_->Shutdown();
    // Always shutdown the completion queue after the server.
    cq_->Shutdown();
  }

  void Run()
  {
    //... other server config code
    grpc::EnableDefaultHealthCheckService(true);
    grpc::reflection::InitProtoReflectionServerBuilderPlugin();
    grpc::ServerBuilder builder;

    builder.AddListeningPort(server_address, serverCredentials);

    cq_ = builder.AddCompletionQueue();
    cqProgress_ = builder.AddCompletionQueue();

    // Use Keep-alive to stop initial slow calls
    builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 10000);
    builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 100000);
    builder.AddChannelArgument(GRPC_ARG_MAX_CONNECTION_IDLE_MS, 10000);
    builder.AddChannelArgument(GRPC_ARG_HTTP2_BDP_PROBE, 1);
    builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);

    // Finally assemble the server.
    server_ = (builder.BuildAndStart());

    // kick start the queues for async RPCs
    HandleStartScan(cq_.get());
    HandleRegisterScanProgress(cqProgress_.get());
    progressOtherRpcs_ = std::thread(&ServerImpl::EventLoop, this, 
cq_.get());
    progressReportThread_ = std::thread(&ServerImpl::EventLoop, this, 
cqProgress_.get());
    server_->Wait();
  }

private:
  void HandleStartScan(ServerCompletionQueue* event_queue)
  {
    new StartScanCallData(&scanAsync_, event_queue);
  }
  void HandleRegisterScanProgress(ServerCompletionQueue* event_queue)
  {
    new RegisterScanProgressCallData(&scanAsync_, event_queue);
  }
  void EventLoop(ServerCompletionQueue* event_queue)
  {
    void* tag; // uniquely identifies a request.
    bool ok;
    while (event_queue->Next(&tag, &ok))
    {
      IAsyncRpcDataAdapter* adapter = 
static_cast<IAsyncRpcDataAdapter*>(tag);
      if (ok)
      {
        adapter->Proceed();
      }
      else
      {
        std::cout << "OK is false" << std::endl;
        continue;
      }
    }
  }

  std::unique_ptr<ServerCompletionQueue> cq_;
  std::unique_ptr<ServerCompletionQueue> cqProgress_;
  napa::Nvbackend::AsyncService scanAsync_;
  std::unique_ptr<Server> server_;
  std::thread progressReportThread_;
  std::thread progressOtherRpcs_;
};

int main(int argc, char** argv)
{
  ServerImpl server;
  server.Run();
  return 0;
}

and this is how the service code looks

enum CallStatus
{
  CREATE,
  PROCESS,
  FINISH,
  PUSH_TO_BACK
};

// interface for service to handle the async RPC
// every RPC will need to implement this interface, so that GRPC server can
// call RPC without knowing its type.
class IAsyncRpcDataAdapter
{
public:
  virtual void Proceed() = 0;
  virtual ~IAsyncRpcDataAdapter() = default;

  protected:
    IAsyncRpcDataAdapter(ScanService* service, ServerCompletionQueue* cq)
      : scanAsync_(service)
      , cq_(cq)
      , status_(CREATE)
    {
    }
    ScanService* scanAsync_;
    ServerCompletionQueue* cq_;
    ServerContext ctx_;
    CallStatus status_; // The current serving state.
    grpc::Alarm alarm_;
};

class StartScanCallData : public IAsyncRpcDataAdapter
{
public:
  StartScanCallData(ScanService* service, ServerCompletionQueue* cq);
  void Proceed() override;

private:
  ScanRequest request_;
  ScanReply reply_;

  ServerAsyncResponseWriter<ScanReply> responder_;
  grpc::Status StartScan(grpc::ServerContext *context, const ScanRequest 
*request, ScanReply *reply);
};

class RegisterScanProgressCallData : public IAsyncRpcDataAdapter
{
public:
  RegisterScanProgressCallData(ScanService* service, ServerCompletionQueue* 
cq);
  void Proceed() override;

private:
  ScanProgressRequest request_;
  ScanProgressReply reply_;
  ServerAsyncWriter<ScanProgressReply> responder_;

  void ProgressReport();
  void waitForScanEvent();
  std::unique_ptr<std::promise<AppScanStatus>> scanPromise_;
};

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/46b73b0d-ac81-4278-aa5f-8b721a86b0d8n%40googlegroups.com.

Reply via email to