Repository: hbase Updated Branches: refs/heads/HBASE-14850 58ec20cac -> 8335e1529
HBASE-18174 Implement Table#checkAndPut() Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8335e152 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8335e152 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8335e152 Branch: refs/heads/HBASE-14850 Commit: 8335e1529c7626279f58fb283975b5b45c430335 Parents: 58ec20c Author: tedyu <yuzhih...@gmail.com> Authored: Fri Jun 9 17:19:57 2017 -0700 Committer: tedyu <yuzhih...@gmail.com> Committed: Fri Jun 9 17:19:57 2017 -0700 ---------------------------------------------------------------------- .../core/async-rpc-retrying-caller.cc | 1 + hbase-native-client/core/client-test.cc | 29 ++++++++++++++++++++ hbase-native-client/core/raw-async-table.cc | 26 ++++++++++++++++++ hbase-native-client/core/raw-async-table.h | 5 ++++ hbase-native-client/core/request-converter.cc | 21 ++++++++++++++ hbase-native-client/core/request-converter.h | 5 ++++ hbase-native-client/core/response-converter.cc | 5 ++++ hbase-native-client/core/response-converter.h | 2 ++ hbase-native-client/core/table.cc | 7 +++++ hbase-native-client/core/table.h | 17 ++++++++++++ 10 files changed, 118 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/async-rpc-retrying-caller.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc index b9d01bb..0302ad3 100644 --- a/hbase-native-client/core/async-rpc-retrying-caller.cc +++ b/hbase-native-client/core/async-rpc-retrying-caller.cc @@ -216,4 +216,5 @@ class OpenScannerResponse; template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<hbase::Result>>; template class AsyncSingleRequestRpcRetryingCaller<folly::Unit>; template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<OpenScannerResponse>>; +template class AsyncSingleRequestRpcRetryingCaller<bool>; } /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/client-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc index 4972e05..ed413e8 100644 --- a/hbase-native-client/core/client-test.cc +++ b/hbase-native-client/core/client-test.cc @@ -247,6 +247,35 @@ TEST_F(ClientTest, Increment) { EXPECT_EQ(incr1 + incr2, hbase::BytesUtil::ToInt64(*(result->Value("d", "1")))); } +TEST_F(ClientTest, CheckAndPut) { + // Using TestUtil to populate test data + ClientTest::test_util->CreateTable("check", "d"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to<hbase::pb::TableName>("check"); + auto row = "test1"; + + // Create a client + hbase::Client client(*ClientTest::test_util->conf()); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + // Perform Puts + table->Put(Put{row}.AddColumn("d", "1", "value1")); + auto result = table->CheckAndPut(row, "d", "1", "value1", Put{row}.AddColumn("d", "1", "value2")); + ASSERT_TRUE(result) << "CheckAndPut didn't replace value"; + + result = table->CheckAndPut(row, "d", "1", "value1", Put{row}.AddColumn("d", "1", "value3")); + + // Perform the Get + hbase::Get get(row); + auto result1 = table->Get(get); + EXPECT_EQ("value2", *(result1->Value("d", "1"))); + ASSERT_FALSE(result) << "CheckAndPut shouldn't replace value"; +} + TEST_F(ClientTest, PutGet) { // Using TestUtil to populate test data ClientTest::test_util->CreateTable("t", "d"); http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/raw-async-table.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc index 413dc6c..46d9dfd 100644 --- a/hbase-native-client/core/raw-async-table.cc +++ b/hbase-native-client/core/raw-async-table.cc @@ -111,6 +111,32 @@ folly::Future<folly::Unit> RawAsyncTable::Put(const hbase::Put& put) { return caller->Call().then([caller](const auto r) { return r; }); } +folly::Future<bool> RawAsyncTable::CheckAndPut(const std::string& row, const std::string& family, + const std::string& qualifier, + const std::string& value, const hbase::Put& put, + const pb::CompareType& compare_op) { + auto caller = + CreateCallerBuilder<bool>(row, connection_conf_->write_rpc_timeout()) + ->action([=, &put](std::shared_ptr<hbase::HBaseRpcController> controller, + std::shared_ptr<hbase::RegionLocation> loc, + std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<bool> { + return Call<hbase::Put, hbase::Request, hbase::Response, bool>( + rpc_client, controller, loc, put, + // request conversion + [=, &put](const hbase::Put& put, + const std::string& region_name) -> std::unique_ptr<Request> { + auto checkReq = RequestConverter::CheckAndPutToMutateRequest( + row, family, qualifier, value, compare_op, put, region_name); + return checkReq; + }, + // response conversion + &ResponseConverter::BoolFromMutateResponse); + }) + ->Build(); + + return caller->Call().then([caller](const auto r) { return r; }); +} + folly::Future<folly::Unit> RawAsyncTable::Delete(const hbase::Delete& del) { auto caller = CreateCallerBuilder<folly::Unit>(del.row(), connection_conf_->write_rpc_timeout()) http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/raw-async-table.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h index 6088d1b..068f230 100644 --- a/hbase-native-client/core/raw-async-table.h +++ b/hbase-native-client/core/raw-async-table.h @@ -67,6 +67,11 @@ class RawAsyncTable { folly::Future<folly::Unit> Put(const hbase::Put& put); + folly::Future<bool> CheckAndPut(const std::string& row, const std::string& family, + const std::string& qualifier, const std::string& value, + const hbase::Put& put, + const pb::CompareType& compare_op = pb::CompareType::EQUAL); + void Scan(const hbase::Scan& scan, std::shared_ptr<RawScanResultConsumer> consumer); void Close() {} http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/request-converter.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc index 042d5b1..0cd9c7c 100644 --- a/hbase-native-client/core/request-converter.cc +++ b/hbase-native-client/core/request-converter.cc @@ -276,6 +276,27 @@ std::unique_ptr<Request> RequestConverter::ToMutateRequest(const Put &put, return pb_req; } +std::unique_ptr<Request> RequestConverter::CheckAndPutToMutateRequest( + const std::string &row, const std::string &family, const std::string &qualifier, + const std::string &value, const pb::CompareType compare_op, const hbase::Put &put, + const std::string ®ion_name) { + auto pb_req = Request::mutate(); + auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg()); + + pb_msg->set_allocated_mutation( + ToMutation(MutationType::MutationProto_MutationType_PUT, put, -1).release()); + ::hbase::pb::Condition *cond = pb_msg->mutable_condition(); + cond->set_row(row); + cond->set_family(family); + cond->set_qualifier(qualifier); + cond->set_allocated_comparator( + Comparator::ToProto(*(ComparatorFactory::BinaryComparator(value).get())).release()); + cond->set_compare_type(compare_op); + + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + return pb_req; +} + std::unique_ptr<Request> RequestConverter::DeleteToMutateRequest(const Delete &del, const std::string ®ion_name) { auto pb_req = Request::mutate(); http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/request-converter.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h index 51440d8..0755f42 100644 --- a/hbase-native-client/core/request-converter.h +++ b/hbase-native-client/core/request-converter.h @@ -84,6 +84,11 @@ class RequestConverter { static std::unique_ptr<Request> ToMutateRequest(const Put &put, const std::string ®ion_name); + static std::unique_ptr<Request> CheckAndPutToMutateRequest( + const std::string &row, const std::string &family, const std::string &qualifier, + const std::string &value, const pb::CompareType compare_op, const hbase::Put &put, + const std::string ®ion_name); + static std::unique_ptr<Request> IncrementToMutateRequest(const Increment &incr, const std::string ®ion_name); http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/response-converter.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc index d2d719b..9bc4892 100644 --- a/hbase-native-client/core/response-converter.cc +++ b/hbase-native-client/core/response-converter.cc @@ -53,6 +53,11 @@ std::shared_ptr<Result> ResponseConverter::FromMutateResponse(const Response& re return ToResult(mutate_resp->result(), resp.cell_scanner()); } +bool ResponseConverter::BoolFromMutateResponse(const Response& resp) { + auto mutate_resp = std::static_pointer_cast<MutateResponse>(resp.resp_msg()); + return mutate_resp->processed(); +} + std::shared_ptr<Result> ResponseConverter::ToResult( const hbase::pb::Result& result, const std::shared_ptr<CellScanner> cell_scanner) { std::vector<std::shared_ptr<Cell>> vcells; http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/response-converter.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h index b518d1c..2f8f279 100644 --- a/hbase-native-client/core/response-converter.h +++ b/hbase-native-client/core/response-converter.h @@ -49,6 +49,8 @@ class ResponseConverter { static std::shared_ptr<hbase::Result> FromMutateResponse(const Response& resp); + static bool BoolFromMutateResponse(const Response& resp); + static std::vector<std::shared_ptr<Result>> FromScanResponse(const Response& resp); static std::vector<std::shared_ptr<Result>> FromScanResponse( http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/table.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc index bf22169..f20078d 100644 --- a/hbase-native-client/core/table.cc +++ b/hbase-native-client/core/table.cc @@ -73,6 +73,13 @@ void Table::Put(const hbase::Put &put) { future.get(operation_timeout()); } +bool Table::CheckAndPut(const std::string &row, const std::string &family, + const std::string &qualifier, const std::string &value, + const hbase::Put &put, const pb::CompareType &compare_op) { + auto context = async_table_->CheckAndPut(row, family, qualifier, value, put, compare_op); + return context.get(operation_timeout()); +} + void Table::Delete(const hbase::Delete &del) { auto future = async_table_->Delete(del); future.get(operation_timeout()); http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/table.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h index 781c6f1..3c486af 100644 --- a/hbase-native-client/core/table.h +++ b/hbase-native-client/core/table.h @@ -63,6 +63,23 @@ class Table { void Put(const hbase::Put &put); /** + * Atomically checks if a row/family/qualifier value matches the expected + * value. If it does, it adds the put. If the passed value is null, the check + * is for the lack of column (ie: non-existance) + * + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param value the expected value + * @param put data to put if check succeeds + * @param compare_op comparison operator to use + * @throws IOException e + * @return true if the new put was executed, false otherwise + */ + bool CheckAndPut(const std::string &row, const std::string &family, const std::string &qualifier, + const std::string &value, const hbase::Put &put, + const pb::CompareType &compare_op = pb::CompareType::EQUAL); + /** * @brief - Deletes some data in the table. * @param - del Delete object to perform HBase Delete operation. */