> C++ gRPC team is working towards or maybe putting more effort in perfecting/optimizing the callback API approach? Yes
> 1.- By using the callback API approach, will we be able to serve different users concurrently the same way we do with our current implementation? Yes > 2.- Will we need to implement a threading logic like the one we have, or is not needed? Not needed with C++ callback API On Wednesday, October 25, 2023 at 6:21:28 PM UTC-7 Pedro Alfonso wrote: > Hi Yas, > > First of all, thanks for coming back to us. > That's a really important comment and please correct us if we are wrong, > the understanding is C++ gRPC team is working towards or maybe putting more > effort in perfecting/optimizing the callback API approach? and btw, we are > also agree, it's easier to use. > > Kindly help us with these additional questions: > > 1.- By using the callback API approach, will we be able to serve different > users concurrently the same way we do with our current implementation? > 2.- Will we need to implement a threading logic like the one we have, or > is not needed? > > Thanks in advance. > > Regards, > > Pedro > On Wednesday, October 25, 2023 at 1:17:23 PM UTC-5 yas...@google.com > wrote: > >> We have been recommending using the C++ callback API instead of the >> completion queue based API since it's easier to use. All performance >> optimizations that we are working are targeting the callback API. >> >> On Thursday, October 19, 2023 at 8:03:42 AM UTC-7 Pedro Alfonso wrote: >> >>> Hello, >>> >>> First let me explain what we have in our C++ gRPC Async server codebase: >>> >>> - We have 2 unary based response RPCs. >>> - And we have 2 stream based response RPCs which will cover over 95% of >>> the client's API consumption, meaning they are really important to our >>> streaming based implementation. >>> >>> From the 2 stream based response RPCs, below one is the most critical to >>> us: >>> >>> // Inner class StreamAssetNodes >>> class StreamAssetNodes : public RequestBase { >>> public: >>> StreamAssetNodes( AsyncAssetStreamerManager& owner ) : RequestBase( >>> owner ), ownerClass( owner ) { >>> owner_.grpc().service_.RequestStreamAssetNodes( >>> &context_, &stream_, cq(), cq(), in_handle_.tag( Handle::Operation:: >>> CONNECT, [this, &owner]( bool ok, Handle::Operation /* op */ ) { >>> LOG_DEBUG << "\n" + me( *this ) << "\n\n >>> *****************************************************************\n" >>> << "- Processing a new connect from " << context_.peer() >>> << "\n\n >>> *****************************************************************\n" >>> << endl; >>> cout << "\n" + me( *this ) << "\n >>> *****************************************************************\n" >>> << "- Processing a new connect from " << context_.peer() << "\n >>> *****************************************************************\n" >>> << endl; >>> >>> if ( !ok ) [[unlikely]] { >>> LOG_DEBUG << "The CONNECT-operation failed." << endl; >>> cout << "The CONNECT-operation failed." << endl; >>> return; >>> } >>> >>> // Creates a new instance so the service can handle requests from a new >>> client >>> owner_.createNew<StreamAssetNodes>( owner ); >>> // Reads request's parameters >>> readNodeIds(); >>> } ) ); >>> } >>> >>> private: >>> // Objects and variables >>> AsyncAssetStreamerManager& ownerClass; >>> ::Illuscio::AssetNodeIds request_; >>> ::Illuscio::AssetNodeComponent reply_; >>> ::grpc::ServerContext context_; >>> ::grpc::ServerAsyncReaderWriter<decltype( reply_ ), decltype( request_ ) >>> > stream_ { &context_ }; >>> >>> vector<string> nodeids_vector; >>> // Contains mapping for all the nodes of a set of assets >>> json assetsNodeMapping; >>> // Contains mapping for all the nodes of a particular asset >>> json assetNodeMapping; >>> ifstream nodeFile; >>> // Handle for messages coming in >>> Handle in_handle_ { *this }; >>> // Handle for messages going out >>> Handle out_handle_ { *this }; >>> >>> int fileNumber = 0; >>> const int chunk_size = 16 * 1024; >>> char buffer[16 * 1024]; >>> >>> // Methods >>> >>> void readNodeIds() { >>> // Reads RPC request parameters >>> stream_.Read( &request_, in_handle_.tag( Handle::Operation::READ, [this]( >>> bool ok, Handle::Operation op ) { >>> if ( !ok ) [[unlikely]] { return; } >>> >>> // Assigns the request to the nodeids vector >>> nodeids_vector.assign( request_.nodeids().begin(), request_.nodeids(). >>> end() ); >>> request_.clear_nodeids(); >>> >>> if ( !nodeids_vector.empty() ) { >>> ownerClass.assetNodeMapping = ownerClass.assetsNodeMapping[request_.uuid >>> ()]; >>> if ( ownerClass.assetNodeMapping.empty() ) { >>> stream_.Finish( grpc::Status( grpc::StatusCode::NOT_FOUND, "Asset's >>> UUID not found in server..." ), >>> in_handle_.tag( Handle::Operation::FINISH, [this]( bool ok, Handle:: >>> Operation /* op */ ) { >>> if ( !ok ) [[unlikely]] { >>> LOG_DEBUG << "The FINISH request-operation failed." << endl; >>> cout << "The FINISH request-operation failed." << endl; >>> } >>> >>> LOG_DEBUG << "Asset's UUID not found in server: " << request_.uuid() << >>> endl; >>> cout << "Asset's UUID not found in server: " << request_.uuid() << endl; >>> } ) ); >>> return; >>> } >>> >>> writeNodeFile( nodeids_vector.front() ); >>> } else { >>> stream_.Finish( grpc::Status( grpc::StatusCode::DATA_LOSS, "Asset' node >>> ids empty. Without node ids node streaming can't start..." ), >>> in_handle_.tag( Handle::Operation::FINISH, [this]( bool ok, Handle:: >>> Operation /* op */ ) { >>> if ( !ok ) [[unlikely]] { >>> LOG_DEBUG << "The FINISH request-operation failed."; >>> cout << "The FINISH request-operation failed."; >>> } >>> >>> LOG_DEBUG << "Asset' node ids coming empty on the request. Without node >>> ids node streaming can't start..." << endl; >>> cout << "Asset' node ids coming empty on the request. Without node ids >>> node streaming can't start..." << endl; >>> } ) ); >>> } >>> } ) ); >>> } >>> >>> void writeNodeFile( const string& nodeId ) { >>> // Opens the file which contains the requested node >>> nodeFile.open( string( ownerClass.assetNodeMapping[nodeId] ), ios:: >>> binary ); >>> >>> if ( !nodeFile.is_open() ) { >>> LOG_DEBUG << "Asset's node file open operation failed for node:" << >>> nodeId << endl; >>> cout << "Asset's node file open operation failed for node:" << nodeId << >>> endl; >>> } >>> >>> splitFileAndWriteChunks(); >>> } >>> >>> void splitFileAndWriteChunks() { >>> setReplyWithBuffer(); >>> >>> stream_.Write( reply_, out_handle_.tag( Handle::Operation::WRITE, [this]( >>> bool ok, Handle::Operation op ) { >>> if ( !nodeFile.eof() ) { >>> splitFileAndWriteChunks(); >>> } else if ( !nodeids_vector.empty() ) { >>> nodeFile.close(); >>> nodeids_vector.erase( nodeids_vector.begin() ); >>> >>> if ( !nodeids_vector.empty() ) { >>> writeNodeFile( nodeids_vector.front() ); >>> } else { >>> finishIfDone(); >>> } >>> } >>> } ) ); >>> } >>> >>> void setReplyWithBuffer() { >>> // Fills read buffer >>> nodeFile.read( buffer, chunk_size ); >>> >>> // Prepare reply and start writing >>> reply_.Clear(); >>> reply_.set_chunk_data( buffer, static_cast<int>( nodeFile.gcount() ) ); >>> } >>> >>> // We wait until all incoming messages are received and all outgoing >>> messages are sent >>> // before we send the finish message. >>> void finishIfDone() { >>> stream_.Finish( grpc::Status::OK, out_handle_.tag( Handle::Operation:: >>> FINISH, [this]( bool ok, Handle::Operation /* op */ ) { >>> if ( !ok ) [[unlikely]] { >>> LOG_DEBUG << "The FINISH request-operation failed." << endl; >>> cout << "The FINISH request-operation failed." << endl; >>> } >>> } ) ); >>> } >>> }; >>> >>> So the idea with above code is that the request is basically an array of >>> strings `ids` (ex. "1", "2", "3", ... btw it is defined as a stream in >>> protobuf) and each of those ids are pointing to a small file which is >>> stored on the server. Now, once the request is read on the rpc, it will >>> take the first Id, will open the file it points to and then start to write >>> the file in chunks to the client as a stream type response and when it >>> finishes, then it takes the second Id from the array and does the same >>> thing again and again until there are no more Ids left in the request's >>> array. >>> >>> From the client perspective the behavior is, a singe client should call >>> above RPC passing this ids array having a size of probably 500 elements >>> (different ids) and also the number of calls to this RPC should be like >>> 1000 calls per seconds. >>> >>> We are using a shared completion queue for all RPCs and we do not have a >>> multithreading approach. >>> >>> With all above background, we would like to know if we can implement a >>> more efficient approach for above RPC method probably based in >>> multi-threading, that's our ultimate goal. >>> >>> Questions: >>> >>> 1.- Can the GRPC team tell us how to approach this problem in detail, >>> also in a away we can use a multi-threading strategy? >>> >>> 2.- How to use completion queues more efficiently along with these new >>> threads? >>> >>> The reason why we are asking these questions, is because we feel we are >>> not leveraging the real power of gRPC for this specific use case. >>> >>> Please let us know if you need more details from us. >>> >>> Thanks in advance. >>> >>> Pedro Alfonso >>> >> -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/17533221-dbdb-4cde-9384-1a6cf48e2c88n%40googlegroups.com.