Could I ask you to create a tracking issue for this on github please? Also if you've got a working solution, contributions to source are welcome :)
On Wednesday, March 22, 2023 at 2:52:22 AM UTC-7 Dmitry Gorelov wrote: > Please check the following code, it fixes the crash of the > route_guide_callback_server! > > #include <algorithm> > #include <chrono> > #include <cmath> > #include <iostream> > #include <memory> > > #include <string> > #include <thread> > > #include "helper.h" > > #include <grpc/grpc.h> > #include <grpcpp/security/server_credentials.h> > #include <grpcpp/server.h> > #include <grpcpp/server_builder.h> > #include <grpcpp/server_context.h> > #ifdef BAZEL_BUILD > #include "examples/protos/route_guide.grpc.pb.h" > #else > #include "route_guide.grpc.pb.h" > #endif > > using grpc::CallbackServerContext; > using grpc::Server; > using grpc::ServerBuilder; > using grpc::Status; > using routeguide::Feature; > using routeguide::Point; > using routeguide::Rectangle; > using routeguide::RouteGuide; > using routeguide::RouteNote; > using routeguide::RouteSummary; > using std::chrono::system_clock; > > > class RouteGuideImpl final : public RouteGuide::CallbackService { > public: > explicit RouteGuideImpl(const std::string& db) { > routeguide::ParseDb(db, &feature_list_); > } > > grpc::ServerBidiReactor<RouteNote, RouteNote>* RouteChat( > CallbackServerContext* context) override { > class Chatter : public grpc::ServerBidiReactor<RouteNote, > RouteNote> { > public: > Chatter(absl::Mutex* mu, std::vector<RouteNote>* > received_notes) > : mu_(mu), received_notes_(received_notes) { > StartRead(¬e_); > } > > void OnDone() override { delete this; } > void OnReadDone(bool ok) override > { > if (ok) > { > // Unlike the other example in this directory that's > not using > // the reactor pattern, we can't grab a local lock to > secure the > // access to the notes vector, because the reactor will > most likely > // make us jump threads, so we'll have to use a > different locking > // strategy. We'll grab the lock locally to build a > copy of the > // list of nodes we're going to send, then we'll grab > the lock > // again to append the received note to the existing > vector. > mu_->Lock(); > std::copy_if(received_notes_->begin(), > received_notes_->end(), > std::back_inserter(to_send_notes_), > [this](const RouteNote& note) { > return note.location().latitude() == > note_.location().latitude() && > note.location().longitude() == > note_.location().longitude(); > }); > notes_iterator_ = to_send_notes_.begin(); > mu_->Unlock(); > NextWrite(); > } else { > //std::cout << "some client finished" << std::endl; > Finish(Status::OK); > } > } > void OnWriteDone(bool ok) override > { > if (ok) > { > NextWrite(); > } > else > { > std::cout << "some client finished write" << std::endl; > Finish(Status::OK); > } > } > > private: > void NextWrite() > { > mu_->Lock(); > > if (notes_iterator_ != to_send_notes_.end()) { > StartWrite(&*notes_iterator_); > notes_iterator_++; > } else { > received_notes_->push_back(note_); > StartRead(¬e_); > } > mu_->Unlock(); > > } > RouteNote note_; > absl::Mutex* mu_; > std::vector<RouteNote>* received_notes_; > std::vector<RouteNote> to_send_notes_; > std::vector<RouteNote>::iterator notes_iterator_; > }; > return new Chatter(&mu_, &received_notes_); > } > > private: > std::vector<Feature> feature_list_; > absl::Mutex mu_; > std::vector<RouteNote> received_notes_ ABSL_GUARDED_BY(mu_); > }; > > void RunServer(const std::string& db_path) { > std::string server_address("0.0.0.0:50051"); > RouteGuideImpl service(db_path); > > ServerBuilder builder; > builder.AddListeningPort(server_address, > grpc::InsecureServerCredentials()); > builder.RegisterService(&service); > std::unique_ptr<Server> server(builder.BuildAndStart()); > std::cout << "Server listening on " << server_address << > std::endl; > server->Wait(); > } > > int main(int argc, char** argv) { > // Expect only arg: --db_path=path/to/route_guide_db.json. > std::string db = routeguide::GetDbFileContent(argc, argv); > RunServer(db); > > return 0; > } > > среда, 22 марта 2023 г. в 02:15:55 UTC+3, Dmitry Gorelov: > >> [image: 2023-03-22 02_14_54-Window.png] >> >> среда, 22 марта 2023 г. в 02:00:25 UTC+3, Dmitry Gorelov: >> >>> Both of them , the client and server, are from route_guide example. I >>> left only bidirectional part. >>> after some attemts to run client, either the client or the server crash. >>> >>> Please help to fix them! >>> >>> >>> среда, 22 марта 2023 г. в 01:59:12 UTC+3, Dmitry Gorelov: >>> >>>> //and this is client code >>>> >>>> /* >>>> * >>>> * Copyright 2021 gRPC authors. >>>> * >>>> * Licensed under the Apache License, Version 2.0 (the "License"); >>>> * you may not use this file except in compliance with the License. >>>> * You may obtain a copy of the License at >>>> * >>>> * http://www.apache.org/licenses/LICENSE-2.0 >>>> * >>>> * Unless required by applicable law or agreed to in writing, software >>>> * distributed under the License is distributed on an "AS IS" BASIS, >>>> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >>>> implied. >>>> * See the License for the specific language governing permissions and >>>> * limitations under the License. >>>> * >>>> */ >>>> >>>> #include <chrono> >>>> #include <condition_variable> >>>> #include <iostream> >>>> #include <memory> >>>> #include <mutex> >>>> #include <random> >>>> >>>> #include <string> >>>> #include <thread> >>>> >>>> #include "helper.h" >>>> >>>> #include <grpc/grpc.h> >>>> #include <grpcpp/alarm.h> >>>> #include <grpcpp/channel.h> >>>> #include <grpcpp/client_context.h> >>>> #include <grpcpp/create_channel.h> >>>> #include <grpcpp/security/credentials.h> >>>> >>>> #ifdef BAZEL_BUILD >>>> #include "examples/protos/route_guide.grpc.pb.h" >>>> #else >>>> #include "route_guide.grpc.pb.h" >>>> #endif >>>> >>>> using grpc::Channel; >>>> using grpc::ClientContext; >>>> >>>> using grpc::Status; >>>> using routeguide::Feature; >>>> using routeguide::Point; >>>> using routeguide::Rectangle; >>>> using routeguide::RouteGuide; >>>> using routeguide::RouteNote; >>>> using routeguide::RouteSummary; >>>> >>>> Point MakePoint(long latitude, long longitude) { >>>> Point p; >>>> p.set_latitude(latitude); >>>> p.set_longitude(longitude); >>>> return p; >>>> } >>>> >>>> Feature MakeFeature(const std::string& name, long latitude, long >>>> longitude) { >>>> Feature f; >>>> f.set_name(name); >>>> f.mutable_location()->CopyFrom(MakePoint(latitude, longitude)); >>>> return f; >>>> } >>>> >>>> RouteNote MakeRouteNote(const std::string& message, long latitude, >>>> long longitude) { >>>> RouteNote n; >>>> n.set_message(message); >>>> n.mutable_location()->CopyFrom(MakePoint(latitude, longitude)); >>>> return n; >>>> } >>>> >>>> class RouteGuideClient { >>>> public: >>>> RouteGuideClient(std::shared_ptr<Channel> channel, const std::string& >>>> db) >>>> : stub_(RouteGuide::NewStub(channel)) { >>>> routeguide::ParseDb(db, &feature_list_); >>>> } >>>> >>>> void RouteChat() { >>>> class Chatter : public grpc::ClientBidiReactor<RouteNote, >>>> RouteNote> { >>>> public: >>>> explicit Chatter(RouteGuide::Stub* stub) >>>> : notes_{MakeRouteNote("First message", 0, 0), >>>> MakeRouteNote("Second message", 0, 1), >>>> MakeRouteNote("Third message", 1, 0), >>>> MakeRouteNote("Fourth message", 0, 0)}, >>>> notes_iterator_(notes_.begin()) { >>>> stub->async()->RouteChat(&context_, this); >>>> NextWrite(); >>>> StartRead(&server_note_); >>>> StartCall(); >>>> >>>> } >>>> void OnWriteDone(bool /*ok*/) override { NextWrite(); } >>>> void OnReadDone(bool ok) override { >>>> if (ok) { >>>> std::cout << "Got message " << server_note_.message() << " at >>>> " >>>> << server_note_.location().latitude() << ", " >>>> << server_note_.location().longitude() << std::endl; >>>> StartRead(&server_note_); >>>> } >>>> } >>>> void OnDone(const Status& s) override { >>>> std::unique_lock<std::mutex> l(mu_); >>>> status_ = s; >>>> done_ = true; >>>> cv_.notify_one(); >>>> } >>>> Status Await() { >>>> std::unique_lock<std::mutex> l(mu_); >>>> cv_.wait(l, [this] { return done_; }); >>>> return std::move(status_); >>>> } >>>> >>>> private: >>>> void NextWrite() { >>>> if (notes_iterator_ != notes_.end()) { >>>> const auto& note = *notes_iterator_; >>>> std::cout << "Sending message " << note.message() << " at " >>>> << note.location().latitude() << ", " >>>> << note.location().longitude() << std::endl; >>>> StartWrite(¬e); >>>> notes_iterator_++; >>>> } else { >>>> StartWritesDone(); >>>> } >>>> } >>>> ClientContext context_; >>>> const std::vector<RouteNote> notes_; >>>> std::vector<RouteNote>::const_iterator notes_iterator_; >>>> RouteNote server_note_; >>>> std::mutex mu_; >>>> std::condition_variable cv_; >>>> Status status_; >>>> bool done_ = false; >>>> }; >>>> >>>> Chatter chatter(stub_.get()); >>>> Status status = chatter.Await(); >>>> if (!status.ok()) { >>>> std::cout << "RouteChat rpc failed." << std::endl; >>>> } >>>> } >>>> >>>> private: >>>> >>>> const float kCoordFactor_ = 10000000.0; >>>> std::unique_ptr<RouteGuide::Stub> stub_; >>>> std::vector<Feature> feature_list_; >>>> >>>> }; >>>> >>>> int main(int argc, char** argv) { >>>> // Expect only arg: --db_path=path/to/route_guide_db.json. >>>> std::string db = routeguide::GetDbFileContent(argc, argv); >>>> RouteGuideClient guide( >>>> grpc::CreateChannel("localhost:50051", >>>> grpc::InsecureChannelCredentials()), >>>> db); >>>> >>>> std::cout << "-------------- RouteChat --------------" << std::endl; >>>> guide.RouteChat(); >>>> >>>> return 0; >>>> } >>>> >>>> среда, 22 марта 2023 г. в 01:42:07 UTC+3, Dmitry Gorelov: >>>> >>>>> Oh man, it is not working even with *one *client! same problem in >>>>> proto_utils.h >>>> >>>> >>>>> >>>>> среда, 22 марта 2023 г. в 01:12:55 UTC+3, Dmitry Gorelov: >>>>> >>>>>> Hi All, >>>>>> >>>>>> please help to modify this peace of server code for bidirectional >>>>>> stream in order to make it work correclty with *multiple clients* at >>>>>> one time. Currently it crashes with segmentation fault in the >>>>>> proto_utils.h. >>>>>> >>>>>> class RouteGuideImpl final : public RouteGuide::CallbackService { >>>>>> public: >>>>>> explicit RouteGuideImpl(const std::string& db) { >>>>>> routeguide::ParseDb(db, &feature_list_); >>>>>> } >>>>>> >>>>>> grpc::ServerBidiReactor<RouteNote, RouteNote>* RouteChat( >>>>>> CallbackServerContext* context) override { >>>>>> class Chatter : public grpc::ServerBidiReactor<RouteNote, >>>>>> RouteNote> { >>>>>> public: >>>>>> Chatter(absl::Mutex* mu, std::vector<RouteNote>* received_notes) >>>>>> : mu_(mu), received_notes_(received_notes) { >>>>>> StartRead(¬e_); >>>>>> } >>>>>> >>>>>> void OnDone() override { delete this; } >>>>>> void OnReadDone(bool ok) override { >>>>>> if (ok) { >>>>>> // Unlike the other example in this directory that's not >>>>>> using >>>>>> // the reactor pattern, we can't grab a local lock to >>>>>> secure the >>>>>> // access to the notes vector, because the reactor will >>>>>> most likely >>>>>> // make us jump threads, so we'll have to use a different >>>>>> locking >>>>>> // strategy. We'll grab the lock locally to build a copy of >>>>>> the >>>>>> // list of nodes we're going to send, then we'll grab the >>>>>> lock >>>>>> // again to append the received note to the existing vector. >>>>>> mu_->Lock(); >>>>>> std::copy_if(received_notes_->begin(), >>>>>> received_notes_->end(), >>>>>> std::back_inserter(to_send_notes_), >>>>>> [this](const RouteNote& note) { >>>>>> return note.location().latitude() == >>>>>> note_.location().latitude() && >>>>>> note.location().longitude() == >>>>>> note_.location().longitude(); >>>>>> }); >>>>>> mu_->Unlock(); >>>>>> notes_iterator_ = to_send_notes_.begin(); >>>>>> NextWrite(); >>>>>> } else { >>>>>> std::cout << "some client finished" << std::endl; >>>>>> Finish(Status::OK); >>>>>> } >>>>>> } >>>>>> void OnWriteDone(bool /*ok*/) override { NextWrite(); } >>>>>> >>>>>> private: >>>>>> void NextWrite() { >>>>>> if (notes_iterator_ != to_send_notes_.end()) { >>>>>> StartWrite(&*notes_iterator_); >>>>>> notes_iterator_++; >>>>>> } else { >>>>>> mu_->Lock(); >>>>>> received_notes_->push_back(note_); >>>>>> mu_->Unlock(); >>>>>> StartRead(¬e_); >>>>>> } >>>>>> } >>>>>> RouteNote note_; >>>>>> absl::Mutex* mu_; >>>>>> std::vector<RouteNote>* received_notes_; >>>>>> std::vector<RouteNote> to_send_notes_; >>>>>> std::vector<RouteNote>::iterator notes_iterator_; >>>>>> }; >>>>>> return new Chatter(&mu_, &received_notes_); >>>>>> } >>>>>> >>>>>> private: >>>>>> std::vector<Feature> feature_list_; >>>>>> absl::Mutex mu_; >>>>>> std::vector<RouteNote> received_notes_ ABSL_GUARDED_BY(mu_); >>>>>> }; >>>>>> >>>>> -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/c7dbedec-e655-47c3-b54b-181cc10db931n%40googlegroups.com.