> C++ gRPC team is working towards or maybe putting more effort in 
perfecting/optimizing the callback API approach?
Yes

> 1.- By using the callback API approach, will we be able to serve 
different users concurrently the same way we do with our current 
implementation?
Yes
> 2.- Will we need to implement a threading logic like the one we have, or 
is not needed?
Not needed with C++ callback API 

On Wednesday, October 25, 2023 at 6:21:28 PM UTC-7 Pedro Alfonso wrote:

> Hi Yas,
>
> First of all, thanks for coming back to us.
> That's a really important comment and please correct us if we are wrong, 
> the understanding is C++ gRPC team is working towards or maybe putting more 
> effort in perfecting/optimizing the callback API approach? and btw, we are 
> also agree, it's easier to use.
>
> Kindly help us with these additional questions:
>
> 1.- By using the callback API approach, will we be able to serve different 
> users concurrently the same way we do with our current implementation?
> 2.- Will we need to implement a threading logic like the one we have, or 
> is not needed?
>
> Thanks in advance.
>
> Regards,
>
> Pedro
> On Wednesday, October 25, 2023 at 1:17:23 PM UTC-5 yas...@google.com 
> wrote:
>
>> We have been recommending using the C++ callback API instead of the 
>> completion queue based API since it's easier to use. All performance 
>> optimizations that we are working are targeting the callback API.
>>
>> On Thursday, October 19, 2023 at 8:03:42 AM UTC-7 Pedro Alfonso wrote:
>>
>>> Hello,
>>>
>>> First let me explain what we have in our C++ gRPC Async server codebase:
>>>
>>> - We have 2 unary based response RPCs.
>>> - And we have 2 stream based response RPCs which will cover over 95% of 
>>> the client's API consumption, meaning they are really important to our 
>>> streaming based implementation.
>>>
>>> From the 2 stream based response RPCs, below one is the most critical to 
>>> us:
>>>
>>> // Inner class StreamAssetNodes
>>> class StreamAssetNodes : public RequestBase {
>>> public:
>>> StreamAssetNodes( AsyncAssetStreamerManager& owner ) : RequestBase( 
>>> owner ), ownerClass( owner ) {
>>> owner_.grpc().service_.RequestStreamAssetNodes(
>>> &context_, &stream_, cq(), cq(), in_handle_.tag( Handle::Operation::
>>> CONNECT, [this, &owner]( bool ok, Handle::Operation /* op */ ) {
>>> LOG_DEBUG << "\n" + me( *this ) << "\n\n
>>> *****************************************************************\n"
>>> << "- Processing a new connect from " << context_.peer()
>>> << "\n\n
>>> *****************************************************************\n"
>>> << endl;
>>> cout << "\n" + me( *this ) << "\n
>>> *****************************************************************\n"
>>> << "- Processing a new connect from " << context_.peer() << "\n
>>> *****************************************************************\n"
>>> << endl;
>>>
>>> if ( !ok ) [[unlikely]] {
>>> LOG_DEBUG << "The CONNECT-operation failed." << endl;
>>> cout << "The CONNECT-operation failed." << endl;
>>> return;
>>> }
>>>
>>> // Creates a new instance so the service can handle requests from a new 
>>> client
>>> owner_.createNew<StreamAssetNodes>( owner );
>>> // Reads request's parameters
>>> readNodeIds();
>>> } ) );
>>> }
>>>
>>> private:
>>> // Objects and variables
>>> AsyncAssetStreamerManager& ownerClass;
>>> ::Illuscio::AssetNodeIds request_;
>>> ::Illuscio::AssetNodeComponent reply_;
>>> ::grpc::ServerContext context_;
>>> ::grpc::ServerAsyncReaderWriter<decltype( reply_ ), decltype( request_ )
>>> > stream_ { &context_ };
>>>
>>> vector<string> nodeids_vector;
>>> // Contains mapping for all the nodes of a set of assets
>>> json assetsNodeMapping;
>>> // Contains mapping for all the nodes of a particular asset
>>> json assetNodeMapping;
>>> ifstream nodeFile;
>>> // Handle for messages coming in
>>> Handle in_handle_ { *this };
>>> // Handle for messages going out
>>> Handle out_handle_ { *this };
>>>
>>> int fileNumber = 0;
>>> const int chunk_size = 16 * 1024;
>>> char buffer[16 * 1024];
>>>
>>> // Methods
>>>
>>> void readNodeIds() {
>>> // Reads RPC request parameters
>>> stream_.Read( &request_, in_handle_.tag( Handle::Operation::READ, [this]( 
>>> bool ok, Handle::Operation op ) {
>>> if ( !ok ) [[unlikely]] { return; }
>>>
>>> // Assigns the request to the nodeids vector
>>> nodeids_vector.assign( request_.nodeids().begin(), request_.nodeids().
>>> end() );
>>> request_.clear_nodeids();
>>>
>>> if ( !nodeids_vector.empty() ) {
>>> ownerClass.assetNodeMapping = ownerClass.assetsNodeMapping[request_.uuid
>>> ()];
>>> if ( ownerClass.assetNodeMapping.empty() ) {
>>> stream_.Finish( grpc::Status( grpc::StatusCode::NOT_FOUND, "Asset's 
>>> UUID not found in server..." ),
>>> in_handle_.tag( Handle::Operation::FINISH, [this]( bool ok, Handle::
>>> Operation /* op */ ) {
>>> if ( !ok ) [[unlikely]] {
>>> LOG_DEBUG << "The FINISH request-operation failed." << endl;
>>> cout << "The FINISH request-operation failed." << endl;
>>> }
>>>
>>> LOG_DEBUG << "Asset's UUID not found in server: " << request_.uuid() << 
>>> endl;
>>> cout << "Asset's UUID not found in server: " << request_.uuid() << endl;
>>> } ) );
>>> return;
>>> }
>>>
>>> writeNodeFile( nodeids_vector.front() );
>>> } else {
>>> stream_.Finish( grpc::Status( grpc::StatusCode::DATA_LOSS, "Asset' node 
>>> ids empty. Without node ids node streaming can't start..." ),
>>> in_handle_.tag( Handle::Operation::FINISH, [this]( bool ok, Handle::
>>> Operation /* op */ ) {
>>> if ( !ok ) [[unlikely]] {
>>> LOG_DEBUG << "The FINISH request-operation failed.";
>>> cout << "The FINISH request-operation failed.";
>>> }
>>>
>>> LOG_DEBUG << "Asset' node ids coming empty on the request. Without node 
>>> ids node streaming can't start..." << endl;
>>> cout << "Asset' node ids coming empty on the request. Without node ids 
>>> node streaming can't start..." << endl;
>>> } ) );
>>> }
>>> } ) );
>>> }
>>>
>>> void writeNodeFile( const string& nodeId ) {
>>> // Opens the file which contains the requested node
>>> nodeFile.open( string( ownerClass.assetNodeMapping[nodeId] ), ios::
>>> binary );
>>>
>>> if ( !nodeFile.is_open() ) {
>>> LOG_DEBUG << "Asset's node file open operation failed for node:" << 
>>> nodeId << endl;
>>> cout << "Asset's node file open operation failed for node:" << nodeId << 
>>> endl;
>>> }
>>>
>>> splitFileAndWriteChunks();
>>> }
>>>
>>> void splitFileAndWriteChunks() {
>>> setReplyWithBuffer();
>>>
>>> stream_.Write( reply_, out_handle_.tag( Handle::Operation::WRITE, [this]( 
>>> bool ok, Handle::Operation op ) {
>>> if ( !nodeFile.eof() ) {
>>> splitFileAndWriteChunks();
>>> } else if ( !nodeids_vector.empty() ) {
>>> nodeFile.close();
>>> nodeids_vector.erase( nodeids_vector.begin() );
>>>
>>> if ( !nodeids_vector.empty() ) {
>>> writeNodeFile( nodeids_vector.front() );
>>> } else {
>>> finishIfDone();
>>> }
>>> }
>>> } ) );
>>> }
>>>
>>> void setReplyWithBuffer() {
>>> // Fills read buffer
>>> nodeFile.read( buffer, chunk_size );
>>>
>>> // Prepare reply and start writing
>>> reply_.Clear();
>>> reply_.set_chunk_data( buffer, static_cast<int>( nodeFile.gcount() ) );
>>> }
>>>
>>> // We wait until all incoming messages are received and all outgoing 
>>> messages are sent
>>> // before we send the finish message.
>>> void finishIfDone() {
>>> stream_.Finish( grpc::Status::OK, out_handle_.tag( Handle::Operation::
>>> FINISH, [this]( bool ok, Handle::Operation /* op */ ) {
>>> if ( !ok ) [[unlikely]] {
>>> LOG_DEBUG << "The FINISH request-operation failed." << endl;
>>> cout << "The FINISH request-operation failed." << endl;
>>> }
>>> } ) );
>>> }
>>> };
>>>
>>> So the idea with above code is that the request is basically an array of 
>>> strings `ids` (ex. "1", "2", "3", ... btw it is defined as a stream in 
>>> protobuf) and each of those ids are pointing to a small file which is 
>>> stored on the server. Now, once the request is read on the rpc, it will 
>>> take the first Id, will open the file it points to and then start to write 
>>> the file in chunks to the client as a stream type response and when it 
>>> finishes, then it takes the second Id from the array and does the same 
>>> thing again and again until there are no more Ids left in the request's 
>>> array.
>>>
>>> From the client perspective the behavior is, a singe client should call 
>>> above RPC passing this ids array having a size of probably 500 elements 
>>> (different ids) and also the number of calls to this RPC should be like 
>>> 1000 calls per seconds.  
>>>
>>> We are using a shared completion queue for all RPCs and we do not have a 
>>> multithreading approach.
>>>
>>> With all above background, we would like to know if we can implement a 
>>> more efficient approach for above RPC method probably based in 
>>> multi-threading, that's our ultimate goal.
>>>
>>> Questions:
>>>
>>> 1.- Can the GRPC team tell us how to approach this problem in detail, 
>>> also in a away we can use a multi-threading strategy?
>>>
>>> 2.- How to use completion queues more efficiently along with these new 
>>> threads?
>>>
>>> The reason why we are asking these questions, is because we feel we are 
>>> not leveraging the real power of gRPC for this specific use case.
>>>
>>> Please let us know if you need more details from us.
>>>
>>> Thanks in advance.
>>>
>>> Pedro Alfonso
>>>
>>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/17533221-dbdb-4cde-9384-1a6cf48e2c88n%40googlegroups.com.

Reply via email to