Hi Yas, First of all, thanks for coming back to us. That's a really important comment and please correct us if we are wrong, the understanding is C++ gRPC team is working towards or maybe putting more effort in perfecting/optimizing the callback API approach? and btw, we are also agree, it's easier to use.
Kindly help us with these additional questions: 1.- By using the callback API approach, will we be able to serve different users concurrently the same way we do with our current implementation? 2.- Will we need to implement a threading logic like the one we have, or is not needed? Thanks in advance. Regards, Pedro On Wednesday, October 25, 2023 at 1:17:23 PM UTC-5 yas...@google.com wrote: > We have been recommending using the C++ callback API instead of the > completion queue based API since it's easier to use. All performance > optimizations that we are working are targeting the callback API. > > On Thursday, October 19, 2023 at 8:03:42 AM UTC-7 Pedro Alfonso wrote: > >> Hello, >> >> First let me explain what we have in our C++ gRPC Async server codebase: >> >> - We have 2 unary based response RPCs. >> - And we have 2 stream based response RPCs which will cover over 95% of >> the client's API consumption, meaning they are really important to our >> streaming based implementation. >> >> From the 2 stream based response RPCs, below one is the most critical to >> us: >> >> // Inner class StreamAssetNodes >> class StreamAssetNodes : public RequestBase { >> public: >> StreamAssetNodes( AsyncAssetStreamerManager& owner ) : RequestBase( owner >> ), ownerClass( owner ) { >> owner_.grpc().service_.RequestStreamAssetNodes( >> &context_, &stream_, cq(), cq(), in_handle_.tag( Handle::Operation:: >> CONNECT, [this, &owner]( bool ok, Handle::Operation /* op */ ) { >> LOG_DEBUG << "\n" + me( *this ) << "\n\n >> *****************************************************************\n" >> << "- Processing a new connect from " << context_.peer() >> << "\n\n***************************************************************** >> \n" >> << endl; >> cout << "\n" + me( *this ) << "\n >> *****************************************************************\n" >> << "- Processing a new connect from " << context_.peer() << "\n >> *****************************************************************\n" >> << endl; >> >> if ( !ok ) [[unlikely]] { >> LOG_DEBUG << "The CONNECT-operation failed." << endl; >> cout << "The CONNECT-operation failed." << endl; >> return; >> } >> >> // Creates a new instance so the service can handle requests from a new >> client >> owner_.createNew<StreamAssetNodes>( owner ); >> // Reads request's parameters >> readNodeIds(); >> } ) ); >> } >> >> private: >> // Objects and variables >> AsyncAssetStreamerManager& ownerClass; >> ::Illuscio::AssetNodeIds request_; >> ::Illuscio::AssetNodeComponent reply_; >> ::grpc::ServerContext context_; >> ::grpc::ServerAsyncReaderWriter<decltype( reply_ ), decltype( request_ )> >> stream_ { &context_ }; >> >> vector<string> nodeids_vector; >> // Contains mapping for all the nodes of a set of assets >> json assetsNodeMapping; >> // Contains mapping for all the nodes of a particular asset >> json assetNodeMapping; >> ifstream nodeFile; >> // Handle for messages coming in >> Handle in_handle_ { *this }; >> // Handle for messages going out >> Handle out_handle_ { *this }; >> >> int fileNumber = 0; >> const int chunk_size = 16 * 1024; >> char buffer[16 * 1024]; >> >> // Methods >> >> void readNodeIds() { >> // Reads RPC request parameters >> stream_.Read( &request_, in_handle_.tag( Handle::Operation::READ, [this]( >> bool ok, Handle::Operation op ) { >> if ( !ok ) [[unlikely]] { return; } >> >> // Assigns the request to the nodeids vector >> nodeids_vector.assign( request_.nodeids().begin(), request_.nodeids().end() >> ); >> request_.clear_nodeids(); >> >> if ( !nodeids_vector.empty() ) { >> ownerClass.assetNodeMapping = ownerClass.assetsNodeMapping[request_.uuid >> ()]; >> if ( ownerClass.assetNodeMapping.empty() ) { >> stream_.Finish( grpc::Status( grpc::StatusCode::NOT_FOUND, "Asset's UUID >> not found in server..." ), >> in_handle_.tag( Handle::Operation::FINISH, [this]( bool ok, Handle:: >> Operation /* op */ ) { >> if ( !ok ) [[unlikely]] { >> LOG_DEBUG << "The FINISH request-operation failed." << endl; >> cout << "The FINISH request-operation failed." << endl; >> } >> >> LOG_DEBUG << "Asset's UUID not found in server: " << request_.uuid() << >> endl; >> cout << "Asset's UUID not found in server: " << request_.uuid() << endl; >> } ) ); >> return; >> } >> >> writeNodeFile( nodeids_vector.front() ); >> } else { >> stream_.Finish( grpc::Status( grpc::StatusCode::DATA_LOSS, "Asset' node >> ids empty. Without node ids node streaming can't start..." ), >> in_handle_.tag( Handle::Operation::FINISH, [this]( bool ok, Handle:: >> Operation /* op */ ) { >> if ( !ok ) [[unlikely]] { >> LOG_DEBUG << "The FINISH request-operation failed."; >> cout << "The FINISH request-operation failed."; >> } >> >> LOG_DEBUG << "Asset' node ids coming empty on the request. Without node >> ids node streaming can't start..." << endl; >> cout << "Asset' node ids coming empty on the request. Without node ids >> node streaming can't start..." << endl; >> } ) ); >> } >> } ) ); >> } >> >> void writeNodeFile( const string& nodeId ) { >> // Opens the file which contains the requested node >> nodeFile.open( string( ownerClass.assetNodeMapping[nodeId] ), ios::binary >> ); >> >> if ( !nodeFile.is_open() ) { >> LOG_DEBUG << "Asset's node file open operation failed for node:" << >> nodeId << endl; >> cout << "Asset's node file open operation failed for node:" << nodeId << >> endl; >> } >> >> splitFileAndWriteChunks(); >> } >> >> void splitFileAndWriteChunks() { >> setReplyWithBuffer(); >> >> stream_.Write( reply_, out_handle_.tag( Handle::Operation::WRITE, [this]( >> bool ok, Handle::Operation op ) { >> if ( !nodeFile.eof() ) { >> splitFileAndWriteChunks(); >> } else if ( !nodeids_vector.empty() ) { >> nodeFile.close(); >> nodeids_vector.erase( nodeids_vector.begin() ); >> >> if ( !nodeids_vector.empty() ) { >> writeNodeFile( nodeids_vector.front() ); >> } else { >> finishIfDone(); >> } >> } >> } ) ); >> } >> >> void setReplyWithBuffer() { >> // Fills read buffer >> nodeFile.read( buffer, chunk_size ); >> >> // Prepare reply and start writing >> reply_.Clear(); >> reply_.set_chunk_data( buffer, static_cast<int>( nodeFile.gcount() ) ); >> } >> >> // We wait until all incoming messages are received and all outgoing >> messages are sent >> // before we send the finish message. >> void finishIfDone() { >> stream_.Finish( grpc::Status::OK, out_handle_.tag( Handle::Operation:: >> FINISH, [this]( bool ok, Handle::Operation /* op */ ) { >> if ( !ok ) [[unlikely]] { >> LOG_DEBUG << "The FINISH request-operation failed." << endl; >> cout << "The FINISH request-operation failed." << endl; >> } >> } ) ); >> } >> }; >> >> So the idea with above code is that the request is basically an array of >> strings `ids` (ex. "1", "2", "3", ... btw it is defined as a stream in >> protobuf) and each of those ids are pointing to a small file which is >> stored on the server. Now, once the request is read on the rpc, it will >> take the first Id, will open the file it points to and then start to write >> the file in chunks to the client as a stream type response and when it >> finishes, then it takes the second Id from the array and does the same >> thing again and again until there are no more Ids left in the request's >> array. >> >> From the client perspective the behavior is, a singe client should call >> above RPC passing this ids array having a size of probably 500 elements >> (different ids) and also the number of calls to this RPC should be like >> 1000 calls per seconds. >> >> We are using a shared completion queue for all RPCs and we do not have a >> multithreading approach. >> >> With all above background, we would like to know if we can implement a >> more efficient approach for above RPC method probably based in >> multi-threading, that's our ultimate goal. >> >> Questions: >> >> 1.- Can the GRPC team tell us how to approach this problem in detail, >> also in a away we can use a multi-threading strategy? >> >> 2.- How to use completion queues more efficiently along with these new >> threads? >> >> The reason why we are asking these questions, is because we feel we are >> not leveraging the real power of gRPC for this specific use case. >> >> Please let us know if you need more details from us. >> >> Thanks in advance. >> >> Pedro Alfonso >> > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/f77ad35a-1f6e-4bad-a5ea-32bb130db7cbn%40googlegroups.com.