Hi folks !
I got server gRPC application implemented with C++ and Java client on the
other side. Googling for few days didn't help me a lot so I hope on your
advices.
Rpc looks like this
rpc GetStuff(GetStuffRequest) returns (stream Stuff);
So client sends me request and right after it I start to stream some Stuff.
My question is what can I do in case of weak network ? For example
connection can be dropped every 5 seconds or every hour and in the same
time data is accumulating on server side no matter connection is alive or
dead. I must guarantee that every Stuff is delivered to client in real time
(I can observe it in frontend) and in case of broken stream data should be
collected on server and delivered right after reconnection.
Two major problems which I faced.
1. What is the exact sequence of reestablishing connection? Here is some
code:
void start()
{
grpc::ServerBuilder builder;
builder.AddListeningPort(m_serverAddress, grpc::
InsecureServerCredentials());
builder.RegisterService(&m_asyncService);
m_cqGetStuff = builder.AddCompletionQueue();
m_server = std::move(builder.BuildAndStart());
m_streamGetStuff.reset(new grpc::ServerAsyncWriter<::de::sensorline
::slrc::classax::Stuff>(&m_context));
m_asyncService.RequestGetStuff(&m_context, &m_requestGetStuff,
m_streamGetStuff.get(), m_cqGetStuff.get(), m_cqGetStuff.get(),
reinterpret_cast<void*>(TAG::start));
m_context.AsyncNotifyWhenDone(reinterpret_cast<void*>(TAG::disc));
m_thread.reset(new std::thread(&loop, this));
}
void loop()
{
while(true)
{
void* got_tag = nullptr;
bool ok = false;
if (!m_cqGetVehicles->Next(&got_tag, &ok)) {
LOGERROR("GetStuff Server stream closed. Quitting");
break;
}
if (TAG::disc == static_cast<TAG>(reinterpret_cast<size_t>(got_tag
))) {
if(m_context.IsCancelled()) {
m_asyncService.RequestGetStuff(&m_context, &
m_requestGetStuff, m_streamGetStuff.get(), m_cqGetStuff.get(), m_cqGetStuff.
get(), reinterpret_cast<void*>(TAG::start));
}
}
if (TAG::start == static_cast<TAG>(reinterpret_cast<size_t>(got_tag
))) {
// grab data from database, put it into internal container and
make first Write()
}
if (TAG::write == static_cast<TAG>(reinterpret_cast<size_t>(got_tag
))) {
m_streamGetStuff->Write(stuffFromDatabase, reinterpret_cast<void
*>(TAG::write));
}
}
}
It's simplified a lot but you can see the idea. With this implementation
after context is cancelled I have never received TAG::start, only
TAG::write always. I've been playing a lot with different sequences but
didn't achieve any success with reconnection. Should I shut down completion
queue and add it again ? Or send initial meta data ? I'm really confused
with this topic for pretty long time so would appreciate any advices.
2. Previously I had sync communication model and it was working almost
perfectly fine but it had to be changed to async and here is what I
observe. About 9 of 10 times client can't connect to server but nothing was
changed inside network or somwhere else, only this code snippet. Some
details:
- Server receives request from client side with appropriate data
- Server starts streaming and I observe that every next message with
TAG::write is "ok"
- Stream can keep going for hours
- Nothing gets to the client untill I kill server (ctrl + C). Right after
process is dead the whole bunch of messages which were streamed from server
appear on client's frontend
Traffic is not really huge. I sent 1 message (about 200 bytes) every 2
seconds. In this case the connection is very stable. On client side I
observed that it's always in state CONNECTING. But about 10% of times it
connects clearly everything works fine.
I'm ready to answer any questions, attach traces or whatever you ask.
Thanks a lot in advance !
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/a853478f-2020-43cd-97a3-28e68544987e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.