I have a gRPC service called `backend` with the following proto and I'm 
trying to chain together gRPC calls together. 

Let's say there are 3 servers: A, B and C. 

B is the primary and A and C are non-primaries.

The idea is that server A calls `ForwardAddUserToPrimary` to server B, 
which then calls `SendAddUserRequest` to A and C. Once B has received the 
responses from A and C, it then returns a response back to A.

**backend.proto:**

    syntax = "proto3";
    
    package backend;
    
    import "keyvaluestore.proto";
    
    service Backend {
      rpc ForwardAddUserToPrimary(keyvaluestore.Credentials) returns
      (keyvaluestore.Response) {}
      rpc SendAddUserRequest(keyvaluestore.Credentials) returns
      (keyvaluestore.Response) {}

How I'm currently doing it is by instantiating a client in the server 
implementation of `ForwardAddUserToPrimary` and then calling 
`client.SendAddUserRequest`:

     Status BackendServiceImpl::ForwardAddUserToPrimary(ServerContext* ctx, 
const Credentials* cred, Response* res) {
      log("[S: ForwardAddUserToPrimary] Received by primary", VB);
      int acks = 1;
      for (string addr : cluster_addrs_) {
        if (addr == my_addr_) continue; // Skip myself
        log("[S: ForwardAddUserToPrimary] Propagating to " + addr, VB);
        BackendClient client(
            grpc::CreateChannel(addr,
                                grpc::InsecureChannelCredentials()));
        bool success = client.SendAddUserRequest(cred->user(),
                                                 cred->passwd());
        if (success) {
          acks++;
        }
      }
      if (acks == cluster_addrs_.size()) {
        return Status::OK;
      }
      return Status(StatusCode::FAILED_PRECONDITION, "Not enough votes");
    };

This method works a tiny of the time. The other times it returns an error 
code 12 (UNIMPLEMENTED):

    13:17:38.689242 [S: ForwardAddUserToPrimary] Received by primary
    13:17:38.689298 [S: ForwardAddUserToPrimary] Propagating to 
127.0.0.1:5000
    13:17:38.690605 [C: SendAddUserRequest] Error 12:

Since each node can be both a server and a client, every node runs the 
backend server in a separate thread:

**node.cc:**

    void RunBackendServer(const string& my_addr, const vector<string>& 
cluster_addrs) {
      BackendServiceImpl service(my_addr, cluster_addrs);
      ServerBuilder builder;
      builder.AddListeningPort(my_addr, grpc::InsecureServerCredentials());
      builder.RegisterService(&service);
      unique_ptr<Server> server(builder.BuildAndStart());
      cout << "Backend server listening on " << my_addr << endl;
      server->Wait();
    
      return;
    }
    
    int main(int argc, char** argv) {
      
      int serv_idx = atoi(argv[argc-1]);
      vector<string> cluster_addrs;
      read_config(cluster_addrs, string(argv[argc-2]));
      string my_addr = cluster_addrs[serv_idx];
    
      thread bethread(RunBackendServer, my_addr, cluster_addrs);
      
      // Server 0 forwards to the primary, which is server 1
      if (serv_idx = 0) {
        int dest_idx = (serv_idx + 1) % cluster_addrs.size();
        string dest_addr = cluster_addrs[dest_idx];
        BackendClient client(
            grpc::CreateChannel(dest_addr,
                                grpc::InsecureChannelCredentials()));
    
        bool success = client.ForwardAddUserToPrimary("user", "pass");
      } 
    
      bethread.join();
    
      return 0;
    }

In my test I only have 2 servers in my cluster. What I found weird was that 
it would work some of the time and wouldn't work for other times.

Does anyone know what I could be doing wrong here?

-------------------

**backend_client.cc**:

    BackendClient::BackendClient(shared_ptr<Channel> channel) : 
stub_(Backend::NewStub(channel)) {};
    
    bool BackendClient::ForwardAddUserToPrimary(const string& user, const 
string& passwd) {
      Credentials cred;
      cred.set_user(user);
      cred.set_passwd(passwd);
    
      ClientContext ctx;
      Response res;
      Status status = stub_->ForwardAddUserToPrimary(&ctx, cred, &res);
      if (status.ok()) {
        log("[C: ForwardAddUserToPrimary] Successfully replicated user: " + 
user + ", passwd: " + passwd, VB);
        return true;
      } else {
        log("[C: ForwardAddUserToPrimary] Error " + 
to_string(status.error_code()) + ": " + status.error_message(), VB);
        return false;
      }
    };
    
    bool BackendClient::SendAddUserRequest(const string& user, const 
string& passwd) {
      Credentials cred;
      cred.set_user(user);
      cred.set_passwd(passwd);
    
      ClientContext ctx;
      Response res;
      Status status = stub_->SendAddUserRequest(&ctx, cred, &res);
      if (status.ok()) {
        log("[C: SendAddUserRequest] Request sent to add user: " + user + 
", passwd: " + passwd, VB);
        return true;
      } else {
        log("[C: SendAddUserRequest] Error " + 
to_string(status.error_code()) + ": " + status.error_message(), VB);
        return false;
      }
    };

**backend_server.cc**:

    BackendServiceImpl::BackendServiceImpl(const string& my_addr, const 
vector<string>& cluster_addrs) : my_addr_(my_addr), 
cluster_addrs_(cluster_addrs) {};
    
    Status BackendServiceImpl::ForwardAddUserToPrimary(ServerContext* ctx, 
const Credentials* cred, Response* res) {
      log("[S: ForwardAddUserToPrimary] Received by primary", VB);
      int acks = 1;
      for (string addr : cluster_addrs_) {
        if (addr == my_addr_) continue; // Skip myself
        log("[S: ForwardAddUserToPrimary] Propagating to " + addr, VB);
        BackendClient client(
            grpc::CreateChannel(addr,
                                grpc::InsecureChannelCredentials()));
        bool success = client.SendAddUserRequest(cred->user(),
                                                 cred->passwd());
        if (success) {
          acks++;
        }
      }
      if (acks == cluster_addrs_.size()) {
        return Status::OK;
      }
      return Status(StatusCode::FAILED_PRECONDITION, "Not enough votes");
    };
    
    Status BackendServiceImpl::SendAddUserRequest(ServerContext* ctx, const 
Credentials* cred, Response* res) {
      TABLET.insert({ cred->user(), unordered_map<string, char*>() });
      TABLET[cred->user()]["passwd"] = (char*) 
malloc(cred->passwd().size());
      memcpy(TABLET[cred->user()]["passwd"], cred->passwd().c_str(), 
cred->passwd().size()+1);
      TABLET[cred->user()]["passwd"] = (char*) 
malloc(cred->passwd().size());
      memcpy(TABLET[cred->user()]["passwd"], 
const_cast<char*>(cred->passwd().c_str()), cred->passwd().size()+1);
      log("[S: SendAddUserRequest] Added user: " + cred->user() + ", 
passwd: " + cred->passwd(), VB);
      return Status::OK;
    };
    
    Status BackendServiceImpl::ForwardPutToPrimary(ServerContext* ctx, 
const Request* req, Response* res) {
      log("[S: ForwardPutToPrimary] Received by primary", VB);
    
      int acks = 0;
      for (string addr : cluster_addrs_) {
        log("[S: ForwardPutToPrimary] Propagating to " + addr, VB);
        BackendClient client(
            grpc::CreateChannel(addr,
                                grpc::InsecureChannelCredentials()));
        bool success = client.SendPutRequest(req->user(),
                                                req->key(),
                                                
const_cast<char*>(req->val().c_str()));
        if (success) {
          acks++;
        }
      }
    
      if (acks == cluster_addrs_.size()) {
        return Status::OK;
      }
      return Status(StatusCode::FAILED_PRECONDITION, "Not enough votes");
    };



-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/1fd9d721-82bf-47cd-8503-5002702d63d5n%40googlegroups.com.

Reply via email to