it's very hard to figure out what you are trying to achieve and how you expect this code to behave: you should narrow the problem and post the minimal example that causes it. Moreover, the code is in some intermediate inconsistent state and wouldn't even compile (proto has functions *GetRequest* and *SendDataToServer*, but then in your code you call something named *sendRequest *; *GrpcClient* has something that looks like constructor but is named *GrpcClient2* ; etc), which makes it even harder to read. If you ask random ppl on the internet for help, you should make it easy for them to help you ;-)
Cheers! On Wednesday, April 28, 2021 at 1:16:23 PM UTC+7 simpl...@gmail.com wrote: > Hi All > > In my application I have a client which sends the streams of record and > server streams the acknowledgment, things are working fine for first > request however for the subsequent request I am getting below errors. The > subsequent request is made after 2 or 3 seconds(GetRequest in proto file > below) > > On Server side I am getting > Cancelling the stream with status Status{code=INTERNAL, description=Too > many responses, cause=null} > > On client Side I am getting > > CANCELLED: RST_STREAM closed stream. HTTP/2 error code: CANCEL > > > Please find the code: > *proto file* > syntax = "proto3"; > option java_multiple_files = true; > option java_package = "io.grpc.examples.tps"; > option java_outer_classname = "TpsProto"; > option objc_class_prefix = "tps"; > > package tps; > // Interface exported by the server. > service TPS { > rpc GetRequest(Request) returns (Acknowledgement) {}// takes data from > client and stream data to different grpc server > rpc SendDataToServer(stream Request) returns (stream Acknowledgement) {} > } > > message Request { > string policyId = 1; > string txnId = 2; > string clientId = 3; > } > > message Acknowledgement { > string txnId = 1; > string clientId = 2; > } > > > Client Side code: > > public final class GrpcClient { > private final Semaphore limiter = new Semaphore(1000); > private final List<ManagedChannel> channels; > private final List<TPSGrpc.TPSStub> futureStubList; > StreamObserver<Request> str; > public GrpcClient2(String host, int port) { > channels = new ArrayList<>(); > futureStubList = new ArrayList<>(); > ManagedChannel channel =null; > channel = NettyChannelBuilder.forAddress(host, port) > > .usePlaintext().keepAliveWithoutCalls(true).keepAliveTime(20, TimeUnit.DAYS) > .build(); > > channels.add(channel); > futureStubList.add(TPSGrpc.newStub(channel)); > str = futureStubList.get(0).sendRequest(new > StreamObserver<Acknowledgement>() { > > @Override > public void onNext(Acknowledgement value) { > // TODO Auto-generated method stub > System.out.println(value.getTxnId()); > > } > @Override > public void onError(Throwable t) { > // TODO Auto-generated method stub > System.out.println(t.getMessage()); > > } > @Override > public void onCompleted() { > // TODO Auto-generated method stub > System.out.println("comp"); > > } > }); > } > public void shutdown() throws InterruptedException { > } > public void verifyAsync(Request request) throws InterruptedException { > limiter.acquire(); > str.onNext(request); > } > } > > > Server Side > > > public class Service extends TPSGrpc.TPSImplBase{ > > > static Map<String, GrpcClient> urlVsGrpcClient = new HashMap<>(); > @Override > public void getRequest(Request request, StreamObserver<Acknowledgement> > responseObserver) { > Request clone = Request.newBuilder().setClientId(request.getClientId()) > .setTxnId(request.getTxnId()).build(); > > responseObserver.onNext(Acknowledgement.newBuilder().setTxnId("ab").build()); > responseObserver.onCompleted(); > CompletableFuture.runAsync(new Runnable() { > @Override > public void run() { > for(int i=0;i<GrpcServer.node.size();i++) { > callNode(GrpcServer.node.get(i), clone); > } > } > }); > } > @Override > public StreamObserver<Request> sendRequest(StreamObserver<Acknowledgement> > responseObserver) { > > return new StreamObserver<Request>() { > > @Override > public void onNext(Request value) { > Acknowledgement ack = > Acknowledgement.newBuilder().setClientId(value.getClientId()). > setTxnId(value.getTxnId()).build(); > responseObserver.onNext(ack); > System.out.println(value.getTxnId()); > } > > @Override > public void onError(Throwable t) { > } > @Override > public void onCompleted() { > responseObserver.onCompleted(); > System.out.println("server comp"); > } > }; > } > private void callNode(String node, Request request) { > GrpcClient client = urlVsGrpcClient.get(node); > if (client == null) { > String[] split = node.split(":"); > client = new GrpcClient(split[0], Integer.parseInt(split[1])); > urlVsGrpcClient.put(node, client); > } > try { > client.verifyAsync(request); > } catch (InterruptedException e) { > e.printStackTrace(); > } > } > } > > > Please check and suggest. > > Thanks, > Shobhit Srivastava > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/370b60aa-3e0b-471d-9c82-87a2c6935d48n%40googlegroups.com.