Repository: hbase Updated Branches: refs/heads/HBASE-14850 915d89f51 -> 8c7a8b9da
HBASE-17771 [C++] Classes required for implementation of BatchCallerBuilder Signed-off-by: Enis Soztutar <e...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8c7a8b9d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8c7a8b9d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8c7a8b9d Branch: refs/heads/HBASE-14850 Commit: 8c7a8b9da24a69810f2e18fdaae551b9d017c8cc Parents: 915d89f Author: Sudeep Sunthankar <sudeep.sunthan...@hashmapinc.com> Authored: Wed Mar 29 16:55:21 2017 +1100 Committer: Enis Soztutar <e...@apache.org> Committed: Wed Mar 29 16:08:55 2017 -0700 ---------------------------------------------------------------------- hbase-native-client/connection/request.cc | 4 + hbase-native-client/connection/request.h | 2 + hbase-native-client/core/BUCK | 8 ++ hbase-native-client/core/action.h | 45 ++++++++++ hbase-native-client/core/get-test.cc | 5 +- hbase-native-client/core/get.cc | 15 +--- hbase-native-client/core/get.h | 18 +--- hbase-native-client/core/multi-response.cc | 80 ++++++++++++++++++ hbase-native-client/core/multi-response.h | 81 ++++++++++++++++++ hbase-native-client/core/raw-async-table.cc | 2 +- hbase-native-client/core/region-request.h | 48 +++++++++++ hbase-native-client/core/region-result.cc | 54 ++++++++++++ hbase-native-client/core/region-result.h | 55 ++++++++++++ hbase-native-client/core/request-converter.cc | 82 ++++++++++++------ hbase-native-client/core/request-converter.h | 12 +++ hbase-native-client/core/response-converter.cc | 94 ++++++++++++++++++++- hbase-native-client/core/response-converter.h | 7 ++ hbase-native-client/core/row.h | 62 ++++++++++++++ hbase-native-client/core/server-request.h | 59 +++++++++++++ 19 files changed, 671 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/connection/request.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/request.cc b/hbase-native-client/connection/request.cc index 189130e..80883cc 100644 --- a/hbase-native-client/connection/request.cc +++ b/hbase-native-client/connection/request.cc @@ -39,3 +39,7 @@ std::unique_ptr<Request> Request::scan() { return std::make_unique<Request>(std::make_shared<hbase::pb::ScanRequest>(), std::make_shared<hbase::pb::ScanResponse>(), "Scan"); } +std::unique_ptr<Request> Request::multi() { + return std::make_unique<Request>(std::make_shared<hbase::pb::MultiRequest>(), + std::make_shared<hbase::pb::MultiResponse>(), "Multi"); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/connection/request.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/request.h b/hbase-native-client/connection/request.h index 91c684d..520b380 100644 --- a/hbase-native-client/connection/request.h +++ b/hbase-native-client/connection/request.h @@ -39,6 +39,8 @@ class Request { static std::unique_ptr<Request> mutate(); /** Create a request object for a scan */ static std::unique_ptr<Request> scan(); + /** Create a request object for a multi */ + static std::unique_ptr<Request> multi(); /** * This should be private. Do not use this. http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index 2d77f2d..7483980 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -45,6 +45,12 @@ cxx_library( "async-rpc-retrying-caller.h", "hbase-rpc-controller.h", "zk-util.h", + "action.h", + "multi-response.h", + "region-request.h", + "region-result.h", + "row.h", + "server-request.h", ], srcs=[ "async-connection.cc", @@ -62,6 +68,8 @@ cxx_library( "response-converter.cc", "table.cc", "zk-util.cc", + "multi-response.cc", + "region-result.cc", ], deps=[ "//exceptions:exceptions", http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/action.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/action.h b/hbase-native-client/core/action.h new file mode 100644 index 0000000..3511683 --- /dev/null +++ b/hbase-native-client/core/action.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <memory> +#include "core/row.h" + +using hbase::Row; +namespace hbase { + +class Action { + public: + Action(std::shared_ptr<Row> action, int original_index) + : action_(action), original_index_(original_index) {} + ~Action() {} + + int64_t original_index() const { return original_index_; } + + std::shared_ptr<Row> action() const { return action_; } + + private: + std::shared_ptr<Row> action_; + int64_t original_index_; + int64_t nonce_ = -1; + int32_t replica_id_ = -1; +}; + +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/get-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/get-test.cc b/hbase-native-client/core/get-test.cc index 07d0003..6ee2715 100644 --- a/hbase-native-client/core/get-test.cc +++ b/hbase-native-client/core/get-test.cc @@ -21,7 +21,8 @@ #include <glog/logging.h> #include <gtest/gtest.h> -using namespace hbase; + +using hbase::Get; const int NUMBER_OF_GETS = 5; void CheckFamilies(Get &get) { @@ -102,7 +103,7 @@ void CheckFamiliesAfterCopy(Get &get) { } void GetMethods(Get &get, const std::string &row) { - EXPECT_EQ(row, get.Row()); + EXPECT_EQ(row, get.row()); CheckFamilies(get); EXPECT_EQ(true, get.CacheBlocks()); http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/get.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/get.cc b/hbase-native-client/core/get.cc index 5c5f446..afeb429 100644 --- a/hbase-native-client/core/get.cc +++ b/hbase-native-client/core/get.cc @@ -26,7 +26,7 @@ namespace hbase { Get::~Get() {} -Get::Get(const std::string &row) : row_(row) { CheckRow(row_); } +Get::Get(const std::string &row) : Row(row) {} Get::Get(const Get &get) { row_ = get.row_; @@ -78,8 +78,6 @@ Get &Get::AddColumn(const std::string &family, const std::string &qualifier) { return *this; } -const std::string &Get::Row() const { return row_; } - hbase::pb::Consistency Get::Consistency() const { return consistency_; } Get &Get::SetConsistency(hbase::pb::Consistency consistency) { @@ -119,15 +117,4 @@ Get &Get::SetTimeStamp(int64_t timestamp) { const TimeRange &Get::Timerange() const { return *tr_; } -void Get::CheckRow(const std::string &row) { - const int kMaxRowLength = std::numeric_limits<int16_t>::max(); - int row_length = row.size(); - if (0 == row_length) { - throw std::runtime_error("Row length can't be 0"); - } - if (row_length > kMaxRowLength) { - throw std::runtime_error("Length of " + row + " is greater than max row size: " + - std::to_string(kMaxRowLength)); - } -} } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/get.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/get.h b/hbase-native-client/core/get.h index 5492f21..e0be4e7 100644 --- a/hbase-native-client/core/get.h +++ b/hbase-native-client/core/get.h @@ -25,9 +25,11 @@ #include <string> #include <vector> #include "core/query.h" +#include "core/row.h" #include "core/time-range.h" #include "if/Client.pb.h" +using hbase::Row; namespace hbase { /** @@ -36,7 +38,7 @@ namespace hbase { */ using FamilyMap = std::map<std::string, std::vector<std::string>>; -class Get : public Query { +class Get : public Row, public Query { public: /** * Constructors @@ -110,11 +112,6 @@ class Get : public Query { Get& AddColumn(const std::string& family, const std::string& qualifier); /** - * @brief Returns the row for this Get operation - */ - const std::string& Row() const; - - /** * @brief Returns true if family map (FamilyMap) is non empty false otherwise */ bool HasFamilies() const; @@ -131,21 +128,12 @@ class Get : public Query { Get& SetConsistency(hbase::pb::Consistency consistency); private: - std::string row_ = ""; int32_t max_versions_ = 1; bool cache_blocks_ = true; bool check_existence_only_ = false; FamilyMap family_map_; hbase::pb::Consistency consistency_ = hbase::pb::Consistency::STRONG; std::unique_ptr<TimeRange> tr_ = std::make_unique<TimeRange>(); - - /** - * @brief Checks if the row for this Get operation is proper or not - * @param row Row to check - * @throws std::runtime_error if row is empty or greater than - * MAX_ROW_LENGTH(i.e. std::numeric_limits<short>::max()) - */ - void CheckRow(const std::string& row); }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/multi-response.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/multi-response.cc b/hbase-native-client/core/multi-response.cc new file mode 100644 index 0000000..562f3b6 --- /dev/null +++ b/hbase-native-client/core/multi-response.cc @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/multi-response.h" +#include "core/region-result.h" + +namespace hbase { + +MultiResponse::MultiResponse() {} + +int MultiResponse::Size() const { + int size = 0; + for (const auto& result : results_) { + size += result.second->ResultOrExceptionSize(); + } + return size; +} + +void MultiResponse::AddRegionResult(const std::string& region_name, int32_t original_index, + std::shared_ptr<Result> result, + std::shared_ptr<std::exception> exc) { + bool region_found = false; + for (auto itr = results_.begin(); itr != results_.end(); ++itr) { + if (itr->first == region_name) { + region_found = true; + itr->second->AddResultOrException(original_index, result, exc); + break; + } + } + if (!region_found) { + auto region_result = std::make_shared<RegionResult>(); + region_result->AddResultOrException(original_index, result, exc); + results_[region_name] = region_result; + } +} + +void MultiResponse::AddRegionException(const std::string& region_name, + std::shared_ptr<std::exception> exception) { + exceptions_[region_name] = exception; +} + +std::shared_ptr<std::exception> MultiResponse::RegionException( + const std::string& region_name) const { + auto find = exceptions_.at(region_name); + return find; +} + +const std::map<std::string, std::shared_ptr<std::exception> >& MultiResponse::RegionExceptions() + const { + return exceptions_; +} + +void MultiResponse::AddStatistic(const std::string& region_name, + std::shared_ptr<RegionLoadStats> stat) { + results_[region_name]->set_stat(stat); +} + +const std::map<std::string, std::shared_ptr<RegionResult> >& MultiResponse::RegionResults() const { + return results_; +} + +MultiResponse::~MultiResponse() {} + +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/multi-response.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/multi-response.h b/hbase-native-client/core/multi-response.h new file mode 100644 index 0000000..cebd2b7 --- /dev/null +++ b/hbase-native-client/core/multi-response.h @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <core/region-result.h> +#include <exception> +#include <map> +#include <memory> +#include <string> + +#include "core/result.h" +#include "if/Client.pb.h" + +using hbase::RegionResult; +using hbase::Result; +using hbase::pb::RegionLoadStats; + +namespace hbase { + +class MultiResponse { + public: + MultiResponse(); + /** + * @brief Returns Number of pairs in this container + */ + int Size() const; + + /** + * Add the pair to the container, grouped by the regionName + * + * @param regionName + * @param originalIndex the original index of the Action (request). + * @param resOrEx the result or error; will be empty for successful Put and Delete actions. + */ + void AddRegionResult(const std::string& region_name, int32_t original_index, + std::shared_ptr<Result> result, std::shared_ptr<std::exception> exc); + + void AddRegionException(const std::string& region_name, + std::shared_ptr<std::exception> exception); + + /** + * @return the exception for the region, if any. Null otherwise. + */ + std::shared_ptr<std::exception> RegionException(const std::string& region_name) const; + + const std::map<std::string, std::shared_ptr<std::exception>>& RegionExceptions() const; + + void AddStatistic(const std::string& region_name, std::shared_ptr<RegionLoadStats> stat); + + const std::map<std::string, std::shared_ptr<RegionResult>>& RegionResults() const; + + ~MultiResponse(); + + private: + // map of regionName to map of Results by the original index for that Result + std::map<std::string, std::shared_ptr<hbase::RegionResult>> results_; + /** + * The server can send us a failure for the region itself, instead of individual failure. + * It's a part of the protobuf definition. + */ + std::map<std::string, std::shared_ptr<std::exception>> exceptions_; +}; + +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/raw-async-table.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc index 88a3382..9a680ed 100644 --- a/hbase-native-client/core/raw-async-table.cc +++ b/hbase-native-client/core/raw-async-table.cc @@ -56,7 +56,7 @@ folly::Future<RESP> RawAsyncTable::Call( Future<std::shared_ptr<Result>> RawAsyncTable::Get(const hbase::Get& get) { auto caller = - CreateCallerBuilder<std::shared_ptr<Result>>(get.Row(), connection_conf_->read_rpc_timeout()) + CreateCallerBuilder<std::shared_ptr<Result>>(get.row(), connection_conf_->read_rpc_timeout()) ->action([=, &get](std::shared_ptr<hbase::HBaseRpcController> controller, std::shared_ptr<hbase::RegionLocation> loc, std::shared_ptr<hbase::RpcClient> rpc_client) http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/region-request.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/region-request.h b/hbase-native-client/core/region-request.h new file mode 100644 index 0000000..6f29d44 --- /dev/null +++ b/hbase-native-client/core/region-request.h @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once +#include <memory> +#include <queue> +#include <vector> +#include "core/action.h" +#include "core/region-location.h" + +using hbase::Action; +namespace hbase { + +class RegionRequest { + public: + // Concurrent + using ActionList = std::vector<std::shared_ptr<Action>>; + explicit RegionRequest(const std::shared_ptr<hbase::RegionLocation> ®ion_loc) + : region_loc_(region_loc) {} + ~RegionRequest() {} + void AddAction(std::shared_ptr<Action> action) { + actions_.push_back(action); + } + std::shared_ptr<hbase::RegionLocation> region_location() const { return region_loc_; } + const ActionList &actions() const { return actions_; } + + private: + std::shared_ptr<hbase::RegionLocation> region_loc_; + ActionList actions_; +}; + +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/region-result.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/region-result.cc b/hbase-native-client/core/region-result.cc new file mode 100644 index 0000000..d9ab942 --- /dev/null +++ b/hbase-native-client/core/region-result.cc @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/region-result.h" +#include <glog/logging.h> +#include <stdexcept> + +using hbase::Result; +using hbase::pb::RegionLoadStats; + +namespace hbase { + +RegionResult::RegionResult() {} + +RegionResult::~RegionResult() {} + +void RegionResult::AddResultOrException(int32_t index, std::shared_ptr<hbase::Result> result, + std::shared_ptr<std::exception> exc) { + auto index_found = result_or_excption_.find(index); + if (index_found == result_or_excption_.end()) { + result_or_excption_[index] = std::make_tuple(result ? result : nullptr, exc ? exc : nullptr); + } else { + throw std::runtime_error("Index " + std::to_string(index) + + " already set with ResultOrException"); + } +} + +void RegionResult::set_stat(std::shared_ptr<RegionLoadStats> stat) { stat_ = stat; } + +int RegionResult::ResultOrExceptionSize() const { return result_or_excption_.size(); } + +std::shared_ptr<ResultOrExceptionTuple> RegionResult::ResultOrException(int32_t index) const { + return std::make_shared<ResultOrExceptionTuple>(result_or_excption_.at(index)); +} + +const std::shared_ptr<RegionLoadStats>& RegionResult::stat() const { return stat_; } + +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/region-result.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/region-result.h b/hbase-native-client/core/region-result.h new file mode 100644 index 0000000..9b7ca03 --- /dev/null +++ b/hbase-native-client/core/region-result.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <map> +#include <memory> +#include <string> +#include <tuple> +#include "core/result.h" +#include "if/Client.pb.h" + +using hbase::Result; +using hbase::pb::RegionLoadStats; + +namespace hbase { +using ResultOrExceptionTuple = + std::tuple<std::shared_ptr<hbase::Result>, std::shared_ptr<std::exception>>; +class RegionResult { + public: + RegionResult(); + void AddResultOrException(int32_t index, std::shared_ptr<hbase::Result> result, + std::shared_ptr<std::exception> exc); + + void set_stat(std::shared_ptr<RegionLoadStats> stat); + + int ResultOrExceptionSize() const; + + std::shared_ptr<ResultOrExceptionTuple> ResultOrException(int32_t index) const; + + const std::shared_ptr<RegionLoadStats>& stat() const; + + ~RegionResult(); + + private: + std::map<int, ResultOrExceptionTuple> result_or_excption_; + std::shared_ptr<RegionLoadStats> stat_; +}; +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/request-converter.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc index 227e04a..ff92b5c 100644 --- a/hbase-native-client/core/request-converter.cc +++ b/hbase-native-client/core/request-converter.cc @@ -23,6 +23,7 @@ using hbase::Request; using hbase::pb::GetRequest; +using hbase::pb::RegionAction; using hbase::pb::RegionSpecifier; using hbase::pb::RegionSpecifier_RegionSpecifierType; using hbase::pb::ScanRequest; @@ -43,35 +44,9 @@ void RequestConverter::SetRegion(const std::string ®ion_name, std::unique_ptr<Request> RequestConverter::ToGetRequest(const Get &get, const std::string ®ion_name) { auto pb_req = Request::get(); - auto pb_msg = std::static_pointer_cast<GetRequest>(pb_req->req_msg()); RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); - - auto pb_get = pb_msg->mutable_get(); - pb_get->set_max_versions(get.MaxVersions()); - pb_get->set_cache_blocks(get.CacheBlocks()); - pb_get->set_consistency(get.Consistency()); - - if (!get.Timerange().IsAllTime()) { - hbase::pb::TimeRange *pb_time_range = pb_get->mutable_time_range(); - pb_time_range->set_from(get.Timerange().MinTimeStamp()); - pb_time_range->set_to(get.Timerange().MaxTimeStamp()); - } - pb_get->set_row(get.Row()); - if (get.HasFamilies()) { - for (const auto &family : get.Family()) { - auto column = pb_get->add_column(); - column->set_family(family.first); - for (const auto &qualifier : family.second) { - column->add_qualifier(qualifier); - } - } - } - - if (get.filter() != nullptr) { - pb_get->set_allocated_filter(Filter::ToProto(*(get.filter())).release()); - } - + pb_msg->set_allocated_get((RequestConverter::ToGet(get)).release()); return pb_req; } @@ -123,4 +98,57 @@ std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan, return pb_req; } + +std::unique_ptr<Request> RequestConverter::ToMultiRequest( + const ActionsByRegion &actions_by_region) { + auto pb_req = Request::multi(); + auto pb_msg = std::static_pointer_cast<hbase::pb::MultiRequest>(pb_req->req_msg()); + + for (const auto &action_by_region : actions_by_region) { + auto pb_region_action = pb_msg->add_regionaction(); + RequestConverter::SetRegion(action_by_region.first, pb_region_action->mutable_region()); + int action_num = 0; + for (const auto ®ion_action : action_by_region.second->actions()) { + auto pb_action = pb_region_action->add_action(); + auto action = region_action->action(); + if (auto pget = std::dynamic_pointer_cast<Get>(action)) { + auto pb_get = RequestConverter::ToGet(*pget.get()); + pb_action->set_allocated_get(pb_get.release()); + pb_action->set_index(action_num); + } + action_num++; + } + } + + VLOG(3) << "Multi Req:-" << pb_req->req_msg()->ShortDebugString(); + return pb_req; +} + +std::unique_ptr<hbase::pb::Get> RequestConverter::ToGet(const Get &get) { + auto pb_get = std::make_unique<hbase::pb::Get>(); + pb_get->set_max_versions(get.MaxVersions()); + pb_get->set_cache_blocks(get.CacheBlocks()); + pb_get->set_consistency(get.Consistency()); + + if (!get.Timerange().IsAllTime()) { + hbase::pb::TimeRange *pb_time_range = pb_get->mutable_time_range(); + pb_time_range->set_from(get.Timerange().MinTimeStamp()); + pb_time_range->set_to(get.Timerange().MaxTimeStamp()); + } + pb_get->set_row(get.row()); + if (get.HasFamilies()) { + for (const auto &family : get.Family()) { + auto column = pb_get->add_column(); + column->set_family(family.first); + for (const auto &qualifier : family.second) { + column->add_qualifier(qualifier); + } + } + } + + if (get.filter() != nullptr) { + pb_get->set_allocated_filter(Filter::ToProto(*(get.filter())).release()); + } + return pb_get; +} } /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/request-converter.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h index 57f08cc..003afaa 100644 --- a/hbase-native-client/core/request-converter.h +++ b/hbase-native-client/core/request-converter.h @@ -21,14 +21,23 @@ #include <memory> #include <string> +#include <vector> #include "connection/request.h" +#include "core/action.h" #include "core/get.h" +#include "core/region-request.h" #include "core/scan.h" +#include "core/server-request.h" #include "if/HBase.pb.h" using hbase::pb::RegionSpecifier; +using hbase::pb::RegionAction; +using hbase::pb::ServerName; +using hbase::ServerRequest; + namespace hbase { +using ActionsByRegion = ServerRequest::ActionsByRegion; /** * RequestConverter class * This class converts a Client side Get, Scan, Mutate operation to corresponding PB message. @@ -53,6 +62,8 @@ class RequestConverter { */ static std::unique_ptr<Request> ToScanRequest(const Scan &scan, const std::string ®ion_name); + static std::unique_ptr<Request> ToMultiRequest(const ActionsByRegion ®ion_requests); + private: // Constructor not required. We have all static methods to create PB requests. RequestConverter(); @@ -64,6 +75,7 @@ class RequestConverter { * Request. */ static void SetRegion(const std::string ®ion_name, RegionSpecifier *region_specifier); + static std::unique_ptr<hbase::pb::Get> ToGet(const Get &get); }; } /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/response-converter.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc index b2fff34..7729257 100644 --- a/hbase-native-client/core/response-converter.cc +++ b/hbase-native-client/core/response-converter.cc @@ -19,13 +19,18 @@ #include "core/response-converter.h" +#include <glog/logging.h> +#include <stdexcept> #include <string> +#include <utility> #include <vector> - #include "core/cell.h" +#include "core/multi-response.h" +#include "exceptions/exception.h" using hbase::pb::GetResponse; using hbase::pb::ScanResponse; +using hbase::pb::RegionLoadStats; namespace hbase { @@ -37,6 +42,7 @@ ResponseConverter::~ResponseConverter() {} // go inside folly::Future's, making the move semantics extremely tricky. std::shared_ptr<Result> ResponseConverter::FromGetResponse(const Response& resp) { auto get_resp = std::static_pointer_cast<GetResponse>(resp.resp_msg()); + VLOG(3) << "FromGetResponse:" << get_resp->ShortDebugString(); return ToResult(get_resp->result(), resp.cell_scanner()); } @@ -52,16 +58,24 @@ std::shared_ptr<Result> ResponseConverter::ToResult( // iterate over the cells coming from rpc codec if (cell_scanner != nullptr) { - while (cell_scanner->Advance()) { + int cells_read = 0; + while (cells_read != result.associated_cell_count()) { + if (cell_scanner->Advance()) { vcells.push_back(cell_scanner->Current()); + cells_read += 1; + } else { + LOG(ERROR)<< "CellScanner::Advance() returned false unexpectedly. Cells Read:- " + << cells_read << "; Expected Cell Count:- " << result.associated_cell_count(); + std::runtime_error("CellScanner::Advance() returned false unexpectedly"); + } } - // TODO: check associated cell count? } return std::make_shared<Result>(vcells, result.exists(), result.stale(), result.partial()); } std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const Response& resp) { auto scan_resp = std::static_pointer_cast<ScanResponse>(resp.resp_msg()); + VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString(); int num_results = resp.cell_scanner() != nullptr ? scan_resp->cells_per_result_size() : scan_resp->results_size(); @@ -94,4 +108,78 @@ std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const R return results; } + +std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_ptr<Request> req, + const Response& resp) { + auto multi_req = std::static_pointer_cast < hbase::pb::MultiRequest > (req->req_msg()); + auto multi_resp = std::static_pointer_cast < hbase::pb::MultiResponse > (resp.resp_msg()); + VLOG(3) << "GetResults:" << multi_resp->ShortDebugString(); + int req_region_action_count = multi_req->regionaction_size(); + int res_region_action_count = multi_resp->regionactionresult_size(); + if (req_region_action_count != res_region_action_count) { + throw std::runtime_error( + "Request mutation count=" + std::to_string(req_region_action_count) + + " does not match response mutation result count=" + + std::to_string(res_region_action_count)); + } + auto multi_response = std::make_unique<hbase::MultiResponse>(); + for (int32_t num = 0; num < res_region_action_count; num++) { + hbase::pb::RegionAction actions = multi_req->regionaction(num); + hbase::pb::RegionActionResult action_result = multi_resp->regionactionresult(num); + hbase::pb::RegionSpecifier rs = actions.region(); + if (rs.has_type() && rs.type() != hbase::pb::RegionSpecifier::REGION_NAME) { + throw std::runtime_error("We support only encoded types for protobuf multi response."); + } + + auto region_name = rs.value(); + if (action_result.has_exception()) { + if (action_result.exception().has_value()) { + auto exc = std::make_shared < hbase::IOException > (action_result.exception().value()); + VLOG(8) << "Store Region Exception:- " << exc->what(); + multi_response->AddRegionException(region_name, exc); + } + continue; + } + + if (actions.action_size() != action_result.resultorexception_size()) { + throw std::runtime_error( + "actions.action_size=" + std::to_string(actions.action_size()) + + ", action_result.resultorexception_size=" + + std::to_string(action_result.resultorexception_size()) + " for region " + + actions.region().value()); + } + + for (hbase::pb::ResultOrException roe : action_result.resultorexception()) { + std::shared_ptr < Result > result; + std::shared_ptr < std::exception > exc; + if (roe.has_exception()) { + if (roe.exception().has_value()) { + exc = std::make_shared < hbase::IOException > (roe.exception().value()); + VLOG(8) << "Store ResultOrException:- " << exc->what(); + } + } else if (roe.has_result()) { + result = ToResult(roe.result(), resp.cell_scanner()); + } else if (roe.has_service_result()) { + // TODO Not processing Coprocessor Service Result; + } else { + // Sometimes, the response is just "it was processed". Generally, this occurs for things + // like mutateRows where either we get back 'processed' (or not) and optionally some + // statistics about the regions we touched. + std::vector < std::shared_ptr < Cell >> empty_cells; + result = std::make_shared < Result + > (empty_cells, multi_resp->processed() ? true : false, false, false); + } + multi_response->AddRegionResult(region_name, roe.index(), std::move(result), exc); + } + } + + if (multi_resp->has_regionstatistics()) { + hbase::pb::MultiRegionLoadStats stats = multi_resp->regionstatistics(); + for (int i = 0; i < stats.region_size(); i++) { + multi_response->AddStatistic(stats.region(i).value(), + std::make_shared < RegionLoadStats > (stats.stat(i))); + } + } + return multi_response; +} } /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/response-converter.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h index 743c14b..a5095fd 100644 --- a/hbase-native-client/core/response-converter.h +++ b/hbase-native-client/core/response-converter.h @@ -21,11 +21,15 @@ #include <memory> #include <vector> +#include "connection/request.h" #include "connection/response.h" +#include "core/multi-response.h" #include "core/result.h" #include "if/Client.pb.h" #include "serde/cell-scanner.h" +using hbase::Request; +using hbase::Response; namespace hbase { /** @@ -47,6 +51,9 @@ class ResponseConverter { static std::vector<std::shared_ptr<Result>> FromScanResponse(const Response& resp); + static std::unique_ptr<hbase::MultiResponse> GetResults(std::shared_ptr<Request> req, + const Response& resp); + private: // Constructor not required. We have all static methods to extract response from PB messages. ResponseConverter(); http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/row.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/row.h b/hbase-native-client/core/row.h new file mode 100644 index 0000000..2c7bdd1 --- /dev/null +++ b/hbase-native-client/core/row.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <limits> +#include <stdexcept> +#include <string> + +#pragma once + +namespace hbase { + +class Row { + public: + Row() {} + explicit Row(const std::string &row) : row_(row) { CheckRow(row_); } + + /** + * @brief Returns the row for the Row interface. + */ + const std::string &row() const { return row_; } + virtual ~Row() {} + + private: + /** + * @brief Checks if the row for this Get operation is proper or not + * @param row Row to check + * @throws std::runtime_error if row is empty or greater than + * MAX_ROW_LENGTH(i.e. std::numeric_limits<short>::max()) + */ + void CheckRow(const std::string &row) { + const int16_t kMaxRowLength = std::numeric_limits<int16_t>::max(); + size_t row_length = row.size(); + if (0 == row_length) { + throw std::runtime_error("Row length can't be 0"); + } + if (row_length > kMaxRowLength) { + throw std::runtime_error("Length of " + row + " is greater than max row size: " + + std::to_string(kMaxRowLength)); + } + } + + protected: + std::string row_ = ""; +}; + +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/server-request.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/server-request.h b/hbase-native-client/core/server-request.h new file mode 100644 index 0000000..827b2e7 --- /dev/null +++ b/hbase-native-client/core/server-request.h @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <map> +#include <memory> +#include <stdexcept> +#include <string> +#include "core/action.h" +#include "core/region-location.h" +#include "core/region-request.h" + +using hbase::Action; +using hbase::RegionRequest; + +namespace hbase { + +class ServerRequest { + public: + // Concurrent + using ActionsByRegion = std::map<std::string, std::shared_ptr<RegionRequest>>; + + explicit ServerRequest(std::shared_ptr<RegionLocation> region_location) { + auto region_name = region_location->region_name(); + auto region_request = std::make_shared<RegionRequest>(region_location); + actions_by_region_[region_name] = region_request; + } + ~ServerRequest() {} + + void AddActionsByRegion(std::shared_ptr<RegionLocation> region_location, + std::shared_ptr<Action> action) { + auto region_name = region_location->region_name(); + auto itr = actions_by_region_.at(region_name); + itr->AddAction(action); + } + + const ActionsByRegion &actions_by_region() const { return actions_by_region_; } + + private: + ActionsByRegion actions_by_region_; +}; +} /* namespace hbase */