Sorry, just realized you are using client streaming. If you need to share 
the resource just between 2 methods of the same service. Can you get rid of 
your lock and just use 
directExecutor 
https://github.com/grpc/grpc-java/blob/master/api/src/main/java/io/grpc/ServerBuilder.java#L58
 
?

Alternatively you can use 2 threads instead of the single-thread-executor 
so that one of the threads will be used to complete the current request 
while the other thread is receiving next request(s).

On Wednesday, September 30, 2020 at 1:18:54 AM UTC-7 weißnet auchnicht 
wrote:

> Hi,
>
> my service has 2 methods, which share a resource. Each method invocation 
> needs exclusive access to this resource. In the example the client makes 5 
> successive requests for one of these methods, sends a message using the 
> stream observer and completes the stream observer. Then, without further 
> delay the next request is made in the same way.
>
> The full source code of the sample application is here: 
> https://github.com/Niklas-Peter/grpc-async.
>
> The GRPC server is configured with 
> ServerBuilder.executor(Executors.newSingleThreadExecutor());
>
> Here the most important extracts:
>
> *Client code:*
> @Slf4j
> public class MyServiceClient {
>     @SneakyThrows
>     public void myServiceMethodA() {
>         log.info("myServiceMethodA(): started.");
>         var writeConnection = stub.myServiceMethodA(new 
> LoggingStreamObserver());
>         log.info("myServiceMethodA(): received write connection.");
>
>         writeConnection.onNext(createEvent());
>         writeConnection.onCompleted();
>     }
> }
>
> *Synchronous service implementation:*
> @Slf4j
> public class MySyncService extends MyServiceGrpc.MyServiceImplBase {
>     private final Semaphore lock = new Semaphore(1);
>
>     @SneakyThrows
>     @Override
>     public StreamObserver<Event> 
> myServiceMethodA(StreamObserver<Confirmation> responseObserver) {
>         log.info(responseObserver + ": Acquiring lock ...");
>         if (!lock.tryAcquire(10, TimeUnit.SECONDS)) {
>             log.warn(responseObserver + ": Lock acquire timeout exceeded");
>             return new NoOpEventStreamObserver(); // Only to prevent 
> exceptions in the log.
>         }
>
>         log.info(responseObserver + ": Acquired lock.");
>
>         return new StreamObserver<>() {
>             private final List<Event> events = new ArrayList<>();
>
>             @Override
>             public void onNext(Event event) {
>                 log.info(responseObserver + ": Received event.");
>
>                 var preprocessedEvent = preprocess(event);
>                 events.add(preprocessedEvent);
>             }
>
>             @Override
>             public void onCompleted() {
>                 log.info(responseObserver + ": Received complete.");
>
>                 var storageLibrary = new StorageLibrary();
>                 
> storageLibrary.store(events.toArray(Event[]::new)).handle((unused, 
> throwable) -> {
>                     log.info(responseObserver + ": Store completed.");
>
>                     
> responseObserver.onNext(Confirmation.newBuilder().build());
>                     responseObserver.onCompleted();
>
>                     lock.release();
>
>                     return null;
>                 });
>             }
>
>             private Event preprocess(Event event) {
>                 // The preprocessing already requires the lock.
>                 return event;
>             }
>         };
>     }
>
>     @SneakyThrows
>     @Override
>     public void myServiceMethodB(Event event, StreamObserver<Confirmation> 
> responseObserver) {
>         if (!lock.tryAcquire(5, TimeUnit.SECONDS))
>             throw new TimeoutException("The lock acquire timeout 
> exceeded.");
>
>         // Requires exclusive access to a shared resource and uses async 
> I/O.
>         var storageLibrary = new StorageLibrary();
>         storageLibrary.store(event).handle((unused, throwable) -> {
>             responseObserver.onNext(Confirmation.newBuilder().build());
>             responseObserver.onCompleted();
>
>             lock.release();
>
>             return null;
>         });
>     }
>
>
>     @Override
>     public void otherServiceMethod(Event request, 
> StreamObserver<Confirmation> responseObserver) {
>         // Do something independent from the other service methods.
>     }
> }
>
> *Output:*
> 09:53:03.453 [pool-1-thread-1] INFO MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Acquiring 
> lock ...
> 09:53:03.453 [pool-1-thread-1] INFO MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Acquired 
> lock.
> 09:53:03.456 [pool-1-thread-1] INFO MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@4a5ebb14: Acquiring 
> lock ...
> 09:53:13.465 [pool-1-thread-1] WARN MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@4a5ebb14: Lock 
> acquire timeout exceeded
> 09:53:13.465 [pool-1-thread-1] INFO MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@40947ea2: Acquiring 
> lock ...
> 09:53:23.484 [pool-1-thread-1] WARN MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@40947ea2: Lock 
> acquire timeout exceeded
> 09:53:23.484 [pool-1-thread-1] INFO MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@23f24271: Acquiring 
> lock ...
> 09:53:33.496 [pool-1-thread-1] WARN MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@23f24271: Lock 
> acquire timeout exceeded
> 09:53:33.496 [pool-1-thread-1] INFO MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@44de4271: Acquiring 
> lock ...
> 09:53:43.501 [pool-1-thread-1] WARN MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@44de4271: Lock 
> acquire timeout exceeded
> 09:53:43.517 [pool-1-thread-1] INFO MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Received 
> event.
> 09:53:43.517 [pool-1-thread-1] INFO MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Received 
> complete.
> 09:53:43.533 [pool-1-thread-1] INFO MySyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Store 
> completed.
>
> In my initial synchronous implementation I use a Semaphore as a lock. This 
> leads to a deadlock until the acquireLock() timeout exceeded. I think this 
> happens:
> 1. The only thread in the pool executes the first request. By doing so the 
> first request acquires the lock and the StreamObserver is returned.
> 2. The initial processing (before the first messages via the 
> StreamObserver arrive) for the first request is done now and the only 
> thread is released.
> 3. In the executor's queue the 2nd request is already waiting for a 
> thread, which executes it, why the just released thread handles the 2nd 
> request.
> 4. The thread blocks, because the Semaphore is not released, yet.
> 5. In the executor's queue are further requests as well as messages for 
> the StreamObservers waiting. However, especially the messages for the first 
> request's StreamObserver can not be handled, so that the lock is not 
> released.
> 6. After all pending requests after the 1st request exceeded the 
> acquireLock() timeout and hence failed,
> the now released thread handles the messages for the StreamObserver of the 
> 1st request and the 1st request completes.
>
> Increasing the thread pool size is probably not a good solution, as it 
> would always have to be greater than or equal to the number of concurrently 
> active requests for these 2 methods.
>
> *Asynchronous service implementation:*
> public class MyAsyncService extends MyServiceGrpc.MyServiceImplBase {
>     private final AsyncSemaphore lock = new AsyncSemaphore(1, 
> Optional.empty());
>
>     @SneakyThrows
>     @Override
>     public StreamObserver<Event> 
> myServiceMethodA(StreamObserver<Confirmation> responseObserver) {
>         log.info(responseObserver + ": Acquiring lock ...");
>         // Already acquire the lock here (instead of in 
> StreamObserver.onComplete())
>         // to ensure the lock is acquired in the order,
>         // in which the GRPC requests are handled, and not in the order, 
> in which the GRPC stream observers
>         // complete.
>         var lockFuture = lock.acquire();
>
>         return new StreamObserver<>() {
>             private final List<Event> events = new ArrayList<>();
>
>             @Override
>             public void onNext(Event event) {
>                 log.info(responseObserver + ": Received event.");
>
>                 events.add(event);
>             }
>
>             @Override
>             public void onCompleted() {
>                 log.info(responseObserver + ": Received complete; 
> acquired lock: " + lockFuture.isDone());
>
>                 lockFuture.thenAccept(permit -> {
>                     log.info(responseObserver + ": Acquired lock.");
>
>                     var preprocessedEvents = events.stream()
>                                                    
> .map(MyAsyncService.this::preprocess)
>                                                    .toArray(Event[]::new);
>                     var storageLibrary = new StorageLibrary();
>                     
> storageLibrary.store(preprocessedEvents).handle((unused, throwable) -> {
>                         log.info(responseObserver + ": Store completed.");
>
>                         
> responseObserver.onNext(Confirmation.newBuilder().build());
>                         responseObserver.onCompleted();
>
>                         permit.release();
>
>                         return null;
>                     });
>                 });
>             }
>         };
>     }
>
>     @SneakyThrows
>     @Override
>     public void myServiceMethodB(Event event, StreamObserver<Confirmation> 
> responseObserver) {
>         lock.acquireAndRun(() -> {
>             log.info(responseObserver + ": Acquired lock.");
>
>             // Requires exclusive access to a shared resource and uses 
> async I/O.
>             var preprocessedEvent = preprocess(event);
>             var storageLibrary = new StorageLibrary();
>             return storageLibrary.store(preprocessedEvent).handle((unused, 
> throwable) -> {
>                 responseObserver.onNext(Confirmation.newBuilder().build());
>                 responseObserver.onCompleted();
>
>                 return null;
>             });
>         });
>     }
> }
>
>
> *Output:*
> 10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Acquiring 
> lock ...
> 10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Acquiring 
> lock ...
> 10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Acquiring 
> lock ...
> 10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Acquiring 
> lock ...
> 10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Acquiring 
> lock ...
> 10:14:55.112 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Received 
> event.
> 10:14:55.112 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Received 
> complete; acquired lock: true
> 10:14:55.112 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Acquired 
> lock.
> 10:14:55.133 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Store 
> completed.
> 10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Received 
> event.
> 10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Received 
> complete; acquired lock: true
> 10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Acquired 
> lock.
> 10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Received 
> event.
> 10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Received 
> complete; acquired lock: false
> 10:14:55.141 [Thread-9] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Store 
> completed.
> 10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Received 
> event.
> 10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Received 
> complete; acquired lock: false
> 10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Received 
> event.
> 10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Received 
> complete; acquired lock: false
> 10:14:55.153 [Thread-9] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Acquired 
> lock.
> 10:14:55.153 [Thread-8] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Store 
> completed.
> 10:14:55.153 [Thread-8] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Acquired 
> lock.
> 10:14:55.153 [Thread-9] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Store 
> completed.
> 10:14:55.153 [Thread-9] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Acquired 
> lock.
> 10:14:55.153 [Thread-8] INFO MyAsyncService - 
> io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Store 
> completed.
>
> In my asynchronous implementation this deadlock does not appear. However, 
> now I have to manage an application-side buffer for each active request 
> until this request gets the lock. I hoped that there was a way to let GRPC 
> handle the problem by buffering the requests at a higher level instead of 
> this lower application-side level and handle the exception (discard the 
> request, send an exception to the client, ...) when the buffer fills. For 
> example, by returning a Future<StreamObserver<>> instead of a 
> StreamObserver as I know it from ASP.NET MVC asynchronous controller 
> methods (
> https://docs.microsoft.com/en-us/aspnet/mvc/overview/performance/using-asynchronous-methods-in-aspnet-mvc-4#CreatingAsynchGizmos).
>  
> Probably there are similar examples in the Java world, but I have less 
> experience with the Java ecosystem, yet.
>
> I have already read this discussion: 
> https://groups.google.com/g/grpc-io/c/XCMIva8NDO8
>
> Thanks in advance!
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/a6bf1306-8471-4c2d-a28c-3582f96157f7n%40googlegroups.com.

Reply via email to