ok, solved it:
the deadlock was caused by the fact that a Listener is guaranteed to be called 
by at most 1 thread concurrently. Therefore, after I blocked a thread in line 
50 
(https://github.com/morgwai/grpc-deadlock/blob/master/src/main/java/pl/morgwai/samples/grpc/deadlock/EchoService.java#L50),
 it was still holding Listener's lock (as user method is called by 
Listener.onHalfClose() in case of unary clients), so no other thread could 
possibly call Listener.onReady() which calls onReadyHandler which would notify 
the first blocked thread.

I've refactored the code to never block the thread by doing the actual work in 
onReadyHandler, so now it works fine:

public void multiEcho(EchoRequest verbalVomit, StreamObserver<EchoResposne> 
responseObserver) {
    log.fine("someone has just emitted an inconsiderated verbal vomit");
    int[] repsRemainingHolder = { verbalVomit.getReps() };
    var echoObserver = (ServerCallStreamObserver<EchoResposne>) 
responseObserver;
    echoObserver.setOnReadyHandler(() -> {
        log.finer("sink ready");
        try {
            while (
                repsRemainingHolder[0] > 0
                &&    echoObserver.isReady()
                && ! echoObserver.isCancelled()
            ) {
                // multiply the content to fill the buffer faster
                var echoBuilder = new StringBuilder();
                for (int j = 0; j < MULTIPLY_FACTOR; j++) 
echoBuilder.append(verbalVomit);
                var echoedVomit =
                    
EchoResposne.newBuilder().setEchoedVomit(echoBuilder.toString()).build();

                if (log.isLoggable(Level.FINEST)) log.finest("echo");
                repsRemainingHolder[0]--;
                echoObserver.onNext(echoedVomit);
            }
            if (echoObserver.isCancelled()) {
                log.fine("client cancelled the call 2");
                return;
            }
            if (repsRemainingHolder[0] == 0) {
                echoObserver.onCompleted();
                log.fine("done");
                return;
            }
            log.finer("sink clogged at rep "
                    + (verbalVomit.getReps() - repsRemainingHolder[0] + 1));
        } catch (StatusRuntimeException e) {
            if (e.getStatus().getCode() == Code.CANCELLED) {
                log.fine("client cancelled the call 1");
            } else {
                log.severe("server error: " + e);
                e.printStackTrace();
            }
        } catch (Exception e) {
            log.severe("server error: " + e);
            e.printStackTrace();
            echoObserver.onError(Status.INTERNAL.withCause(e).asException());
        }
    });
}

(hope the formatting will be ok this time ;-] )

The above code is in the branch named 'solution' of the previously mentioned 
github repo: 
https://github.com/morgwai/grpc-deadlock/blob/solution/src/main/java/pl/morgwai/samples/grpc/deadlock/EchoService.java

Cheers!





