Sure.

Here is the code without multithreading:

public void greetServerStream(GreetServerStreamRequest 
request, StreamObserver<GreetServerStreamResponse> responseObserver) {
    String message = request.getGreeting().getMessage();
    try {
        for (int i = 0; i < 10; i++) {
            String result = "Hello " + message + ", response number: " + i;
            GreetServerStreamResponse response = GreetServerStreamResponse.
newBuilder()
            .setResult(result)
            .build();
            *responseObserver.onNext(response);*
            Thread.sleep(1000L);
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        responseObserver.onCompleted();
    }
}

>From the code above, I implemented a multithreaded version as shown below:

public void greetServerStream(GreetServerStreamRequest 
request, StreamObserver<GreetServerStreamResponse> responseObserver) {
    String message = request.getGreeting().getMessage();
    MultiRunnable runnable1 = new MultiRunnable("thread1", message+"1", 
responseObserver);
    runnable1.start();
    MultiRunnable runnable2 = new MultiRunnable("thread1", message+"2", 
responseObserver);
    runnable2.start();
}

public class MultiRunnable implements Runnable {
    private Thread thread;
    private final String threadName;
    private final String message;
    private final ServerCallStreamObserver<GreetServerStreamResponse> 
serverCallStreamObserver;

    public MultiRunnable(String threadName, String message, 
StreamObserver<GreetServerStreamResponse> responseObserver) {
        this.threadName = threadName;
        this.message = message;
        this.serverCallStreamObserver = 
(ServerCallStreamObserver<GreetServerStreamResponse>)responseObserver;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                String result = "Hello " + message + ", response number: " 
+ i;
                GreetServerStreamResponse response = 
GreetServerStreamResponse.newBuilder()
                .setResult(result)
                .build();
                
*synchronized (serverCallStreamObserver) {                     
serverCallStreamObserver.onNext(response);*
*                }*
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            System.out.println("Thread " + threadName + " interrupted.");
        } finally {
            serverCallStreamObserver.onCompleted();
        }
    }

    public void start () {
        System.out.println("Starting " + threadName );
        if (thread == null) {
            thread = new Thread (this, threadName);
            thread.start ();
        }
    }
}

So each thread will run its own for loop and call onNext() to send response 
stream back to client. I initially did not add the synchronized block above 
and got the following error:

"Stream 3 sent too many headers EOS"

Upon adding the block, I was able to make multiple threads executing 
onNext() concurrently.

I am just curious about whether this is the right way of doing 
synchronization. From the best practice perspective, what is the best way 
of doing synchronization? Is multithreading a common thing to do or 
recommended when calling onNext()?

Thanks,
Bill

On Wednesday, 19 May 2021 at 01:47:54 UTC-4 sanjay...@google.com wrote:

> Pls include a code snippet of what you want to do. Show how you intend to 
> share "one ResponseObserver".
>
> On Tue, May 18, 2021 at 6:56 PM Bill Li <zhl...@gmail.com> wrote:
>
>> Got it, thanks!
>>
>> I am currently implementing a server-side streaming application. Can one 
>> ResponseObserver be shared by multiple threads sending response stream back 
>> to the client through onNext() method? Just want to confirm if there is a 
>> race condition in calling onNext() at the same time.
>>
>> On Tuesday, 18 May 2021 at 19:28:43 UTC-4 sanjay...@google.com wrote:
>>
>>> With NettyServerBuilder you can use maxConcurrentCallsPerConnection(int 
>>> maxCalls) 
>>> <https://github.com/grpc/grpc-java/blob/master/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java#L397>
>>>  
>>>
>>> This is the same as setting MAX_CONCURRENT_STREAMS per connection.
>>>
>>> On Tue, May 18, 2021 at 3:36 PM Bill Li <zhl...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Does anyone know or have an example for configuring the parameter 
>>>> MAX_CONCURRENT_STREAMS for gRPC server written in Java?
>>>>
>>>> Thanks,
>>>> Bill
>>>>
>>>> -- 
>>>> You received this message because you are subscribed to the Google 
>>>> Groups "grpc.io" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>> an email to grpc-io+u...@googlegroups.com.
>>>> To view this discussion on the web visit 
>>>> https://groups.google.com/d/msgid/grpc-io/cbb2fd35-a01a-4128-879d-08cbc91049b0n%40googlegroups.com
>>>>  
>>>> <https://groups.google.com/d/msgid/grpc-io/cbb2fd35-a01a-4128-879d-08cbc91049b0n%40googlegroups.com?utm_medium=email&utm_source=footer>
>>>> .
>>>>
>>> -- 
>> You received this message because you are subscribed to the Google Groups 
>> "grpc.io" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to grpc-io+u...@googlegroups.com.
>>
> To view this discussion on the web visit 
>> https://groups.google.com/d/msgid/grpc-io/0eb808f0-b1e0-4b5f-86e6-ffa15b7149d8n%40googlegroups.com
>>  
>> <https://groups.google.com/d/msgid/grpc-io/0eb808f0-b1e0-4b5f-86e6-ffa15b7149d8n%40googlegroups.com?utm_medium=email&utm_source=footer>
>> .
>>
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/065aecd9-0190-4bde-8a91-aae0edc2a0e5n%40googlegroups.com.

Reply via email to