[grpc-io] <_MultiThreadedRendezvous of RPC that terminated with: details = "Exception iterating requests!"

2021-04-28 Thread Rag esh
https://stackoverflow.com/questions/67299012/multithreadedrendezvous-of-rpc-that-terminated-with-details-exception-iter

I am trying to create a bidirectional streaming service using gRPC. 
### Proto definition

syntax = "proto3";

message Features {
int32 id = 1;
float sensor1 = 2 ;
float sensor2 = 3 ;
float sensor3 = 4 ;
float sensor4 = 5 ;
}
message Empty{}

service StreamDataBroker{
rpc StreamDataBroker(stream Empty) returns(stream Features);
}

### Server snippet

def StreamDataBroker(self, requests, context):
#csv_filename = "./dataset/sensors.csv"
response = msg.Features()
for request in requests:
print("server running")
total_rows = row_obj.init_count()
current_row = row_obj.current_row
row = row_obj.get_next_row(current_row)
response.id = row[0]
response.sensor1 = row[1]
response.sensor2 = row[2]
response.sensor3 = row[3]
response.sensor4 = row[4]
row_obj.current_row = row_obj.current_row + 1

yield response

### Client snippet

def send_request_stream(self):
request = empty_pb2.Empty()
responses = self.stub.StreamDataBroker(request)
for response in responses:
print(response)

When I try to run the client I get the following error

Traceback (most recent call last):
  File "client.py", line 27, in 
main()
  File "client.py", line 22, in main
client.send_request_stream()
  File "client.py", line 17, in send_request_stream
for response in responses:
  File 
"/home/edge/.local/lib/python3.8/site-packages/grpc/_channel.py", line 426, 
in __next__
return self._next()
  File 
"/home/edge/.local/lib/python3.8/site-packages/grpc/_channel.py", line 809, 
in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of 
RPC that terminated with:
status = StatusCode.UNKNOWN
details = "Exception iterating requests!"
debug_error_string = "None"
>

What is going wrong here and How do I handle iterating requests properly

[Source code][1]

I tried this [solution][2] bud did not work for me.


  [1]: https://github.com/ipa-rar/gRPC-ws/tree/main/data_broker_stream
  [2]: https://github.com/grpc/grpc/issues/13544

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/e27108fd-c73f-47c1-bd74-31e159e73d6en%40googlegroups.com.


Re: [grpc-io] Re: java concurrent ordered response buffer

2021-04-28 Thread Piotr Morgwai Kotarbinski
Thanks :)

I was surprised actually, because I thought parallel request message 
processing was a common use-case, both for gRPC and websockets
(for example if we have a service that does some single-threaded graphic 
processing on received images and sends back a modified version of a given 
image, it would be most efficient to dispatch the processing to a thread 
pool with a size corresponding to available CPU/GPU cores, right? As 
processing them sequentially would utilize just 1 core per request stream, 
so in case of low number of concurrent request streams, we would be 
underutilizing the cores).

Cheers! 

On Wednesday, April 28, 2021 at 6:52:08 AM UTC+7 Eric Anderson wrote:

> Yeah, we don't have anything pre-existing that does something like that; 
> it gets into the specifics of your use-case. Making something yourself was 
> appropriate. I will say that the strategy used in 
> OrderedConcurrentOutputBuffer with the Buckets seems really clean.
>
> On Thu, Apr 22, 2021 at 9:21 AM Piotr Morgwai Kotarbinski <
> mor...@gmail.com> wrote:
>
>> in case someone needs it also, I've written it myself due to lack of 
>> answers either here and on SO:
>>
>> https://github.com/morgwai/java-utils/blob/master/src/main/java/pl/morgwai/base/utils/OrderedConcurrentOutputBuffer.java
>> feedback is welcome :)
>> On Tuesday, April 20, 2021 at 11:09:59 PM UTC+7 Piotr Morgwai Kotarbinski 
>> wrote:
>>
>>> Hello
>>> i have a stream of messages coming from a websocket or a grpc client. 
>>> for each message my service produces 0 or more reply messages. by default 
>>> both websocket endpoints and grpc request observers are guaranteed to be 
>>> called by maximum 1 thread concurrently, so my replies are sent in the same 
>>> order as requests. Now I want to dispatch request processing to other 
>>> threads and process them in parallel, but still keep the order. Therefore, 
>>> I need some "concurrent ordered response buffer", which will buffer replies 
>>> to a given request message until processing of previous requests is 
>>> finished and replies to them are sent (in order they were produced within 
>>> each "request bucket").
>>>
>>> I can develop such class myself, but it seems a common case, so I was 
>>> wondering if maybe such thing already exists (to not reinvent the wheel). 
>>> however I could not easily find anything on the web nor get any answer on 
>>> SO 
>>> 
>>> . does anyone knows about something like this?
>>>
>>> Thanks!
>>>
>> -- 
>> You received this message because you are subscribed to the Google Groups 
>> "grpc.io" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to grpc-io+u...@googlegroups.com.
>> To view this discussion on the web visit 
>> https://groups.google.com/d/msgid/grpc-io/e7107eed-fa35-4b2e-8d5a-5754e0a37740n%40googlegroups.com
>>  
>> 
>> .
>>
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/e9405581-0290-468b-bc06-db27ef4e1064n%40googlegroups.com.


