Sorry, just realized you are using client streaming. If you need to share 
the resource just between 2 methods of the same service. Can you get rid of 
your lock and just use 

Alternatively you can use 2 threads instead of the single-thread-executor 
so that one of the threads will be used to complete the current request 
while the other thread is receiving next request(s).

On Wednesday, September 30, 2020 at 1:18:54 AM UTC-7 weißnet auchnicht 

> Hi,
> my service has 2 methods, which share a resource. Each method invocation 
> needs exclusive access to this resource. In the example the client makes 5 
> successive requests for one of these methods, sends a message using the 
> stream observer and completes the stream observer. Then, without further 
> delay the next request is made in the same way.
> The full source code of the sample application is here: 
> The GRPC server is configured with 
> ServerBuilder.executor(Executors.newSingleThreadExecutor());
> Here the most important extracts:
> *Client code:*
> @Slf4j
> public class MyServiceClient {
>     @SneakyThrows
>     public void myServiceMethodA() {
>"myServiceMethodA(): started.");
>         var writeConnection = stub.myServiceMethodA(new 
> LoggingStreamObserver());
>"myServiceMethodA(): received write connection.");
>         writeConnection.onNext(createEvent());
>         writeConnection.onCompleted();
>     }
> }
> *Synchronous service implementation:*
> @Slf4j
> public class MySyncService extends MyServiceGrpc.MyServiceImplBase {
>     private final Semaphore lock = new Semaphore(1);
>     @SneakyThrows
>     @Override
>     public StreamObserver<Event> 
> myServiceMethodA(StreamObserver<Confirmation> responseObserver) {
> + ": Acquiring lock ...");
>         if (!lock.tryAcquire(10, TimeUnit.SECONDS)) {
>             log.warn(responseObserver + ": Lock acquire timeout exceeded");
>             return new NoOpEventStreamObserver(); // Only to prevent 
> exceptions in the log.
>         }
> + ": Acquired lock.");
>         return new StreamObserver<>() {
>             private final List<Event> events = new ArrayList<>();
>             @Override
>             public void onNext(Event event) {
>        + ": Received event.");
>                 var preprocessedEvent = preprocess(event);
>                 events.add(preprocessedEvent);
>             }
>             @Override
>             public void onCompleted() {
>        + ": Received complete.");
>                 var storageLibrary = new StorageLibrary();
> throwable) -> {
>            + ": Store completed.");
> responseObserver.onNext(Confirmation.newBuilder().build());
>                     responseObserver.onCompleted();
>                     lock.release();
>                     return null;
>                 });
>             }
>             private Event preprocess(Event event) {
>                 // The preprocessing already requires the lock.
>                 return event;
>             }
>         };
>     }
>     @SneakyThrows
>     @Override
>     public void myServiceMethodB(Event event, StreamObserver<Confirmation> 
> responseObserver) {
>         if (!lock.tryAcquire(5, TimeUnit.SECONDS))
>             throw new TimeoutException("The lock acquire timeout 
> exceeded.");
>         // Requires exclusive access to a shared resource and uses async 
> I/O.
>         var storageLibrary = new StorageLibrary();
>, throwable) -> {
>             responseObserver.onNext(Confirmation.newBuilder().build());
>             responseObserver.onCompleted();
>             lock.release();
>             return null;
>         });
>     }
>     @Override
>     public void otherServiceMethod(Event request, 
> StreamObserver<Confirmation> responseObserver) {
>         // Do something independent from the other service methods.
>     }
> }
> *Output:*
> 09:53:03.453 [pool-1-thread-1] INFO MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Acquiring 
> lock ...
> 09:53:03.453 [pool-1-thread-1] INFO MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Acquired 
> lock.
> 09:53:03.456 [pool-1-thread-1] INFO MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@4a5ebb14: Acquiring 
> lock ...
> 09:53:13.465 [pool-1-thread-1] WARN MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@4a5ebb14: Lock 
> acquire timeout exceeded
> 09:53:13.465 [pool-1-thread-1] INFO MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@40947ea2: Acquiring 
> lock ...
> 09:53:23.484 [pool-1-thread-1] WARN MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@40947ea2: Lock 
> acquire timeout exceeded
> 09:53:23.484 [pool-1-thread-1] INFO MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@23f24271: Acquiring 
> lock ...
> 09:53:33.496 [pool-1-thread-1] WARN MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@23f24271: Lock 
> acquire timeout exceeded
> 09:53:33.496 [pool-1-thread-1] INFO MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@44de4271: Acquiring 
> lock ...
> 09:53:43.501 [pool-1-thread-1] WARN MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@44de4271: Lock 
> acquire timeout exceeded
> 09:53:43.517 [pool-1-thread-1] INFO MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Received 
> event.
> 09:53:43.517 [pool-1-thread-1] INFO MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Received 
> complete.
> 09:53:43.533 [pool-1-thread-1] INFO MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Store 
> completed.
> In my initial synchronous implementation I use a Semaphore as a lock. This 
> leads to a deadlock until the acquireLock() timeout exceeded. I think this 
> happens:
> 1. The only thread in the pool executes the first request. By doing so the 
> first request acquires the lock and the StreamObserver is returned.
> 2. The initial processing (before the first messages via the 
> StreamObserver arrive) for the first request is done now and the only 
> thread is released.
> 3. In the executor's queue the 2nd request is already waiting for a 
> thread, which executes it, why the just released thread handles the 2nd 
> request.
> 4. The thread blocks, because the Semaphore is not released, yet.
> 5. In the executor's queue are further requests as well as messages for 
> the StreamObservers waiting. However, especially the messages for the first 
> request's StreamObserver can not be handled, so that the lock is not 
> released.
> 6. After all pending requests after the 1st request exceeded the 
> acquireLock() timeout and hence failed,
> the now released thread handles the messages for the StreamObserver of the 
> 1st request and the 1st request completes.
> Increasing the thread pool size is probably not a good solution, as it 
> would always have to be greater than or equal to the number of concurrently 
> active requests for these 2 methods.
> *Asynchronous service implementation:*
> public class MyAsyncService extends MyServiceGrpc.MyServiceImplBase {
>     private final AsyncSemaphore lock = new AsyncSemaphore(1, 
> Optional.empty());
>     @SneakyThrows
>     @Override
>     public StreamObserver<Event> 
> myServiceMethodA(StreamObserver<Confirmation> responseObserver) {
> + ": Acquiring lock ...");
>         // Already acquire the lock here (instead of in 
> StreamObserver.onComplete())
>         // to ensure the lock is acquired in the order,
>         // in which the GRPC requests are handled, and not in the order, 
> in which the GRPC stream observers
>         // complete.
>         var lockFuture = lock.acquire();
>         return new StreamObserver<>() {
>             private final List<Event> events = new ArrayList<>();
>             @Override
>             public void onNext(Event event) {
>        + ": Received event.");
>                 events.add(event);
>             }
>             @Override
>             public void onCompleted() {
>        + ": Received complete; 
> acquired lock: " + lockFuture.isDone());
>                 lockFuture.thenAccept(permit -> {
>            + ": Acquired lock.");
>                     var preprocessedEvents =
> .map(MyAsyncService.this::preprocess)
>                                                    .toArray(Event[]::new);
>                     var storageLibrary = new StorageLibrary();
>, throwable) -> {
>                + ": Store completed.");
> responseObserver.onNext(Confirmation.newBuilder().build());
>                         responseObserver.onCompleted();
>                         permit.release();
>                         return null;
>                     });
>                 });
>             }
>         };
>     }
>     @SneakyThrows
>     @Override
>     public void myServiceMethodB(Event event, StreamObserver<Confirmation> 
> responseObserver) {
>         lock.acquireAndRun(() -> {
>    + ": Acquired lock.");
>             // Requires exclusive access to a shared resource and uses 
> async I/O.
>             var preprocessedEvent = preprocess(event);
>             var storageLibrary = new StorageLibrary();
>             return, 
> throwable) -> {
>                 responseObserver.onNext(Confirmation.newBuilder().build());
>                 responseObserver.onCompleted();
>                 return null;
>             });
>         });
>     }
> }
> *Output:*
> 10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Acquiring 
> lock ...
> 10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Acquiring 
> lock ...
> 10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Acquiring 
> lock ...
> 10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Acquiring 
> lock ...
> 10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Acquiring 
> lock ...
> 10:14:55.112 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Received 
> event.
> 10:14:55.112 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Received 
> complete; acquired lock: true
> 10:14:55.112 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Acquired 
> lock.
> 10:14:55.133 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Store 
> completed.
> 10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Received 
> event.
> 10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Received 
> complete; acquired lock: true
> 10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Acquired 
> lock.
> 10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Received 
> event.
> 10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Received 
> complete; acquired lock: false
> 10:14:55.141 [Thread-9] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Store 
> completed.
> 10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Received 
> event.
> 10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Received 
> complete; acquired lock: false
> 10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Received 
> event.
> 10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Received 
> complete; acquired lock: false
> 10:14:55.153 [Thread-9] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Acquired 
> lock.
> 10:14:55.153 [Thread-8] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Store 
> completed.
> 10:14:55.153 [Thread-8] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Acquired 
> lock.
> 10:14:55.153 [Thread-9] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Store 
> completed.
> 10:14:55.153 [Thread-9] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Acquired 
> lock.
> 10:14:55.153 [Thread-8] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Store 
> completed.
> In my asynchronous implementation this deadlock does not appear. However, 
> now I have to manage an application-side buffer for each active request 
> until this request gets the lock. I hoped that there was a way to let GRPC 
> handle the problem by buffering the requests at a higher level instead of 
> this lower application-side level and handle the exception (discard the 
> request, send an exception to the client, ...) when the buffer fills. For 
> example, by returning a Future<StreamObserver<>> instead of a 
> StreamObserver as I know it from ASP.NET MVC asynchronous controller 
> methods (
> Probably there are similar examples in the Java world, but I have less 
> experience with the Java ecosystem, yet.
> I have already read this discussion: 
> Thanks in advance!

You received this message because you are subscribed to the Google Groups 
"" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
To view this discussion on the web visit

Reply via email to