Hi all,
I have a simplest possible server-streaming method:

service CancelService {
rpc fun(Empty) returns (stream Empty) {}
}

an implementation that sleeps for 1s and then sends reply:

public class CancelService extends CancelServiceImplBase {

@Override
public void fun(Empty request, StreamObserver<Empty> basicResponseObserver) 
{
final var responseObserver = (ServerCallStreamObserver<Empty>) 
basicResponseObserver;
try {
Thread.sleep(1000l);  // longer than client's deadline
System.out.println("isCancelled: " + responseObserver.isCancelled());
responseObserver.onNext(request);
responseObserver.onCompleted();
System.out.println("completed successfully");
} catch (StatusRuntimeException e) {
System.out.println("StatusRuntimeException" + e);
} catch (Throwable t) {
System.out.println("server error" + t);
responseObserver.onError(Status.INTERNAL.withCause(t).asException());
if (t instanceof Error) throw (Error) t;
}
}

public static void main(String[] args) throws Exception {
final Server server = NettyServerBuilder
.forPort(6666)
.addService(new CancelService())
.build()
.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try { server.shutdown().awaitTermination(5, TimeUnit.SECONDS); } catch 
(Exception e) {}
}));
System.out.println("server started");
server.awaitTermination();
}
}

and a client that sets deadline to 0.5s:

public static void main(String[] args) throws Exception {
final var channel = ManagedChannelBuilder
.forTarget("localhost:6666")
.usePlaintext()
.build();
final var connector = CancelServiceGrpc.newBlockingStub(channel)
.withDeadlineAfter(500l, TimeUnit.MILLISECONDS);
final var results = connector.fun(Empty.newBuilder().build());
while (results.hasNext()) {
System.out.println("got result: " + results.next());
}
System.out.println("call successful");
}

when run, on the client side I get an excp as expected:

Exception in thread "main" io.grpc.StatusRuntimeException: 
DEADLINE_EXCEEDED: deadline exceeded (...)

on the server however, responseObserver.isCancelled() returns true but 
StatusRuntimeException is not thrown when calling onNext(...) and 
onCompleted() and call finishes as normal:

server started
isCancelled: true
completed successfully

is this an expected behavior? I thought StatusRuntimeException with 
Status.CANCELLED should be thrown, no? (unless onCancelHandler is set, 
which is not the case here)

To make things more confusing, if I dispatch work in fun(...) to some 
executor like this:

ThreadPoolExecutor executor =
new ThreadPoolExecutor(3, 3, 0, TimeUnit.DAYS, new LinkedBlockingQueue<>());

@Override
public void fun(Empty request, StreamObserver<Empty> basicResponseObserver) 
{
final var responseObserver = (ServerCallStreamObserver<Empty>) 
basicResponseObserver;
executor.execute(() -> {
try {
Thread.sleep(1000l);  // longer than client's deadline
System.out.println("isCancelled: " + responseObserver.isCancelled());
responseObserver.onNext(request);
responseObserver.onCompleted();
System.out.println("completed successfully");
} catch (StatusRuntimeException e) {
System.out.println("StatusRuntimeException" + e);
} catch (Throwable t) {
System.out.println("server error" + t);
responseObserver.onError(Status.INTERNAL.withCause(t).asException());
if (t instanceof Error) throw (Error) t;
}
});
}


then I do get a StatusRuntimeException:

server started
isCancelled: true
StatusRuntimeExceptionio.grpc.StatusRuntimeException: CANCELLED: call 
already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to 
disable this exception

Shouldn't the behavior be consistent in both of these cases?

Thanks!
 

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/253a2c7f-4b36-4003-bd22-c9fd66b4d0a6n%40googlegroups.com.

Reply via email to