>From a source I am getting stream data which size will not be known before 
the final processing, but the minimum is 10 GB. I have to send this large 
amount of data using `gRPC`.

Need to mention here, this `large amount data` will be passed through the 
`gRPC` while the processing of the `streaming` is done. In this step, I 
have thought to store all the value in a `vector`.

##### Regarding sending large amount of data I have tried to get idea and 
found:
 - 
[This](https://stackoverflow.com/questions/68644134/how-to-split-long-messages-into-short-messages-in-grpc-c)
 
where it is mentioned not to pass large data using `gRPC`. Here, mentioned 
to use any other message protocol where I have limitation to use something 
else rather than `gRPC`(at least till today).
 - [From this 
post](https://stackoverflow.com/questions/60538700/how-could-i-send-message-of-many-fields-with-c-grpc-stream)
 
I have tried to know how `chunk message` can be sent but I am not sure is 
it related to my problem or not.
- [First post](https://jbrandhorst.com/post/grpc-binary-blob-stream/) where 
I have found a blog to stream data using `go` language.
- [This one](https://stackoverflow.com/a/54334419/10634362) the 
presentation using `python` language of [this 
post](https://jbrandhorst.com/post/grpc-binary-blob-stream/). But it is 
also incomplete.
- [gRPC 
example](https://github.com/grpc/grpc/tree/master/examples/cpp/route_guide) 
could be a good start bt cannot decode due to lack of C++ knowledge

## From there, a huge Update I have done in the question. But the main 
theme of the question is not changed

##### What I have done so far and some points about my project. The [github 
repo is available 
here](https://github.com/atifkarim/gRPC_CPP_CMake/tree/streaming_chunk).

- A `Unary rpc` is present in the project
- I know that my new `Bi directional rpc` will take some time. I want that 
the `Unary rpc` will not wait for the completion of the `Bi directional 
rpc`. Right now I am thinking in a  `synchronous way` where `Unary rpc` is 
waiting to pass it's `status` for the streaming one completion.
- I am avoiding the unnecessary lines in `C++ code`. But giving whole 
`proto` files

- `big_data.proto`

```sh
syntax = "proto3";

package demo_grpc;

message Large_Data {
    repeated int32 large_data_collection = 1 [packed=true];
    int32 data_chunk_number = 2;
}
```

- `addressbook.proto`

```sh
syntax = "proto3";

package demo_grpc;

import "myproto/big_data.proto";

message S_Response {
    string name     = 1;
    string street   = 2;
    string zip      = 3;
    string city     = 4;
    string country  = 5;

    int32 double_init_val = 6;
}

message C_Request {
    uint32 choose_area = 1;
    string name = 2;
    int32 init_val = 3;
}

service AddressBook {
    rpc GetAddress(C_Request) returns (S_Response) {}

    rpc Stream_Chunk_Service(stream Large_Data) returns (stream Large_Data) 
{}
}

```

- `client.cpp`
```cpp
#include <big_data.pb.h>
#include <addressbook.grpc.pb.h>

#include <grpcpp/grpcpp.h>
#include <grpcpp/create_channel.h>

#include <iostream>
#include <numeric>
using namespace std;

// This function prompts the user to set value for the required area
void Client_Request(demo_grpc::C_Request &request_)
{
// do processing for unary rpc. Intentionally avoided here
}

// According to Client Request this function display the value of protobuf 
message
void Server_Response(demo_grpc::C_Request &request_, const 
demo_grpc::S_Response &response_)
{
// do processing for unary rpc. Intentionally avoided here
}

// following function make large vector and then chunk to send via stream 
from client to server
void Stream_Data_Chunk_Request(demo_grpc::Large_Data &request_,
                                                          
 demo_grpc::Large_Data &response_,
                                                           uint64_t 
preferred_chunk_size_in_kibyte)
{
        // A dummy vector which in real case will be the large data set's 
container
        std::vector<int32_t> large_vector;

        // irerate it now for 1024*10 times
        for(int64_t i = 0; i < 1024 * 10; i++)
        {
                large_vector.push_back(1);
        }

        uint64_t preferred_chunk_size_in_kibyte_holds_integer_num = 0; // 1 
chunk how many intger will contain that num will come here

        // total chunk number will be updated here
        uint32_t total_chunk = total_chunk_counter(large_vector.size(), 
preferred_chunk_size_in_kibyte, 
preferred_chunk_size_in_kibyte_holds_integer_num);

        // A temp counter to trace the index of the large_vector
        int32_t temp_count = 0;

        // loop will start if the total num of chunk is greater than 0. 
After each iteration total_chunk will be decremented
        while(total_chunk > 0)
        {
                for (int64_t i = temp_count * 
preferred_chunk_size_in_kibyte_holds_integer_num; i < 
preferred_chunk_size_in_kibyte_holds_integer_num + temp_count * 
preferred_chunk_size_in_kibyte_holds_integer_num; i++)
                {
                        // the repeated field large_data_collection is 
taking value from the large_vector
                        request_.add_large_data_collection(large_vector[i]);
                }
                temp_count++;
                total_chunk--;

                std::string ip_address = "localhost:50051";
                auto channel = grpc::CreateChannel(ip_address, 
grpc::InsecureChannelCredentials());
                std::unique_ptr<demo_grpc::AddressBook::Stub> stub = 
demo_grpc::AddressBook::NewStub(channel);

                grpc::ClientContext context;
                std::shared_ptr<::grpc::ClientReaderWriter< 
::demo_grpc::Large_Data, ::demo_grpc::Large_Data> > 
stream(stub->Stream_Chunk_Service(&context));

                // While the size of each chunk is eached then this 
repeated field is cleared. I am not sure before this
                // value can be transfered to server or not. But my 
assumption is saying that it should be done
                request_.clear_large_data_collection();
        }

}

int main(int argc, char* argv[])
{
        std::string client_address = "localhost:50051";
        std::cout << "Address of client: " << client_address << std::endl;

        // The following part for the Unary RPC
        demo_grpc::C_Request query;
        demo_grpc::S_Response result;
        Client_Request(query);

        // This part for the streaming chunk data (Bi directional Stream 
RPC)
        demo_grpc::Large_Data stream_chunk_request_;
        demo_grpc::Large_Data stream_chunk_response_;
        uint64_t preferred_chunk_size_in_kibyte = 64;
        Stream_Data_Chunk_Request(stream_chunk_request_, 
stream_chunk_response_, preferred_chunk_size_in_kibyte);

        // Call
        auto channel = grpc::CreateChannel(client_address, 
grpc::InsecureChannelCredentials());
        std::unique_ptr<demo_grpc::AddressBook::Stub> stub = 
demo_grpc::AddressBook::NewStub(channel);
        grpc::ClientContext context;
        grpc::Status status = stub->GetAddress(&context, query, &result);

    // the following status is for unary rpc as far I have understood the 
structure
        if (status.ok())
        {
                Server_Response(query, result);
        }
        else
        {
                std::cout << status.error_message() << std::endl;
        }

        return 0;
}
```

- `heper function total_chunk_counter`
```cpp
#include <cmath>
uint32_t total_chunk_counter(uint64_t num_of_container_content,
                             uint64_t preferred_chunk_size_in_kibyte,
                             uint64_t 
&preferred_chunk_size_in_kibyte_holds_integer_num)
{
        uint64_t cotainer_size_in_kibyte = (32ULL * 
num_of_container_content) / 1024;
        preferred_chunk_size_in_kibyte_holds_integer_num = 
(num_of_container_content * preferred_chunk_size_in_kibyte) / 
cotainer_size_in_kibyte;

        float total_chunk = static_cast<float>(num_of_container_content) / 
preferred_chunk_size_in_kibyte_holds_integer_num;
        return std::ceil(total_chunk);
}

```

- `server.cpp` which is totally incomplete
```cpp
#include <myproto/big_data.pb.h>
#include <myproto/addressbook.grpc.pb.h>

#include <grpcpp/grpcpp.h>
#include <grpcpp/server_builder.h>

#include <iostream>

class AddressBookService final : public demo_grpc::AddressBook::Service {
        public:
                virtual ::grpc::Status GetAddress(::grpc::ServerContext* 
context, const ::demo_grpc::C_Request* request, ::demo_grpc::S_Response* 
response)
                {
                        switch (request->choose_area())
                        {
             // do processing for unary rpc. Intentionally avoided here
                        std::cout << "Information of " << 
request->choose_area() << " is sent to Client" << std::endl;
                        return grpc::Status::OK;
                    }


        // Bi-directional streaming chunk data
        virtual ::grpc::Status Stream_Chunk_Service(::grpc::ServerContext* 
context, ::grpc::ServerReaderWriter< ::demo_grpc::Large_Data, 
::demo_grpc::Large_Data>* stream)
        {
                // stream->Large_Data;
                return grpc::Status::OK;
        }
};

void RunServer()
{
        std::cout << "grpc Version: " << grpc::Version() << std::endl;
        std::string server_address = "localhost:50051";
        std::cout << "Address of server: " << server_address << std::endl;

        grpc::ServerBuilder builder;
        builder.AddListeningPort(server_address, 
grpc::InsecureServerCredentials());

        AddressBookService my_service;
        builder.RegisterService(&my_service);

        std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
        server->Wait();
}

int main(int argc, char* argv[])
{
        RunServer();
        return 0;
}

```
#### In summary my desire
- I need to pass the content of `large_vector` with the `repeated field 
large_data_collection` of message `Large_Data`. I should `chunk` the size 
of the `large_vector` and populate the `repeated field 
large_data_collection` with that `chunk size`
- In server side all `chunk` will be concatenate by keeping the exact order 
of the `large_vector`. Some processing will be done on them (eg: `double 
the value of each index`). Then again whole data will be sent to the 
`client` as a `chunk stream`
- Would be great if the present `unary rpc` don't wait for the completion 
of the `bi-directional rpc`

Solution with example would be really helpful. Advance thanks. The [github 
repo is available 
here](https://github.com/atifkarim/gRPC_CPP_CMake/tree/streaming_chunk).

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/ba2e3219-7b63-49a7-a74a-2975f7474863n%40googlegroups.com.

Reply via email to