HBASE-15687 Allow decoding more than GetResponse from the server Summary: We'll need more than get's for the client to be usable. So now the Request class contains the protobufs needed to encode and decode rpc's.
I also added some helper methods to create initial requests. Test Plan: It compiles and still gets data from HBase meta Differential Revision: https://reviews.facebook.net/D57327 Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/14a33d7d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/14a33d7d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/14a33d7d Branch: refs/heads/HBASE-14850 Commit: 14a33d7dbc7c9118f83be24925d1ff5c603cc352 Parents: a1069eb Author: Elliott Clark <ecl...@apache.org> Authored: Wed Apr 27 15:27:09 2016 -0700 Committer: Elliott Clark <ecl...@apache.org> Committed: Mon Jul 11 16:47:26 2016 -0700 ---------------------------------------------------------------------- hbase-native-client/connection/BUCK | 1 + .../connection/client-dispatcher.cc | 6 +-- .../connection/client-dispatcher.h | 6 +-- .../connection/client-handler.cc | 30 ++++++++++--- hbase-native-client/connection/client-handler.h | 18 ++++++-- .../connection/connection-factory.cc | 6 +-- .../connection/connection-factory.h | 2 +- hbase-native-client/connection/pipeline.h | 3 +- hbase-native-client/connection/request.cc | 45 ++++++++++++++++++++ hbase-native-client/connection/request.h | 25 +++++++++-- hbase-native-client/core/simple-client.cc | 13 +++--- 11 files changed, 123 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/14a33d7d/hbase-native-client/connection/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK index 5067708..d393885 100644 --- a/hbase-native-client/connection/BUCK +++ b/hbase-native-client/connection/BUCK @@ -32,6 +32,7 @@ cxx_library(name="connection", "client-handler.cc", "connection-factory.cc", "pipeline.cc", + "request.cc", ], deps=[ "//if:if", http://git-wip-us.apache.org/repos/asf/hbase/blob/14a33d7d/hbase-native-client/connection/client-dispatcher.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc index 25cff7d..eea0a17 100644 --- a/hbase-native-client/connection/client-dispatcher.cc +++ b/hbase-native-client/connection/client-dispatcher.cc @@ -35,16 +35,16 @@ void ClientDispatcher::read(Context *ctx, Response in) { p.setValue(in); } -Future<Response> ClientDispatcher::operator()(Request arg) { +Future<Response> ClientDispatcher::operator()(std::unique_ptr<Request> arg) { auto call_id = ++current_call_id_; - arg.set_call_id(call_id); + arg->set_call_id(call_id); auto &p = requests_[call_id]; auto f = p.getFuture(); p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) { this->requests_.erase(call_id); }); - this->pipeline_->write(arg); + this->pipeline_->write(std::move(arg)); return f; } http://git-wip-us.apache.org/repos/asf/hbase/blob/14a33d7d/hbase-native-client/connection/client-dispatcher.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-dispatcher.h b/hbase-native-client/connection/client-dispatcher.h index 89c7119..877e877 100644 --- a/hbase-native-client/connection/client-dispatcher.h +++ b/hbase-native-client/connection/client-dispatcher.h @@ -27,11 +27,11 @@ namespace hbase { class ClientDispatcher - : public wangle::ClientDispatcherBase<SerializePipeline, Request, - Response> { + : public wangle::ClientDispatcherBase<SerializePipeline, + std::unique_ptr<Request>, Response> { public: void read(Context *ctx, Response in) override; - folly::Future<Response> operator()(Request arg) override; + folly::Future<Response> operator()(std::unique_ptr<Request> arg) override; folly::Future<folly::Unit> close(Context *ctx) override; folly::Future<folly::Unit> close() override; http://git-wip-us.apache.org/repos/asf/hbase/blob/14a33d7d/hbase-native-client/connection/client-handler.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc index 205993a7..abcf5c1 100644 --- a/hbase-native-client/connection/client-handler.cc +++ b/hbase-native-client/connection/client-handler.cc @@ -47,14 +47,30 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) { LOG(INFO) << "Read ResponseHeader size=" << used_bytes << " call_id=" << header.call_id() << " has_exception=" << header.has_exception(); + + // Get the response protobuf from the map + auto search = resp_msgs_.find(header.call_id()); + // It's an error if it's not there. + CHECK(search != resp_msgs_.end()); + auto resp_msg = search->second; + CHECK(resp_msg != nullptr); + + // Make sure we don't leak the protobuf + resp_msgs_.erase(search); + + // set the call_id. + // This will be used to by the dispatcher to match up + // the promise with the response. received.set_call_id(header.call_id()); + // If there was an exception then there's no + // data left on the wire. if (header.has_exception() == false) { buf->trimStart(used_bytes); - // For now assume that everything was a get. - // We'll need to set this up later. - received.set_response(std::make_shared<GetResponse>()); - used_bytes = deser_.parse_delimited(buf.get(), received.response().get()); + used_bytes = deser_.parse_delimited(buf.get(), resp_msg.get()); + // Make sure that bytes were parsed. + CHECK(used_bytes == buf->length()); + received.set_response(resp_msg); } ctx->fireRead(std::move(received)); } @@ -62,7 +78,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) { // TODO(eclark): Figure out how to handle the // network errors that are going to come. -Future<Unit> ClientHandler::write(Context *ctx, Request r) { +Future<Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) { // Keep track of if we have sent the header. if (UNLIKELY(need_send_header_)) { need_send_header_ = false; @@ -78,5 +94,7 @@ Future<Unit> ClientHandler::write(Context *ctx, Request r) { ctx->fireWrite(std::move(pre)); } - return ctx->fireWrite(ser_.request(r.call_id(), r.method(), r.msg())); + resp_msgs_[r->call_id()] = r->resp_msg(); + return ctx->fireWrite( + ser_.request(r->call_id(), r->method(), r->req_msg().get())); } http://git-wip-us.apache.org/repos/asf/hbase/blob/14a33d7d/hbase-native-client/connection/client-handler.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h index dbaf5a0..41bb883 100644 --- a/hbase-native-client/connection/client-handler.h +++ b/hbase-native-client/connection/client-handler.h @@ -30,20 +30,30 @@ namespace hbase { class Request; class Response; } +namespace google { +namespace protobuf { +class Message; +} +} namespace hbase { -class ClientHandler - : public wangle::Handler<std::unique_ptr<folly::IOBuf>, Response, Request, - std::unique_ptr<folly::IOBuf>> { +class ClientHandler : public wangle::Handler<std::unique_ptr<folly::IOBuf>, + Response, std::unique_ptr<Request>, + std::unique_ptr<folly::IOBuf>> { public: ClientHandler(std::string user_name); void read(Context *ctx, std::unique_ptr<folly::IOBuf> msg) override; - folly::Future<folly::Unit> write(Context *ctx, Request r) override; + folly::Future<folly::Unit> write(Context *ctx, + std::unique_ptr<Request> r) override; private: bool need_send_header_ = true; std::string user_name_; ClientSerializer ser_; ClientDeserializer deser_; + + // in flight requests + std::unordered_map<uint32_t, std::shared_ptr<google::protobuf::Message>> + resp_msgs_; }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/14a33d7d/hbase-native-client/connection/connection-factory.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc index 5d1b0da..7073f9d 100644 --- a/hbase-native-client/connection/connection-factory.cc +++ b/hbase-native-client/connection/connection-factory.cc @@ -45,14 +45,14 @@ ConnectionFactory::ConnectionFactory() { bootstrap_.pipelineFactory(std::make_shared<RpcPipelineFactory>()); } -std::shared_ptr<Service<Request, Response>> +std::shared_ptr<Service<std::unique_ptr<Request>, Response>> ConnectionFactory::make_connection(std::string host, int port) { // Connect to a given server // Then when connected create a ClientDispactcher. auto pipeline = bootstrap_.connect(SocketAddress(host, port, true)).get(); auto dispatcher = std::make_shared<ClientDispatcher>(); dispatcher->setPipeline(pipeline); - auto service = - std::make_shared<CloseOnReleaseFilter<Request, Response>>(dispatcher); + auto service = std::make_shared< + CloseOnReleaseFilter<std::unique_ptr<Request>, Response>>(dispatcher); return service; } http://git-wip-us.apache.org/repos/asf/hbase/blob/14a33d7d/hbase-native-client/connection/connection-factory.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h index 73ac032..8d1d2f0 100644 --- a/hbase-native-client/connection/connection-factory.h +++ b/hbase-native-client/connection/connection-factory.h @@ -31,7 +31,7 @@ namespace hbase { class ConnectionFactory { public: ConnectionFactory(); - std::shared_ptr<wangle::Service<Request, Response>> + std::shared_ptr<wangle::Service<std::unique_ptr<Request>, Response>> make_connection(std::string host, int port); private: http://git-wip-us.apache.org/repos/asf/hbase/blob/14a33d7d/hbase-native-client/connection/pipeline.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/pipeline.h b/hbase-native-client/connection/pipeline.h index 8114fab..6c4f4ff 100644 --- a/hbase-native-client/connection/pipeline.h +++ b/hbase-native-client/connection/pipeline.h @@ -26,7 +26,8 @@ #include "utils/user-util.h" namespace hbase { -using SerializePipeline = wangle::Pipeline<folly::IOBufQueue &, Request>; +using SerializePipeline = + wangle::Pipeline<folly::IOBufQueue &, std::unique_ptr<Request>>; class RpcPipelineFactory : public wangle::PipelineFactory<SerializePipeline> { public: http://git-wip-us.apache.org/repos/asf/hbase/blob/14a33d7d/hbase-native-client/connection/request.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/request.cc b/hbase-native-client/connection/request.cc new file mode 100644 index 0000000..50ea029 --- /dev/null +++ b/hbase-native-client/connection/request.cc @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "connection/request.h" + +#include "if/Client.pb.h" + +using namespace hbase; + +Request::Request(std::shared_ptr<google::protobuf::Message> req, + std::shared_ptr<google::protobuf::Message> resp, + std::string method) + : req_msg_(req), resp_msg_(resp), method_(method), call_id_(0) {} + +std::unique_ptr<Request> Request::get() { + return std::make_unique<Request>(std::make_shared<hbase::pb::GetRequest>(), + std::make_shared<hbase::pb::GetResponse>(), + "Get"); +} +std::unique_ptr<Request> Request::mutate() { + return std::make_unique<Request>(std::make_shared<hbase::pb::MutateRequest>(), + std::make_shared<hbase::pb::MutateResponse>(), + "Mutate"); +} +std::unique_ptr<Request> Request::scan() { + return std::make_unique<Request>(std::make_shared<hbase::pb::ScanRequest>(), + std::make_shared<hbase::pb::ScanResponse>(), + "Scan"); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/14a33d7d/hbase-native-client/connection/request.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/request.h b/hbase-native-client/connection/request.h index e9e3e88..743c469 100644 --- a/hbase-native-client/connection/request.h +++ b/hbase-native-client/connection/request.h @@ -21,22 +21,39 @@ #include <google/protobuf/message.h> #include <cstdint> +#include <memory> #include <string> namespace hbase { class Request { public: - Request() : call_id_(0) {} + static std::unique_ptr<Request> get(); + static std::unique_ptr<Request> mutate(); + static std::unique_ptr<Request> scan(); + + Request(std::shared_ptr<google::protobuf::Message> req, + std::shared_ptr<google::protobuf::Message> resp, std::string method); + uint32_t call_id() { return call_id_; } void set_call_id(uint32_t call_id) { call_id_ = call_id; } - google::protobuf::Message *msg() { return msg_.get(); } - void set_msg(std::shared_ptr<google::protobuf::Message> msg) { msg_ = msg; } + + std::shared_ptr<google::protobuf::Message> req_msg() { return req_msg_; } + std::shared_ptr<google::protobuf::Message> resp_msg() { return resp_msg_; } + + void set_req_msg(std::shared_ptr<google::protobuf::Message> msg) { + req_msg_ = msg; + } + void set_resp_msg(std::shared_ptr<google::protobuf::Message> msg) { + resp_msg_ = msg; + } + std::string method() { return method_; } void set_method(std::string method) { method_ = method; } private: uint32_t call_id_; - std::shared_ptr<google::protobuf::Message> msg_ = nullptr; + std::shared_ptr<google::protobuf::Message> req_msg_ = nullptr; + std::shared_ptr<google::protobuf::Message> resp_msg_ = nullptr; std::string method_ = "Get"; }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/14a33d7d/hbase-native-client/core/simple-client.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc index 8b2fae5..2cb6200 100644 --- a/hbase-native-client/core/simple-client.cc +++ b/hbase-native-client/core/simple-client.cc @@ -58,24 +58,23 @@ int main(int argc, char *argv[]) { auto conn = cf.make_connection(result.host_name(), result.port()); // Send the request - Request r; + auto r = Request::get(); // This is a get request so make that - auto msg = make_shared<hbase::pb::GetRequest>(); + auto req_msg = static_pointer_cast<hbase::pb::GetRequest>(r->req_msg()); // Set what region - msg->mutable_region()->set_value(FLAGS_region); + req_msg->mutable_region()->set_value(FLAGS_region); // It's always this. - msg->mutable_region()->set_type( + req_msg->mutable_region()->set_type( RegionSpecifier_RegionSpecifierType:: RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME); // What row. - msg->mutable_get()->set_row(FLAGS_row); + req_msg->mutable_get()->set_row(FLAGS_row); // Send it. - r.set_msg(msg); - auto resp = (*conn)(r).get(milliseconds(5000)); + auto resp = (*conn)(std::move(r)).get(milliseconds(5000)); auto get_resp = std::static_pointer_cast<GetResponse>(resp.response()); cout << "GetResponse has_result = " << get_resp->has_result() << '\n';