>From a source I am getting stream data which size will not be known before the final processing, but the minimum is 10 GB. I have to send this large amount of data using `gRPC`.
Need to mention here, this `large amount data` will be passed through the `gRPC` while the processing of the `streaming` is done. In this step, I have thought to store all the value in a `vector`. ##### Regarding sending large amount of data I have tried to get idea and found: - [This](https://stackoverflow.com/questions/68644134/how-to-split-long-messages-into-short-messages-in-grpc-c) where it is mentioned not to pass large data using `gRPC`. Here, mentioned to use any other message protocol where I have limitation to use something else rather than `gRPC`(at least till today). - [From this post](https://stackoverflow.com/questions/60538700/how-could-i-send-message-of-many-fields-with-c-grpc-stream) I have tried to know how `chunk message` can be sent but I am not sure is it related to my problem or not. - [First post](https://jbrandhorst.com/post/grpc-binary-blob-stream/) where I have found a blog to stream data using `go` language. - [This one](https://stackoverflow.com/a/54334419/10634362) the presentation using `python` language of [this post](https://jbrandhorst.com/post/grpc-binary-blob-stream/). But it is also incomplete. - [gRPC example](https://github.com/grpc/grpc/tree/master/examples/cpp/route_guide) could be a good start bt cannot decode due to lack of C++ knowledge ## From there, a huge Update I have done in the question. But the main theme of the question is not changed ##### What I have done so far and some points about my project. The [github repo is available here](https://github.com/atifkarim/gRPC_CPP_CMake/tree/streaming_chunk). - A `Unary rpc` is present in the project - I know that my new `Bi directional rpc` will take some time. I want that the `Unary rpc` will not wait for the completion of the `Bi directional rpc`. Right now I am thinking in a `synchronous way` where `Unary rpc` is waiting to pass it's `status` for the streaming one completion. - I am avoiding the unnecessary lines in `C++ code`. But giving whole `proto` files - `big_data.proto` ```sh syntax = "proto3"; package demo_grpc; message Large_Data { repeated int32 large_data_collection = 1 [packed=true]; int32 data_chunk_number = 2; } ``` - `addressbook.proto` ```sh syntax = "proto3"; package demo_grpc; import "myproto/big_data.proto"; message S_Response { string name = 1; string street = 2; string zip = 3; string city = 4; string country = 5; int32 double_init_val = 6; } message C_Request { uint32 choose_area = 1; string name = 2; int32 init_val = 3; } service AddressBook { rpc GetAddress(C_Request) returns (S_Response) {} rpc Stream_Chunk_Service(stream Large_Data) returns (stream Large_Data) {} } ``` - `client.cpp` ```cpp #include <big_data.pb.h> #include <addressbook.grpc.pb.h> #include <grpcpp/grpcpp.h> #include <grpcpp/create_channel.h> #include <iostream> #include <numeric> using namespace std; // This function prompts the user to set value for the required area void Client_Request(demo_grpc::C_Request &request_) { // do processing for unary rpc. Intentionally avoided here } // According to Client Request this function display the value of protobuf message void Server_Response(demo_grpc::C_Request &request_, const demo_grpc::S_Response &response_) { // do processing for unary rpc. Intentionally avoided here } // following function make large vector and then chunk to send via stream from client to server void Stream_Data_Chunk_Request(demo_grpc::Large_Data &request_, demo_grpc::Large_Data &response_, uint64_t preferred_chunk_size_in_kibyte) { // A dummy vector which in real case will be the large data set's container std::vector<int32_t> large_vector; // irerate it now for 1024*10 times for(int64_t i = 0; i < 1024 * 10; i++) { large_vector.push_back(1); } uint64_t preferred_chunk_size_in_kibyte_holds_integer_num = 0; // 1 chunk how many intger will contain that num will come here // total chunk number will be updated here uint32_t total_chunk = total_chunk_counter(large_vector.size(), preferred_chunk_size_in_kibyte, preferred_chunk_size_in_kibyte_holds_integer_num); // A temp counter to trace the index of the large_vector int32_t temp_count = 0; // loop will start if the total num of chunk is greater than 0. After each iteration total_chunk will be decremented while(total_chunk > 0) { for (int64_t i = temp_count * preferred_chunk_size_in_kibyte_holds_integer_num; i < preferred_chunk_size_in_kibyte_holds_integer_num + temp_count * preferred_chunk_size_in_kibyte_holds_integer_num; i++) { // the repeated field large_data_collection is taking value from the large_vector request_.add_large_data_collection(large_vector[i]); } temp_count++; total_chunk--; std::string ip_address = "localhost:50051"; auto channel = grpc::CreateChannel(ip_address, grpc::InsecureChannelCredentials()); std::unique_ptr<demo_grpc::AddressBook::Stub> stub = demo_grpc::AddressBook::NewStub(channel); grpc::ClientContext context; std::shared_ptr<::grpc::ClientReaderWriter< ::demo_grpc::Large_Data, ::demo_grpc::Large_Data> > stream(stub->Stream_Chunk_Service(&context)); // While the size of each chunk is eached then this repeated field is cleared. I am not sure before this // value can be transfered to server or not. But my assumption is saying that it should be done request_.clear_large_data_collection(); } } int main(int argc, char* argv[]) { std::string client_address = "localhost:50051"; std::cout << "Address of client: " << client_address << std::endl; // The following part for the Unary RPC demo_grpc::C_Request query; demo_grpc::S_Response result; Client_Request(query); // This part for the streaming chunk data (Bi directional Stream RPC) demo_grpc::Large_Data stream_chunk_request_; demo_grpc::Large_Data stream_chunk_response_; uint64_t preferred_chunk_size_in_kibyte = 64; Stream_Data_Chunk_Request(stream_chunk_request_, stream_chunk_response_, preferred_chunk_size_in_kibyte); // Call auto channel = grpc::CreateChannel(client_address, grpc::InsecureChannelCredentials()); std::unique_ptr<demo_grpc::AddressBook::Stub> stub = demo_grpc::AddressBook::NewStub(channel); grpc::ClientContext context; grpc::Status status = stub->GetAddress(&context, query, &result); // the following status is for unary rpc as far I have understood the structure if (status.ok()) { Server_Response(query, result); } else { std::cout << status.error_message() << std::endl; } return 0; } ``` - `heper function total_chunk_counter` ```cpp #include <cmath> uint32_t total_chunk_counter(uint64_t num_of_container_content, uint64_t preferred_chunk_size_in_kibyte, uint64_t &preferred_chunk_size_in_kibyte_holds_integer_num) { uint64_t cotainer_size_in_kibyte = (32ULL * num_of_container_content) / 1024; preferred_chunk_size_in_kibyte_holds_integer_num = (num_of_container_content * preferred_chunk_size_in_kibyte) / cotainer_size_in_kibyte; float total_chunk = static_cast<float>(num_of_container_content) / preferred_chunk_size_in_kibyte_holds_integer_num; return std::ceil(total_chunk); } ``` - `server.cpp` which is totally incomplete ```cpp #include <myproto/big_data.pb.h> #include <myproto/addressbook.grpc.pb.h> #include <grpcpp/grpcpp.h> #include <grpcpp/server_builder.h> #include <iostream> class AddressBookService final : public demo_grpc::AddressBook::Service { public: virtual ::grpc::Status GetAddress(::grpc::ServerContext* context, const ::demo_grpc::C_Request* request, ::demo_grpc::S_Response* response) { switch (request->choose_area()) { // do processing for unary rpc. Intentionally avoided here std::cout << "Information of " << request->choose_area() << " is sent to Client" << std::endl; return grpc::Status::OK; } // Bi-directional streaming chunk data virtual ::grpc::Status Stream_Chunk_Service(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::demo_grpc::Large_Data, ::demo_grpc::Large_Data>* stream) { // stream->Large_Data; return grpc::Status::OK; } }; void RunServer() { std::cout << "grpc Version: " << grpc::Version() << std::endl; std::string server_address = "localhost:50051"; std::cout << "Address of server: " << server_address << std::endl; grpc::ServerBuilder builder; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); AddressBookService my_service; builder.RegisterService(&my_service); std::unique_ptr<grpc::Server> server(builder.BuildAndStart()); server->Wait(); } int main(int argc, char* argv[]) { RunServer(); return 0; } ``` #### In summary my desire - I need to pass the content of `large_vector` with the `repeated field large_data_collection` of message `Large_Data`. I should `chunk` the size of the `large_vector` and populate the `repeated field large_data_collection` with that `chunk size` - In server side all `chunk` will be concatenate by keeping the exact order of the `large_vector`. Some processing will be done on them (eg: `double the value of each index`). Then again whole data will be sent to the `client` as a `chunk stream` - Would be great if the present `unary rpc` don't wait for the completion of the `bi-directional rpc` Solution with example would be really helpful. Advance thanks. The [github repo is available here](https://github.com/atifkarim/gRPC_CPP_CMake/tree/streaming_chunk). -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/ba2e3219-7b63-49a7-a74a-2975f7474863n%40googlegroups.com.