Hi Yas,

First of all, thanks for coming back to us.
That's a really important comment and please correct us if we are wrong, 
the understanding is C++ gRPC team is working towards or maybe putting more 
effort in perfecting/optimizing the callback API approach? and btw, we are 
also agree, it's easier to use.

Kindly help us with these additional questions:

1.- By using the callback API approach, will we be able to serve different 
users concurrently the same way we do with our current implementation?
2.- Will we need to implement a threading logic like the one we have, or is 
not needed?

Thanks in advance.

Regards,

Pedro
On Wednesday, October 25, 2023 at 1:17:23 PM UTC-5 yas...@google.com wrote:

> We have been recommending using the C++ callback API instead of the 
> completion queue based API since it's easier to use. All performance 
> optimizations that we are working are targeting the callback API.
>
> On Thursday, October 19, 2023 at 8:03:42 AM UTC-7 Pedro Alfonso wrote:
>
>> Hello,
>>
>> First let me explain what we have in our C++ gRPC Async server codebase:
>>
>> - We have 2 unary based response RPCs.
>> - And we have 2 stream based response RPCs which will cover over 95% of 
>> the client's API consumption, meaning they are really important to our 
>> streaming based implementation.
>>
>> From the 2 stream based response RPCs, below one is the most critical to 
>> us:
>>
>> // Inner class StreamAssetNodes
>> class StreamAssetNodes : public RequestBase {
>> public:
>> StreamAssetNodes( AsyncAssetStreamerManager& owner ) : RequestBase( owner 
>> ), ownerClass( owner ) {
>> owner_.grpc().service_.RequestStreamAssetNodes(
>> &context_, &stream_, cq(), cq(), in_handle_.tag( Handle::Operation::
>> CONNECT, [this, &owner]( bool ok, Handle::Operation /* op */ ) {
>> LOG_DEBUG << "\n" + me( *this ) << "\n\n
>> *****************************************************************\n"
>> << "- Processing a new connect from " << context_.peer()
>> << "\n\n*****************************************************************
>> \n"
>> << endl;
>> cout << "\n" + me( *this ) << "\n
>> *****************************************************************\n"
>> << "- Processing a new connect from " << context_.peer() << "\n
>> *****************************************************************\n"
>> << endl;
>>
>> if ( !ok ) [[unlikely]] {
>> LOG_DEBUG << "The CONNECT-operation failed." << endl;
>> cout << "The CONNECT-operation failed." << endl;
>> return;
>> }
>>
>> // Creates a new instance so the service can handle requests from a new 
>> client
>> owner_.createNew<StreamAssetNodes>( owner );
>> // Reads request's parameters
>> readNodeIds();
>> } ) );
>> }
>>
>> private:
>> // Objects and variables
>> AsyncAssetStreamerManager& ownerClass;
>> ::Illuscio::AssetNodeIds request_;
>> ::Illuscio::AssetNodeComponent reply_;
>> ::grpc::ServerContext context_;
>> ::grpc::ServerAsyncReaderWriter<decltype( reply_ ), decltype( request_ )> 
>> stream_ { &context_ };
>>
>> vector<string> nodeids_vector;
>> // Contains mapping for all the nodes of a set of assets
>> json assetsNodeMapping;
>> // Contains mapping for all the nodes of a particular asset
>> json assetNodeMapping;
>> ifstream nodeFile;
>> // Handle for messages coming in
>> Handle in_handle_ { *this };
>> // Handle for messages going out
>> Handle out_handle_ { *this };
>>
>> int fileNumber = 0;
>> const int chunk_size = 16 * 1024;
>> char buffer[16 * 1024];
>>
>> // Methods
>>
>> void readNodeIds() {
>> // Reads RPC request parameters
>> stream_.Read( &request_, in_handle_.tag( Handle::Operation::READ, [this]( 
>> bool ok, Handle::Operation op ) {
>> if ( !ok ) [[unlikely]] { return; }
>>
>> // Assigns the request to the nodeids vector
>> nodeids_vector.assign( request_.nodeids().begin(), request_.nodeids().end() 
>> );
>> request_.clear_nodeids();
>>
>> if ( !nodeids_vector.empty() ) {
>> ownerClass.assetNodeMapping = ownerClass.assetsNodeMapping[request_.uuid
>> ()];
>> if ( ownerClass.assetNodeMapping.empty() ) {
>> stream_.Finish( grpc::Status( grpc::StatusCode::NOT_FOUND, "Asset's UUID 
>> not found in server..." ),
>> in_handle_.tag( Handle::Operation::FINISH, [this]( bool ok, Handle::
>> Operation /* op */ ) {
>> if ( !ok ) [[unlikely]] {
>> LOG_DEBUG << "The FINISH request-operation failed." << endl;
>> cout << "The FINISH request-operation failed." << endl;
>> }
>>
>> LOG_DEBUG << "Asset's UUID not found in server: " << request_.uuid() << 
>> endl;
>> cout << "Asset's UUID not found in server: " << request_.uuid() << endl;
>> } ) );
>> return;
>> }
>>
>> writeNodeFile( nodeids_vector.front() );
>> } else {
>> stream_.Finish( grpc::Status( grpc::StatusCode::DATA_LOSS, "Asset' node 
>> ids empty. Without node ids node streaming can't start..." ),
>> in_handle_.tag( Handle::Operation::FINISH, [this]( bool ok, Handle::
>> Operation /* op */ ) {
>> if ( !ok ) [[unlikely]] {
>> LOG_DEBUG << "The FINISH request-operation failed.";
>> cout << "The FINISH request-operation failed.";
>> }
>>
>> LOG_DEBUG << "Asset' node ids coming empty on the request. Without node 
>> ids node streaming can't start..." << endl;
>> cout << "Asset' node ids coming empty on the request. Without node ids 
>> node streaming can't start..." << endl;
>> } ) );
>> }
>> } ) );
>> }
>>
>> void writeNodeFile( const string& nodeId ) {
>> // Opens the file which contains the requested node
>> nodeFile.open( string( ownerClass.assetNodeMapping[nodeId] ), ios::binary 
>> );
>>
>> if ( !nodeFile.is_open() ) {
>> LOG_DEBUG << "Asset's node file open operation failed for node:" << 
>> nodeId << endl;
>> cout << "Asset's node file open operation failed for node:" << nodeId << 
>> endl;
>> }
>>
>> splitFileAndWriteChunks();
>> }
>>
>> void splitFileAndWriteChunks() {
>> setReplyWithBuffer();
>>
>> stream_.Write( reply_, out_handle_.tag( Handle::Operation::WRITE, [this]( 
>> bool ok, Handle::Operation op ) {
>> if ( !nodeFile.eof() ) {
>> splitFileAndWriteChunks();
>> } else if ( !nodeids_vector.empty() ) {
>> nodeFile.close();
>> nodeids_vector.erase( nodeids_vector.begin() );
>>
>> if ( !nodeids_vector.empty() ) {
>> writeNodeFile( nodeids_vector.front() );
>> } else {
>> finishIfDone();
>> }
>> }
>> } ) );
>> }
>>
>> void setReplyWithBuffer() {
>> // Fills read buffer
>> nodeFile.read( buffer, chunk_size );
>>
>> // Prepare reply and start writing
>> reply_.Clear();
>> reply_.set_chunk_data( buffer, static_cast<int>( nodeFile.gcount() ) );
>> }
>>
>> // We wait until all incoming messages are received and all outgoing 
>> messages are sent
>> // before we send the finish message.
>> void finishIfDone() {
>> stream_.Finish( grpc::Status::OK, out_handle_.tag( Handle::Operation::
>> FINISH, [this]( bool ok, Handle::Operation /* op */ ) {
>> if ( !ok ) [[unlikely]] {
>> LOG_DEBUG << "The FINISH request-operation failed." << endl;
>> cout << "The FINISH request-operation failed." << endl;
>> }
>> } ) );
>> }
>> };
>>
>> So the idea with above code is that the request is basically an array of 
>> strings `ids` (ex. "1", "2", "3", ... btw it is defined as a stream in 
>> protobuf) and each of those ids are pointing to a small file which is 
>> stored on the server. Now, once the request is read on the rpc, it will 
>> take the first Id, will open the file it points to and then start to write 
>> the file in chunks to the client as a stream type response and when it 
>> finishes, then it takes the second Id from the array and does the same 
>> thing again and again until there are no more Ids left in the request's 
>> array.
>>
>> From the client perspective the behavior is, a singe client should call 
>> above RPC passing this ids array having a size of probably 500 elements 
>> (different ids) and also the number of calls to this RPC should be like 
>> 1000 calls per seconds.  
>>
>> We are using a shared completion queue for all RPCs and we do not have a 
>> multithreading approach.
>>
>> With all above background, we would like to know if we can implement a 
>> more efficient approach for above RPC method probably based in 
>> multi-threading, that's our ultimate goal.
>>
>> Questions:
>>
>> 1.- Can the GRPC team tell us how to approach this problem in detail, 
>> also in a away we can use a multi-threading strategy?
>>
>> 2.- How to use completion queues more efficiently along with these new 
>> threads?
>>
>> The reason why we are asking these questions, is because we feel we are 
>> not leveraging the real power of gRPC for this specific use case.
>>
>> Please let us know if you need more details from us.
>>
>> Thanks in advance.
>>
>> Pedro Alfonso
>>
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/f77ad35a-1f6e-4bad-a5ea-32bb130db7cbn%40googlegroups.com.

Reply via email to