[grpc-io] long live grpc issue in bidi stream

2021-04-28 Thread Shobhit Srivastava
Hi All

In my application I have a client which sends the streams of record and 
server streams the acknowledgment, things are working fine for first 
request however for the subsequent request I am getting below errors. The 
subsequent request is made after 2 or 3 seconds(GetRequest in proto file 
below)

On Server side I am getting
Cancelling the stream with status Status{code=INTERNAL, description=Too 
many responses, cause=null}

On client Side I am getting 

CANCELLED: RST_STREAM closed stream. HTTP/2 error code: CANCEL


Please find the code:
*proto file*
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.tps";
option java_outer_classname = "TpsProto";
option objc_class_prefix = "tps";

package tps;
// Interface exported by the server.
service TPS {
  rpc GetRequest(Request) returns (Acknowledgement) {}// takes data from 
client and stream data to different grpc server
  rpc SendDataToServer(stream Request) returns (stream Acknowledgement) {}
}

 message  Request  {
  string policyId = 1;
  string txnId = 2;
  string clientId = 3;
  }
  
  message Acknowledgement {
  string txnId = 1;
  string clientId = 2;
  }
  

Client Side code:

public final class GrpcClient {
private final Semaphore limiter = new Semaphore(1000);
private final List channels;
private final List futureStubList;
StreamObserver str;
public GrpcClient2(String host, int port) {
channels = new ArrayList<>();
futureStubList = new ArrayList<>();
ManagedChannel channel =null;
 channel = NettyChannelBuilder.forAddress(host, port)

.usePlaintext().keepAliveWithoutCalls(true).keepAliveTime(20, TimeUnit.DAYS)
.build();
 
 channels.add(channel);
 futureStubList.add(TPSGrpc.newStub(channel));
 str = futureStubList.get(0).sendRequest(new 
StreamObserver() {

  @Override
  public void onNext(Acknowledgement value) {
  // TODO Auto-generated method stub
  System.out.println(value.getTxnId());
  
  }
  @Override
  public void onError(Throwable t) {
  // TODO Auto-generated method stub
  System.out.println(t.getMessage());
  
  }
  @Override
  public void onCompleted() {
  // TODO Auto-generated method stub
  System.out.println("comp");
  
  }
  });
}
public void shutdown() throws InterruptedException {
}
public void verifyAsync(Request request) throws InterruptedException {
limiter.acquire();
   str.onNext(request);
}
}


Server Side


public class Service extends TPSGrpc.TPSImplBase{


static Map urlVsGrpcClient = new HashMap<>();
@Override
public void getRequest(Request request, StreamObserver 
responseObserver) {
Request clone = Request.newBuilder().setClientId(request.getClientId())
.setTxnId(request.getTxnId()).build();
responseObserver.onNext(Acknowledgement.newBuilder().setTxnId("ab").build());
responseObserver.onCompleted();
CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
for(int i=0;i sendRequest(StreamObserver 
responseObserver) {
 
return new StreamObserver() {

@Override
public void onNext(Request value) {
Acknowledgement ack = 
Acknowledgement.newBuilder().setClientId(value.getClientId()).
  setTxnId(value.getTxnId()).build();
responseObserver.onNext(ack);
System.out.println(value.getTxnId());
}

@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
System.out.println("server comp");
}
};
}
private void callNode(String node, Request request) {
GrpcClient client = urlVsGrpcClient.get(node);
if (client == null) {
String[] split = node.split(":");
client = new GrpcClient(split[0], Integer.parseInt(split[1]));
urlVsGrpcClient.put(node, client);
}
try {
client.verifyAsync(request);
} catch (InterruptedException e) {
e.printStackTrace();
} 
}
}


Please check and suggest.

Thanks,
Shobhit Srivastava

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/4f53e64b-cc5d-4d26-acf9-38da0e1fa806n%40googlegroups.com.