On 05/06/2021 22:01, Piotr Morgwai Kotarbinski wrote:
> I was trying this with grpc-1.38.0 on openjdk-11 on ubuntu-18.04 in case it 
> matters.
> 
> 
> On 05/06/2021 21:49, Piotr Morgwai Kotarbinski wrote:
>> Hi all,
>> I have the following server-streaming method:
>>
>>> public void multiEcho(EchoRequest verbalVomit, StreamObserver<EchoResposne> 
>>> responseObserver) {
>>>     log.fine("someone has just emitted an inconsiderated verbal vomit");
>>>     var callMonitor = new Object();
>>>     var echoObserver = (ServerCallStreamObserver<EchoResposne>) 
>>> responseObserver;
>>>     echoObserver.setOnReadyHandler(() -> {
>>>         log.finer("sink ready");
>>>         synchronized (callMonitor) { callMonitor.notifyAll(); }
>>>     });
>>>     echoObserver.setOnCancelHandler(() -> {
>>>         log.fine("client cancelled the call 1");
>>>         synchronized (callMonitor) { callMonitor.notifyAll(); }
>>>     });
>>>
>>>     try {
>>>         for (int i = 1; i <= verbalVomit.getReps(); i++) {
>>>             if (echoObserver.isCancelled()) {
>>>                 log.fine("client cancelled the call 2");
>>>                 return;
>>>             }
>>>             synchronized (callMonitor) {
>>>                 while( ! echoObserver.isReady()) {
>>>                     log.finer("sink clogged at rep " + i);
>>>                     callMonitor.wait();
>>>                 }
>>>             }
>>>
>>>             // multiply the content to fill the buffer faster
>>>             var echoBuilder = new StringBuilder();
>>>             for (int j = 0; j < MULTIPLY_FACTOR; j++) {
>>>                 
>>> echoBuilder.append(verbalVomit.getInconsideratedVerbalVomit());
>>>             }
>>>             var echoedVomit =
>>>                 
>>> EchoResposne.newBuilder().setEchoedVomit(echoBuilder.toString()).build();
>>>
>>>             if (log.isLoggable(Level.FINEST)) log.finest("echo");
>>>             echoObserver.onNext(echoedVomit);
>>>         }
>>>         echoObserver.onCompleted();
>>>     } catch (StatusRuntimeException e) {
>>>         if (e.getStatus().getCode() == Code.CANCELLED) {
>>>             log.fine("client cancelled the call 3");
>>>         } else {
>>>             log.severe("server error: " + e);
>>>             e.printStackTrace();
>>>         }
>>>     } catch (Exception e) {
>>>         log.severe("server error: " + e);
>>>         e.printStackTrace();
>>>         echoObserver.onError(Status.INTERNAL.withCause(e).asException());
>>>     }
>>> }
>>
>> I create the server this way:
>>
>>> echoServer = NettyServerBuilder
>>>     .forPort(port)
>>>     .maxConnectionAge(10, TimeUnit.MINUTES)
>>>     .maxConnectionAgeGrace(12, TimeUnit.HOURS)
>>>     .addService(new EchoService())
>>>     .build();
>>
>> and the client looks like this:
>>
>>> var connector = EchoServiceGrpc.newBlockingStub(channel);
>>> var request = EchoRequest
>>>     .newBuilder()
>>>     .setInconsideratedVerbalVomit(
>>>             "bleeeeeeeeeeeeeeeeeeeehhhhhhhhhh" +
>>>             "hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh")  // 64B
>>>     .setReps(100)
>>>     .build();
>>> var vomitIterator = connector.multiEcho(request);
>>> while (vomitIterator.hasNext()) {
>>>     vomitIterator.next();
>>>     System.out.println("got echo");
>>> }
>>
>> and it dead-locks after just a few messages  ;-]
>>
>> server output looks like this:
>>
>>> started gRPC EchoServer on port 6666
>>> Jun 05, 2021 8:22:52 P.M. pl.morgwai.samples.grpc.deadlock.EchoService 
>>> multiEcho
>>> FINE: someone has just emitted an inconsiderated verbal vomit
>>> Jun 05, 2021 8:22:53 P.M. pl.morgwai.samples.grpc.deadlock.EchoService 
>>> multiEcho
>>> FINER: sink clogged at rep 7
>>
>> ...and clients:
>>
>>> got echo
>>> got echo
>>> got echo
>>> got echo
>>> got echo
>>> got echo
>>
>> ...and they both hand indefinitely :(
>>
>> am I doing something wrong or is it a bug?
>>
>> The interesting part is that if I use direct executor in the server and 
>> dispatch the work to a separate executor, then it works without any problems 
>> (that's what I usually do, so I've never encountered this problem before).
>> ie: 
>>
>>> public void multiEcho(EchoRequest verbalVomit, StreamObserver<EchoResposne> 
>>> responseObserver) {
>>>     log.fine("someone has just emitted an inconsiderated verbal vomit");
>>>     var callMonitor = new Object();
>>>     var echoObserver = (ServerCallStreamObserver<EchoResposne>) 
>>> responseObserver;
>>>     echoObserver.setOnReadyHandler(() -> {
>>>         log.finer("sink ready");
>>>         synchronized (callMonitor) { callMonitor.notifyAll(); }
>>>     });
>>>     echoObserver.setOnCancelHandler(() -> {
>>>         log.fine("client cancelled the call 1");
>>>         synchronized (callMonitor) { callMonitor.notifyAll(); }
>>>     });
>>>
>>>     cpuIntensiveOpExecutor.execute(() -> {
>>>         try {
>>>             for (int i = 1; i <= verbalVomit.getReps(); i++) {
>> (...)
>>
>> and
>>
>>> echoServer = NettyServerBuilder
>>>     .forPort(port)
>>>     .maxConnectionAge(10, TimeUnit.MINUTES)
>>>     .maxConnectionAgeGrace(12, TimeUnit.HOURS)
>>>     .addService(new EchoService())
>>>     .directExecutor()
>>>     .build();
>>
>> A full working example (dead-locking that is) can be found on github: 
>> https://github.com/morgwai/grpc-deadlock
>>
>> Any hints will be much appreciated :)
>>
>> Thanks!
>>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/a325072a-ffb0-6e61-0e6a-e73c3e20338f%40gmail.com.

Reply via email to