it's very hard to figure out what you are trying to achieve and how you 
expect this code to behave: you should narrow the problem and post the 
minimal example that causes it. Moreover, the code is in some intermediate 
inconsistent state and wouldn't even compile (proto has functions 
*GetRequest* and  *SendDataToServer*, but then in your code you call 
something named *sendRequest *;  *GrpcClient* has something that looks like 
constructor but is named  *GrpcClient2* ; etc), which makes it even harder 
to read.
If you ask random ppl on the internet for help, you should make it easy for 
them to help you ;-)

Cheers!

On Wednesday, April 28, 2021 at 1:16:23 PM UTC+7 simpl...@gmail.com wrote:

> Hi All
>
> In my application I have a client which sends the streams of record and 
> server streams the acknowledgment, things are working fine for first 
> request however for the subsequent request I am getting below errors. The 
> subsequent request is made after 2 or 3 seconds(GetRequest in proto file 
> below)
>
> On Server side I am getting
> Cancelling the stream with status Status{code=INTERNAL, description=Too 
> many responses, cause=null}
>
> On client Side I am getting 
>
> CANCELLED: RST_STREAM closed stream. HTTP/2 error code: CANCEL
>
>
> Please find the code:
> *proto file*
> syntax = "proto3";
> option java_multiple_files = true;
> option java_package = "io.grpc.examples.tps";
> option java_outer_classname = "TpsProto";
> option objc_class_prefix = "tps";
>
> package tps;
> // Interface exported by the server.
> service TPS {
>   rpc GetRequest(Request) returns (Acknowledgement) {}// takes data from 
> client and stream data to different grpc server
>   rpc SendDataToServer(stream Request) returns (stream Acknowledgement) {}
> }
>
>  message  Request  {
>   string policyId = 1;
>   string txnId = 2;
>   string clientId = 3;
>   }
>   
>   message Acknowledgement {
>   string txnId = 1;
>   string clientId = 2;
>   }
>   
>
> Client Side code:
>
> public final class GrpcClient {
>     private final Semaphore limiter = new Semaphore(1000);
>     private final List<ManagedChannel> channels;
>     private final List<TPSGrpc.TPSStub> futureStubList;
>     StreamObserver<Request> str;
>     public GrpcClient2(String host, int port) {
>         channels = new ArrayList<>();
>         futureStubList = new ArrayList<>();
>         ManagedChannel channel =null;
>              channel = NettyChannelBuilder.forAddress(host, port)
>                     
> .usePlaintext().keepAliveWithoutCalls(true).keepAliveTime(20, TimeUnit.DAYS)
>                     .build();
>              
>              channels.add(channel);
>              futureStubList.add(TPSGrpc.newStub(channel));
>              str = futureStubList.get(0).sendRequest(new 
> StreamObserver<Acknowledgement>() {
>
>   @Override
>   public void onNext(Acknowledgement value) {
>   // TODO Auto-generated method stub
>   System.out.println(value.getTxnId());
>   
>   }
>   @Override
>   public void onError(Throwable t) {
>   // TODO Auto-generated method stub
>   System.out.println(t.getMessage());
>   
>   }
>   @Override
>   public void onCompleted() {
>   // TODO Auto-generated method stub
>   System.out.println("comp");
>   
>   }
>   });
>     }
>     public void shutdown() throws InterruptedException {
>     }
>     public void verifyAsync(Request request) throws InterruptedException {
>         limiter.acquire();
>        str.onNext(request);
>     }
> }
>
>
> Server Side
>
>
> public class Service extends TPSGrpc.TPSImplBase{
>
>
> static Map<String, GrpcClient> urlVsGrpcClient = new HashMap<>();
> @Override
> public void getRequest(Request request, StreamObserver<Acknowledgement> 
> responseObserver) {
> Request clone = Request.newBuilder().setClientId(request.getClientId())
> .setTxnId(request.getTxnId()).build();
>
> responseObserver.onNext(Acknowledgement.newBuilder().setTxnId("ab").build());
> responseObserver.onCompleted();
> CompletableFuture.runAsync(new Runnable() {
> @Override
> public void run() {
> for(int i=0;i<GrpcServer.node.size();i++) {
> callNode(GrpcServer.node.get(i), clone);
> }
> }
> });
> }
> @Override
> public StreamObserver<Request> sendRequest(StreamObserver<Acknowledgement> 
> responseObserver) {
>  
> return new StreamObserver<Request>() {
>
> @Override
> public void onNext(Request value) {
> Acknowledgement ack = 
> Acknowledgement.newBuilder().setClientId(value.getClientId()).
>                   setTxnId(value.getTxnId()).build();
> responseObserver.onNext(ack);
> System.out.println(value.getTxnId());
> }
>
> @Override
> public void onError(Throwable t) {
> }
> @Override
> public void onCompleted() {
> responseObserver.onCompleted();
> System.out.println("server comp");
> }
> };
> }
> private void callNode(String node, Request request) {
> GrpcClient client = urlVsGrpcClient.get(node);
> if (client == null) {
> String[] split = node.split(":");
> client = new GrpcClient(split[0], Integer.parseInt(split[1]));
> urlVsGrpcClient.put(node, client);
> }
> try {
> client.verifyAsync(request);
> } catch (InterruptedException e) {
> e.printStackTrace();
> } 
> }
> }
>
>
> Please check and suggest.
>
> Thanks,
> Shobhit Srivastava
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/370b60aa-3e0b-471d-9c82-87a2c6935d48n%40googlegroups.com.

Reply via email to