Checking whether the RPC is cancelled in the loop works great. Thanks for 
the informative response Eric!

I think another part of the confusion for me was that I wasn't expecting 
the Server's executor thread pool to be in charge of both handling and 
cancelling the RPC's. In my test, I had a fixed thread pool with 5 threads, 
and sometimes would start up 5 streams to clients. Then, if a client 
disconnected, the server wouldn't cancel it because it would have no 
executor threads available to do so.


On Monday, April 16, 2018 at 5:31:28 PM UTC-7, Eric Anderson wrote:
>
> Oh, and if you want a notification for your code, you could use 
> Context.current().addListener(). That should be firing, as it isn't 
> synchronized with the rest of the callbacks.
>
> On Mon, Apr 16, 2018 at 5:27 PM, Eric Anderson <ej...@google.com 
> <javascript:>> wrote:
>
>> On Fri, Apr 13, 2018 at 3:32 PM, Christopher Schechter <
>> chris...@gmail.com <javascript:>> wrote:
>>
>>> Inside testService, it has this server-side streaming method:
>>>
>>> // for the purpose of this test, this queue is pre-filled with a handful 
>>> of StreamItems, and never has any other objects added to it   
>>> BlockingQueue<StreamItem> queue = new LinkedBlockingQueue<>();
>>>
>>>     @Override
>>>     public void testStream(StreamRequest request, 
>>> StreamObserver<StreamItem> responseObserver) {
>>>         try {
>>>             while (!Thread.interrupted()) {
>>>                 StreamItem next = queue.poll();
>>>                 if (next == null) {
>>>                     LOGGER.debug("Queue is empty, waiting before next 
>>> poll()");
>>>                     Thread.sleep(1000);
>>>                     // send default instance which is empty, to signal 
>>> that the queue is empty right now
>>>                     
>>> responseObserver.onNext(StreamItem.getDefaultInstance());
>>>                 } else {
>>>                     responseObserver.onNext(next);
>>>                 }
>>>             }
>>>         } catch (InterruptedException e) {
>>>             LOGGER.info("Interrupted. Exiting loop.");
>>>         } catch (Exception e) {
>>>             LOGGER.error("Unexpected error", e);
>>>             responseObserver.onError(e);
>>>             return;
>>>         }
>>>         responseObserver.onCompleted();
>>>     }
>>>
>>
>> Because this method doesn't return, it blocks the callback thread which 
>> delivers notifications. We have checks to throw if the client cancelled, 
>> but they aren't triggering. I just filed an issue for this 
>> <https://github.com/grpc/grpc-java/issues/4351>.
>>
>> For quick fix, you can cast responseObserver to ServerCallStreamObserver 
>> and check isCancelled() as part of the loop. Alternatively, you can rely on 
>> the gRPC Context by calling Context.current().isCancelled().
>>
>
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To post to this group, send email to grpc-io@googlegroups.com.
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/bad607be-c30a-4fec-934d-78270cb6ff1d%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to