Sorry, just realized you are using client streaming. If you need to share the resource just between 2 methods of the same service. Can you get rid of your lock and just use directExecutor https://github.com/grpc/grpc-java/blob/master/api/src/main/java/io/grpc/ServerBuilder.java#L58 ?
Alternatively you can use 2 threads instead of the single-thread-executor so that one of the threads will be used to complete the current request while the other thread is receiving next request(s). On Wednesday, September 30, 2020 at 1:18:54 AM UTC-7 weißnet auchnicht wrote: > Hi, > > my service has 2 methods, which share a resource. Each method invocation > needs exclusive access to this resource. In the example the client makes 5 > successive requests for one of these methods, sends a message using the > stream observer and completes the stream observer. Then, without further > delay the next request is made in the same way. > > The full source code of the sample application is here: > https://github.com/Niklas-Peter/grpc-async. > > The GRPC server is configured with > ServerBuilder.executor(Executors.newSingleThreadExecutor()); > > Here the most important extracts: > > *Client code:* > @Slf4j > public class MyServiceClient { > @SneakyThrows > public void myServiceMethodA() { > log.info("myServiceMethodA(): started."); > var writeConnection = stub.myServiceMethodA(new > LoggingStreamObserver()); > log.info("myServiceMethodA(): received write connection."); > > writeConnection.onNext(createEvent()); > writeConnection.onCompleted(); > } > } > > *Synchronous service implementation:* > @Slf4j > public class MySyncService extends MyServiceGrpc.MyServiceImplBase { > private final Semaphore lock = new Semaphore(1); > > @SneakyThrows > @Override > public StreamObserver<Event> > myServiceMethodA(StreamObserver<Confirmation> responseObserver) { > log.info(responseObserver + ": Acquiring lock ..."); > if (!lock.tryAcquire(10, TimeUnit.SECONDS)) { > log.warn(responseObserver + ": Lock acquire timeout exceeded"); > return new NoOpEventStreamObserver(); // Only to prevent > exceptions in the log. > } > > log.info(responseObserver + ": Acquired lock."); > > return new StreamObserver<>() { > private final List<Event> events = new ArrayList<>(); > > @Override > public void onNext(Event event) { > log.info(responseObserver + ": Received event."); > > var preprocessedEvent = preprocess(event); > events.add(preprocessedEvent); > } > > @Override > public void onCompleted() { > log.info(responseObserver + ": Received complete."); > > var storageLibrary = new StorageLibrary(); > > storageLibrary.store(events.toArray(Event[]::new)).handle((unused, > throwable) -> { > log.info(responseObserver + ": Store completed."); > > > responseObserver.onNext(Confirmation.newBuilder().build()); > responseObserver.onCompleted(); > > lock.release(); > > return null; > }); > } > > private Event preprocess(Event event) { > // The preprocessing already requires the lock. > return event; > } > }; > } > > @SneakyThrows > @Override > public void myServiceMethodB(Event event, StreamObserver<Confirmation> > responseObserver) { > if (!lock.tryAcquire(5, TimeUnit.SECONDS)) > throw new TimeoutException("The lock acquire timeout > exceeded."); > > // Requires exclusive access to a shared resource and uses async > I/O. > var storageLibrary = new StorageLibrary(); > storageLibrary.store(event).handle((unused, throwable) -> { > responseObserver.onNext(Confirmation.newBuilder().build()); > responseObserver.onCompleted(); > > lock.release(); > > return null; > }); > } > > > @Override > public void otherServiceMethod(Event request, > StreamObserver<Confirmation> responseObserver) { > // Do something independent from the other service methods. > } > } > > *Output:* > 09:53:03.453 [pool-1-thread-1] INFO MySyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Acquiring > lock ... > 09:53:03.453 [pool-1-thread-1] INFO MySyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Acquired > lock. > 09:53:03.456 [pool-1-thread-1] INFO MySyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@4a5ebb14: Acquiring > lock ... > 09:53:13.465 [pool-1-thread-1] WARN MySyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@4a5ebb14: Lock > acquire timeout exceeded > 09:53:13.465 [pool-1-thread-1] INFO MySyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@40947ea2: Acquiring > lock ... > 09:53:23.484 [pool-1-thread-1] WARN MySyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@40947ea2: Lock > acquire timeout exceeded > 09:53:23.484 [pool-1-thread-1] INFO MySyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@23f24271: Acquiring > lock ... > 09:53:33.496 [pool-1-thread-1] WARN MySyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@23f24271: Lock > acquire timeout exceeded > 09:53:33.496 [pool-1-thread-1] INFO MySyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@44de4271: Acquiring > lock ... > 09:53:43.501 [pool-1-thread-1] WARN MySyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@44de4271: Lock > acquire timeout exceeded > 09:53:43.517 [pool-1-thread-1] INFO MySyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Received > event. > 09:53:43.517 [pool-1-thread-1] INFO MySyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Received > complete. > 09:53:43.533 [pool-1-thread-1] INFO MySyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Store > completed. > > In my initial synchronous implementation I use a Semaphore as a lock. This > leads to a deadlock until the acquireLock() timeout exceeded. I think this > happens: > 1. The only thread in the pool executes the first request. By doing so the > first request acquires the lock and the StreamObserver is returned. > 2. The initial processing (before the first messages via the > StreamObserver arrive) for the first request is done now and the only > thread is released. > 3. In the executor's queue the 2nd request is already waiting for a > thread, which executes it, why the just released thread handles the 2nd > request. > 4. The thread blocks, because the Semaphore is not released, yet. > 5. In the executor's queue are further requests as well as messages for > the StreamObservers waiting. However, especially the messages for the first > request's StreamObserver can not be handled, so that the lock is not > released. > 6. After all pending requests after the 1st request exceeded the > acquireLock() timeout and hence failed, > the now released thread handles the messages for the StreamObserver of the > 1st request and the 1st request completes. > > Increasing the thread pool size is probably not a good solution, as it > would always have to be greater than or equal to the number of concurrently > active requests for these 2 methods. > > *Asynchronous service implementation:* > public class MyAsyncService extends MyServiceGrpc.MyServiceImplBase { > private final AsyncSemaphore lock = new AsyncSemaphore(1, > Optional.empty()); > > @SneakyThrows > @Override > public StreamObserver<Event> > myServiceMethodA(StreamObserver<Confirmation> responseObserver) { > log.info(responseObserver + ": Acquiring lock ..."); > // Already acquire the lock here (instead of in > StreamObserver.onComplete()) > // to ensure the lock is acquired in the order, > // in which the GRPC requests are handled, and not in the order, > in which the GRPC stream observers > // complete. > var lockFuture = lock.acquire(); > > return new StreamObserver<>() { > private final List<Event> events = new ArrayList<>(); > > @Override > public void onNext(Event event) { > log.info(responseObserver + ": Received event."); > > events.add(event); > } > > @Override > public void onCompleted() { > log.info(responseObserver + ": Received complete; > acquired lock: " + lockFuture.isDone()); > > lockFuture.thenAccept(permit -> { > log.info(responseObserver + ": Acquired lock."); > > var preprocessedEvents = events.stream() > > .map(MyAsyncService.this::preprocess) > .toArray(Event[]::new); > var storageLibrary = new StorageLibrary(); > > storageLibrary.store(preprocessedEvents).handle((unused, throwable) -> { > log.info(responseObserver + ": Store completed."); > > > responseObserver.onNext(Confirmation.newBuilder().build()); > responseObserver.onCompleted(); > > permit.release(); > > return null; > }); > }); > } > }; > } > > @SneakyThrows > @Override > public void myServiceMethodB(Event event, StreamObserver<Confirmation> > responseObserver) { > lock.acquireAndRun(() -> { > log.info(responseObserver + ": Acquired lock."); > > // Requires exclusive access to a shared resource and uses > async I/O. > var preprocessedEvent = preprocess(event); > var storageLibrary = new StorageLibrary(); > return storageLibrary.store(preprocessedEvent).handle((unused, > throwable) -> { > responseObserver.onNext(Confirmation.newBuilder().build()); > responseObserver.onCompleted(); > > return null; > }); > }); > } > } > > > *Output:* > 10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Acquiring > lock ... > 10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Acquiring > lock ... > 10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Acquiring > lock ... > 10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Acquiring > lock ... > 10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Acquiring > lock ... > 10:14:55.112 [pool-1-thread-1] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Received > event. > 10:14:55.112 [pool-1-thread-1] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Received > complete; acquired lock: true > 10:14:55.112 [pool-1-thread-1] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Acquired > lock. > 10:14:55.133 [pool-1-thread-1] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Store > completed. > 10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Received > event. > 10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Received > complete; acquired lock: true > 10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Acquired > lock. > 10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Received > event. > 10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Received > complete; acquired lock: false > 10:14:55.141 [Thread-9] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Store > completed. > 10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Received > event. > 10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Received > complete; acquired lock: false > 10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Received > event. > 10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Received > complete; acquired lock: false > 10:14:55.153 [Thread-9] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Acquired > lock. > 10:14:55.153 [Thread-8] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Store > completed. > 10:14:55.153 [Thread-8] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Acquired > lock. > 10:14:55.153 [Thread-9] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Store > completed. > 10:14:55.153 [Thread-9] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Acquired > lock. > 10:14:55.153 [Thread-8] INFO MyAsyncService - > io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Store > completed. > > In my asynchronous implementation this deadlock does not appear. However, > now I have to manage an application-side buffer for each active request > until this request gets the lock. I hoped that there was a way to let GRPC > handle the problem by buffering the requests at a higher level instead of > this lower application-side level and handle the exception (discard the > request, send an exception to the client, ...) when the buffer fills. For > example, by returning a Future<StreamObserver<>> instead of a > StreamObserver as I know it from ASP.NET MVC asynchronous controller > methods ( > https://docs.microsoft.com/en-us/aspnet/mvc/overview/performance/using-asynchronous-methods-in-aspnet-mvc-4#CreatingAsynchGizmos). > > Probably there are similar examples in the Java world, but I have less > experience with the Java ecosystem, yet. > > I have already read this discussion: > https://groups.google.com/g/grpc-io/c/XCMIva8NDO8 > > Thanks in advance! > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/a6bf1306-8471-4c2d-a28c-3582f96157f7n%40googlegroups.com.