Hi All

In my application I have a client which sends the streams of record and 
server streams the acknowledgment, things are working fine for first 
request however for the subsequent request I am getting below errors. The 
subsequent request is made after 2 or 3 seconds(GetRequest in proto file 
below)

On Server side I am getting
Cancelling the stream with status Status{code=INTERNAL, description=Too 
many responses, cause=null}

On client Side I am getting 

CANCELLED: RST_STREAM closed stream. HTTP/2 error code: CANCEL


Please find the code:
*proto file*
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.tps";
option java_outer_classname = "TpsProto";
option objc_class_prefix = "tps";

package tps;
// Interface exported by the server.
service TPS {
  rpc GetRequest(Request) returns (Acknowledgement) {}// takes data from 
client and stream data to different grpc server
  rpc SendDataToServer(stream Request) returns (stream Acknowledgement) {}
}

 message  Request  {
  string policyId = 1;
  string txnId = 2;
  string clientId = 3;
  }
  
  message Acknowledgement {
  string txnId = 1;
  string clientId = 2;
  }
  

Client Side code:

public final class GrpcClient {
    private final Semaphore limiter = new Semaphore(1000);
    private final List<ManagedChannel> channels;
    private final List<TPSGrpc.TPSStub> futureStubList;
    StreamObserver<Request> str;
    public GrpcClient2(String host, int port) {
        channels = new ArrayList<>();
        futureStubList = new ArrayList<>();
        ManagedChannel channel =null;
             channel = NettyChannelBuilder.forAddress(host, port)
                    
.usePlaintext().keepAliveWithoutCalls(true).keepAliveTime(20, TimeUnit.DAYS)
                    .build();
             
             channels.add(channel);
             futureStubList.add(TPSGrpc.newStub(channel));
             str = futureStubList.get(0).sendRequest(new 
StreamObserver<Acknowledgement>() {

  @Override
  public void onNext(Acknowledgement value) {
  // TODO Auto-generated method stub
  System.out.println(value.getTxnId());
  
  }
  @Override
  public void onError(Throwable t) {
  // TODO Auto-generated method stub
  System.out.println(t.getMessage());
  
  }
  @Override
  public void onCompleted() {
  // TODO Auto-generated method stub
  System.out.println("comp");
  
  }
  });
    }
    public void shutdown() throws InterruptedException {
    }
    public void verifyAsync(Request request) throws InterruptedException {
        limiter.acquire();
       str.onNext(request);
    }
}


Server Side


public class Service extends TPSGrpc.TPSImplBase{


static Map<String, GrpcClient> urlVsGrpcClient = new HashMap<>();
@Override
public void getRequest(Request request, StreamObserver<Acknowledgement> 
responseObserver) {
Request clone = Request.newBuilder().setClientId(request.getClientId())
.setTxnId(request.getTxnId()).build();
responseObserver.onNext(Acknowledgement.newBuilder().setTxnId("ab").build());
responseObserver.onCompleted();
CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
for(int i=0;i<GrpcServer.node.size();i++) {
callNode(GrpcServer.node.get(i), clone);
}
}
});
}
@Override
public StreamObserver<Request> sendRequest(StreamObserver<Acknowledgement> 
responseObserver) {
 
return new StreamObserver<Request>() {

@Override
public void onNext(Request value) {
Acknowledgement ack = 
Acknowledgement.newBuilder().setClientId(value.getClientId()).
                  setTxnId(value.getTxnId()).build();
responseObserver.onNext(ack);
System.out.println(value.getTxnId());
}

@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
System.out.println("server comp");
}
};
}
private void callNode(String node, Request request) {
GrpcClient client = urlVsGrpcClient.get(node);
if (client == null) {
String[] split = node.split(":");
client = new GrpcClient(split[0], Integer.parseInt(split[1]));
urlVsGrpcClient.put(node, client);
}
try {
client.verifyAsync(request);
} catch (InterruptedException e) {
e.printStackTrace();
} 
}
}


Please check and suggest.

Thanks,
Shobhit Srivastava

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/4f53e64b-cc5d-4d26-acf9-38da0e1fa806n%40googlegroups.com.

Reply via email to