HBASE-15750 Add on meta deserialization Summary: Add on meta region info deserialization
Test Plan: Unit tests. Simple client connects. Differential Revision: https://reviews.facebook.net/D57555 Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6b4d7599 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6b4d7599 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6b4d7599 Branch: refs/heads/HBASE-14850 Commit: 6b4d75995a52f079a87334951f8cfe20418db25c Parents: 3915478 Author: Elliott Clark <ecl...@apache.org> Authored: Tue May 3 12:17:07 2016 -0700 Committer: Elliott Clark <elli...@fb.com> Committed: Wed May 18 15:48:52 2016 -0700 ---------------------------------------------------------------------- .../connection/client-dispatcher.cc | 1 + .../connection/client-handler.cc | 13 +- hbase-native-client/connection/client-handler.h | 6 +- .../connection/connection-pool.cc | 36 +--- .../connection/connection-pool.h | 8 +- hbase-native-client/core/BUCK | 4 - hbase-native-client/core/location-cache-test.cc | 1 + hbase-native-client/core/location-cache.cc | 73 +++++++- hbase-native-client/core/location-cache.h | 7 +- hbase-native-client/core/meta-utils.cc | 9 +- hbase-native-client/core/meta-utils.h | 7 +- hbase-native-client/core/region-location.h | 5 +- hbase-native-client/core/simple-client.cc | 6 +- hbase-native-client/core/table-name-test.cc | 54 ------ hbase-native-client/core/table-name.h | 50 ----- hbase-native-client/serde/BUCK | 56 +++--- .../serde/client-deserializer-test.cc | 25 ++- .../serde/client-deserializer.cc | 68 ------- hbase-native-client/serde/client-deserializer.h | 36 ---- .../serde/client-serializer-test.cc | 26 +-- hbase-native-client/serde/client-serializer.cc | 139 -------------- hbase-native-client/serde/client-serializer.h | 55 ------ .../serde/region-info-deserializer-test.cc | 54 ++++++ hbase-native-client/serde/region-info.h | 41 +++++ hbase-native-client/serde/rpc.cc | 181 +++++++++++++++++++ hbase-native-client/serde/rpc.h | 58 ++++++ hbase-native-client/serde/server-name-test.cc | 32 ++++ hbase-native-client/serde/server-name.h | 21 +++ hbase-native-client/serde/table-name-test.cc | 54 ++++++ hbase-native-client/serde/table-name.h | 54 ++++++ .../serde/zk-deserializer-test.cc | 8 +- hbase-native-client/serde/zk-deserializer.cc | 78 -------- hbase-native-client/serde/zk-deserializer.h | 35 ---- hbase-native-client/serde/zk.cc | 78 ++++++++ hbase-native-client/serde/zk.h | 35 ++++ 35 files changed, 761 insertions(+), 653 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/connection/client-dispatcher.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc index 817adc1..6e2dc54 100644 --- a/hbase-native-client/connection/client-dispatcher.cc +++ b/hbase-native-client/connection/client-dispatcher.cc @@ -44,6 +44,7 @@ Future<Response> ClientDispatcher::operator()(std::unique_ptr<Request> arg) { auto &p = requests_[call_id]; auto f = p.getFuture(); p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) { + LOG(ERROR) << "e = " << call_id; this->requests_.erase(call_id); }); this->pipeline_->write(std::move(arg)); http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/connection/client-handler.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc index 3180f4e..496e4f2 100644 --- a/hbase-native-client/connection/client-handler.cc +++ b/hbase-native-client/connection/client-handler.cc @@ -37,8 +37,7 @@ using hbase::pb::GetResponse; using google::protobuf::Message; ClientHandler::ClientHandler(std::string user_name) - : user_name_(user_name), need_send_header_(true), ser_(), deser_(), - resp_msgs_() {} + : user_name_(user_name), need_send_header_(true), serde_(), resp_msgs_() {} void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) { if (LIKELY(buf != nullptr)) { @@ -46,7 +45,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) { Response received; ResponseHeader header; - int used_bytes = deser_.parse_delimited(buf.get(), &header); + int used_bytes = serde_.ParseDelimited(buf.get(), &header); LOG(INFO) << "Read ResponseHeader size=" << used_bytes << " call_id=" << header.call_id() << " has_exception=" << header.has_exception(); @@ -70,7 +69,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) { // data left on the wire. if (header.has_exception() == false) { buf->trimStart(used_bytes); - used_bytes = deser_.parse_delimited(buf.get(), resp_msg.get()); + used_bytes = serde_.ParseDelimited(buf.get(), resp_msg.get()); // Make sure that bytes were parsed. CHECK(used_bytes == buf->length()); received.set_response(resp_msg); @@ -91,13 +90,13 @@ Future<Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) { // and one for the request. // // That doesn't seem like too bad, but who knows. - auto pre = ser_.preamble(); - auto header = ser_.header(user_name_); + auto pre = serde_.Preamble(); + auto header = serde_.Header(user_name_); pre->appendChain(std::move(header)); ctx->fireWrite(std::move(pre)); } resp_msgs_[r->call_id()] = r->resp_msg(); return ctx->fireWrite( - ser_.request(r->call_id(), r->method(), r->req_msg().get())); + serde_.Request(r->call_id(), r->method(), r->req_msg().get())); } http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/connection/client-handler.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h index 68513de..ce99c9e 100644 --- a/hbase-native-client/connection/client-handler.h +++ b/hbase-native-client/connection/client-handler.h @@ -22,8 +22,7 @@ #include <string> -#include "serde/client-deserializer.h" -#include "serde/client-serializer.h" +#include "serde/rpc.h" // Forward decs. namespace hbase { @@ -49,8 +48,7 @@ public: private: bool need_send_header_; std::string user_name_; - ClientSerializer ser_; - ClientDeserializer deser_; + RpcSerde serde_; // in flight requests std::unordered_map<uint32_t, std::shared_ptr<google::protobuf::Message>> http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/connection/connection-pool.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc index 72c1306..eafe60a 100644 --- a/hbase-native-client/connection/connection-pool.cc +++ b/hbase-native-client/connection/connection-pool.cc @@ -25,41 +25,10 @@ using std::mutex; using std::unique_ptr; using std::shared_ptr; using hbase::pb::ServerName; -using wangle::ServiceFilter; using folly::SharedMutexWritePriority; namespace hbase { -class RemoveServiceFilter - : public ServiceFilter<unique_ptr<Request>, Response> { - -public: - RemoveServiceFilter(std::shared_ptr<HBaseService> service, ServerName sn, - ConnectionPool &cp) - : ServiceFilter<unique_ptr<Request>, Response>(service), sn_(sn), - cp_(cp) {} - - folly::Future<folly::Unit> close() override { - if (!released.exchange(true)) { - return this->service_->close().then( - [this]() { this->cp_.close(this->sn_); }); - } else { - return folly::makeFuture(); - } - } - - virtual bool isAvailable() override { return service_->isAvailable(); } - - folly::Future<Response> operator()(unique_ptr<Request> req) override { - return (*this->service_)(std::move(req)); - } - -private: - std::atomic<bool> released{false}; - hbase::pb::ServerName sn_; - ConnectionPool &cp_; -}; - ConnectionPool::ConnectionPool() : cf_(std::make_shared<ConnectionFactory>()), connections_(), map_mutex_() { } @@ -72,13 +41,12 @@ std::shared_ptr<HBaseService> ConnectionPool::get(const ServerName &sn) { if (found == connections_.end() || found->second == nullptr) { SharedMutexWritePriority::WriteHolder holder(std::move(holder)); auto new_con = cf_->make_connection(sn.host_name(), sn.port()); - auto wrapped = std::make_shared<RemoveServiceFilter>(new_con, sn, *this); - connections_[sn] = wrapped; + connections_[sn] = new_con; return new_con; } return found->second; } -void ConnectionPool::close(ServerName sn) { +void ConnectionPool::close(const ServerName &sn) { SharedMutexWritePriority::WriteHolder holder(map_mutex_); auto found = connections_.find(sn); http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/connection/connection-pool.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h index 394cd71..b8330e3 100644 --- a/hbase-native-client/connection/connection-pool.h +++ b/hbase-native-client/connection/connection-pool.h @@ -27,13 +27,13 @@ #include "if/HBase.pb.h" namespace hbase { -struct MyServerNameEquals { +struct ServerNameEquals { bool operator()(const hbase::pb::ServerName &lhs, const hbase::pb::ServerName &rhs) const { return lhs.host_name() == rhs.host_name() && lhs.port() == rhs.port(); } }; -struct MyServerNameHash { +struct ServerNameHash { std::size_t operator()(hbase::pb::ServerName const &s) const { std::size_t h1 = std::hash<std::string>()(s.host_name()); std::size_t h2 = std::hash<uint32_t>()(s.port()); @@ -46,12 +46,12 @@ public: ConnectionPool(); explicit ConnectionPool(std::shared_ptr<ConnectionFactory> cf); std::shared_ptr<HBaseService> get(const hbase::pb::ServerName &sn); - void close(hbase::pb::ServerName sn); + void close(const hbase::pb::ServerName &sn); private: std::shared_ptr<ConnectionFactory> cf_; std::unordered_map<hbase::pb::ServerName, std::shared_ptr<HBaseService>, - MyServerNameHash, MyServerNameEquals> + ServerNameHash, ServerNameEquals> connections_; folly::SharedMutexWritePriority map_mutex_; }; http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/core/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index 447248b..ef8c2f8 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -24,7 +24,6 @@ cxx_library( "hbase_macros.h", "region-location.h", "location-cache.h", - "table-name.h", # TODO: move this out of exported # Once meta lookup works "meta-utils.h", @@ -53,9 +52,6 @@ cxx_test(name="location-cache-test", ], deps=[":core", ], run_test_separately=True, ) -cxx_test(name="table-name-test", - srcs=["table-name-test.cc", ], - deps=[":core", ], ) cxx_binary(name="simple-client", srcs=["simple-client.cc", ], deps=[":core", "//connection:connection"], ) http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/core/location-cache-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/location-cache-test.cc b/hbase-native-client/core/location-cache-test.cc index f3166fb..172799d 100644 --- a/hbase-native-client/core/location-cache-test.cc +++ b/hbase-native-client/core/location-cache-test.cc @@ -30,4 +30,5 @@ TEST(LocationCacheTest, TestGetMetaNodeContents) { auto result = f.get(); ASSERT_FALSE(f.hasException()); ASSERT_TRUE(result.has_port()); + ASSERT_TRUE(result.has_host_name()); } http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/core/location-cache.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc index 539051a..2667f11 100644 --- a/hbase-native-client/core/location-cache.cc +++ b/hbase-native-client/core/location-cache.cc @@ -20,19 +20,25 @@ #include <folly/Logging.h> #include <folly/io/IOBuf.h> +#include <wangle/concurrent/GlobalExecutor.h> #include "connection/response.h" #include "if/Client.pb.h" #include "if/ZooKeeper.pb.h" -#include "serde/zk-deserializer.h" +#include "serde/server-name.h" +#include "serde/region-info.h" +#include "serde/zk.h" using namespace std; using namespace folly; +using wangle::ServiceFilter; +using hbase::Request; using hbase::Response; using hbase::LocationCache; using hbase::RegionLocation; using hbase::HBaseService; +using hbase::ConnectionPool; using hbase::pb::ScanResponse; using hbase::pb::TableName; using hbase::pb::ServerName; @@ -45,7 +51,7 @@ static const char META_ZNODE_NAME[] = "/hbase/meta-region-server"; LocationCache::LocationCache(string quorum_spec, shared_ptr<folly::Executor> executor) : quorum_spec_(quorum_spec), executor_(executor), meta_promise_(nullptr), - meta_lock_(), cp_(), meta_util_() { + meta_lock_(), cp_(), meta_util_(), zk_(nullptr) { zk_ = zookeeper_init(quorum_spec.c_str(), nullptr, 1000, 0, 0, 0); } @@ -95,28 +101,77 @@ ServerName LocationCache::ReadMetaLocation() { buf->append(len); MetaRegionServer mrs; - if (derser.parse(buf.get(), &mrs) == false) { + if (derser.Parse(buf.get(), &mrs) == false) { LOG(ERROR) << "Unable to decode"; } return mrs.server(); } -Future<RegionLocation> LocationCache::locateFromMeta(const TableName &tn, - const string &row) { +Future<std::shared_ptr<RegionLocation>> +LocationCache::LocateFromMeta(const TableName &tn, const string &row) { + auto exc = wangle::getIOExecutor(); return this->LocateMeta() .then([&](ServerName sn) { return this->cp_.get(sn); }) + .via(exc.get()) // Need to handle all rpc's on the IOExecutor. .then([&](std::shared_ptr<HBaseService> service) { - return (*service)(std::move(meta_util_.make_meta_request(tn, row))); + return (*service)(std::move(meta_util_.MetaRequest(tn, row))); }) .then([&](Response resp) { // take the protobuf response and make it into // a region location. - return this->parse_response(std::move(resp)); + return this->CreateLocation(std::move(resp)); }); } -RegionLocation LocationCache::parse_response(const Response &resp) { +class RemoveServiceFilter + : public ServiceFilter<std::unique_ptr<Request>, Response> { + +public: + RemoveServiceFilter(std::shared_ptr<HBaseService> service, ServerName sn, + ConnectionPool &cp) + : ServiceFilter<unique_ptr<Request>, Response>(service), sn_(sn), + cp_(cp) {} + + folly::Future<folly::Unit> close() override { + if (!released.exchange(true)) { + return this->service_->close().then([this]() { + // TODO(eclark): remove the service from the meta cache. + this->cp_.close(this->sn_); + }); + } else { + return folly::makeFuture(); + } + } + + virtual bool isAvailable() override { + return !released && service_->isAvailable(); + } + + folly::Future<Response> operator()(unique_ptr<Request> req) override { + // TODO(eclark): add in an on error handler that will + // remove the region location from the cache if needed. + // Also close the connection if this is likely to be an error + // that needs to get a new connection. + return (*this->service_)(std::move(req)); + } + +private: + std::atomic<bool> released{false}; + hbase::pb::ServerName sn_; + ConnectionPool &cp_; +}; + +std::shared_ptr<RegionLocation> +LocationCache::CreateLocation(const Response &resp){ auto resp_msg = static_pointer_cast<ScanResponse>(resp.response()); + auto &results = resp_msg->results().Get(0); + auto &cells = results.cell(); LOG(ERROR) << "resp_msg = " << resp_msg->DebugString(); - return RegionLocation{RegionInfo{}, ServerName{}, nullptr}; + auto ri = folly::to<RegionInfo>(cells.Get(0).value()); + auto sn = folly::to<ServerName>(cells.Get(1).value()); + + LOG(ERROR) << "RegionInfo = " << ri.DebugString(); + LOG(ERROR) << "ServerName = " << sn.DebugString(); + auto wrapped = make_shared<RemoveServiceFilter>(cp_.get(sn), sn, this->cp_); + return std::make_shared<RegionLocation>(std::move(ri), std::move(sn), wrapped); } http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/core/location-cache.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h index cfd6838..99b5e5e 100644 --- a/hbase-native-client/core/location-cache.h +++ b/hbase-native-client/core/location-cache.h @@ -29,8 +29,8 @@ #include "connection/connection-pool.h" #include "core/meta-utils.h" -#include "core/table-name.h" #include "core/region-location.h" +#include "serde/table-name.h" namespace hbase { @@ -48,14 +48,14 @@ public: // Meta Related Methods. // These are only public until testing is complete folly::Future<hbase::pb::ServerName> LocateMeta(); - folly::Future<RegionLocation> locateFromMeta(const hbase::pb::TableName &tn, + folly::Future<std::shared_ptr<RegionLocation>> LocateFromMeta(const hbase::pb::TableName &tn, const std::string &row); - RegionLocation parse_response(const Response &resp); void InvalidateMeta(); private: void RefreshMetaLocation(); hbase::pb::ServerName ReadMetaLocation(); + std::shared_ptr<RegionLocation> CreateLocation(const Response &resp); std::string quorum_spec_; std::shared_ptr<folly::Executor> executor_; @@ -64,7 +64,6 @@ private: ConnectionPool cp_; MetaUtil meta_util_; - // TODO: migrate this to a smart pointer with a deleter. zhandle_t *zk_; }; http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/core/meta-utils.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/meta-utils.cc b/hbase-native-client/core/meta-utils.cc index d2fdd88..1325d83 100644 --- a/hbase-native-client/core/meta-utils.cc +++ b/hbase-native-client/core/meta-utils.cc @@ -23,25 +23,26 @@ #include "connection/request.h" #include "connection/response.h" -#include "core/table-name.h" #include "if/Client.pb.h" +#include "serde/table-name.h" using hbase::pb::TableName; using hbase::MetaUtil; using hbase::Request; using hbase::Response; using hbase::pb::ScanRequest; +using hbase::pb::ServerName; using hbase::pb::RegionSpecifier_RegionSpecifierType; static const std::string META_REGION = "1588230740"; -std::string MetaUtil::region_lookup_rowkey(const TableName &tn, +std::string MetaUtil::RegionLookupRowkey(const TableName &tn, const std::string &row) const { return folly::to<std::string>(tn, ",", row, ",", "999999999999999999"); } std::unique_ptr<Request> -MetaUtil::make_meta_request(const TableName tn, const std::string &row) const { +MetaUtil::MetaRequest(const TableName tn, const std::string &row) const { auto request = Request::scan(); auto msg = std::static_pointer_cast<ScanRequest>(request->req_msg()); @@ -76,6 +77,6 @@ MetaUtil::make_meta_request(const TableName tn, const std::string &row) const { info_col->add_qualifier("server"); info_col->add_qualifier("regioninfo"); - scan->set_start_row(region_lookup_rowkey(tn, row)); + scan->set_start_row(RegionLookupRowkey(tn, row)); return request; } http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/core/meta-utils.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/meta-utils.h b/hbase-native-client/core/meta-utils.h index e007d02..5a659f3 100644 --- a/hbase-native-client/core/meta-utils.h +++ b/hbase-native-client/core/meta-utils.h @@ -21,15 +21,16 @@ #include <string> #include "connection/Request.h" -#include "core/table-name.h" +#include "if/HBase.pb.h" +#include "serde/table-name.h" namespace hbase { class MetaUtil { public: - std::string region_lookup_rowkey(const hbase::pb::TableName &tn, + std::string RegionLookupRowkey(const hbase::pb::TableName &tn, const std::string &row) const; - std::unique_ptr<Request> make_meta_request(const hbase::pb::TableName tn, + std::unique_ptr<Request> MetaRequest(const hbase::pb::TableName tn, const std::string &row) const; }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/core/region-location.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h index a46b8e2..7922c95 100644 --- a/hbase-native-client/core/region-location.h +++ b/hbase-native-client/core/region-location.h @@ -23,7 +23,6 @@ #include "connection/service.h" #include "if/HBase.pb.h" - namespace hbase { class RegionLocation { @@ -32,8 +31,8 @@ public: std::shared_ptr<HBaseService> service) : ri_(ri), sn_(sn), service_(service) {} - const hbase::pb::RegionInfo& region_info() { return ri_; } - const hbase::pb::ServerName& server_name() { return sn_; } + const hbase::pb::RegionInfo ®ion_info() { return ri_; } + const hbase::pb::ServerName &server_name() { return sn_; } std::shared_ptr<HBaseService> service() { return service_; } private: http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/core/simple-client.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc index ab614e4..00e3369 100644 --- a/hbase-native-client/core/simple-client.cc +++ b/hbase-native-client/core/simple-client.cc @@ -27,9 +27,9 @@ #include "connection/connection-pool.h" #include "core/client.h" -#include "core/table-name.h" #include "if/Client.pb.h" #include "if/ZooKeeper.pb.h" +#include "serde/table-name.h" using namespace folly; using namespace std; @@ -39,7 +39,7 @@ using hbase::Request; using hbase::HBaseService; using hbase::LocationCache; using hbase::ConnectionPool; -using hbase::TableNameUtil; +using hbase::pb::TableName; using hbase::pb::ServerName; using hbase::pb::RegionSpecifier_RegionSpecifierType; using hbase::pb::GetRequest; @@ -61,7 +61,7 @@ int main(int argc, char *argv[]) { auto cpu_ex = wangle::getCPUExecutor(); LocationCache cache{FLAGS_zookeeper, cpu_ex}; auto result = - cache.locateFromMeta(TableNameUtil::create(FLAGS_table), FLAGS_row) + cache.LocateFromMeta(folly::to<TableName>(FLAGS_table), FLAGS_row) .get(milliseconds(5000)); return 0; http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/core/table-name-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/table-name-test.cc b/hbase-native-client/core/table-name-test.cc deleted file mode 100644 index 7bad3f1..0000000 --- a/hbase-native-client/core/table-name-test.cc +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <folly/Conv.h> -#include <gtest/gtest.h> - -#include <string> - -#include "core/table-name.h" - -using namespace hbase; -using hbase::pb::TableName; - -TEST(TestTableName, TestToStringNoDefault) { - TableName tn; - tn.set_qualifier("TestTableName"); - std::string result = folly::to<std::string>(tn); - ASSERT_EQ(result.find("default"), std::string::npos); - ASSERT_EQ("TestTableName", result); -} - -TEST(TestTableName, TestToStringNoDefaltWhenSet) { - TableName tn; - tn.set_namespace_("default"); - tn.set_qualifier("TestTableName"); - std::string result = folly::to<std::string>(tn); - ASSERT_EQ(result.find("default"), std::string::npos); - ASSERT_EQ("TestTableName", result); -} - -TEST(TestTableName, TestToStringIncludeNS) { - TableName tn; - tn.set_namespace_("hbase"); - tn.set_qualifier("acl"); - std::string result = folly::to<std::string>(tn); - ASSERT_EQ(result.find("hbase"), 0); - ASSERT_EQ("hbase:acl", result); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/core/table-name.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/table-name.h b/hbase-native-client/core/table-name.h deleted file mode 100644 index 1612667..0000000 --- a/hbase-native-client/core/table-name.h +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <memory> -#include <string> - -#include "if/HBase.pb.h" -#include <folly/Conv.h> - -namespace hbase { -namespace pb { - -// Provide folly::to<std::string>(TableName); -template <class String> void toAppend(const TableName &in, String *result) { - if (!in.has_namespace_() || in.namespace_() == "default") { - folly::toAppend(in.qualifier(), result); - } else { - folly::toAppend(in.namespace_(), ':', in.qualifier(), result); - } -} - -} // namespace pb - -class TableNameUtil { -public: - static ::hbase::pb::TableName create(std::string table_name) { - ::hbase::pb::TableName tn; - tn.set_namespace_("default"); - tn.set_qualifier(table_name); - return tn; - } -}; -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/BUCK b/hbase-native-client/serde/BUCK index 539a221..db15026 100644 --- a/hbase-native-client/serde/BUCK +++ b/hbase-native-client/serde/BUCK @@ -17,49 +17,47 @@ cxx_library(name="serde", exported_headers=[ - "client-serializer.h", - "client-deserializer.h", - "zk-deserializer.h", + "region-info.h", + "rpc.h", + "server-name.h", + "table-name.h", + "zk.h", ], srcs=[ - "client-serializer.cc", - "client-deserializer.cc", - "zk-deserializer.cc", + "rpc.cc", + "zk.cc", ], deps=[ "//if:if", "//third-party:folly", ], tests=[ - ":client-serializer-test", ":client-deserializer-test", + ":client-serializer-test", + ":server-name-test", + ":table-name-test", + ":zk-deserializer-test", + ":region-info-deserializer-test", ], compiler_flags=['-Weffc++'], visibility=[ 'PUBLIC', ], ) - +cxx_test(name="table-name-test", + srcs=["table-name-test.cc", ], + deps=[":serde", ], ) +cxx_test(name="server-name-test", + srcs=["server-name-test.cc", ], + deps=[":serde", ], ) cxx_test(name="client-serializer-test", - srcs=[ - "client-serializer-test.cc", - ], - deps=[ - ":serde", - "//if:if", - ], ) + srcs=["client-serializer-test.cc", ], + deps=[":serde", ], ) cxx_test(name="client-deserializer-test", - srcs=[ - "client-deserializer-test.cc", - ], - deps=[ - ":serde", - "//if:if", - ], ) + srcs=["client-deserializer-test.cc", ], + deps=[":serde", ], ) cxx_test(name="zk-deserializer-test", - srcs=[ - "zk-deserializer-test.cc", - ], - deps=[ - ":serde", - "//if:if", - ], ) + srcs=["zk-deserializer-test.cc", ], + deps=[":serde", ], ) +cxx_test(name="region-info-deserializer-test", + srcs=["region-info-deserializer-test.cc", ], + deps=[":serde", ], ) http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/client-deserializer-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/client-deserializer-test.cc b/hbase-native-client/serde/client-deserializer-test.cc index 9fef093..8c571b1 100644 --- a/hbase-native-client/serde/client-deserializer-test.cc +++ b/hbase-native-client/serde/client-deserializer-test.cc @@ -16,13 +16,12 @@ * limitations under the License. * */ +#include "serde/rpc.h" #include <folly/io/IOBuf.h> #include <gtest/gtest.h> #include "if/Client.pb.h" -#include "serde/client-deserializer.h" -#include "serde/client-serializer.h" using namespace hbase; using folly::IOBuf; @@ -30,23 +29,23 @@ using hbase::pb::GetRequest; using hbase::pb::RegionSpecifier; using hbase::pb::RegionSpecifier_RegionSpecifierType; -TEST(TestClientDeserializer, TestReturnFalseOnNullPtr) { - ClientDeserializer deser; - ASSERT_LT(deser.parse_delimited(nullptr, nullptr), 0); +TEST(TestRpcSerde, TestReturnFalseOnNullPtr) { + RpcSerde deser; + ASSERT_LT(deser.ParseDelimited(nullptr, nullptr), 0); } -TEST(TestClientDeserializer, TestReturnFalseOnBadInput) { - ClientDeserializer deser; +TEST(TestRpcSerde, TestReturnFalseOnBadInput) { + RpcSerde deser; auto buf = IOBuf::copyBuffer("test"); GetRequest gr; - ASSERT_LT(deser.parse_delimited(buf.get(), &gr), 0); + ASSERT_LT(deser.ParseDelimited(buf.get(), &gr), 0); } -TEST(TestClientDeserializer, TestGoodGetRequestFullRoundTrip) { +TEST(TestRpcSerde, TestGoodGetRequestFullRoundTrip) { GetRequest in; - ClientSerializer ser; - ClientDeserializer deser; + RpcSerde ser; + RpcSerde deser; // fill up the GetRequest. in.mutable_region()->set_value("test_region_id"); @@ -56,11 +55,11 @@ TEST(TestClientDeserializer, TestGoodGetRequestFullRoundTrip) { in.mutable_get()->set_row("test_row"); // Create the buffer - auto buf = ser.serialize_delimited(in); + auto buf = ser.SerializeDelimited(in); GetRequest out; - int used_bytes = deser.parse_delimited(buf.get(), &out); + int used_bytes = deser.ParseDelimited(buf.get(), &out); ASSERT_GT(used_bytes, 0); ASSERT_EQ(used_bytes, buf->length()); http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/client-deserializer.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/client-deserializer.cc b/hbase-native-client/serde/client-deserializer.cc deleted file mode 100644 index acca7ea..0000000 --- a/hbase-native-client/serde/client-deserializer.cc +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "serde/client-deserializer.h" - -#include <folly/Logging.h> -#include <google/protobuf/io/coded_stream.h> -#include <google/protobuf/io/zero_copy_stream_impl_lite.h> -#include <google/protobuf/message.h> - -using namespace hbase; - -using folly::IOBuf; -using google::protobuf::Message; -using google::protobuf::io::ArrayInputStream; -using google::protobuf::io::CodedInputStream; - -int ClientDeserializer::parse_delimited(const IOBuf *buf, Message *msg) { - if (buf == nullptr || msg == nullptr) { - return -2; - } - - DCHECK(!buf->isChained()); - - ArrayInputStream ais{buf->data(), static_cast<int>(buf->length())}; - CodedInputStream coded_stream{&ais}; - - uint32_t msg_size; - - // Try and read the varint. - if (coded_stream.ReadVarint32(&msg_size) == false) { - FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a var uint32_t"; - return -3; - } - - coded_stream.PushLimit(msg_size); - // Parse the message. - if (msg->MergeFromCodedStream(&coded_stream) == false) { - FB_LOG_EVERY_MS(ERROR, 1000) - << "Unable to read a protobuf message from data."; - return -4; - } - - // Make sure all the data was consumed. - if (coded_stream.ConsumedEntireMessage() == false) { - FB_LOG_EVERY_MS(ERROR, 1000) - << "Orphaned data left after reading protobuf message"; - return -5; - } - - return coded_stream.CurrentPosition(); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/client-deserializer.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/client-deserializer.h b/hbase-native-client/serde/client-deserializer.h deleted file mode 100644 index b9664b0..0000000 --- a/hbase-native-client/serde/client-deserializer.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <folly/io/IOBuf.h> - -// Forward -namespace google { -namespace protobuf { -class Message; -} -} - -namespace hbase { -class ClientDeserializer { -public: - int parse_delimited(const folly::IOBuf *buf, google::protobuf::Message *msg); -}; - -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/client-serializer-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/client-serializer-test.cc b/hbase-native-client/serde/client-serializer-test.cc index 9bf38af..2bd17fb 100644 --- a/hbase-native-client/serde/client-serializer-test.cc +++ b/hbase-native-client/serde/client-serializer-test.cc @@ -24,16 +24,16 @@ #include "if/HBase.pb.h" #include "if/RPC.pb.h" -#include "serde/client-serializer.h" +#include "serde/rpc.h" using namespace hbase; using namespace hbase::pb; using namespace folly; using namespace folly::io; -TEST(ClientSerializerTest, PreambleIncludesHBas) { - ClientSerializer ser; - auto buf = ser.preamble(); +TEST(RpcSerdeTest, PreambleIncludesHBas) { + RpcSerde ser; + auto buf = ser.Preamble(); const char *p = reinterpret_cast<const char *>(buf->data()); // Take the first for chars and make sure they are the // magic string @@ -42,16 +42,16 @@ TEST(ClientSerializerTest, PreambleIncludesHBas) { EXPECT_EQ(6, buf->computeChainDataLength()); } -TEST(ClientSerializerTest, PreambleIncludesVersion) { - ClientSerializer ser; - auto buf = ser.preamble(); +TEST(RpcSerdeTest, PreambleIncludesVersion) { + RpcSerde ser; + auto buf = ser.Preamble(); EXPECT_EQ(0, static_cast<const uint8_t *>(buf->data())[4]); EXPECT_EQ(80, static_cast<const uint8_t *>(buf->data())[5]); } -TEST(ClientSerializerTest, TestHeaderLengthPrefixed) { - ClientSerializer ser; - auto header = ser.header("elliott"); +TEST(RpcSerdeTest, TestHeaderLengthPrefixed) { + RpcSerde ser; + auto header = ser.Header("elliott"); // The header should be prefixed by 4 bytes of length. EXPECT_EQ(4, header->length()); @@ -64,9 +64,9 @@ TEST(ClientSerializerTest, TestHeaderLengthPrefixed) { EXPECT_EQ(prefixed_len, header->next()->length()); } -TEST(ClientSerializerTest, TestHeaderDecode) { - ClientSerializer ser; - auto buf = ser.header("elliott"); +TEST(RpcSerdeTest, TestHeaderDecode) { + RpcSerde ser; + auto buf = ser.Header("elliott"); auto header_buf = buf->next(); ConnectionHeader h; http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/client-serializer.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/client-serializer.cc b/hbase-native-client/serde/client-serializer.cc deleted file mode 100644 index 09b81c8..0000000 --- a/hbase-native-client/serde/client-serializer.cc +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "serde/client-serializer.h" - -#include <folly/Logging.h> -#include <folly/io/Cursor.h> -#include <google/protobuf/io/coded_stream.h> -#include <google/protobuf/io/zero_copy_stream_impl_lite.h> - -#include "if/HBase.pb.h" -#include "if/RPC.pb.h" - -using namespace hbase; - -using folly::IOBuf; -using folly::io::RWPrivateCursor; -using google::protobuf::Message; -using google::protobuf::io::ArrayOutputStream; -using google::protobuf::io::CodedOutputStream; -using google::protobuf::io::ZeroCopyOutputStream; -using std::string; -using std::unique_ptr; - -static const std::string PREAMBLE = "HBas"; -static const std::string INTERFACE = "ClientService"; -static const uint8_t RPC_VERSION = 0; -static const uint8_t DEFAULT_AUTH_TYPE = 80; - -ClientSerializer::ClientSerializer() : auth_type_(DEFAULT_AUTH_TYPE) {} - -unique_ptr<IOBuf> ClientSerializer::preamble() { - auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2); - magic->append(2); - RWPrivateCursor c(magic.get()); - c.skip(4); - // Version - c.write(RPC_VERSION); - // Standard security aka Please don't lie to me. - c.write(auth_type_); - return magic; -} - -unique_ptr<IOBuf> ClientSerializer::header(const string &user) { - pb::ConnectionHeader h; - - // TODO(eclark): Make this not a total lie. - h.mutable_user_info()->set_effective_user(user); - // The service name that we want to talk to. - // - // Right now we're completely ignoring the service interface. - // That may or may not be the correct thing to do. - // It worked for a while with the java client; until it - // didn't. - h.set_service_name(INTERFACE); - return prepend_length(serialize_message(h)); -} - -unique_ptr<IOBuf> ClientSerializer::request(const uint32_t call_id, - const string &method, - const Message *msg) { - pb::RequestHeader rq; - rq.set_method_name(method); - rq.set_call_id(call_id); - rq.set_request_param(msg != nullptr); - auto ser_header = serialize_delimited(rq); - if (msg != nullptr) { - auto ser_req = serialize_delimited(*msg); - ser_header->appendChain(std::move(ser_req)); - } - - return prepend_length(std::move(ser_header)); -} - -unique_ptr<IOBuf> ClientSerializer::prepend_length(unique_ptr<IOBuf> msg) { - // Java ints are 4 long. So create a buffer that large - auto len_buf = IOBuf::create(4); - // Then make those bytes visible. - len_buf->append(4); - - RWPrivateCursor c(len_buf.get()); - // Get the size of the data to be pushed out the network. - auto size = msg->computeChainDataLength(); - - // Write the length to this IOBuf. - c.writeBE(static_cast<uint32_t>(size)); - - // Then attach the origional to the back of len_buf - len_buf->appendChain(std::move(msg)); - return len_buf; -} - -unique_ptr<IOBuf> ClientSerializer::serialize_delimited(const Message &msg) { - // Get the buffer size needed for just the message. - int msg_size = msg.ByteSize(); - int buf_size = CodedOutputStream::VarintSize32(msg_size) + msg_size; - - // Create a buffer big enough to hold the varint and the object. - auto buf = IOBuf::create(buf_size); - buf->append(buf_size); - - // Create the array output stream. - ArrayOutputStream aos{buf->writableData(), static_cast<int>(buf->length())}; - // Wrap the ArrayOuputStream in the coded output stream to allow writing - // Varint32 - CodedOutputStream cos{&aos}; - - // Write out the size. - cos.WriteVarint32(msg_size); - - // Now write the rest out. - // We're using the protobuf output streams here to keep track - // of where in the output array we are rather than IOBuf. - msg.SerializeWithCachedSizesToArray( - cos.GetDirectBufferForNBytesAndAdvance(msg_size)); - - // Return the buffer. - return buf; -} -// TODO(eclark): Make this 1 copy. -unique_ptr<IOBuf> ClientSerializer::serialize_message(const Message &msg) { - auto buf = IOBuf::copyBuffer(msg.SerializeAsString()); - return buf; -} http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/client-serializer.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/client-serializer.h b/hbase-native-client/serde/client-serializer.h deleted file mode 100644 index 9c819fe..0000000 --- a/hbase-native-client/serde/client-serializer.h +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <cstdint> -#include <folly/io/IOBuf.h> -#include <string> - -// Forward -namespace google { -namespace protobuf { -class Message; -} -} -namespace hbase { -class Request; -} - -namespace hbase { -class ClientSerializer { -public: - ClientSerializer(); - std::unique_ptr<folly::IOBuf> preamble(); - std::unique_ptr<folly::IOBuf> header(const std::string &user); - std::unique_ptr<folly::IOBuf> request(const uint32_t call_id, - const std::string &method, - const google::protobuf::Message *msg); - std::unique_ptr<folly::IOBuf> - serialize_delimited(const google::protobuf::Message &msg); - - std::unique_ptr<folly::IOBuf> - serialize_message(const google::protobuf::Message &msg); - - std::unique_ptr<folly::IOBuf> - prepend_length(std::unique_ptr<folly::IOBuf> msg); - - uint8_t auth_type_; -}; -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/region-info-deserializer-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/region-info-deserializer-test.cc b/hbase-native-client/serde/region-info-deserializer-test.cc new file mode 100644 index 0000000..ce8dedf --- /dev/null +++ b/hbase-native-client/serde/region-info-deserializer-test.cc @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "serde/region-info.h" + +#include <gtest/gtest.h> + +#include <string> + +#include "if/HBase.pb.h" +#include "serde/table-name.h" + +using std::string; +using hbase::pb::RegionInfo; +using hbase::pb::TableName; + +TEST(TestRegionInfoDesializer, TestDeserialize) { + string ns{"test_ns"}; + string tn{"table_name"}; + string start_row{"AAAAAA"}; + string stop_row{"BBBBBBBBBBBB"}; + uint64_t region_id = 2345678; + + RegionInfo ri_out; + ri_out.set_region_id(region_id); + ri_out.mutable_table_name()->set_namespace_(ns); + ri_out.mutable_table_name()->set_qualifier(tn); + ri_out.set_start_key(start_row); + ri_out.set_end_key(stop_row); + + + string header{"PBUF"}; + string ser = header + ri_out.SerializeAsString(); + + auto out = folly::to<RegionInfo>(ser); + + EXPECT_EQ(region_id, out.region_id()); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/region-info.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/region-info.h b/hbase-native-client/serde/region-info.h new file mode 100644 index 0000000..6af351c --- /dev/null +++ b/hbase-native-client/serde/region-info.h @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include "if/HBase.pb.h" + +#include <folly/Conv.h> +#include <boost/algorithm/string/predicate.hpp> + +namespace hbase { +namespace pb { +template <class String> void parseTo(String in, RegionInfo& out) { + // TODO(eclark): there has to be something better. + std::string s = folly::to<std::string>(in); + + if (!boost::starts_with(s, "PBUF") ) { + throw std::runtime_error("Region Info field doesn't contain preamble"); + } + if (!out.ParseFromArray(s.data() + 4, s.size() - 4)) { + throw std::runtime_error("Bad protobuf for RegionInfo"); + } +} +} // namespace pb +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/rpc.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/rpc.cc b/hbase-native-client/serde/rpc.cc new file mode 100644 index 0000000..4c3c999 --- /dev/null +++ b/hbase-native-client/serde/rpc.cc @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "serde/rpc.h" + +#include <folly/Logging.h> +#include <folly/Logging.h> +#include <folly/io/Cursor.h> +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> +#include <google/protobuf/message.h> + +#include "if/HBase.pb.h" +#include "if/RPC.pb.h" + +using namespace hbase; + +using folly::IOBuf; +using folly::io::RWPrivateCursor; +using google::protobuf::Message; +using google::protobuf::Message; +using google::protobuf::io::ArrayInputStream; +using google::protobuf::io::ArrayOutputStream; +using google::protobuf::io::CodedInputStream; +using google::protobuf::io::CodedOutputStream; +using google::protobuf::io::ZeroCopyOutputStream; +using std::string; +using std::unique_ptr; + +static const std::string PREAMBLE = "HBas"; +static const std::string INTERFACE = "ClientService"; +static const uint8_t RPC_VERSION = 0; +static const uint8_t DEFAULT_AUTH_TYPE = 80; + +int RpcSerde::ParseDelimited(const IOBuf *buf, Message *msg) { + if (buf == nullptr || msg == nullptr) { + return -2; + } + + DCHECK(!buf->isChained()); + + ArrayInputStream ais{buf->data(), static_cast<int>(buf->length())}; + CodedInputStream coded_stream{&ais}; + + uint32_t msg_size; + + // Try and read the varint. + if (coded_stream.ReadVarint32(&msg_size) == false) { + FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a var uint32_t"; + return -3; + } + + coded_stream.PushLimit(msg_size); + // Parse the message. + if (msg->MergeFromCodedStream(&coded_stream) == false) { + FB_LOG_EVERY_MS(ERROR, 1000) + << "Unable to read a protobuf message from data."; + return -4; + } + + // Make sure all the data was consumed. + if (coded_stream.ConsumedEntireMessage() == false) { + FB_LOG_EVERY_MS(ERROR, 1000) + << "Orphaned data left after reading protobuf message"; + return -5; + } + + return coded_stream.CurrentPosition(); +} + +RpcSerde::RpcSerde() : auth_type_(DEFAULT_AUTH_TYPE) {} +RpcSerde::~RpcSerde() {} + +unique_ptr<IOBuf> RpcSerde::Preamble() { + auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2); + magic->append(2); + RWPrivateCursor c(magic.get()); + c.skip(4); + // Version + c.write(RPC_VERSION); + // Standard security aka Please don't lie to me. + c.write(auth_type_); + return magic; +} + +unique_ptr<IOBuf> RpcSerde::Header(const string &user) { + pb::ConnectionHeader h; + + // TODO(eclark): Make this not a total lie. + h.mutable_user_info()->set_effective_user(user); + // The service name that we want to talk to. + // + // Right now we're completely ignoring the service interface. + // That may or may not be the correct thing to do. + // It worked for a while with the java client; until it + // didn't. + h.set_service_name(INTERFACE); + return PrependLength(SerializeMessage(h)); +} + +unique_ptr<IOBuf> RpcSerde::Request(const uint32_t call_id, + const string &method, const Message *msg) { + pb::RequestHeader rq; + rq.set_method_name(method); + rq.set_call_id(call_id); + rq.set_request_param(msg != nullptr); + auto ser_header = SerializeDelimited(rq); + if (msg != nullptr) { + auto ser_req = SerializeDelimited(*msg); + ser_header->appendChain(std::move(ser_req)); + } + + return PrependLength(std::move(ser_header)); +} + +unique_ptr<IOBuf> RpcSerde::PrependLength(unique_ptr<IOBuf> msg) { + // Java ints are 4 long. So create a buffer that large + auto len_buf = IOBuf::create(4); + // Then make those bytes visible. + len_buf->append(4); + + RWPrivateCursor c(len_buf.get()); + // Get the size of the data to be pushed out the network. + auto size = msg->computeChainDataLength(); + + // Write the length to this IOBuf. + c.writeBE(static_cast<uint32_t>(size)); + + // Then attach the origional to the back of len_buf + len_buf->appendChain(std::move(msg)); + return len_buf; +} + +unique_ptr<IOBuf> RpcSerde::SerializeDelimited(const Message &msg) { + // Get the buffer size needed for just the message. + int msg_size = msg.ByteSize(); + int buf_size = CodedOutputStream::VarintSize32(msg_size) + msg_size; + + // Create a buffer big enough to hold the varint and the object. + auto buf = IOBuf::create(buf_size); + buf->append(buf_size); + + // Create the array output stream. + ArrayOutputStream aos{buf->writableData(), static_cast<int>(buf->length())}; + // Wrap the ArrayOuputStream in the coded output stream to allow writing + // Varint32 + CodedOutputStream cos{&aos}; + + // Write out the size. + cos.WriteVarint32(msg_size); + + // Now write the rest out. + // We're using the protobuf output streams here to keep track + // of where in the output array we are rather than IOBuf. + msg.SerializeWithCachedSizesToArray( + cos.GetDirectBufferForNBytesAndAdvance(msg_size)); + + // Return the buffer. + return buf; +} +// TODO(eclark): Make this 1 copy. +unique_ptr<IOBuf> RpcSerde::SerializeMessage(const Message &msg) { + auto buf = IOBuf::copyBuffer(msg.SerializeAsString()); + return buf; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/rpc.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/rpc.h b/hbase-native-client/serde/rpc.h new file mode 100644 index 0000000..cefb583 --- /dev/null +++ b/hbase-native-client/serde/rpc.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <memory> +#include <string> + +// Forward +namespace folly { +class IOBuf; +} +namespace google { +namespace protobuf { +class Message; +} +} + +namespace hbase { +class RpcSerde { +public: + RpcSerde(); + virtual ~RpcSerde(); + int ParseDelimited(const folly::IOBuf *buf, google::protobuf::Message *msg); + std::unique_ptr<folly::IOBuf> Preamble(); + std::unique_ptr<folly::IOBuf> Header(const std::string &user); + std::unique_ptr<folly::IOBuf> Request(const uint32_t call_id, + const std::string &method, + const google::protobuf::Message *msg); + std::unique_ptr<folly::IOBuf> + SerializeDelimited(const google::protobuf::Message &msg); + + std::unique_ptr<folly::IOBuf> + SerializeMessage(const google::protobuf::Message &msg); + + std::unique_ptr<folly::IOBuf> + PrependLength(std::unique_ptr<folly::IOBuf> msg); + +private: + /* data */ + uint8_t auth_type_; +}; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/server-name-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/server-name-test.cc b/hbase-native-client/serde/server-name-test.cc new file mode 100644 index 0000000..35dcbc1 --- /dev/null +++ b/hbase-native-client/serde/server-name-test.cc @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "serde/server-name.h" + +#include <gtest/gtest.h> +#include <string> + +using hbase::pb::ServerName; + +TEST(TestServerName, TestMakeServerName) { + auto sn = folly::to<ServerName>("test:123"); + + ASSERT_EQ("test", sn.host_name()); + ASSERT_EQ(123, sn.port()); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/server-name.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/server-name.h b/hbase-native-client/serde/server-name.h new file mode 100644 index 0000000..bdba087 --- /dev/null +++ b/hbase-native-client/serde/server-name.h @@ -0,0 +1,21 @@ +#pragma once + +#include "if/HBase.pb.h" +#include <folly/Conv.h> +#include <folly/String.h> + +namespace hbase { +namespace pb { + +template <class String> void parseTo(String in, ServerName &out) { + // TODO see about getting rsplit into folly. + std::string s = folly::to<std::string>(in); + + auto delim = s.rfind(":"); + DCHECK(delim != std::string::npos); + out.set_host_name(s.substr(0, delim)); + // Now keep everything after the : (delim + 1) to the end. + out.set_port(folly::to<int>(s.substr(delim + 1))); +} +} +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/table-name-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/table-name-test.cc b/hbase-native-client/serde/table-name-test.cc new file mode 100644 index 0000000..877d522 --- /dev/null +++ b/hbase-native-client/serde/table-name-test.cc @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <folly/Conv.h> +#include <gtest/gtest.h> + +#include <string> + +#include "serde/table-name.h" + +using namespace hbase; +using hbase::pb::TableName; + +TEST(TestTableName, TestToStringNoDefault) { + TableName tn; + tn.set_qualifier("TestTableName"); + std::string result = folly::to<std::string>(tn); + ASSERT_EQ(result.find("default"), std::string::npos); + ASSERT_EQ("TestTableName", result); +} + +TEST(TestTableName, TestToStringNoDefaltWhenSet) { + TableName tn; + tn.set_namespace_("default"); + tn.set_qualifier("TestTableName"); + std::string result = folly::to<std::string>(tn); + ASSERT_EQ(result.find("default"), std::string::npos); + ASSERT_EQ("TestTableName", result); +} + +TEST(TestTableName, TestToStringIncludeNS) { + TableName tn; + tn.set_namespace_("hbase"); + tn.set_qualifier("acl"); + std::string result = folly::to<std::string>(tn); + ASSERT_EQ(result.find("hbase"), 0); + ASSERT_EQ("hbase:acl", result); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/table-name.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/table-name.h b/hbase-native-client/serde/table-name.h new file mode 100644 index 0000000..c81e166 --- /dev/null +++ b/hbase-native-client/serde/table-name.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <memory> +#include <string> + +#include "if/HBase.pb.h" +#include <folly/Conv.h> +#include <folly/String.h> + +namespace hbase { +namespace pb { + +// Provide folly::to<std::string>(TableName); +template <class String> void toAppend(const TableName &in, String *result) { + if (!in.has_namespace_() || in.namespace_() == "default") { + folly::toAppend(in.qualifier(), result); + } else { + folly::toAppend(in.namespace_(), ':', in.qualifier(), result); + } +} + +template <class String> void parseTo(String in, TableName &out) { + std::vector<std::string> v; + folly::split(":", in, v); + + if (v.size() == 1) { + out.set_namespace_("default"); + out.set_qualifier(v[0]); + } else { + out.set_namespace_(v[0]); + out.set_qualifier(v[1]); + } +} + +} // namespace pb +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/zk-deserializer-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/zk-deserializer-test.cc b/hbase-native-client/serde/zk-deserializer-test.cc index 92d85a0..f07eecf 100644 --- a/hbase-native-client/serde/zk-deserializer-test.cc +++ b/hbase-native-client/serde/zk-deserializer-test.cc @@ -17,7 +17,7 @@ * */ -#include "serde/zk-deserializer.h" +#include "serde/zk.h" #include <folly/Logging.h> #include <folly/io/Cursor.h> @@ -41,7 +41,7 @@ TEST(TestZkDesializer, TestThrowNoMagicNum) { buf->append(100); RWPrivateCursor c{buf.get()}; c.write<uint8_t>(99); - ASSERT_THROW(deser.parse(buf.get(), &mrs), runtime_error); + ASSERT_THROW(deser.Parse(buf.get(), &mrs), runtime_error); } // Test if the protobuf is in a format that we can't decode @@ -78,7 +78,7 @@ TEST(TestZkDesializer, TestBadProtoThrow) { // Create the protobuf MetaRegionServer out; - ASSERT_THROW(deser.parse(buf.get(), &out), runtime_error); + ASSERT_THROW(deser.Parse(buf.get(), &out), runtime_error); } // Test to make sure the whole thing works. @@ -118,6 +118,6 @@ TEST(TestZkDesializer, TestNoThrow) { // Create the protobuf MetaRegionServer out; - ASSERT_TRUE(deser.parse(buf.get(), &out)); + ASSERT_TRUE(deser.Parse(buf.get(), &out)); ASSERT_EQ(mrs.server().host_name(), out.server().host_name()); } http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/zk-deserializer.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/zk-deserializer.cc b/hbase-native-client/serde/zk-deserializer.cc deleted file mode 100644 index 33cf809..0000000 --- a/hbase-native-client/serde/zk-deserializer.cc +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "serde/zk-deserializer.h" - -#include <folly/io/Cursor.h> -#include <folly/io/IOBuf.h> -#include <google/protobuf/message.h> - -using hbase::ZkDeserializer; -using std::runtime_error; -using folly::IOBuf; -using folly::io::Cursor; -using google::protobuf::Message; - -static const std::string MAGIC_STRING = "PBUF"; - -bool ZkDeserializer::parse(IOBuf *buf, Message *out) { - - // The format is like this - // 1 byte of magic number. 255 - // 4 bytes of id length. - // id_length number of bytes for the id of who put up the znode - // 4 bytes of a magic string PBUF - // Then the protobuf serialized without a varint header. - - Cursor c{buf}; - - // There should be a magic number for recoverable zk - uint8_t magic_num = c.read<uint8_t>(); - if (magic_num != 255) { - LOG(ERROR) << "Magic number not in ZK znode data expected 255 got =" - << unsigned(magic_num); - throw runtime_error("Magic number not in znode data"); - } - // How long is the id? - uint32_t id_len = c.readBE<uint32_t>(); - - if (id_len >= c.length()) { - LOG(ERROR) << "After skiping the if from zookeeper data there's not enough " - "left to read anything else"; - throw runtime_error("Not enough bytes to decode from zookeeper"); - } - - // Skip the id - c.skip(id_len); - - // Make sure that the magic string is there. - if (MAGIC_STRING != c.readFixedString(4)) { - LOG(ERROR) << "There was no PBUF magic string."; - throw runtime_error("No PBUF magic string in the zookpeeper data."); - } - - // Try to decode the protobuf. - // If there's an error bail out. - if (out->ParseFromArray(c.data(), c.length()) == false) { - LOG(ERROR) << "Error parsing Protobuf Message"; - throw runtime_error("Error parsing protobuf"); - } - - return true; -} http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/zk-deserializer.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/zk-deserializer.h b/hbase-native-client/serde/zk-deserializer.h deleted file mode 100644 index aa91661..0000000 --- a/hbase-native-client/serde/zk-deserializer.h +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -namespace google { -namespace protobuf { -class Message; -} -} -namespace folly { -class IOBuf; -} - -namespace hbase { -class ZkDeserializer { -public: - bool parse(folly::IOBuf *buf, google::protobuf::Message *out); -}; -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/zk.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/zk.cc b/hbase-native-client/serde/zk.cc new file mode 100644 index 0000000..59871a5 --- /dev/null +++ b/hbase-native-client/serde/zk.cc @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "serde/zk.h" + +#include <folly/io/Cursor.h> +#include <folly/io/IOBuf.h> +#include <google/protobuf/message.h> + +using hbase::ZkDeserializer; +using std::runtime_error; +using folly::IOBuf; +using folly::io::Cursor; +using google::protobuf::Message; + +static const std::string MAGIC_STRING = "PBUF"; + +bool ZkDeserializer::Parse(IOBuf *buf, Message *out) { + + // The format is like this + // 1 byte of magic number. 255 + // 4 bytes of id length. + // id_length number of bytes for the id of who put up the znode + // 4 bytes of a magic string PBUF + // Then the protobuf serialized without a varint header. + + Cursor c{buf}; + + // There should be a magic number for recoverable zk + uint8_t magic_num = c.read<uint8_t>(); + if (magic_num != 255) { + LOG(ERROR) << "Magic number not in ZK znode data expected 255 got =" + << unsigned(magic_num); + throw runtime_error("Magic number not in znode data"); + } + // How long is the id? + uint32_t id_len = c.readBE<uint32_t>(); + + if (id_len >= c.length()) { + LOG(ERROR) << "After skiping the if from zookeeper data there's not enough " + "left to read anything else"; + throw runtime_error("Not enough bytes to decode from zookeeper"); + } + + // Skip the id + c.skip(id_len); + + // Make sure that the magic string is there. + if (MAGIC_STRING != c.readFixedString(4)) { + LOG(ERROR) << "There was no PBUF magic string."; + throw runtime_error("No PBUF magic string in the zookpeeper data."); + } + + // Try to decode the protobuf. + // If there's an error bail out. + if (out->ParseFromArray(c.data(), c.length()) == false) { + LOG(ERROR) << "Error parsing Protobuf Message"; + throw runtime_error("Error parsing protobuf"); + } + + return true; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6b4d7599/hbase-native-client/serde/zk.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/zk.h b/hbase-native-client/serde/zk.h new file mode 100644 index 0000000..b672bf4 --- /dev/null +++ b/hbase-native-client/serde/zk.h @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +namespace google { +namespace protobuf { +class Message; +} +} +namespace folly { +class IOBuf; +} + +namespace hbase { +class ZkDeserializer { +public: + bool Parse(folly::IOBuf *buf, google::protobuf::Message *out); +}; +} // namespace hbase