We have been recommending using the C++ callback API instead of the 
completion queue based API since it's easier to use. All performance 
optimizations that we are working are targeting the callback API.

On Thursday, October 19, 2023 at 8:03:42 AM UTC-7 Pedro Alfonso wrote:

> Hello,
>
> First let me explain what we have in our C++ gRPC Async server codebase:
>
> - We have 2 unary based response RPCs.
> - And we have 2 stream based response RPCs which will cover over 95% of 
> the client's API consumption, meaning they are really important to our 
> streaming based implementation.
>
> From the 2 stream based response RPCs, below one is the most critical to 
> us:
>
> // Inner class StreamAssetNodes
> class StreamAssetNodes : public RequestBase {
> public:
> StreamAssetNodes( AsyncAssetStreamerManager& owner ) : RequestBase( owner 
> ), ownerClass( owner ) {
> owner_.grpc().service_.RequestStreamAssetNodes(
> &context_, &stream_, cq(), cq(), in_handle_.tag( Handle::Operation::
> CONNECT, [this, &owner]( bool ok, Handle::Operation /* op */ ) {
> LOG_DEBUG << "\n" + me( *this ) << "\n\n
> *****************************************************************\n"
> << "- Processing a new connect from " << context_.peer()
> << "\n\n*****************************************************************
> \n"
> << endl;
> cout << "\n" + me( *this ) << "\n
> *****************************************************************\n"
> << "- Processing a new connect from " << context_.peer() << "\n
> *****************************************************************\n"
> << endl;
>
> if ( !ok ) [[unlikely]] {
> LOG_DEBUG << "The CONNECT-operation failed." << endl;
> cout << "The CONNECT-operation failed." << endl;
> return;
> }
>
> // Creates a new instance so the service can handle requests from a new 
> client
> owner_.createNew<StreamAssetNodes>( owner );
> // Reads request's parameters
> readNodeIds();
> } ) );
> }
>
> private:
> // Objects and variables
> AsyncAssetStreamerManager& ownerClass;
> ::Illuscio::AssetNodeIds request_;
> ::Illuscio::AssetNodeComponent reply_;
> ::grpc::ServerContext context_;
> ::grpc::ServerAsyncReaderWriter<decltype( reply_ ), decltype( request_ )> 
> stream_ { &context_ };
>
> vector<string> nodeids_vector;
> // Contains mapping for all the nodes of a set of assets
> json assetsNodeMapping;
> // Contains mapping for all the nodes of a particular asset
> json assetNodeMapping;
> ifstream nodeFile;
> // Handle for messages coming in
> Handle in_handle_ { *this };
> // Handle for messages going out
> Handle out_handle_ { *this };
>
> int fileNumber = 0;
> const int chunk_size = 16 * 1024;
> char buffer[16 * 1024];
>
> // Methods
>
> void readNodeIds() {
> // Reads RPC request parameters
> stream_.Read( &request_, in_handle_.tag( Handle::Operation::READ, [this]( 
> bool ok, Handle::Operation op ) {
> if ( !ok ) [[unlikely]] { return; }
>
> // Assigns the request to the nodeids vector
> nodeids_vector.assign( request_.nodeids().begin(), request_.nodeids().end() 
> );
> request_.clear_nodeids();
>
> if ( !nodeids_vector.empty() ) {
> ownerClass.assetNodeMapping = ownerClass.assetsNodeMapping[request_.uuid()
> ];
> if ( ownerClass.assetNodeMapping.empty() ) {
> stream_.Finish( grpc::Status( grpc::StatusCode::NOT_FOUND, "Asset's UUID 
> not found in server..." ),
> in_handle_.tag( Handle::Operation::FINISH, [this]( bool ok, Handle::
> Operation /* op */ ) {
> if ( !ok ) [[unlikely]] {
> LOG_DEBUG << "The FINISH request-operation failed." << endl;
> cout << "The FINISH request-operation failed." << endl;
> }
>
> LOG_DEBUG << "Asset's UUID not found in server: " << request_.uuid() << 
> endl;
> cout << "Asset's UUID not found in server: " << request_.uuid() << endl;
> } ) );
> return;
> }
>
> writeNodeFile( nodeids_vector.front() );
> } else {
> stream_.Finish( grpc::Status( grpc::StatusCode::DATA_LOSS, "Asset' node 
> ids empty. Without node ids node streaming can't start..." ),
> in_handle_.tag( Handle::Operation::FINISH, [this]( bool ok, Handle::
> Operation /* op */ ) {
> if ( !ok ) [[unlikely]] {
> LOG_DEBUG << "The FINISH request-operation failed.";
> cout << "The FINISH request-operation failed.";
> }
>
> LOG_DEBUG << "Asset' node ids coming empty on the request. Without node 
> ids node streaming can't start..." << endl;
> cout << "Asset' node ids coming empty on the request. Without node ids 
> node streaming can't start..." << endl;
> } ) );
> }
> } ) );
> }
>
> void writeNodeFile( const string& nodeId ) {
> // Opens the file which contains the requested node
> nodeFile.open( string( ownerClass.assetNodeMapping[nodeId] ), ios::binary 
> );
>
> if ( !nodeFile.is_open() ) {
> LOG_DEBUG << "Asset's node file open operation failed for node:" << nodeId 
> << endl;
> cout << "Asset's node file open operation failed for node:" << nodeId << 
> endl;
> }
>
> splitFileAndWriteChunks();
> }
>
> void splitFileAndWriteChunks() {
> setReplyWithBuffer();
>
> stream_.Write( reply_, out_handle_.tag( Handle::Operation::WRITE, [this]( 
> bool ok, Handle::Operation op ) {
> if ( !nodeFile.eof() ) {
> splitFileAndWriteChunks();
> } else if ( !nodeids_vector.empty() ) {
> nodeFile.close();
> nodeids_vector.erase( nodeids_vector.begin() );
>
> if ( !nodeids_vector.empty() ) {
> writeNodeFile( nodeids_vector.front() );
> } else {
> finishIfDone();
> }
> }
> } ) );
> }
>
> void setReplyWithBuffer() {
> // Fills read buffer
> nodeFile.read( buffer, chunk_size );
>
> // Prepare reply and start writing
> reply_.Clear();
> reply_.set_chunk_data( buffer, static_cast<int>( nodeFile.gcount() ) );
> }
>
> // We wait until all incoming messages are received and all outgoing 
> messages are sent
> // before we send the finish message.
> void finishIfDone() {
> stream_.Finish( grpc::Status::OK, out_handle_.tag( Handle::Operation::
> FINISH, [this]( bool ok, Handle::Operation /* op */ ) {
> if ( !ok ) [[unlikely]] {
> LOG_DEBUG << "The FINISH request-operation failed." << endl;
> cout << "The FINISH request-operation failed." << endl;
> }
> } ) );
> }
> };
>
> So the idea with above code is that the request is basically an array of 
> strings `ids` (ex. "1", "2", "3", ... btw it is defined as a stream in 
> protobuf) and each of those ids are pointing to a small file which is 
> stored on the server. Now, once the request is read on the rpc, it will 
> take the first Id, will open the file it points to and then start to write 
> the file in chunks to the client as a stream type response and when it 
> finishes, then it takes the second Id from the array and does the same 
> thing again and again until there are no more Ids left in the request's 
> array.
>
> From the client perspective the behavior is, a singe client should call 
> above RPC passing this ids array having a size of probably 500 elements 
> (different ids) and also the number of calls to this RPC should be like 
> 1000 calls per seconds.  
>
> We are using a shared completion queue for all RPCs and we do not have a 
> multithreading approach.
>
> With all above background, we would like to know if we can implement a 
> more efficient approach for above RPC method probably based in 
> multi-threading, that's our ultimate goal.
>
> Questions:
>
> 1.- Can the GRPC team tell us how to approach this problem in detail, also 
> in a away we can use a multi-threading strategy?
>
> 2.- How to use completion queues more efficiently along with these new 
> threads?
>
> The reason why we are asking these questions, is because we feel we are 
> not leveraging the real power of gRPC for this specific use case.
>
> Please let us know if you need more details from us.
>
> Thanks in advance.
>
> Pedro Alfonso
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/7ecd9fcc-86c3-4a16-9fe2-22b3253529f2n%40googlegroups.com.

Reply via email to