Sure. Here is the code without multithreading:
public void greetServerStream(GreetServerStreamRequest request, StreamObserver<GreetServerStreamResponse> responseObserver) { String message = request.getGreeting().getMessage(); try { for (int i = 0; i < 10; i++) { String result = "Hello " + message + ", response number: " + i; GreetServerStreamResponse response = GreetServerStreamResponse. newBuilder() .setResult(result) .build(); *responseObserver.onNext(response);* Thread.sleep(1000L); } } catch (InterruptedException e) { e.printStackTrace(); } finally { responseObserver.onCompleted(); } } >From the code above, I implemented a multithreaded version as shown below: public void greetServerStream(GreetServerStreamRequest request, StreamObserver<GreetServerStreamResponse> responseObserver) { String message = request.getGreeting().getMessage(); MultiRunnable runnable1 = new MultiRunnable("thread1", message+"1", responseObserver); runnable1.start(); MultiRunnable runnable2 = new MultiRunnable("thread1", message+"2", responseObserver); runnable2.start(); } public class MultiRunnable implements Runnable { private Thread thread; private final String threadName; private final String message; private final ServerCallStreamObserver<GreetServerStreamResponse> serverCallStreamObserver; public MultiRunnable(String threadName, String message, StreamObserver<GreetServerStreamResponse> responseObserver) { this.threadName = threadName; this.message = message; this.serverCallStreamObserver = (ServerCallStreamObserver<GreetServerStreamResponse>)responseObserver; } @Override public void run() { try { for (int i = 0; i < 10; i++) { String result = "Hello " + message + ", response number: " + i; GreetServerStreamResponse response = GreetServerStreamResponse.newBuilder() .setResult(result) .build(); *synchronized (serverCallStreamObserver) { serverCallStreamObserver.onNext(response);* * }* Thread.sleep(1000); } } catch (InterruptedException e) { System.out.println("Thread " + threadName + " interrupted."); } finally { serverCallStreamObserver.onCompleted(); } } public void start () { System.out.println("Starting " + threadName ); if (thread == null) { thread = new Thread (this, threadName); thread.start (); } } } So each thread will run its own for loop and call onNext() to send response stream back to client. I initially did not add the synchronized block above and got the following error: "Stream 3 sent too many headers EOS" Upon adding the block, I was able to make multiple threads executing onNext() concurrently. I am just curious about whether this is the right way of doing synchronization. From the best practice perspective, what is the best way of doing synchronization? Is multithreading a common thing to do or recommended when calling onNext()? Thanks, Bill On Wednesday, 19 May 2021 at 01:47:54 UTC-4 sanjay...@google.com wrote: > Pls include a code snippet of what you want to do. Show how you intend to > share "one ResponseObserver". > > On Tue, May 18, 2021 at 6:56 PM Bill Li <zhl...@gmail.com> wrote: > >> Got it, thanks! >> >> I am currently implementing a server-side streaming application. Can one >> ResponseObserver be shared by multiple threads sending response stream back >> to the client through onNext() method? Just want to confirm if there is a >> race condition in calling onNext() at the same time. >> >> On Tuesday, 18 May 2021 at 19:28:43 UTC-4 sanjay...@google.com wrote: >> >>> With NettyServerBuilder you can use maxConcurrentCallsPerConnection(int >>> maxCalls) >>> <https://github.com/grpc/grpc-java/blob/master/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java#L397> >>> >>> >>> This is the same as setting MAX_CONCURRENT_STREAMS per connection. >>> >>> On Tue, May 18, 2021 at 3:36 PM Bill Li <zhl...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> Does anyone know or have an example for configuring the parameter >>>> MAX_CONCURRENT_STREAMS for gRPC server written in Java? >>>> >>>> Thanks, >>>> Bill >>>> >>>> -- >>>> You received this message because you are subscribed to the Google >>>> Groups "grpc.io" group. >>>> To unsubscribe from this group and stop receiving emails from it, send >>>> an email to grpc-io+u...@googlegroups.com. >>>> To view this discussion on the web visit >>>> https://groups.google.com/d/msgid/grpc-io/cbb2fd35-a01a-4128-879d-08cbc91049b0n%40googlegroups.com >>>> >>>> <https://groups.google.com/d/msgid/grpc-io/cbb2fd35-a01a-4128-879d-08cbc91049b0n%40googlegroups.com?utm_medium=email&utm_source=footer> >>>> . >>>> >>> -- >> You received this message because you are subscribed to the Google Groups >> "grpc.io" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to grpc-io+u...@googlegroups.com. >> > To view this discussion on the web visit >> https://groups.google.com/d/msgid/grpc-io/0eb808f0-b1e0-4b5f-86e6-ffa15b7149d8n%40googlegroups.com >> >> <https://groups.google.com/d/msgid/grpc-io/0eb808f0-b1e0-4b5f-86e6-ffa15b7149d8n%40googlegroups.com?utm_medium=email&utm_source=footer> >> . >> > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/065aecd9-0190-4bde-8a91-aae0edc2a0e5n%40googlegroups.com.