Checking whether the RPC is cancelled in the loop works great. Thanks for the informative response Eric!
I think another part of the confusion for me was that I wasn't expecting the Server's executor thread pool to be in charge of both handling and cancelling the RPC's. In my test, I had a fixed thread pool with 5 threads, and sometimes would start up 5 streams to clients. Then, if a client disconnected, the server wouldn't cancel it because it would have no executor threads available to do so. On Monday, April 16, 2018 at 5:31:28 PM UTC-7, Eric Anderson wrote: > > Oh, and if you want a notification for your code, you could use > Context.current().addListener(). That should be firing, as it isn't > synchronized with the rest of the callbacks. > > On Mon, Apr 16, 2018 at 5:27 PM, Eric Anderson <ej...@google.com > <javascript:>> wrote: > >> On Fri, Apr 13, 2018 at 3:32 PM, Christopher Schechter < >> chris...@gmail.com <javascript:>> wrote: >> >>> Inside testService, it has this server-side streaming method: >>> >>> // for the purpose of this test, this queue is pre-filled with a handful >>> of StreamItems, and never has any other objects added to it >>> BlockingQueue<StreamItem> queue = new LinkedBlockingQueue<>(); >>> >>> @Override >>> public void testStream(StreamRequest request, >>> StreamObserver<StreamItem> responseObserver) { >>> try { >>> while (!Thread.interrupted()) { >>> StreamItem next = queue.poll(); >>> if (next == null) { >>> LOGGER.debug("Queue is empty, waiting before next >>> poll()"); >>> Thread.sleep(1000); >>> // send default instance which is empty, to signal >>> that the queue is empty right now >>> >>> responseObserver.onNext(StreamItem.getDefaultInstance()); >>> } else { >>> responseObserver.onNext(next); >>> } >>> } >>> } catch (InterruptedException e) { >>> LOGGER.info("Interrupted. Exiting loop."); >>> } catch (Exception e) { >>> LOGGER.error("Unexpected error", e); >>> responseObserver.onError(e); >>> return; >>> } >>> responseObserver.onCompleted(); >>> } >>> >> >> Because this method doesn't return, it blocks the callback thread which >> delivers notifications. We have checks to throw if the client cancelled, >> but they aren't triggering. I just filed an issue for this >> <https://github.com/grpc/grpc-java/issues/4351>. >> >> For quick fix, you can cast responseObserver to ServerCallStreamObserver >> and check isCancelled() as part of the loop. Alternatively, you can rely on >> the gRPC Context by calling Context.current().isCancelled(). >> > > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscr...@googlegroups.com. To post to this group, send email to grpc-io@googlegroups.com. Visit this group at https://groups.google.com/group/grpc-io. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/bad607be-c30a-4fec-934d-78270cb6ff1d%40googlegroups.com. For more options, visit https://groups.google.com/d/optout.