http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/raw-async-table.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc deleted file mode 100644 index 409883f..0000000 --- a/hbase-native-client/core/raw-async-table.cc +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <utility> - -#include "core/raw-async-table.h" -#include "core/request-converter.h" -#include "core/response-converter.h" - -using hbase::security::User; - -namespace hbase { - -template <typename RESP> -std::shared_ptr<SingleRequestCallerBuilder<RESP>> RawAsyncTable::CreateCallerBuilder( - std::string row, std::chrono::nanoseconds rpc_timeout) { - return connection_->caller_factory() - ->Single<RESP>() - ->table(table_name_) - ->row(row) - ->rpc_timeout(rpc_timeout) - ->operation_timeout(connection_conf_->operation_timeout()) - ->pause(connection_conf_->pause()) - ->max_retries(connection_conf_->max_retries()) - ->start_log_errors_count(connection_conf_->start_log_errors_count()); -} - -template <typename REQ, typename PREQ, typename PRESP, typename RESP> -folly::Future<RESP> RawAsyncTable::Call( - std::shared_ptr<RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller, - std::shared_ptr<RegionLocation> loc, const REQ& req, - const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter, - const RespConverter<RESP, PRESP> resp_converter) { - std::unique_ptr<PREQ> preq = req_converter(req, loc->region_name()); - - // No need to make take a callable argument, it is always the same - return rpc_client - ->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), std::move(preq), - User::defaultUser(), "ClientService") - .then( - [resp_converter](const std::unique_ptr<PRESP>& presp) { return resp_converter(*presp); }); -} - -folly::Future<std::shared_ptr<Result>> RawAsyncTable::Get(const hbase::Get& get) { - auto caller = - CreateCallerBuilder<std::shared_ptr<Result>>(get.row(), connection_conf_->read_rpc_timeout()) - ->action([=, &get](std::shared_ptr<hbase::HBaseRpcController> controller, - std::shared_ptr<hbase::RegionLocation> loc, - std::shared_ptr<hbase::RpcClient> rpc_client) - -> folly::Future<std::shared_ptr<hbase::Result>> { - return Call<hbase::Get, hbase::Request, hbase::Response, - std::shared_ptr<hbase::Result>>( - rpc_client, controller, loc, get, - &hbase::RequestConverter::ToGetRequest, - &hbase::ResponseConverter::FromGetResponse); - }) - ->Build(); - - // Return the Future we obtain from the call(). However, we do not want the Caller to go out of - // context and get deallocated since the caller injects a lot of closures which capture [this, &] - // which is use-after-free. We are just passing an identity closure capturing caller by value to - // ensure that the lifecycle of the Caller object is longer than the retry lambdas. - return caller->Call().then([caller](const auto r) { return r; }); -} -folly::Future<std::shared_ptr<Result>> RawAsyncTable::Increment(const hbase::Increment& incr) { - auto caller = - CreateCallerBuilder<std::shared_ptr<Result>>(incr.row(), - connection_conf_->write_rpc_timeout()) - ->action([=, &incr](std::shared_ptr<hbase::HBaseRpcController> controller, - std::shared_ptr<hbase::RegionLocation> loc, - std::shared_ptr<hbase::RpcClient> - rpc_client) -> folly::Future<std::shared_ptr<Result>> { - return Call<hbase::Increment, hbase::Request, hbase::Response, std::shared_ptr<Result>>( - rpc_client, controller, loc, incr, - &hbase::RequestConverter::IncrementToMutateRequest, - &hbase::ResponseConverter::FromMutateResponse); - }) - ->Build(); - - return caller->Call().then([caller](const auto r) { return r; }); -} - -folly::Future<folly::Unit> RawAsyncTable::Put(const hbase::Put& put) { - auto caller = - CreateCallerBuilder<folly::Unit>(put.row(), connection_conf_->write_rpc_timeout()) - ->action([=, &put]( - std::shared_ptr<hbase::HBaseRpcController> controller, - std::shared_ptr<hbase::RegionLocation> loc, - std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<folly::Unit> { - return Call<hbase::Put, hbase::Request, hbase::Response, folly::Unit>( - rpc_client, controller, loc, put, &hbase::RequestConverter::ToMutateRequest, - [](const Response& r) -> folly::Unit { return folly::unit; }); - }) - ->Build(); - - return caller->Call().then([caller](const auto r) { return r; }); -} - -folly::Future<bool> RawAsyncTable::CheckAndPut(const std::string& row, const std::string& family, - const std::string& qualifier, - const std::string& value, const hbase::Put& put, - const pb::CompareType& compare_op) { - auto caller = - CreateCallerBuilder<bool>(row, connection_conf_->write_rpc_timeout()) - ->action([=, &put](std::shared_ptr<hbase::HBaseRpcController> controller, - std::shared_ptr<hbase::RegionLocation> loc, - std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<bool> { - return Call<hbase::Put, hbase::Request, hbase::Response, bool>( - rpc_client, controller, loc, put, - // request conversion - [=, &put](const hbase::Put& put, - const std::string& region_name) -> std::unique_ptr<Request> { - auto checkReq = RequestConverter::CheckAndPutToMutateRequest( - row, family, qualifier, value, compare_op, put, region_name); - return checkReq; - }, - // response conversion - &ResponseConverter::BoolFromMutateResponse); - }) - ->Build(); - - return caller->Call().then([caller](const auto r) { return r; }); -} - -folly::Future<bool> RawAsyncTable::CheckAndDelete(const std::string& row, const std::string& family, - const std::string& qualifier, - const std::string& value, - const hbase::Delete& del, - const pb::CompareType& compare_op) { - auto caller = - CreateCallerBuilder<bool>(row, connection_conf_->write_rpc_timeout()) - ->action([=, &del](std::shared_ptr<hbase::HBaseRpcController> controller, - std::shared_ptr<hbase::RegionLocation> loc, - std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<bool> { - return Call<hbase::Delete, hbase::Request, hbase::Response, bool>( - rpc_client, controller, loc, del, - // request conversion - [=, &del](const hbase::Delete& del, - const std::string& region_name) -> std::unique_ptr<Request> { - auto checkReq = RequestConverter::CheckAndDeleteToMutateRequest( - row, family, qualifier, value, compare_op, del, region_name); - return checkReq; - }, - // response conversion - &ResponseConverter::BoolFromMutateResponse); - }) - ->Build(); - - return caller->Call().then([caller](const auto r) { return r; }); -} - -folly::Future<folly::Unit> RawAsyncTable::Delete(const hbase::Delete& del) { - auto caller = - CreateCallerBuilder<folly::Unit>(del.row(), connection_conf_->write_rpc_timeout()) - ->action([=, &del]( - std::shared_ptr<hbase::HBaseRpcController> controller, - std::shared_ptr<hbase::RegionLocation> loc, - std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<folly::Unit> { - return Call<hbase::Delete, hbase::Request, hbase::Response, folly::Unit>( - rpc_client, controller, loc, del, &hbase::RequestConverter::DeleteToMutateRequest, - [](const Response& r) -> folly::Unit { return folly::unit; }); - }) - ->Build(); - - return caller->Call().then([caller](const auto r) { return r; }); -} - -folly::Future<std::shared_ptr<Result>> RawAsyncTable::Append(const hbase::Append& append) { - auto caller = - CreateCallerBuilder<std::shared_ptr<Result>>(append.row(), - connection_conf_->write_rpc_timeout()) - ->action([=, &append](std::shared_ptr<hbase::HBaseRpcController> controller, - std::shared_ptr<hbase::RegionLocation> loc, - std::shared_ptr<hbase::RpcClient> - rpc_client) -> folly::Future<std::shared_ptr<Result>> { - return Call<hbase::Append, hbase::Request, hbase::Response, std::shared_ptr<Result>>( - rpc_client, controller, loc, append, - &hbase::RequestConverter::AppendToMutateRequest, - &hbase::ResponseConverter::FromMutateResponse); - }) - ->Build(); - - return caller->Call().then([caller](const auto r) { return r; }); -} - -folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::Get( - const std::vector<hbase::Get>& gets) { - std::vector<std::shared_ptr<hbase::Row>> rows; - for (auto get : gets) { - std::shared_ptr<hbase::Row> srow = std::make_shared<hbase::Get>(get); - rows.push_back(srow); - } - return this->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>( - rows, connection_conf_->read_rpc_timeout()); -} - -template <typename REQ, typename RESP> -folly::Future<std::vector<folly::Try<RESP>>> RawAsyncTable::Batch( - const std::vector<REQ>& rows, std::chrono::nanoseconds timeout) { - auto caller = connection_->caller_factory() - ->Batch<REQ, RESP>() - ->table(table_name_) - ->actions(std::make_shared<std::vector<REQ>>(rows)) - ->rpc_timeout(timeout) - ->operation_timeout(connection_conf_->operation_timeout()) - ->pause(connection_conf_->pause()) - ->max_attempts(connection_conf_->max_retries()) - ->start_log_errors_count(connection_conf_->start_log_errors_count()) - ->Build(); - - return caller->Call().then([caller](auto r) { return r; }); -} - -void RawAsyncTable::Scan(const hbase::Scan& scan, std::shared_ptr<RawScanResultConsumer> consumer) { - auto scanner = AsyncClientScanner::Create( - connection_, SetDefaultScanConfig(scan), table_name_, consumer, connection_conf_->pause(), - connection_conf_->max_retries(), connection_conf_->scan_timeout(), - connection_conf_->rpc_timeout(), connection_conf_->start_log_errors_count()); - scanner->Start(); -} - -std::shared_ptr<hbase::Scan> RawAsyncTable::SetDefaultScanConfig(const hbase::Scan& scan) { - // always create a new scan object as we may reset the start row later. - auto new_scan = std::make_shared<hbase::Scan>(scan); - if (new_scan->Caching() <= 0) { - new_scan->SetCaching(default_scanner_caching_); - } - if (new_scan->MaxResultSize() <= 0) { - new_scan->SetMaxResultSize(default_scanner_max_result_size_); - } - return new_scan; -} - -folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::Put( - const std::vector<hbase::Put>& puts) { - std::vector<std::shared_ptr<hbase::Row>> rows; - for (auto put : puts) { - std::shared_ptr<hbase::Row> srow = std::make_shared<hbase::Put>(put); - rows.push_back(srow); - } - return this->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>( - rows, connection_conf_->write_rpc_timeout()); -} -} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/raw-async-table.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h deleted file mode 100644 index 97eef7f..0000000 --- a/hbase-native-client/core/raw-async-table.h +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <folly/Unit.h> -#include <folly/futures/Future.h> -#include <chrono> -#include <memory> -#include <string> -#include <vector> - -#include "core/async-batch-rpc-retrying-caller.h" -#include "core/async-client-scanner.h" -#include "core/async-connection.h" -#include "core/async-rpc-retrying-caller-factory.h" -#include "core/async-rpc-retrying-caller.h" -#include "core/connection-configuration.h" -#include "core/delete.h" -#include "core/get.h" -#include "core/increment.h" -#include "core/put.h" -#include "core/result.h" -#include "core/scan.h" - -namespace hbase { - -/** - * A low level asynchronous table that should not be used by user applications.The implementation - * is required to be thread safe. - */ -class RawAsyncTable { - public: - RawAsyncTable(std::shared_ptr<pb::TableName> table_name, - std::shared_ptr<AsyncConnection> connection) - : connection_(connection), - connection_conf_(connection->connection_conf()), - table_name_(table_name), - rpc_client_(connection->rpc_client()) { - default_scanner_caching_ = connection_conf_->scanner_caching(); - default_scanner_max_result_size_ = connection_conf_->scanner_max_result_size(); - } - virtual ~RawAsyncTable() = default; - - folly::Future<std::shared_ptr<Result>> Get(const hbase::Get& get); - - folly::Future<folly::Unit> Delete(const hbase::Delete& del); - - folly::Future<std::shared_ptr<hbase::Result>> Append(const hbase::Append& append); - - folly::Future<std::shared_ptr<hbase::Result>> Increment(const hbase::Increment& increment); - - folly::Future<folly::Unit> Put(const hbase::Put& put); - - folly::Future<bool> CheckAndPut(const std::string& row, const std::string& family, - const std::string& qualifier, const std::string& value, - const hbase::Put& put, - const pb::CompareType& compare_op = pb::CompareType::EQUAL); - - folly::Future<bool> CheckAndDelete(const std::string& row, const std::string& family, - const std::string& qualifier, const std::string& value, - const hbase::Delete& del, - const pb::CompareType& compare_op = pb::CompareType::EQUAL); - - void Scan(const hbase::Scan& scan, std::shared_ptr<RawScanResultConsumer> consumer); - - void Close() {} - - folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Get( - const std::vector<hbase::Get>& gets); - template <typename REQ, typename RESP> - folly::Future<std::vector<folly::Try<RESP>>> Batch(const std::vector<REQ>& rows, - std::chrono::nanoseconds timeout); - folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Put( - const std::vector<hbase::Put>& puts); - - private: - /* Data */ - std::shared_ptr<AsyncConnection> connection_; - std::shared_ptr<ConnectionConfiguration> connection_conf_; - std::shared_ptr<pb::TableName> table_name_; - std::shared_ptr<RpcClient> rpc_client_; - int32_t default_scanner_caching_; - int64_t default_scanner_max_result_size_; - - /* Methods */ - template <typename REQ, typename PREQ, typename PRESP, typename RESP> - folly::Future<RESP> Call( - std::shared_ptr<RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller, - std::shared_ptr<RegionLocation> loc, const REQ& req, - const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter, - const RespConverter<RESP, PRESP> resp_converter); - - template <typename RESP> - std::shared_ptr<SingleRequestCallerBuilder<RESP>> CreateCallerBuilder( - std::string row, std::chrono::nanoseconds rpc_timeout); - - std::shared_ptr<hbase::Scan> SetDefaultScanConfig(const hbase::Scan& scan); -}; -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/raw-scan-result-consumer.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/raw-scan-result-consumer.h b/hbase-native-client/core/raw-scan-result-consumer.h deleted file mode 100644 index b7c3c48..0000000 --- a/hbase-native-client/core/raw-scan-result-consumer.h +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <folly/ExceptionWrapper.h> -#include <folly/Logging.h> -#include <chrono> -#include <memory> -#include <string> -#include <thread> -#include <vector> - -#include "core/result.h" -#include "if/Client.pb.h" -#include "if/HBase.pb.h" - -namespace hbase { - -enum class ScanControllerState { kInitialized, kSuspended, kTerminated, kDestroyed }; - -enum class ScanResumerState { kInitialized, kSuspended, kResumed }; - -/** - * Used to resume a scan. - */ -class ScanResumer { - public: - virtual ~ScanResumer() = default; - - /** - * Resume the scan. You are free to call it multiple time but only the first call will take - * effect. - */ - virtual void Resume() = 0; -}; - -/** - * Used to suspend or stop a scan. - * <p> - * Notice that, you should only call the methods below inside onNext or onHeartbeat method. A - * IllegalStateException will be thrown if you call them at other places. - * <p> - * You can only call one of the methods below, i.e., call suspend or terminate(of course you are - * free to not call them both), and the methods are not reentrant. A IllegalStateException will be - * thrown if you have already called one of the methods. - */ -class ScanController { - public: - virtual ~ScanController() = default; - - /** - * Suspend the scan. - * <p> - * This means we will stop fetching data in background, i.e., will not call onNext any more - * before you resume the scan. - * @return A resumer used to resume the scan later. - */ - virtual std::shared_ptr<ScanResumer> Suspend() = 0; - - /** - * Terminate the scan. - * <p> - * This is useful when you have got enough results and want to stop the scan in onNext method, - * or you want to stop the scan in onHeartbeat method because it has spent too many time. - */ - virtual void Terminate() = 0; -}; - -/** - * Receives {@link Result} for an asynchronous scan. - * <p> - * Notice that, the {@link #onNext(Result[], ScanController)} method will be called in the thread - * which we send request to HBase service. So if you want the asynchronous scanner fetch data from - * HBase in background while you process the returned data, you need to move the processing work to - * another thread to make the {@code onNext} call return immediately. And please do NOT do any time - * consuming tasks in all methods below unless you know what you are doing. - */ -class RawScanResultConsumer { - public: - virtual ~RawScanResultConsumer() = default; - - /** - * Indicate that we have receive some data. - * @param results the data fetched from HBase service. - * @param controller used to suspend or terminate the scan. Notice that the {@code controller} - * instance is only valid within scope of onNext method. You can only call its method in - * onNext, do NOT store it and call it later outside onNext. - */ - virtual void OnNext(const std::vector<std::shared_ptr<Result>> &results, - std::shared_ptr<ScanController> controller) {} - - /** - * Indicate that there is an heartbeat message but we have not cumulated enough cells to call - * onNext. - * <p> - * This method give you a chance to terminate a slow scan operation. - * @param controller used to suspend or terminate the scan. Notice that the {@code controller} - * instance is only valid within the scope of onHeartbeat method. You can only call its - * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat. - */ - virtual void OnHeartbeat(std::shared_ptr<ScanController> controller) {} - - /** - * Indicate that we hit an unrecoverable error and the scan operation is terminated. - * <p> - * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}. - */ - virtual void OnError(const folly::exception_wrapper &error) {} - - /** - * Indicate that the scan operation is completed normally. - */ - virtual void OnComplete() {} -}; -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/region-location.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h deleted file mode 100644 index f73999f..0000000 --- a/hbase-native-client/core/region-location.h +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <memory> -#include <string> - -#include "if/HBase.pb.h" - -namespace hbase { - -enum class RegionLocateType { kBefore, kCurrent, kAfter }; - -/** - * @brief class to hold where a region is located. - * - * This class holds where a region is located, the information about it, the - * region name. - */ -class RegionLocation { - public: - /** - * Constructor. - * @param region_name The region name of this region. - * @param ri The decoded RegionInfo of this region. - * @param sn The server name of the HBase regionserver thought to be hosting - * this region. - */ - RegionLocation(std::string region_name, hbase::pb::RegionInfo ri, hbase::pb::ServerName sn) - : region_name_(region_name), ri_(ri), sn_(sn) {} - - /** - * Get a reference to the regio info - */ - const hbase::pb::RegionInfo ®ion_info() const { return ri_; } - - /** - * Get a reference to the server name - */ - const hbase::pb::ServerName &server_name() const { return sn_; } - - /** - * Get a reference to the region name. - */ - const std::string ®ion_name() const { return region_name_; } - - /** - * Set the servername if the region has moved. - */ - void set_server_name(hbase::pb::ServerName sn) { sn_ = sn; } - - const std::string DebugString() const { - return "region_info:" + ri_.ShortDebugString() + ", server_name:" + sn_.ShortDebugString(); - } - - private: - std::string region_name_; - hbase::pb::RegionInfo ri_; - hbase::pb::ServerName sn_; -}; - -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/region-request.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/region-request.h b/hbase-native-client/core/region-request.h deleted file mode 100644 index aded3a9..0000000 --- a/hbase-native-client/core/region-request.h +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#pragma once - -#include <memory> -#include <queue> -#include <vector> -#include "core/action.h" -#include "core/region-location.h" - -namespace hbase { - -class RegionRequest { - public: - // Concurrent - using ActionList = std::vector<std::shared_ptr<Action>>; - explicit RegionRequest(const std::shared_ptr<hbase::RegionLocation> ®ion_loc) - : region_loc_(region_loc) {} - ~RegionRequest() {} - void AddAction(std::shared_ptr<Action> action) { actions_.push_back(action); } - std::shared_ptr<hbase::RegionLocation> region_location() const { return region_loc_; } - const ActionList &actions() const { return actions_; } - - private: - std::shared_ptr<hbase::RegionLocation> region_loc_; - ActionList actions_; -}; - -} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/region-result.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/region-result.cc b/hbase-native-client/core/region-result.cc deleted file mode 100644 index 206c876..0000000 --- a/hbase-native-client/core/region-result.cc +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "core/region-result.h" -#include <glog/logging.h> -#include <stdexcept> - -using hbase::pb::RegionLoadStats; - -namespace hbase { - -RegionResult::RegionResult() {} - -RegionResult::~RegionResult() {} - -void RegionResult::AddResultOrException(int32_t index, std::shared_ptr<hbase::Result> result, - std::shared_ptr<folly::exception_wrapper> exc) { - auto index_found = result_or_excption_.find(index); - if (index_found == result_or_excption_.end()) { - result_or_excption_[index] = std::make_tuple(result ? result : nullptr, exc ? exc : nullptr); - } else { - throw std::runtime_error("Index " + std::to_string(index) + - " already set with ResultOrException"); - } -} - -void RegionResult::set_stat(std::shared_ptr<RegionLoadStats> stat) { stat_ = stat; } - -int RegionResult::ResultOrExceptionSize() const { return result_or_excption_.size(); } - -std::shared_ptr<ResultOrExceptionTuple> RegionResult::ResultOrException(int32_t index) const { - return std::make_shared<ResultOrExceptionTuple>(result_or_excption_.at(index)); -} - -const std::shared_ptr<RegionLoadStats>& RegionResult::stat() const { return stat_; } - -} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/region-result.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/region-result.h b/hbase-native-client/core/region-result.h deleted file mode 100644 index b961634..0000000 --- a/hbase-native-client/core/region-result.h +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#pragma once - -#include <folly/ExceptionWrapper.h> -#include <map> -#include <memory> -#include <string> -#include <tuple> -#include "core/result.h" -#include "if/Client.pb.h" - -namespace hbase { - -using ResultOrExceptionTuple = - std::tuple<std::shared_ptr<hbase::Result>, std::shared_ptr<folly::exception_wrapper>>; - -class RegionResult { - public: - RegionResult(); - void AddResultOrException(int32_t index, std::shared_ptr<hbase::Result> result, - std::shared_ptr<folly::exception_wrapper> exc); - - void set_stat(std::shared_ptr<pb::RegionLoadStats> stat); - - int ResultOrExceptionSize() const; - - std::shared_ptr<ResultOrExceptionTuple> ResultOrException(int32_t index) const; - - const std::shared_ptr<pb::RegionLoadStats>& stat() const; - - ~RegionResult(); - - private: - std::map<int, ResultOrExceptionTuple> result_or_excption_; - std::shared_ptr<pb::RegionLoadStats> stat_; -}; -} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/request-converter-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/request-converter-test.cc b/hbase-native-client/core/request-converter-test.cc deleted file mode 100644 index 6c07a19..0000000 --- a/hbase-native-client/core/request-converter-test.cc +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "core/request-converter.h" - -#include <gtest/gtest.h> -#include <limits> -#include "connection/request.h" -#include "core/get.h" -#include "core/scan.h" - -using hbase::Get; -using hbase::Scan; - -using hbase::pb::GetRequest; -using hbase::pb::RegionSpecifier; -using hbase::pb::RegionSpecifier_RegionSpecifierType; -using hbase::pb::ScanRequest; - -TEST(RequestConverter, ToGet) { - std::string row_str = "row-test"; - Get get(row_str); - get.AddFamily("family-1"); - get.AddFamily("family-2"); - get.AddFamily("family-3"); - get.AddColumn("family-2", "qualifier-1"); - get.AddColumn("family-2", "qualifier-2"); - get.AddColumn("family-2", "qualifier-3"); - get.SetCacheBlocks(false); - get.SetConsistency(hbase::pb::Consistency::TIMELINE); - get.SetMaxVersions(2); - get.SetTimeRange(10000, 20000); - std::string region_name("RegionName"); - - auto req = hbase::RequestConverter::ToGetRequest(get, region_name); - auto msg = std::static_pointer_cast<GetRequest>(req->req_msg()); - - // Tests whether the PB object is properly set or not. - ASSERT_TRUE(msg->has_region()); - ASSERT_TRUE(msg->region().has_value()); - EXPECT_EQ(msg->region().value(), region_name); - - ASSERT_TRUE(msg->has_get()); - EXPECT_EQ(msg->get().row(), row_str); - EXPECT_FALSE(msg->get().cache_blocks()); - EXPECT_EQ(msg->get().consistency(), hbase::pb::Consistency::TIMELINE); - EXPECT_EQ(msg->get().max_versions(), 2); - EXPECT_EQ(msg->get().column_size(), 3); - for (int i = 0; i < msg->get().column_size(); ++i) { - EXPECT_EQ(msg->get().column(i).family(), "family-" + std::to_string(i + 1)); - for (int j = 0; j < msg->get().column(i).qualifier_size(); ++j) { - EXPECT_EQ(msg->get().column(i).qualifier(j), "qualifier-" + std::to_string(j + 1)); - } - } -} - -TEST(RequestConverter, ToScan) { - std::string start_row("start-row"); - std::string stop_row("stop-row"); - hbase::Scan scan; - scan.AddFamily("family-1"); - scan.AddFamily("family-2"); - scan.AddFamily("family-3"); - scan.AddColumn("family-2", "qualifier-1"); - scan.AddColumn("family-2", "qualifier-2"); - scan.AddColumn("family-2", "qualifier-3"); - scan.SetReversed(true); - scan.SetStartRow(start_row); - scan.SetStopRow(stop_row); - scan.SetCaching(3); - scan.SetConsistency(hbase::pb::Consistency::TIMELINE); - scan.SetCacheBlocks(true); - scan.SetAllowPartialResults(true); - scan.SetLoadColumnFamiliesOnDemand(true); - scan.SetMaxVersions(5); - scan.SetTimeRange(10000, 20000); - std::string region_name("RegionName"); - - auto req = hbase::RequestConverter::ToScanRequest(scan, region_name); - auto msg = std::static_pointer_cast<ScanRequest>(req->req_msg()); - - // Tests whether the PB object is properly set or not. - ASSERT_TRUE(msg->has_region()); - ASSERT_TRUE(msg->region().has_value()); - EXPECT_EQ(msg->region().value(), region_name); - - ASSERT_TRUE(msg->has_scan()); - EXPECT_TRUE(msg->scan().reversed()); - EXPECT_EQ(msg->scan().start_row(), start_row); - EXPECT_EQ(msg->scan().stop_row(), stop_row); - EXPECT_FALSE(msg->scan().small()); - EXPECT_EQ(msg->scan().caching(), 3); - EXPECT_EQ(msg->scan().consistency(), hbase::pb::Consistency::TIMELINE); - EXPECT_TRUE(msg->scan().cache_blocks()); - EXPECT_TRUE(msg->scan().allow_partial_results()); - EXPECT_TRUE(msg->scan().load_column_families_on_demand()); - EXPECT_EQ(msg->scan().max_versions(), 5); - EXPECT_EQ(msg->scan().max_result_size(), std::numeric_limits<uint64_t>::max()); - - EXPECT_EQ(msg->scan().column_size(), 3); - for (int i = 0; i < msg->scan().column_size(); ++i) { - EXPECT_EQ(msg->scan().column(i).family(), "family-" + std::to_string(i + 1)); - for (int j = 0; j < msg->scan().column(i).qualifier_size(); ++j) { - EXPECT_EQ(msg->scan().column(i).qualifier(j), "qualifier-" + std::to_string(j + 1)); - } - } - ASSERT_FALSE(msg->client_handles_partials()); - ASSERT_FALSE(msg->client_handles_heartbeats()); - ASSERT_FALSE(msg->track_scan_metrics()); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/request-converter.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc deleted file mode 100644 index f48f228..0000000 --- a/hbase-native-client/core/request-converter.cc +++ /dev/null @@ -1,368 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "core/request-converter.h" - -#include <folly/Conv.h> - -#include <utility> -#include "if/Client.pb.h" - -using hbase::pb::GetRequest; -using hbase::pb::MutationProto; -using hbase::pb::RegionAction; -using hbase::pb::RegionSpecifier; -using hbase::pb::RegionSpecifier_RegionSpecifierType; -using hbase::pb::ScanRequest; - -namespace hbase { - -RequestConverter::~RequestConverter() {} - -RequestConverter::RequestConverter() {} - -void RequestConverter::SetRegion(const std::string ®ion_name, - RegionSpecifier *region_specifier) { - region_specifier->set_type( - RegionSpecifier_RegionSpecifierType::RegionSpecifier_RegionSpecifierType_REGION_NAME); - region_specifier->set_value(region_name); -} - -std::unique_ptr<Request> RequestConverter::ToGetRequest(const Get &get, - const std::string ®ion_name) { - auto pb_req = Request::get(); - auto pb_msg = std::static_pointer_cast<GetRequest>(pb_req->req_msg()); - RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); - pb_msg->set_allocated_get((RequestConverter::ToGet(get)).release()); - return pb_req; -} - -std::unique_ptr<hbase::pb::Scan> RequestConverter::ToScan(const Scan &scan) { - auto pb_scan = std::make_unique<hbase::pb::Scan>(); - pb_scan->set_max_versions(scan.MaxVersions()); - pb_scan->set_cache_blocks(scan.CacheBlocks()); - pb_scan->set_reversed(scan.IsReversed()); - pb_scan->set_caching(scan.Caching()); - pb_scan->set_start_row(scan.StartRow()); - pb_scan->set_stop_row(scan.StopRow()); - pb_scan->set_consistency(scan.Consistency()); - pb_scan->set_max_result_size(scan.MaxResultSize()); - pb_scan->set_allow_partial_results(scan.AllowPartialResults()); - pb_scan->set_load_column_families_on_demand(scan.LoadColumnFamiliesOnDemand()); - - if (!scan.Timerange().IsAllTime()) { - hbase::pb::TimeRange *pb_time_range = pb_scan->mutable_time_range(); - pb_time_range->set_from(scan.Timerange().MinTimeStamp()); - pb_time_range->set_to(scan.Timerange().MaxTimeStamp()); - } - - if (scan.HasFamilies()) { - for (const auto &family : scan.FamilyMap()) { - auto column = pb_scan->add_column(); - column->set_family(family.first); - for (const auto &qualifier : family.second) { - column->add_qualifier(qualifier); - } - } - } - - if (scan.filter() != nullptr) { - pb_scan->set_allocated_filter(Filter::ToProto(*(scan.filter())).release()); - } - - return std::move(pb_scan); -} - -std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan, - const std::string ®ion_name) { - auto pb_req = Request::scan(); - auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg()); - - RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); - - pb_msg->set_allocated_scan(ToScan(scan).release()); - - SetCommonScanRequestFields(pb_msg, false); - - return pb_req; -} - -std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan, - const std::string ®ion_name, - int32_t num_rows, bool close_scanner) { - auto pb_req = Request::scan(); - auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg()); - - RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); - - pb_msg->set_allocated_scan(ToScan(scan).release()); - - pb_msg->set_number_of_rows(num_rows); - pb_msg->set_close_scanner(close_scanner); - - SetCommonScanRequestFields(pb_msg, false); - - return pb_req; -} - -std::unique_ptr<Request> RequestConverter::ToScanRequest(int64_t scanner_id, int32_t num_rows, - bool close_scanner) { - auto pb_req = Request::scan(); - auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg()); - - pb_msg->set_number_of_rows(num_rows); - pb_msg->set_close_scanner(close_scanner); - pb_msg->set_scanner_id(scanner_id); - - SetCommonScanRequestFields(pb_msg, false); - - return pb_req; -} - -std::unique_ptr<Request> RequestConverter::ToScanRequest(int64_t scanner_id, int32_t num_rows, - bool close_scanner, - int64_t next_call_seq_id, bool renew) { - auto pb_req = Request::scan(); - auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg()); - - pb_msg->set_number_of_rows(num_rows); - pb_msg->set_close_scanner(close_scanner); - pb_msg->set_scanner_id(scanner_id); - pb_msg->set_next_call_seq(next_call_seq_id); - - SetCommonScanRequestFields(pb_msg, renew); - return pb_req; -} - -void RequestConverter::SetCommonScanRequestFields(std::shared_ptr<hbase::pb::ScanRequest> pb_msg, - bool renew) { - // TODO We will change these later when we implement partial results and heartbeats, etc - pb_msg->set_client_handles_partials(false); - pb_msg->set_client_handles_heartbeats(false); - pb_msg->set_track_scan_metrics(false); - pb_msg->set_renew(renew); - // TODO: set scan limit -} - -std::unique_ptr<Request> RequestConverter::ToMultiRequest( - const ActionsByRegion &actions_by_region) { - auto pb_req = Request::multi(); - auto pb_msg = std::static_pointer_cast<hbase::pb::MultiRequest>(pb_req->req_msg()); - - for (const auto &action_by_region : actions_by_region) { - auto pb_region_action = pb_msg->add_regionaction(); - RequestConverter::SetRegion(action_by_region.first, pb_region_action->mutable_region()); - int action_num = 0; - for (const auto ®ion_action : action_by_region.second->actions()) { - auto pb_action = pb_region_action->add_action(); - auto pget = region_action->action(); - // We store only hbase::Get in hbase::Action as of now. It will be changed later on. - CHECK(pget) << "Unexpected. action can't be null."; - std::string error_msg(""); - if (typeid(*pget) == typeid(hbase::Get)) { - auto getp = dynamic_cast<hbase::Get *>(pget.get()); - pb_action->set_allocated_get(RequestConverter::ToGet(*getp).release()); - } else if (typeid(*pget) == typeid(hbase::Put)) { - auto putp = dynamic_cast<hbase::Put *>(pget.get()); - pb_action->set_allocated_mutation( - RequestConverter::ToMutation(MutationType::MutationProto_MutationType_PUT, *putp, -1) - .release()); - } else { - throw std::runtime_error("Unexpected action type encountered."); - } - pb_action->set_index(action_num); - action_num++; - } - } - return pb_req; -} - -std::unique_ptr<hbase::pb::Get> RequestConverter::ToGet(const Get &get) { - auto pb_get = std::make_unique<hbase::pb::Get>(); - pb_get->set_max_versions(get.MaxVersions()); - pb_get->set_cache_blocks(get.CacheBlocks()); - pb_get->set_consistency(get.Consistency()); - - if (!get.Timerange().IsAllTime()) { - hbase::pb::TimeRange *pb_time_range = pb_get->mutable_time_range(); - pb_time_range->set_from(get.Timerange().MinTimeStamp()); - pb_time_range->set_to(get.Timerange().MaxTimeStamp()); - } - pb_get->set_row(get.row()); - if (get.HasFamilies()) { - for (const auto &family : get.FamilyMap()) { - auto column = pb_get->add_column(); - column->set_family(family.first); - for (const auto &qualifier : family.second) { - column->add_qualifier(qualifier); - } - } - } - - if (get.filter() != nullptr) { - pb_get->set_allocated_filter(Filter::ToProto(*(get.filter())).release()); - } - return pb_get; -} - -std::unique_ptr<MutationProto> RequestConverter::ToMutation(const MutationType type, - const Mutation &mutation, - const int64_t nonce) { - auto pb_mut = std::make_unique<MutationProto>(); - pb_mut->set_row(mutation.row()); - pb_mut->set_mutate_type(type); - pb_mut->set_durability(mutation.Durability()); - pb_mut->set_timestamp(mutation.TimeStamp()); - // TODO: set attributes from the mutation (key value pairs). - - if (nonce > 0) { - pb_mut->set_nonce(nonce); - } - - for (const auto &family : mutation.FamilyMap()) { - for (const auto &cell : family.second) { - auto column = pb_mut->add_column_value(); - column->set_family(cell->Family()); - auto qual = column->add_qualifier_value(); - qual->set_qualifier(cell->Qualifier()); - qual->set_timestamp(cell->Timestamp()); - auto cell_type = cell->Type(); - if (type == pb::MutationProto_MutationType_DELETE || - (type == pb::MutationProto_MutationType_PUT && IsDelete(cell_type))) { - qual->set_delete_type(ToDeleteType(cell_type)); - } - - qual->set_value(cell->Value()); - } - } - return std::move(pb_mut); -} - -DeleteType RequestConverter::ToDeleteType(const CellType type) { - switch (type) { - case CellType::DELETE: - return pb::MutationProto_DeleteType_DELETE_ONE_VERSION; - case CellType::DELETE_COLUMN: - return pb::MutationProto_DeleteType_DELETE_MULTIPLE_VERSIONS; - case CellType::DELETE_FAMILY: - return pb::MutationProto_DeleteType_DELETE_FAMILY; - case CellType::DELETE_FAMILY_VERSION: - return pb::MutationProto_DeleteType_DELETE_FAMILY_VERSION; - default: - throw std::runtime_error("Unknown delete type: " + folly::to<std::string>(type)); - } -} - -bool RequestConverter::IsDelete(const CellType type) { - return CellType::DELETE <= type && type <= CellType::DELETE_FAMILY; -} - -std::unique_ptr<Request> RequestConverter::ToMutateRequest(const Put &put, - const std::string ®ion_name) { - auto pb_req = Request::mutate(); - auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg()); - RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); - - pb_msg->set_allocated_mutation( - ToMutation(MutationType::MutationProto_MutationType_PUT, put, -1).release()); - - return pb_req; -} - -std::unique_ptr<Request> RequestConverter::CheckAndPutToMutateRequest( - const std::string &row, const std::string &family, const std::string &qualifier, - const std::string &value, const pb::CompareType compare_op, const hbase::Put &put, - const std::string ®ion_name) { - auto pb_req = Request::mutate(); - auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg()); - - pb_msg->set_allocated_mutation( - ToMutation(MutationType::MutationProto_MutationType_PUT, put, -1).release()); - ::hbase::pb::Condition *cond = pb_msg->mutable_condition(); - cond->set_row(row); - cond->set_family(family); - cond->set_qualifier(qualifier); - cond->set_allocated_comparator( - Comparator::ToProto(*(ComparatorFactory::BinaryComparator(value).get())).release()); - cond->set_compare_type(compare_op); - - RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); - return pb_req; -} - -std::unique_ptr<Request> RequestConverter::CheckAndDeleteToMutateRequest( - const std::string &row, const std::string &family, const std::string &qualifier, - const std::string &value, const pb::CompareType compare_op, const hbase::Delete &del, - const std::string ®ion_name) { - auto pb_req = Request::mutate(); - auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg()); - - pb_msg->set_allocated_mutation( - ToMutation(MutationType::MutationProto_MutationType_DELETE, del, -1).release()); - ::hbase::pb::Condition *cond = pb_msg->mutable_condition(); - cond->set_row(row); - cond->set_family(family); - cond->set_qualifier(qualifier); - cond->set_allocated_comparator( - Comparator::ToProto(*(ComparatorFactory::BinaryComparator(value).get())).release()); - cond->set_compare_type(compare_op); - - RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); - return pb_req; -} - -std::unique_ptr<Request> RequestConverter::DeleteToMutateRequest(const Delete &del, - const std::string ®ion_name) { - auto pb_req = Request::mutate(); - auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg()); - RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); - - pb_msg->set_allocated_mutation( - ToMutation(MutationType::MutationProto_MutationType_DELETE, del, -1).release()); - - VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString(); - return pb_req; -} -std::unique_ptr<Request> RequestConverter::IncrementToMutateRequest( - const Increment &incr, const std::string ®ion_name) { - auto pb_req = Request::mutate(); - auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg()); - RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); - - pb_msg->set_allocated_mutation( - ToMutation(MutationType::MutationProto_MutationType_INCREMENT, incr, -1).release()); - - VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString(); - return pb_req; -} - -std::unique_ptr<Request> RequestConverter::AppendToMutateRequest(const Append &append, - const std::string ®ion_name) { - auto pb_req = Request::mutate(); - auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg()); - RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); - - pb_msg->set_allocated_mutation( - ToMutation(MutationType::MutationProto_MutationType_APPEND, append, -1).release()); - - VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString(); - return pb_req; -} - -} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/request-converter.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h deleted file mode 100644 index bcea278..0000000 --- a/hbase-native-client/core/request-converter.h +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#pragma once - -#include <memory> -#include <string> -#include <vector> -#include "connection/request.h" -#include "core/action.h" -#include "core/append.h" -#include "core/cell.h" -#include "core/delete.h" -#include "core/get.h" -#include "core/increment.h" -#include "core/mutation.h" -#include "core/put.h" -#include "core/region-request.h" -#include "core/scan.h" -#include "core/server-request.h" -#include "if/Client.pb.h" -#include "if/HBase.pb.h" - -using MutationType = hbase::pb::MutationProto_MutationType; -using DeleteType = hbase::pb::MutationProto_DeleteType; - -namespace hbase { - -using ActionsByRegion = ServerRequest::ActionsByRegion; -/** - * RequestConverter class - * This class converts a Client side Get, Scan, Mutate operation to corresponding PB message. - */ -class RequestConverter { - public: - ~RequestConverter(); - - /** - * @brief Returns a Request object comprising of PB GetRequest created using - * passed 'get' - * @param get - Get object used for creating GetRequest - * @param region_name - table region - */ - static std::unique_ptr<Request> ToGetRequest(const Get &get, const std::string ®ion_name); - - /** - * @brief Returns a Request object comprising of PB ScanRequest created using - * passed 'scan' - * @param scan - Scan object used for creating ScanRequest - * @param region_name - table region - */ - static std::unique_ptr<Request> ToScanRequest(const Scan &scan, const std::string ®ion_name); - - static std::unique_ptr<Request> ToScanRequest(const Scan &scan, const std::string ®ion_name, - int32_t num_rows, bool close_scanner); - - static std::unique_ptr<Request> ToScanRequest(int64_t scanner_id, int32_t num_rows, - bool close_scanner); - - static std::unique_ptr<Request> ToScanRequest(int64_t scanner_id, int32_t num_rows, - bool close_scanner, int64_t next_call_seq_id, - bool renew); - - static std::unique_ptr<Request> ToMultiRequest(const ActionsByRegion ®ion_requests); - - static std::unique_ptr<Request> DeleteToMutateRequest(const Delete &del, - const std::string ®ion_name); - - static std::unique_ptr<Request> ToMutateRequest(const Put &put, const std::string ®ion_name); - - static std::unique_ptr<Request> CheckAndPutToMutateRequest( - const std::string &row, const std::string &family, const std::string &qualifier, - const std::string &value, const pb::CompareType compare_op, const hbase::Put &put, - const std::string ®ion_name); - - static std::unique_ptr<Request> CheckAndDeleteToMutateRequest( - const std::string &row, const std::string &family, const std::string &qualifier, - const std::string &value, const pb::CompareType compare_op, const hbase::Delete &del, - const std::string ®ion_name); - - static std::unique_ptr<Request> IncrementToMutateRequest(const Increment &incr, - const std::string ®ion_name); - - static std::unique_ptr<pb::MutationProto> ToMutation(const MutationType type, - const Mutation &mutation, - const int64_t nonce); - - static std::unique_ptr<Request> AppendToMutateRequest(const Append &append, - const std::string ®ion_name); - - private: - // Constructor not required. We have all static methods to create PB requests. - RequestConverter(); - - /** - * @brief fills region_specifier with region values. - * @param region_name - table region - * @param region_specifier - RegionSpecifier to be filled and passed in PB - * Request. - */ - static void SetRegion(const std::string ®ion_name, pb::RegionSpecifier *region_specifier); - static std::unique_ptr<hbase::pb::Get> ToGet(const Get &get); - static std::unique_ptr<hbase::pb::Scan> ToScan(const Scan &scan); - static DeleteType ToDeleteType(const CellType type); - static bool IsDelete(const CellType type); - static void SetCommonScanRequestFields(std::shared_ptr<hbase::pb::ScanRequest>, bool renew); -}; - -} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/response-converter.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc deleted file mode 100644 index 960c487..0000000 --- a/hbase-native-client/core/response-converter.cc +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "core/response-converter.h" -#include <glog/logging.h> -#include <stdexcept> -#include <string> -#include <utility> -#include <vector> -#include "core/cell.h" -#include "core/multi-response.h" -#include "exceptions/exception.h" - -using hbase::pb::GetResponse; -using hbase::pb::MutateResponse; -using hbase::pb::ScanResponse; -using hbase::pb::RegionLoadStats; - -namespace hbase { - -ResponseConverter::ResponseConverter() {} - -ResponseConverter::~ResponseConverter() {} - -// impl note: we are returning shared_ptr's instead of unique_ptr's because these -// go inside folly::Future's, making the move semantics extremely tricky. -std::shared_ptr<Result> ResponseConverter::FromGetResponse(const Response& resp) { - auto get_resp = std::static_pointer_cast<GetResponse>(resp.resp_msg()); - VLOG(3) << "FromGetResponse:" << get_resp->ShortDebugString(); - return ToResult(get_resp->result(), resp.cell_scanner()); -} - -std::shared_ptr<Result> ResponseConverter::FromMutateResponse(const Response& resp) { - auto mutate_resp = std::static_pointer_cast<MutateResponse>(resp.resp_msg()); - hbase::pb::Result result = mutate_resp->result(); - return ToResult(mutate_resp->result(), resp.cell_scanner()); -} - -bool ResponseConverter::BoolFromMutateResponse(const Response& resp) { - auto mutate_resp = std::static_pointer_cast<MutateResponse>(resp.resp_msg()); - return mutate_resp->processed(); -} - -std::shared_ptr<Result> ResponseConverter::ToResult( - const hbase::pb::Result& result, const std::shared_ptr<CellScanner> cell_scanner) { - std::vector<std::shared_ptr<Cell>> vcells; - for (auto cell : result.cell()) { - std::shared_ptr<Cell> pcell = - std::make_shared<Cell>(cell.row(), cell.family(), cell.qualifier(), cell.timestamp(), - cell.value(), static_cast<hbase::CellType>(cell.cell_type())); - vcells.push_back(pcell); - } - - // iterate over the cells coming from rpc codec - if (cell_scanner != nullptr) { - int cells_read = 0; - while (cells_read != result.associated_cell_count()) { - if (cell_scanner->Advance()) { - vcells.push_back(cell_scanner->Current()); - cells_read += 1; - } else { - LOG(ERROR) << "CellScanner::Advance() returned false unexpectedly. Cells Read:- " - << cells_read << "; Expected Cell Count:- " << result.associated_cell_count(); - std::runtime_error("CellScanner::Advance() returned false unexpectedly"); - } - } - } - return std::make_shared<Result>(vcells, result.exists(), result.stale(), result.partial()); -} - -std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const Response& resp) { - auto scan_resp = std::static_pointer_cast<ScanResponse>(resp.resp_msg()); - return FromScanResponse(scan_resp, resp.cell_scanner()); -} - -std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse( - const std::shared_ptr<ScanResponse> scan_resp, std::shared_ptr<CellScanner> cell_scanner) { - VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString() - << " cell_scanner:" << (cell_scanner != nullptr); - int num_results = - cell_scanner != nullptr ? scan_resp->cells_per_result_size() : scan_resp->results_size(); - - std::vector<std::shared_ptr<Result>> results{static_cast<size_t>(num_results)}; - for (int i = 0; i < num_results; i++) { - if (cell_scanner != nullptr) { - // Cells are out in cellblocks. Group them up again as Results. How many to read at a - // time will be found in getCellsLength -- length here is how many Cells in the i'th Result - int num_cells = scan_resp->cells_per_result(i); - - std::vector<std::shared_ptr<Cell>> vcells; - for (int j = 0; j < num_cells; j++) { - if (!cell_scanner->Advance()) { - std::string msg = "Results sent from server=" + std::to_string(num_results) + - ". But only got " + std::to_string(i) + - " results completely at client. Resetting the scanner to scan again."; - LOG(ERROR) << msg; - throw std::runtime_error(msg); - } - vcells.push_back(cell_scanner->Current()); - } - // TODO: handle partial results per Result by checking partial_flag_per_result - results[i] = std::make_shared<Result>(vcells, false, scan_resp->stale(), false); - } else { - results[i] = ToResult(scan_resp->results(i), cell_scanner); - } - } - - return results; -} - -std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults( - std::shared_ptr<Request> req, const Response& resp, - const ServerRequest::ActionsByRegion& actions_by_region) { - auto multi_req = std::static_pointer_cast<hbase::pb::MultiRequest>(req->req_msg()); - auto multi_resp = std::static_pointer_cast<hbase::pb::MultiResponse>(resp.resp_msg()); - VLOG(3) << "GetResults:" << multi_resp->ShortDebugString(); - int req_region_action_count = multi_req->regionaction_size(); - int res_region_action_count = multi_resp->regionactionresult_size(); - if (req_region_action_count != res_region_action_count) { - throw std::runtime_error("Request mutation count=" + std::to_string(req_region_action_count) + - " does not match response mutation result count=" + - std::to_string(res_region_action_count)); - } - auto multi_response = std::make_unique<hbase::MultiResponse>(); - for (int32_t num = 0; num < res_region_action_count; num++) { - hbase::pb::RegionAction actions = multi_req->regionaction(num); - hbase::pb::RegionActionResult action_result = multi_resp->regionactionresult(num); - hbase::pb::RegionSpecifier rs = actions.region(); - if (rs.has_type() && rs.type() != hbase::pb::RegionSpecifier::REGION_NAME) { - throw std::runtime_error("We support only encoded types for protobuf multi response."); - } - - auto region_name = rs.value(); - if (action_result.has_exception()) { - auto ew = ResponseConverter::GetRemoteException(action_result.exception()); - VLOG(8) << "Store Remote Region Exception:- " << ew->what().toStdString() << "; Region[" - << region_name << "];"; - multi_response->AddRegionException(region_name, ew); - continue; - } - - if (actions.action_size() != action_result.resultorexception_size()) { - throw std::runtime_error("actions.action_size=" + std::to_string(actions.action_size()) + - ", action_result.resultorexception_size=" + - std::to_string(action_result.resultorexception_size()) + - " for region " + actions.region().value()); - } - - auto multi_actions = actions_by_region.at(region_name)->actions(); - uint64_t multi_actions_num = 0; - for (hbase::pb::ResultOrException roe : action_result.resultorexception()) { - std::shared_ptr<Result> result; - std::shared_ptr<folly::exception_wrapper> ew; - if (roe.has_exception()) { - auto ew = ResponseConverter::GetRemoteException(roe.exception()); - VLOG(8) << "Store Remote Region Exception:- " << ew->what().toStdString() << "; Region[" - << region_name << "];"; - multi_response->AddRegionException(region_name, ew); - } else if (roe.has_result()) { - result = ToResult(roe.result(), resp.cell_scanner()); - } else if (roe.has_service_result()) { - // TODO Not processing Coprocessor Service Result; - } else { - // Sometimes, the response is just "it was processed". Generally, this occurs for things - // like mutateRows where either we get back 'processed' (or not) and optionally some - // statistics about the regions we touched. - std::vector<std::shared_ptr<Cell>> empty_cells; - result = std::make_shared<Result>(empty_cells, multi_resp->processed() ? true : false, - false, false); - } - // We add the original index of the multi-action so that when populating the response back we - // do it as per the action index - multi_response->AddRegionResult( - region_name, multi_actions[multi_actions_num]->original_index(), std::move(result), ew); - multi_actions_num++; - } - } - - if (multi_resp->has_regionstatistics()) { - hbase::pb::MultiRegionLoadStats stats = multi_resp->regionstatistics(); - for (int i = 0; i < stats.region_size(); i++) { - multi_response->AddStatistic(stats.region(i).value(), - std::make_shared<RegionLoadStats>(stats.stat(i))); - } - } - return multi_response; -} - -std::shared_ptr<folly::exception_wrapper> ResponseConverter::GetRemoteException( - const hbase::pb::NameBytesPair& exc_resp) { - std::string what; - std::string exception_class_name = exc_resp.has_name() ? exc_resp.name() : ""; - std::string stack_trace = exc_resp.has_value() ? exc_resp.value() : ""; - - what.append(exception_class_name).append(stack_trace); - auto remote_exception = std::make_unique<RemoteException>(what); - remote_exception->set_exception_class_name(exception_class_name) - ->set_stack_trace(stack_trace) - ->set_hostname("") - ->set_port(0); - - return std::make_shared<folly::exception_wrapper>( - folly::make_exception_wrapper<RemoteException>(*remote_exception)); -} -} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/response-converter.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h deleted file mode 100644 index edd4165..0000000 --- a/hbase-native-client/core/response-converter.h +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#pragma once - -#include <memory> -#include <vector> -#include "connection/request.h" -#include "connection/response.h" -#include "core/multi-response.h" -#include "core/result.h" -#include "core/server-request.h" -#include "if/Client.pb.h" -#include "serde/cell-scanner.h" - -namespace hbase { - -/** - * ResponseConverter class - * This class converts a PB Response to corresponding Result or other objects. - */ -class ResponseConverter { - public: - ~ResponseConverter(); - - static std::shared_ptr<Result> ToResult(const hbase::pb::Result& result, - const std::shared_ptr<CellScanner> cell_scanner); - - /** - * @brief Returns a Result object created by PB Message in passed Response object. - * @param resp - Response object having the PB message. - */ - static std::shared_ptr<hbase::Result> FromGetResponse(const Response& resp); - - static std::shared_ptr<hbase::Result> FromMutateResponse(const Response& resp); - - static bool BoolFromMutateResponse(const Response& resp); - - static std::vector<std::shared_ptr<Result>> FromScanResponse(const Response& resp); - - static std::vector<std::shared_ptr<Result>> FromScanResponse( - const std::shared_ptr<pb::ScanResponse> resp, std::shared_ptr<CellScanner> cell_scanner); - - static std::unique_ptr<hbase::MultiResponse> GetResults( - std::shared_ptr<Request> req, const Response& resp, - const ServerRequest::ActionsByRegion& actions_by_region); - - private: - // Constructor not required. We have all static methods to extract response from PB messages. - ResponseConverter(); - static std::shared_ptr<folly::exception_wrapper> GetRemoteException( - const hbase::pb::NameBytesPair& exc_resp); -}; - -} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/result-scanner.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/result-scanner.h b/hbase-native-client/core/result-scanner.h deleted file mode 100644 index 9460521..0000000 --- a/hbase-native-client/core/result-scanner.h +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#pragma once - -#include <functional> -#include <iterator> -#include <map> -#include <memory> -#include <string> -#include <vector> - -#include "core/cell.h" -#include "core/result.h" - -namespace hbase { - -/** - * Interface for client-side scanning. Use Table to obtain instances. - */ -class ResultScanner { - // TODO: should we implement forward iterators? - - public: - virtual ~ResultScanner() {} - - virtual void Close() = 0; - - virtual std::shared_ptr<Result> Next() = 0; -}; -} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/result-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/result-test.cc b/hbase-native-client/core/result-test.cc deleted file mode 100644 index dd60aeb..0000000 --- a/hbase-native-client/core/result-test.cc +++ /dev/null @@ -1,322 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <glog/logging.h> -#include <gtest/gtest.h> -#include <limits> -#include <memory> -#include <string> -#include <vector> - -#include "core/cell.h" -#include "core/result.h" - -using hbase::Cell; -using hbase::CellType; -using hbase::Result; -using std::experimental::nullopt; - -void PopulateCells(std::vector<std::shared_ptr<Cell> > &cells) { - // Populate some Results - // We assume that for a single Cell, the corresponding row, families and - // qualifiers are present. - // We have also considered different versions in the test for the same row. - std::string row = "row"; - for (int i = 0; i < 10; i++) { - std::string family = "family-" + std::to_string(i); - std::string column = "column-" + std::to_string(i); - std::string value = "value-" + std::to_string(i); - - switch (i) { - case 5: { - cells.push_back( - std::make_shared<Cell>(row, family, column, 1482113040506, "value-5", CellType::PUT)); - cells.push_back( - std::make_shared<Cell>(row, family, column, 1482111803856, "value-X", CellType::PUT)); - break; - } - case 8: { - cells.push_back( - std::make_shared<Cell>(row, family, column, 1482113040506, "value-8", CellType::PUT)); - cells.push_back( - std::make_shared<Cell>(row, family, column, 1482111803856, "value-X", CellType::PUT)); - cells.push_back( - std::make_shared<Cell>(row, family, column, 1482110969958, "value-Y", CellType::PUT)); - break; - } - case 9: { - cells.push_back( - std::make_shared<Cell>(row, family, column, 1482113040506, "value-9", CellType::PUT)); - cells.push_back( - std::make_shared<Cell>(row, family, column, 1482111803856, "value-X", CellType::PUT)); - cells.push_back( - std::make_shared<Cell>(row, family, column, 1482110969958, "value-Y", CellType::PUT)); - cells.push_back( - std::make_shared<Cell>(row, family, column, 1482110876075, "value-Z", CellType::PUT)); - break; - } - default: { - cells.push_back(std::make_shared<Cell>( - row, family, column, std::numeric_limits<int64_t>::max(), value, CellType::PUT)); - } - } - } - return; -} - -TEST(Result, EmptyResult) { - std::vector<std::shared_ptr<Cell> > cells; - Result result(cells, true, false, false); - EXPECT_EQ(true, result.IsEmpty()); - EXPECT_EQ(0, result.Size()); -} - -TEST(Result, FilledResult) { - std::vector<std::shared_ptr<Cell> > cells; - PopulateCells(cells); - - Result result(cells, true, false, false); - - EXPECT_EQ(false, result.IsEmpty()); - EXPECT_EQ(16, result.Size()); - - // Get Latest Cell for the given family and qualifier. - auto latest_cell(result.ColumnLatestCell("family", "column")); - // Nothing of the above family/qualifier combo is present so it should be - // nullptr - ASSERT_FALSE(latest_cell.get()); - - // Try to get the latest cell for the given family and qualifier. - latest_cell = result.ColumnLatestCell("family-4", "column-4"); - // Now shouldn't be a nullptr - ASSERT_TRUE(latest_cell.get()); - // And Value must match too - EXPECT_EQ("value-4", latest_cell->Value()); - - // Value will be nullptr as no such family and qualifier is present - ASSERT_FALSE(result.Value("family-4", "qualifier")); - // Value will be present as family and qualifier is present - ASSERT_TRUE(result.Value("family-4", "column-4") != nullopt); - // Value should be present and match. - EXPECT_EQ(latest_cell->Value(), (*result.ColumnLatestCell("family-4", "column-4")).Value()); - EXPECT_EQ("value-5", (*result.ColumnLatestCell("family-5", "column-5")).Value()); - EXPECT_EQ("value-8", (*result.ColumnLatestCell("family-8", "column-8")).Value()); - EXPECT_EQ("value-7", *result.Value("family-7", "column-7")); - - // Get cells for the given family and qualifier - auto column_cells = result.ColumnCells("family", "column"); - // Size should be 0 - EXPECT_EQ(0, column_cells.size()); - - // Size shouldn't be 0 and Row() and Value() must match - column_cells = result.ColumnCells("family-0", "column-0"); - EXPECT_EQ(1, column_cells.size()); - EXPECT_EQ("row", column_cells[0]->Row()); - EXPECT_EQ("row", result.Row()); - - // Size shouldn't be 0 and Row() and Value() must match - column_cells = result.ColumnCells("family-5", "column-5"); - EXPECT_EQ(2, column_cells.size()); - EXPECT_EQ("row", column_cells[0]->Row()); - EXPECT_EQ("row", column_cells[1]->Row()); - EXPECT_EQ("value-5", column_cells[0]->Value()); - EXPECT_EQ("value-X", column_cells[1]->Value()); - EXPECT_EQ("row", result.Row()); - - // Size shouldn't be 0 and Row() and Value() must match - column_cells = result.ColumnCells("family-8", "column-8"); - EXPECT_EQ(3, column_cells.size()); - EXPECT_EQ("row", column_cells[0]->Row()); - EXPECT_EQ("row", column_cells[1]->Row()); - EXPECT_EQ("row", column_cells[2]->Row()); - EXPECT_EQ("value-8", column_cells[0]->Value()); - EXPECT_EQ("value-X", column_cells[1]->Value()); - EXPECT_EQ("value-Y", column_cells[2]->Value()); - EXPECT_EQ("row", result.Row()); - - // Size shouldn't be 0 and Row() and Value() must match - column_cells = result.ColumnCells("family-9", "column-9"); - EXPECT_EQ(4, column_cells.size()); - EXPECT_EQ("row", column_cells[0]->Row()); - EXPECT_EQ("row", column_cells[1]->Row()); - EXPECT_EQ("row", column_cells[2]->Row()); - EXPECT_EQ("row", column_cells[3]->Row()); - EXPECT_EQ("value-9", column_cells[0]->Value()); - EXPECT_EQ("value-X", column_cells[1]->Value()); - EXPECT_EQ("value-Y", column_cells[2]->Value()); - EXPECT_EQ("value-Z", column_cells[3]->Value()); - EXPECT_EQ("row", result.Row()); - - // Test all the Cell values - const auto &result_cells = result.Cells(); - int i = 0, j = 0; - for (const auto &cell : result_cells) { - std::string row = "row"; - std::string family = "family-" + std::to_string(i); - std::string column = "column-" + std::to_string(i); - std::string value = "value-" + std::to_string(i); - switch (j) { - case 6: - case 10: - case 13: { - EXPECT_EQ("value-X", cell->Value()); - ++j; - continue; - } - case 11: - case 14: { - EXPECT_EQ("value-Y", cell->Value()); - ++j; - continue; - } - case 15: { - EXPECT_EQ("value-Z", cell->Value()); - ++j; - continue; - } - } - EXPECT_EQ(row, cell->Row()); - EXPECT_EQ(family, cell->Family()); - EXPECT_EQ(column, cell->Qualifier()); - EXPECT_EQ(value, cell->Value()); - ++i; - ++j; - } - - auto result_map_tmp = result.Map(); - result_map_tmp["testf"]["testq"][1] = "value"; - EXPECT_EQ(11, result_map_tmp.size()); - - auto result_map = result.Map(); - EXPECT_EQ(10, result_map.size()); - - i = 0; - for (auto family_map : result_map) { - std::string family = "family-" + std::to_string(i); - std::string qualifier = "column-" + std::to_string(i); - std::string value = "value-" + std::to_string(i); - EXPECT_EQ(family, family_map.first); - for (auto qualifier_map : family_map.second) { - EXPECT_EQ(qualifier, qualifier_map.first); - j = 0; - for (auto version_map : qualifier_map.second) { - switch (i) { - case 5: { - if (1 == j) { - EXPECT_EQ(1482111803856, version_map.first); - EXPECT_EQ("value-X", version_map.second); - } else if (0 == j) { - EXPECT_EQ(1482113040506, version_map.first); - EXPECT_EQ("value-5", version_map.second); - } - break; - } - case 8: { - if (2 == j) { - EXPECT_EQ(1482110969958, version_map.first); - EXPECT_EQ("value-Y", version_map.second); - } else if (1 == j) { - EXPECT_EQ(1482111803856, version_map.first); - EXPECT_EQ("value-X", version_map.second); - } else if (0 == j) { - EXPECT_EQ(1482113040506, version_map.first); - EXPECT_EQ("value-8", version_map.second); - } - break; - } - case 9: { - if (3 == j) { - EXPECT_EQ(1482110876075, version_map.first); - EXPECT_EQ("value-Z", version_map.second); - } else if (2 == j) { - EXPECT_EQ(1482110969958, version_map.first); - EXPECT_EQ("value-Y", version_map.second); - } else if (1 == j) { - EXPECT_EQ(1482111803856, version_map.first); - EXPECT_EQ("value-X", version_map.second); - } else if (0 == j) { - EXPECT_EQ(1482113040506, version_map.first); - EXPECT_EQ("value-9", version_map.second); - } - break; - } - default: { - EXPECT_EQ(std::numeric_limits<int64_t>::max(), version_map.first); - EXPECT_EQ(value, version_map.second); - } - } - ++j; - } - } - ++i; - } - - auto family_map = result.FamilyMap("family-0"); - EXPECT_EQ(1, family_map.size()); - i = 0; - for (auto qual_val_map : family_map) { - EXPECT_EQ("column-0", qual_val_map.first); - EXPECT_EQ("value-0", qual_val_map.second); - } - - family_map = result.FamilyMap("family-1"); - EXPECT_EQ(1, family_map.size()); - i = 0; - for (auto qual_val_map : family_map) { - EXPECT_EQ("column-1", qual_val_map.first); - EXPECT_EQ("value-1", qual_val_map.second); - } - - family_map = result.FamilyMap("family-5"); - EXPECT_EQ(1, family_map.size()); - i = 0; - for (auto qual_val_map : family_map) { - EXPECT_EQ("column-5", qual_val_map.first); - EXPECT_EQ("value-5", qual_val_map.second); - } - - family_map = result.FamilyMap("family-9"); - EXPECT_EQ(1, family_map.size()); - i = 0; - for (auto qual_val_map : family_map) { - EXPECT_EQ("column-9", qual_val_map.first); - EXPECT_EQ("value-9", qual_val_map.second); - } -} - -TEST(Result, ResultEstimatedSize) { - CellType cell_type = CellType::PUT; - int64_t timestamp = std::numeric_limits<int64_t>::max(); - std::vector<std::shared_ptr<Cell> > cells; - Result empty(cells, true, false, false); - - EXPECT_EQ(empty.EstimatedSize(), sizeof(Result)); - - cells.push_back(std::make_shared<Cell>("a", "a", "", timestamp, "", cell_type)); - Result result1(cells, true, false, false); - EXPECT_TRUE(result1.EstimatedSize() > empty.EstimatedSize()); - - cells.push_back(std::make_shared<Cell>("a", "a", "", timestamp, "", cell_type)); - Result result2(cells, true, false, false); - EXPECT_TRUE(result2.EstimatedSize() > result1.EstimatedSize()); - - LOG(INFO) << empty.EstimatedSize(); - LOG(INFO) << result1.EstimatedSize(); - LOG(INFO) << result2.EstimatedSize(); -}