http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/raw-async-table.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/raw-async-table.cc 
b/hbase-native-client/src/hbase/client/raw-async-table.cc
new file mode 100644
index 0000000..96361e4
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/raw-async-table.cc
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <utility>
+
+#include "hbase/client/async-batch-rpc-retrying-caller.h"
+#include "hbase/client/raw-async-table.h"
+#include "hbase/client/request-converter.h"
+#include "hbase/client/response-converter.h"
+
+using hbase::security::User;
+
+namespace hbase {
+
+template <typename RESP>
+std::shared_ptr<SingleRequestCallerBuilder<RESP>> 
RawAsyncTable::CreateCallerBuilder(
+    std::string row, std::chrono::nanoseconds rpc_timeout) {
+  return connection_->caller_factory()
+      ->Single<RESP>()
+      ->table(table_name_)
+      ->row(row)
+      ->rpc_timeout(rpc_timeout)
+      ->operation_timeout(connection_conf_->operation_timeout())
+      ->pause(connection_conf_->pause())
+      ->max_retries(connection_conf_->max_retries())
+      ->start_log_errors_count(connection_conf_->start_log_errors_count());
+}
+
+template <typename REQ, typename PREQ, typename PRESP, typename RESP>
+folly::Future<RESP> RawAsyncTable::Call(
+    std::shared_ptr<RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> 
controller,
+    std::shared_ptr<RegionLocation> loc, const REQ& req,
+    const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter,
+    const RespConverter<RESP, PRESP> resp_converter) {
+  std::unique_ptr<PREQ> preq = req_converter(req, loc->region_name());
+
+  // No need to make take a callable argument, it is always the same
+  return rpc_client
+      ->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), 
std::move(preq),
+                  User::defaultUser(), "ClientService")
+      .then(
+          [resp_converter](const std::unique_ptr<PRESP>& presp) { return 
resp_converter(*presp); });
+}
+
+folly::Future<std::shared_ptr<Result>> RawAsyncTable::Get(const hbase::Get& 
get) {
+  auto caller =
+      CreateCallerBuilder<std::shared_ptr<Result>>(get.row(), 
connection_conf_->read_rpc_timeout())
+          ->action([=, &get](std::shared_ptr<hbase::HBaseRpcController> 
controller,
+                             std::shared_ptr<hbase::RegionLocation> loc,
+                             std::shared_ptr<hbase::RpcClient> rpc_client)
+                       -> folly::Future<std::shared_ptr<hbase::Result>> {
+                         return Call<hbase::Get, hbase::Request, 
hbase::Response,
+                                     std::shared_ptr<hbase::Result>>(
+                             rpc_client, controller, loc, get,
+                             &hbase::RequestConverter::ToGetRequest,
+                             &hbase::ResponseConverter::FromGetResponse);
+                       })
+          ->Build();
+
+  // Return the Future we obtain from the call(). However, we do not want the 
Caller to go out of
+  // context and get deallocated since the caller injects a lot of closures 
which capture [this, &]
+  // which is use-after-free. We are just passing an identity closure 
capturing caller by value to
+  // ensure  that the lifecycle of the Caller object is longer than the retry 
lambdas.
+  return caller->Call().then([caller](const auto r) { return r; });
+}
+folly::Future<std::shared_ptr<Result>> RawAsyncTable::Increment(const 
hbase::Increment& incr) {
+  auto caller =
+      CreateCallerBuilder<std::shared_ptr<Result>>(incr.row(),
+                                                   
connection_conf_->write_rpc_timeout())
+          ->action([=, &incr](std::shared_ptr<hbase::HBaseRpcController> 
controller,
+                              std::shared_ptr<hbase::RegionLocation> loc,
+                              std::shared_ptr<hbase::RpcClient>
+                                  rpc_client) -> 
folly::Future<std::shared_ptr<Result>> {
+            return Call<hbase::Increment, hbase::Request, hbase::Response, 
std::shared_ptr<Result>>(
+                rpc_client, controller, loc, incr,
+                &hbase::RequestConverter::IncrementToMutateRequest,
+                &hbase::ResponseConverter::FromMutateResponse);
+          })
+          ->Build();
+
+  return caller->Call().then([caller](const auto r) { return r; });
+}
+
+folly::Future<folly::Unit> RawAsyncTable::Put(const hbase::Put& put) {
+  auto caller =
+      CreateCallerBuilder<folly::Unit>(put.row(), 
connection_conf_->write_rpc_timeout())
+          ->action([=, &put](
+                       std::shared_ptr<hbase::HBaseRpcController> controller,
+                       std::shared_ptr<hbase::RegionLocation> loc,
+                       std::shared_ptr<hbase::RpcClient> rpc_client) -> 
folly::Future<folly::Unit> {
+            return Call<hbase::Put, hbase::Request, hbase::Response, 
folly::Unit>(
+                rpc_client, controller, loc, put, 
&hbase::RequestConverter::ToMutateRequest,
+                [](const Response& r) -> folly::Unit { return folly::unit; });
+          })
+          ->Build();
+
+  return caller->Call().then([caller](const auto r) { return r; });
+}
+
+folly::Future<bool> RawAsyncTable::CheckAndPut(const std::string& row, const 
std::string& family,
+                                               const std::string& qualifier,
+                                               const std::string& value, const 
hbase::Put& put,
+                                               const pb::CompareType& 
compare_op) {
+  auto caller =
+      CreateCallerBuilder<bool>(row, connection_conf_->write_rpc_timeout())
+          ->action([=, &put](std::shared_ptr<hbase::HBaseRpcController> 
controller,
+                             std::shared_ptr<hbase::RegionLocation> loc,
+                             std::shared_ptr<hbase::RpcClient> rpc_client) -> 
folly::Future<bool> {
+            return Call<hbase::Put, hbase::Request, hbase::Response, bool>(
+                rpc_client, controller, loc, put,
+                // request conversion
+                [=, &put](const hbase::Put& put,
+                          const std::string& region_name) -> 
std::unique_ptr<Request> {
+                  auto checkReq = RequestConverter::CheckAndPutToMutateRequest(
+                      row, family, qualifier, value, compare_op, put, 
region_name);
+                  return checkReq;
+                },
+                // response conversion
+                &ResponseConverter::BoolFromMutateResponse);
+          })
+          ->Build();
+
+  return caller->Call().then([caller](const auto r) { return r; });
+}
+
+folly::Future<bool> RawAsyncTable::CheckAndDelete(const std::string& row, 
const std::string& family,
+                                                  const std::string& qualifier,
+                                                  const std::string& value,
+                                                  const hbase::Delete& del,
+                                                  const pb::CompareType& 
compare_op) {
+  auto caller =
+      CreateCallerBuilder<bool>(row, connection_conf_->write_rpc_timeout())
+          ->action([=, &del](std::shared_ptr<hbase::HBaseRpcController> 
controller,
+                             std::shared_ptr<hbase::RegionLocation> loc,
+                             std::shared_ptr<hbase::RpcClient> rpc_client) -> 
folly::Future<bool> {
+            return Call<hbase::Delete, hbase::Request, hbase::Response, bool>(
+                rpc_client, controller, loc, del,
+                // request conversion
+                [=, &del](const hbase::Delete& del,
+                          const std::string& region_name) -> 
std::unique_ptr<Request> {
+                  auto checkReq = 
RequestConverter::CheckAndDeleteToMutateRequest(
+                      row, family, qualifier, value, compare_op, del, 
region_name);
+                  return checkReq;
+                },
+                // response conversion
+                &ResponseConverter::BoolFromMutateResponse);
+          })
+          ->Build();
+
+  return caller->Call().then([caller](const auto r) { return r; });
+}
+
+folly::Future<folly::Unit> RawAsyncTable::Delete(const hbase::Delete& del) {
+  auto caller =
+      CreateCallerBuilder<folly::Unit>(del.row(), 
connection_conf_->write_rpc_timeout())
+          ->action([=, &del](
+                       std::shared_ptr<hbase::HBaseRpcController> controller,
+                       std::shared_ptr<hbase::RegionLocation> loc,
+                       std::shared_ptr<hbase::RpcClient> rpc_client) -> 
folly::Future<folly::Unit> {
+            return Call<hbase::Delete, hbase::Request, hbase::Response, 
folly::Unit>(
+                rpc_client, controller, loc, del, 
&hbase::RequestConverter::DeleteToMutateRequest,
+                [](const Response& r) -> folly::Unit { return folly::unit; });
+          })
+          ->Build();
+
+  return caller->Call().then([caller](const auto r) { return r; });
+}
+
+folly::Future<std::shared_ptr<Result>> RawAsyncTable::Append(const 
hbase::Append& append) {
+  auto caller =
+      CreateCallerBuilder<std::shared_ptr<Result>>(append.row(),
+                                                   
connection_conf_->write_rpc_timeout())
+          ->action([=, &append](std::shared_ptr<hbase::HBaseRpcController> 
controller,
+                                std::shared_ptr<hbase::RegionLocation> loc,
+                                std::shared_ptr<hbase::RpcClient>
+                                    rpc_client) -> 
folly::Future<std::shared_ptr<Result>> {
+            return Call<hbase::Append, hbase::Request, hbase::Response, 
std::shared_ptr<Result>>(
+                rpc_client, controller, loc, append,
+                &hbase::RequestConverter::AppendToMutateRequest,
+                &hbase::ResponseConverter::FromMutateResponse);
+          })
+          ->Build();
+
+  return caller->Call().then([caller](const auto r) { return r; });
+}
+
+folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> 
RawAsyncTable::Get(
+    const std::vector<hbase::Get>& gets) {
+  std::vector<std::shared_ptr<hbase::Row>> rows;
+  for (auto get : gets) {
+    std::shared_ptr<hbase::Row> srow = std::make_shared<hbase::Get>(get);
+    rows.push_back(srow);
+  }
+  return this->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(
+      rows, connection_conf_->read_rpc_timeout());
+}
+
+template <typename REQ, typename RESP>
+folly::Future<std::vector<folly::Try<RESP>>> RawAsyncTable::Batch(
+    const std::vector<REQ>& rows, std::chrono::nanoseconds timeout) {
+  auto caller = connection_->caller_factory()
+                    ->Batch<REQ, RESP>()
+                    ->table(table_name_)
+                    ->actions(std::make_shared<std::vector<REQ>>(rows))
+                    ->rpc_timeout(timeout)
+                    ->operation_timeout(connection_conf_->operation_timeout())
+                    ->pause(connection_conf_->pause())
+                    ->max_attempts(connection_conf_->max_retries())
+                    
->start_log_errors_count(connection_conf_->start_log_errors_count())
+                    ->Build();
+
+  return caller->Call().then([caller](auto r) { return r; });
+}
+
+void RawAsyncTable::Scan(const hbase::Scan& scan, 
std::shared_ptr<RawScanResultConsumer> consumer) {
+  auto scanner = AsyncClientScanner::Create(
+      connection_, SetDefaultScanConfig(scan), table_name_, consumer, 
connection_conf_->pause(),
+      connection_conf_->max_retries(), connection_conf_->scan_timeout(),
+      connection_conf_->rpc_timeout(), 
connection_conf_->start_log_errors_count());
+  scanner->Start();
+}
+
+std::shared_ptr<hbase::Scan> RawAsyncTable::SetDefaultScanConfig(const 
hbase::Scan& scan) {
+  // always create a new scan object as we may reset the start row later.
+  auto new_scan = std::make_shared<hbase::Scan>(scan);
+  if (new_scan->Caching() <= 0) {
+    new_scan->SetCaching(default_scanner_caching_);
+  }
+  if (new_scan->MaxResultSize() <= 0) {
+    new_scan->SetMaxResultSize(default_scanner_max_result_size_);
+  }
+  return new_scan;
+}
+
+folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> 
RawAsyncTable::Put(
+    const std::vector<hbase::Put>& puts) {
+  std::vector<std::shared_ptr<hbase::Row>> rows;
+  for (auto put : puts) {
+    std::shared_ptr<hbase::Row> srow = std::make_shared<hbase::Put>(put);
+    rows.push_back(srow);
+  }
+  return this->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(
+      rows, connection_conf_->write_rpc_timeout());
+}
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/region-result.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/region-result.cc 
b/hbase-native-client/src/hbase/client/region-result.cc
new file mode 100644
index 0000000..28c4861
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/region-result.cc
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/region-result.h"
+#include <glog/logging.h>
+#include <stdexcept>
+
+using hbase::pb::RegionLoadStats;
+
+namespace hbase {
+
+RegionResult::RegionResult() {}
+
+RegionResult::~RegionResult() {}
+
+void RegionResult::AddResultOrException(int32_t index, 
std::shared_ptr<hbase::Result> result,
+                                        
std::shared_ptr<folly::exception_wrapper> exc) {
+  auto index_found = result_or_excption_.find(index);
+  if (index_found == result_or_excption_.end()) {
+    result_or_excption_[index] = std::make_tuple(result ? result : nullptr, 
exc ? exc : nullptr);
+  } else {
+    throw std::runtime_error("Index " + std::to_string(index) +
+                             " already set with ResultOrException");
+  }
+}
+
+void RegionResult::set_stat(std::shared_ptr<RegionLoadStats> stat) { stat_ = 
stat; }
+
+int RegionResult::ResultOrExceptionSize() const { return 
result_or_excption_.size(); }
+
+std::shared_ptr<ResultOrExceptionTuple> 
RegionResult::ResultOrException(int32_t index) const {
+  return 
std::make_shared<ResultOrExceptionTuple>(result_or_excption_.at(index));
+}
+
+const std::shared_ptr<RegionLoadStats>& RegionResult::stat() const { return 
stat_; }
+
+} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/request-converter-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/request-converter-test.cc 
b/hbase-native-client/src/hbase/client/request-converter-test.cc
new file mode 100644
index 0000000..0878519
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/request-converter-test.cc
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/request-converter.h"
+
+#include <gtest/gtest.h>
+#include <limits>
+#include "hbase/connection/request.h"
+#include "hbase/client/get.h"
+#include "hbase/client/scan.h"
+
+using hbase::Get;
+using hbase::Scan;
+
+using hbase::pb::GetRequest;
+using hbase::pb::RegionSpecifier;
+using hbase::pb::RegionSpecifier_RegionSpecifierType;
+using hbase::pb::ScanRequest;
+
+TEST(RequestConverter, ToGet) {
+  std::string row_str = "row-test";
+  Get get(row_str);
+  get.AddFamily("family-1");
+  get.AddFamily("family-2");
+  get.AddFamily("family-3");
+  get.AddColumn("family-2", "qualifier-1");
+  get.AddColumn("family-2", "qualifier-2");
+  get.AddColumn("family-2", "qualifier-3");
+  get.SetCacheBlocks(false);
+  get.SetConsistency(hbase::pb::Consistency::TIMELINE);
+  get.SetMaxVersions(2);
+  get.SetTimeRange(10000, 20000);
+  std::string region_name("RegionName");
+
+  auto req = hbase::RequestConverter::ToGetRequest(get, region_name);
+  auto msg = std::static_pointer_cast<GetRequest>(req->req_msg());
+
+  // Tests whether the PB object is properly set or not.
+  ASSERT_TRUE(msg->has_region());
+  ASSERT_TRUE(msg->region().has_value());
+  EXPECT_EQ(msg->region().value(), region_name);
+
+  ASSERT_TRUE(msg->has_get());
+  EXPECT_EQ(msg->get().row(), row_str);
+  EXPECT_FALSE(msg->get().cache_blocks());
+  EXPECT_EQ(msg->get().consistency(), hbase::pb::Consistency::TIMELINE);
+  EXPECT_EQ(msg->get().max_versions(), 2);
+  EXPECT_EQ(msg->get().column_size(), 3);
+  for (int i = 0; i < msg->get().column_size(); ++i) {
+    EXPECT_EQ(msg->get().column(i).family(), "family-" + std::to_string(i + 
1));
+    for (int j = 0; j < msg->get().column(i).qualifier_size(); ++j) {
+      EXPECT_EQ(msg->get().column(i).qualifier(j), "qualifier-" + 
std::to_string(j + 1));
+    }
+  }
+}
+
+TEST(RequestConverter, ToScan) {
+  std::string start_row("start-row");
+  std::string stop_row("stop-row");
+  hbase::Scan scan;
+  scan.AddFamily("family-1");
+  scan.AddFamily("family-2");
+  scan.AddFamily("family-3");
+  scan.AddColumn("family-2", "qualifier-1");
+  scan.AddColumn("family-2", "qualifier-2");
+  scan.AddColumn("family-2", "qualifier-3");
+  scan.SetReversed(true);
+  scan.SetStartRow(start_row);
+  scan.SetStopRow(stop_row);
+  scan.SetCaching(3);
+  scan.SetConsistency(hbase::pb::Consistency::TIMELINE);
+  scan.SetCacheBlocks(true);
+  scan.SetAllowPartialResults(true);
+  scan.SetLoadColumnFamiliesOnDemand(true);
+  scan.SetMaxVersions(5);
+  scan.SetTimeRange(10000, 20000);
+  std::string region_name("RegionName");
+
+  auto req = hbase::RequestConverter::ToScanRequest(scan, region_name);
+  auto msg = std::static_pointer_cast<ScanRequest>(req->req_msg());
+
+  // Tests whether the PB object is properly set or not.
+  ASSERT_TRUE(msg->has_region());
+  ASSERT_TRUE(msg->region().has_value());
+  EXPECT_EQ(msg->region().value(), region_name);
+
+  ASSERT_TRUE(msg->has_scan());
+  EXPECT_TRUE(msg->scan().reversed());
+  EXPECT_EQ(msg->scan().start_row(), start_row);
+  EXPECT_EQ(msg->scan().stop_row(), stop_row);
+  EXPECT_FALSE(msg->scan().small());
+  EXPECT_EQ(msg->scan().caching(), 3);
+  EXPECT_EQ(msg->scan().consistency(), hbase::pb::Consistency::TIMELINE);
+  EXPECT_TRUE(msg->scan().cache_blocks());
+  EXPECT_TRUE(msg->scan().allow_partial_results());
+  EXPECT_TRUE(msg->scan().load_column_families_on_demand());
+  EXPECT_EQ(msg->scan().max_versions(), 5);
+  EXPECT_EQ(msg->scan().max_result_size(), 
std::numeric_limits<uint64_t>::max());
+
+  EXPECT_EQ(msg->scan().column_size(), 3);
+  for (int i = 0; i < msg->scan().column_size(); ++i) {
+    EXPECT_EQ(msg->scan().column(i).family(), "family-" + std::to_string(i + 
1));
+    for (int j = 0; j < msg->scan().column(i).qualifier_size(); ++j) {
+      EXPECT_EQ(msg->scan().column(i).qualifier(j), "qualifier-" + 
std::to_string(j + 1));
+    }
+  }
+  ASSERT_FALSE(msg->client_handles_partials());
+  ASSERT_FALSE(msg->client_handles_heartbeats());
+  ASSERT_FALSE(msg->track_scan_metrics());
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/request-converter.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/request-converter.cc 
b/hbase-native-client/src/hbase/client/request-converter.cc
new file mode 100644
index 0000000..a57ac31
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/request-converter.cc
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/request-converter.h"
+
+#include <folly/Conv.h>
+
+#include <utility>
+#include "hbase/if/Client.pb.h"
+
+using hbase::pb::GetRequest;
+using hbase::pb::MutationProto;
+using hbase::pb::RegionAction;
+using hbase::pb::RegionSpecifier;
+using hbase::pb::RegionSpecifier_RegionSpecifierType;
+using hbase::pb::ScanRequest;
+
+namespace hbase {
+
+RequestConverter::~RequestConverter() {}
+
+RequestConverter::RequestConverter() {}
+
+void RequestConverter::SetRegion(const std::string &region_name,
+                                 RegionSpecifier *region_specifier) {
+  region_specifier->set_type(
+      
RegionSpecifier_RegionSpecifierType::RegionSpecifier_RegionSpecifierType_REGION_NAME);
+  region_specifier->set_value(region_name);
+}
+
+std::unique_ptr<Request> RequestConverter::ToGetRequest(const Get &get,
+                                                        const std::string 
&region_name) {
+  auto pb_req = Request::get();
+  auto pb_msg = std::static_pointer_cast<GetRequest>(pb_req->req_msg());
+  RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+  pb_msg->set_allocated_get((RequestConverter::ToGet(get)).release());
+  return pb_req;
+}
+
+std::unique_ptr<hbase::pb::Scan> RequestConverter::ToScan(const Scan &scan) {
+  auto pb_scan = std::make_unique<hbase::pb::Scan>();
+  pb_scan->set_max_versions(scan.MaxVersions());
+  pb_scan->set_cache_blocks(scan.CacheBlocks());
+  pb_scan->set_reversed(scan.IsReversed());
+  pb_scan->set_caching(scan.Caching());
+  pb_scan->set_start_row(scan.StartRow());
+  pb_scan->set_stop_row(scan.StopRow());
+  pb_scan->set_consistency(scan.Consistency());
+  pb_scan->set_max_result_size(scan.MaxResultSize());
+  pb_scan->set_allow_partial_results(scan.AllowPartialResults());
+  
pb_scan->set_load_column_families_on_demand(scan.LoadColumnFamiliesOnDemand());
+
+  if (!scan.Timerange().IsAllTime()) {
+    hbase::pb::TimeRange *pb_time_range = pb_scan->mutable_time_range();
+    pb_time_range->set_from(scan.Timerange().MinTimeStamp());
+    pb_time_range->set_to(scan.Timerange().MaxTimeStamp());
+  }
+
+  if (scan.HasFamilies()) {
+    for (const auto &family : scan.FamilyMap()) {
+      auto column = pb_scan->add_column();
+      column->set_family(family.first);
+      for (const auto &qualifier : family.second) {
+        column->add_qualifier(qualifier);
+      }
+    }
+  }
+
+  if (scan.filter() != nullptr) {
+    pb_scan->set_allocated_filter(Filter::ToProto(*(scan.filter())).release());
+  }
+
+  return std::move(pb_scan);
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan,
+                                                         const std::string 
&region_name) {
+  auto pb_req = Request::scan();
+  auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+  RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+
+  pb_msg->set_allocated_scan(ToScan(scan).release());
+
+  SetCommonScanRequestFields(pb_msg, false);
+
+  return pb_req;
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan,
+                                                         const std::string 
&region_name,
+                                                         int32_t num_rows, 
bool close_scanner) {
+  auto pb_req = Request::scan();
+  auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+  RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+
+  pb_msg->set_allocated_scan(ToScan(scan).release());
+
+  pb_msg->set_number_of_rows(num_rows);
+  pb_msg->set_close_scanner(close_scanner);
+
+  SetCommonScanRequestFields(pb_msg, false);
+
+  return pb_req;
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(int64_t scanner_id, 
int32_t num_rows,
+                                                         bool close_scanner) {
+  auto pb_req = Request::scan();
+  auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+  pb_msg->set_number_of_rows(num_rows);
+  pb_msg->set_close_scanner(close_scanner);
+  pb_msg->set_scanner_id(scanner_id);
+
+  SetCommonScanRequestFields(pb_msg, false);
+
+  return pb_req;
+}
+
+std::unique_ptr<Request> RequestConverter::ToScanRequest(int64_t scanner_id, 
int32_t num_rows,
+                                                         bool close_scanner,
+                                                         int64_t 
next_call_seq_id, bool renew) {
+  auto pb_req = Request::scan();
+  auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg());
+
+  pb_msg->set_number_of_rows(num_rows);
+  pb_msg->set_close_scanner(close_scanner);
+  pb_msg->set_scanner_id(scanner_id);
+  pb_msg->set_next_call_seq(next_call_seq_id);
+
+  SetCommonScanRequestFields(pb_msg, renew);
+  return pb_req;
+}
+
+void 
RequestConverter::SetCommonScanRequestFields(std::shared_ptr<hbase::pb::ScanRequest>
 pb_msg,
+                                                  bool renew) {
+  // TODO We will change these later when we implement partial results and 
heartbeats, etc
+  pb_msg->set_client_handles_partials(false);
+  pb_msg->set_client_handles_heartbeats(false);
+  pb_msg->set_track_scan_metrics(false);
+  pb_msg->set_renew(renew);
+  // TODO: set scan limit
+}
+
+std::unique_ptr<Request> RequestConverter::ToMultiRequest(
+    const ActionsByRegion &actions_by_region) {
+  auto pb_req = Request::multi();
+  auto pb_msg = 
std::static_pointer_cast<hbase::pb::MultiRequest>(pb_req->req_msg());
+
+  for (const auto &action_by_region : actions_by_region) {
+    auto pb_region_action = pb_msg->add_regionaction();
+    RequestConverter::SetRegion(action_by_region.first, 
pb_region_action->mutable_region());
+    int action_num = 0;
+    for (const auto &region_action : action_by_region.second->actions()) {
+      auto pb_action = pb_region_action->add_action();
+      auto pget = region_action->action();
+      // We store only hbase::Get in hbase::Action as of now. It will be 
changed later on.
+      CHECK(pget) << "Unexpected. action can't be null.";
+      std::string error_msg("");
+      if (typeid(*pget) == typeid(hbase::Get)) {
+        auto getp = dynamic_cast<hbase::Get *>(pget.get());
+        pb_action->set_allocated_get(RequestConverter::ToGet(*getp).release());
+      } else if (typeid(*pget) == typeid(hbase::Put)) {
+        auto putp = dynamic_cast<hbase::Put *>(pget.get());
+        pb_action->set_allocated_mutation(
+            
RequestConverter::ToMutation(MutationType::MutationProto_MutationType_PUT, 
*putp, -1)
+                .release());
+      } else {
+        throw std::runtime_error("Unexpected action type encountered.");
+      }
+      pb_action->set_index(action_num);
+      action_num++;
+    }
+  }
+  return pb_req;
+}
+
+std::unique_ptr<hbase::pb::Get> RequestConverter::ToGet(const Get &get) {
+  auto pb_get = std::make_unique<hbase::pb::Get>();
+  pb_get->set_max_versions(get.MaxVersions());
+  pb_get->set_cache_blocks(get.CacheBlocks());
+  pb_get->set_consistency(get.Consistency());
+
+  if (!get.Timerange().IsAllTime()) {
+    hbase::pb::TimeRange *pb_time_range = pb_get->mutable_time_range();
+    pb_time_range->set_from(get.Timerange().MinTimeStamp());
+    pb_time_range->set_to(get.Timerange().MaxTimeStamp());
+  }
+  pb_get->set_row(get.row());
+  if (get.HasFamilies()) {
+    for (const auto &family : get.FamilyMap()) {
+      auto column = pb_get->add_column();
+      column->set_family(family.first);
+      for (const auto &qualifier : family.second) {
+        column->add_qualifier(qualifier);
+      }
+    }
+  }
+
+  if (get.filter() != nullptr) {
+    pb_get->set_allocated_filter(Filter::ToProto(*(get.filter())).release());
+  }
+  return pb_get;
+}
+
+std::unique_ptr<MutationProto> RequestConverter::ToMutation(const MutationType 
type,
+                                                            const Mutation 
&mutation,
+                                                            const int64_t 
nonce) {
+  auto pb_mut = std::make_unique<MutationProto>();
+  pb_mut->set_row(mutation.row());
+  pb_mut->set_mutate_type(type);
+  pb_mut->set_durability(mutation.Durability());
+  pb_mut->set_timestamp(mutation.TimeStamp());
+  // TODO: set attributes from the mutation (key value pairs).
+
+  if (nonce > 0) {
+    pb_mut->set_nonce(nonce);
+  }
+
+  for (const auto &family : mutation.FamilyMap()) {
+    for (const auto &cell : family.second) {
+      auto column = pb_mut->add_column_value();
+      column->set_family(cell->Family());
+      auto qual = column->add_qualifier_value();
+      qual->set_qualifier(cell->Qualifier());
+      qual->set_timestamp(cell->Timestamp());
+      auto cell_type = cell->Type();
+      if (type == pb::MutationProto_MutationType_DELETE ||
+          (type == pb::MutationProto_MutationType_PUT && IsDelete(cell_type))) 
{
+        qual->set_delete_type(ToDeleteType(cell_type));
+      }
+
+      qual->set_value(cell->Value());
+    }
+  }
+  return std::move(pb_mut);
+}
+
+DeleteType RequestConverter::ToDeleteType(const CellType type) {
+  switch (type) {
+    case CellType::DELETE:
+      return pb::MutationProto_DeleteType_DELETE_ONE_VERSION;
+    case CellType::DELETE_COLUMN:
+      return pb::MutationProto_DeleteType_DELETE_MULTIPLE_VERSIONS;
+    case CellType::DELETE_FAMILY:
+      return pb::MutationProto_DeleteType_DELETE_FAMILY;
+    case CellType::DELETE_FAMILY_VERSION:
+      return pb::MutationProto_DeleteType_DELETE_FAMILY_VERSION;
+    default:
+      throw std::runtime_error("Unknown delete type: " + 
folly::to<std::string>(type));
+  }
+}
+
+bool RequestConverter::IsDelete(const CellType type) {
+  return CellType::DELETE <= type && type <= CellType::DELETE_FAMILY;
+}
+
+std::unique_ptr<Request> RequestConverter::ToMutateRequest(const Put &put,
+                                                           const std::string 
&region_name) {
+  auto pb_req = Request::mutate();
+  auto pb_msg = 
std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg());
+  RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+
+  pb_msg->set_allocated_mutation(
+      ToMutation(MutationType::MutationProto_MutationType_PUT, put, 
-1).release());
+
+  return pb_req;
+}
+
+std::unique_ptr<Request> RequestConverter::CheckAndPutToMutateRequest(
+    const std::string &row, const std::string &family, const std::string 
&qualifier,
+    const std::string &value, const pb::CompareType compare_op, const 
hbase::Put &put,
+    const std::string &region_name) {
+  auto pb_req = Request::mutate();
+  auto pb_msg = 
std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg());
+
+  pb_msg->set_allocated_mutation(
+      ToMutation(MutationType::MutationProto_MutationType_PUT, put, 
-1).release());
+  ::hbase::pb::Condition *cond = pb_msg->mutable_condition();
+  cond->set_row(row);
+  cond->set_family(family);
+  cond->set_qualifier(qualifier);
+  cond->set_allocated_comparator(
+      
Comparator::ToProto(*(ComparatorFactory::BinaryComparator(value).get())).release());
+  cond->set_compare_type(compare_op);
+
+  RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+  return pb_req;
+}
+
+std::unique_ptr<Request> RequestConverter::CheckAndDeleteToMutateRequest(
+    const std::string &row, const std::string &family, const std::string 
&qualifier,
+    const std::string &value, const pb::CompareType compare_op, const 
hbase::Delete &del,
+    const std::string &region_name) {
+  auto pb_req = Request::mutate();
+  auto pb_msg = 
std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg());
+
+  pb_msg->set_allocated_mutation(
+      ToMutation(MutationType::MutationProto_MutationType_DELETE, del, 
-1).release());
+  ::hbase::pb::Condition *cond = pb_msg->mutable_condition();
+  cond->set_row(row);
+  cond->set_family(family);
+  cond->set_qualifier(qualifier);
+  cond->set_allocated_comparator(
+      
Comparator::ToProto(*(ComparatorFactory::BinaryComparator(value).get())).release());
+  cond->set_compare_type(compare_op);
+
+  RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+  return pb_req;
+}
+
+std::unique_ptr<Request> RequestConverter::DeleteToMutateRequest(const Delete 
&del,
+                                                                 const 
std::string &region_name) {
+  auto pb_req = Request::mutate();
+  auto pb_msg = 
std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg());
+  RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+
+  pb_msg->set_allocated_mutation(
+      ToMutation(MutationType::MutationProto_MutationType_DELETE, del, 
-1).release());
+
+  VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString();
+  return pb_req;
+}
+std::unique_ptr<Request> RequestConverter::IncrementToMutateRequest(
+    const Increment &incr, const std::string &region_name) {
+  auto pb_req = Request::mutate();
+  auto pb_msg = 
std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg());
+  RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+
+  pb_msg->set_allocated_mutation(
+      ToMutation(MutationType::MutationProto_MutationType_INCREMENT, incr, 
-1).release());
+
+  VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString();
+  return pb_req;
+}
+
+std::unique_ptr<Request> RequestConverter::AppendToMutateRequest(const Append 
&append,
+                                                                 const 
std::string &region_name) {
+  auto pb_req = Request::mutate();
+  auto pb_msg = 
std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg());
+  RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+
+  pb_msg->set_allocated_mutation(
+      ToMutation(MutationType::MutationProto_MutationType_APPEND, append, 
-1).release());
+
+  VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString();
+  return pb_req;
+}
+
+} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/response-converter.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/response-converter.cc 
b/hbase-native-client/src/hbase/client/response-converter.cc
new file mode 100644
index 0000000..f3b78fd
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/response-converter.cc
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/response-converter.h"
+#include <glog/logging.h>
+#include <stdexcept>
+#include <string>
+#include <utility>
+#include <vector>
+#include "hbase/client/cell.h"
+#include "hbase/client/multi-response.h"
+#include "hbase/exceptions/exception.h"
+
+using hbase::pb::GetResponse;
+using hbase::pb::MutateResponse;
+using hbase::pb::ScanResponse;
+using hbase::pb::RegionLoadStats;
+
+namespace hbase {
+
+ResponseConverter::ResponseConverter() {}
+
+ResponseConverter::~ResponseConverter() {}
+
+// impl note: we are returning shared_ptr's instead of unique_ptr's because 
these
+// go inside folly::Future's, making the move semantics extremely tricky.
+std::shared_ptr<Result> ResponseConverter::FromGetResponse(const Response& 
resp) {
+  auto get_resp = std::static_pointer_cast<GetResponse>(resp.resp_msg());
+  VLOG(3) << "FromGetResponse:" << get_resp->ShortDebugString();
+  return ToResult(get_resp->result(), resp.cell_scanner());
+}
+
+std::shared_ptr<Result> ResponseConverter::FromMutateResponse(const Response& 
resp) {
+  auto mutate_resp = std::static_pointer_cast<MutateResponse>(resp.resp_msg());
+  hbase::pb::Result result = mutate_resp->result();
+  return ToResult(mutate_resp->result(), resp.cell_scanner());
+}
+
+bool ResponseConverter::BoolFromMutateResponse(const Response& resp) {
+  auto mutate_resp = std::static_pointer_cast<MutateResponse>(resp.resp_msg());
+  return mutate_resp->processed();
+}
+
+std::shared_ptr<Result> ResponseConverter::ToResult(
+    const hbase::pb::Result& result, const std::shared_ptr<CellScanner> 
cell_scanner) {
+  std::vector<std::shared_ptr<Cell>> vcells;
+  for (auto cell : result.cell()) {
+    std::shared_ptr<Cell> pcell =
+        std::make_shared<Cell>(cell.row(), cell.family(), cell.qualifier(), 
cell.timestamp(),
+                               cell.value(), 
static_cast<hbase::CellType>(cell.cell_type()));
+    vcells.push_back(pcell);
+  }
+
+  // iterate over the cells coming from rpc codec
+  if (cell_scanner != nullptr) {
+    int cells_read = 0;
+    while (cells_read != result.associated_cell_count()) {
+      if (cell_scanner->Advance()) {
+        vcells.push_back(cell_scanner->Current());
+        cells_read += 1;
+      } else {
+        LOG(ERROR) << "CellScanner::Advance() returned false unexpectedly. 
Cells Read:- "
+                   << cells_read << "; Expected Cell Count:- " << 
result.associated_cell_count();
+        std::runtime_error("CellScanner::Advance() returned false 
unexpectedly");
+      }
+    }
+  }
+  return std::make_shared<Result>(vcells, result.exists(), result.stale(), 
result.partial());
+}
+
+std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const 
Response& resp) {
+  auto scan_resp = std::static_pointer_cast<ScanResponse>(resp.resp_msg());
+  return FromScanResponse(scan_resp, resp.cell_scanner());
+}
+
+std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(
+    const std::shared_ptr<ScanResponse> scan_resp, 
std::shared_ptr<CellScanner> cell_scanner) {
+  VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString()
+          << " cell_scanner:" << (cell_scanner != nullptr);
+  int num_results =
+      cell_scanner != nullptr ? scan_resp->cells_per_result_size() : 
scan_resp->results_size();
+
+  std::vector<std::shared_ptr<Result>> 
results{static_cast<size_t>(num_results)};
+  for (int i = 0; i < num_results; i++) {
+    if (cell_scanner != nullptr) {
+      // Cells are out in cellblocks.  Group them up again as Results.  How 
many to read at a
+      // time will be found in getCellsLength -- length here is how many Cells 
in the i'th Result
+      int num_cells = scan_resp->cells_per_result(i);
+
+      std::vector<std::shared_ptr<Cell>> vcells;
+      for (int j = 0; j < num_cells; j++) {
+        if (!cell_scanner->Advance()) {
+          std::string msg = "Results sent from server=" + 
std::to_string(num_results) +
+                            ". But only got " + std::to_string(i) +
+                            " results completely at client. Resetting the 
scanner to scan again.";
+          LOG(ERROR) << msg;
+          throw std::runtime_error(msg);
+        }
+        vcells.push_back(cell_scanner->Current());
+      }
+      // TODO: handle partial results per Result by checking 
partial_flag_per_result
+      results[i] = std::make_shared<Result>(vcells, false, scan_resp->stale(), 
false);
+    } else {
+      results[i] = ToResult(scan_resp->results(i), cell_scanner);
+    }
+  }
+
+  return results;
+}
+
+std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(
+    std::shared_ptr<Request> req, const Response& resp,
+    const ServerRequest::ActionsByRegion& actions_by_region) {
+  auto multi_req = 
std::static_pointer_cast<hbase::pb::MultiRequest>(req->req_msg());
+  auto multi_resp = 
std::static_pointer_cast<hbase::pb::MultiResponse>(resp.resp_msg());
+  VLOG(3) << "GetResults:" << multi_resp->ShortDebugString();
+  int req_region_action_count = multi_req->regionaction_size();
+  int res_region_action_count = multi_resp->regionactionresult_size();
+  if (req_region_action_count != res_region_action_count) {
+    throw std::runtime_error("Request mutation count=" + 
std::to_string(req_region_action_count) +
+                             " does not match response mutation result count=" 
+
+                             std::to_string(res_region_action_count));
+  }
+  auto multi_response = std::make_unique<hbase::MultiResponse>();
+  for (int32_t num = 0; num < res_region_action_count; num++) {
+    hbase::pb::RegionAction actions = multi_req->regionaction(num);
+    hbase::pb::RegionActionResult action_result = 
multi_resp->regionactionresult(num);
+    hbase::pb::RegionSpecifier rs = actions.region();
+    if (rs.has_type() && rs.type() != hbase::pb::RegionSpecifier::REGION_NAME) 
{
+      throw std::runtime_error("We support only encoded types for protobuf 
multi response.");
+    }
+
+    auto region_name = rs.value();
+    if (action_result.has_exception()) {
+      auto ew = 
ResponseConverter::GetRemoteException(action_result.exception());
+      VLOG(8) << "Store Remote Region Exception:- " << 
ew->what().toStdString() << "; Region["
+              << region_name << "];";
+      multi_response->AddRegionException(region_name, ew);
+      continue;
+    }
+
+    if (actions.action_size() != action_result.resultorexception_size()) {
+      throw std::runtime_error("actions.action_size=" + 
std::to_string(actions.action_size()) +
+                               ", action_result.resultorexception_size=" +
+                               
std::to_string(action_result.resultorexception_size()) +
+                               " for region " + actions.region().value());
+    }
+
+    auto multi_actions = actions_by_region.at(region_name)->actions();
+    uint64_t multi_actions_num = 0;
+    for (hbase::pb::ResultOrException roe : action_result.resultorexception()) 
{
+      std::shared_ptr<Result> result;
+      std::shared_ptr<folly::exception_wrapper> ew;
+      if (roe.has_exception()) {
+        auto ew = ResponseConverter::GetRemoteException(roe.exception());
+        VLOG(8) << "Store Remote Region Exception:- " << 
ew->what().toStdString() << "; Region["
+                << region_name << "];";
+        multi_response->AddRegionException(region_name, ew);
+      } else if (roe.has_result()) {
+        result = ToResult(roe.result(), resp.cell_scanner());
+      } else if (roe.has_service_result()) {
+        // TODO Not processing Coprocessor Service Result;
+      } else {
+        // Sometimes, the response is just "it was processed". Generally, this 
occurs for things
+        // like mutateRows where either we get back 'processed' (or not) and 
optionally some
+        // statistics about the regions we touched.
+        std::vector<std::shared_ptr<Cell>> empty_cells;
+        result = std::make_shared<Result>(empty_cells, multi_resp->processed() 
? true : false,
+                                          false, false);
+      }
+      // We add the original index of the multi-action so that when populating 
the response back we
+      // do it as per the action index
+      multi_response->AddRegionResult(
+          region_name, multi_actions[multi_actions_num]->original_index(), 
std::move(result), ew);
+      multi_actions_num++;
+    }
+  }
+
+  if (multi_resp->has_regionstatistics()) {
+    hbase::pb::MultiRegionLoadStats stats = multi_resp->regionstatistics();
+    for (int i = 0; i < stats.region_size(); i++) {
+      multi_response->AddStatistic(stats.region(i).value(),
+                                   
std::make_shared<RegionLoadStats>(stats.stat(i)));
+    }
+  }
+  return multi_response;
+}
+
+std::shared_ptr<folly::exception_wrapper> 
ResponseConverter::GetRemoteException(
+    const hbase::pb::NameBytesPair& exc_resp) {
+  std::string what;
+  std::string exception_class_name = exc_resp.has_name() ? exc_resp.name() : 
"";
+  std::string stack_trace = exc_resp.has_value() ? exc_resp.value() : "";
+
+  what.append(exception_class_name).append(stack_trace);
+  auto remote_exception = std::make_unique<RemoteException>(what);
+  remote_exception->set_exception_class_name(exception_class_name)
+      ->set_stack_trace(stack_trace)
+      ->set_hostname("")
+      ->set_port(0);
+
+  return std::make_shared<folly::exception_wrapper>(
+      folly::make_exception_wrapper<RemoteException>(*remote_exception));
+}
+} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/result-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/result-test.cc 
b/hbase-native-client/src/hbase/client/result-test.cc
new file mode 100644
index 0000000..684c08d
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/result-test.cc
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include <limits>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "hbase/client/cell.h"
+#include "hbase/client/result.h"
+
+using hbase::Cell;
+using hbase::CellType;
+using hbase::Result;
+using std::experimental::nullopt;
+
+void PopulateCells(std::vector<std::shared_ptr<Cell> > &cells) {
+  // Populate some Results
+  // We assume that for a single Cell, the corresponding row, families and
+  // qualifiers are present.
+  // We have also considered different versions in the test for the same row.
+  std::string row = "row";
+  for (int i = 0; i < 10; i++) {
+    std::string family = "family-" + std::to_string(i);
+    std::string column = "column-" + std::to_string(i);
+    std::string value = "value-" + std::to_string(i);
+
+    switch (i) {
+      case 5: {
+        cells.push_back(
+            std::make_shared<Cell>(row, family, column, 1482113040506, 
"value-5", CellType::PUT));
+        cells.push_back(
+            std::make_shared<Cell>(row, family, column, 1482111803856, 
"value-X", CellType::PUT));
+        break;
+      }
+      case 8: {
+        cells.push_back(
+            std::make_shared<Cell>(row, family, column, 1482113040506, 
"value-8", CellType::PUT));
+        cells.push_back(
+            std::make_shared<Cell>(row, family, column, 1482111803856, 
"value-X", CellType::PUT));
+        cells.push_back(
+            std::make_shared<Cell>(row, family, column, 1482110969958, 
"value-Y", CellType::PUT));
+        break;
+      }
+      case 9: {
+        cells.push_back(
+            std::make_shared<Cell>(row, family, column, 1482113040506, 
"value-9", CellType::PUT));
+        cells.push_back(
+            std::make_shared<Cell>(row, family, column, 1482111803856, 
"value-X", CellType::PUT));
+        cells.push_back(
+            std::make_shared<Cell>(row, family, column, 1482110969958, 
"value-Y", CellType::PUT));
+        cells.push_back(
+            std::make_shared<Cell>(row, family, column, 1482110876075, 
"value-Z", CellType::PUT));
+        break;
+      }
+      default: {
+        cells.push_back(std::make_shared<Cell>(
+            row, family, column, std::numeric_limits<int64_t>::max(), value, 
CellType::PUT));
+      }
+    }
+  }
+  return;
+}
+
+TEST(Result, EmptyResult) {
+  std::vector<std::shared_ptr<Cell> > cells;
+  Result result(cells, true, false, false);
+  EXPECT_EQ(true, result.IsEmpty());
+  EXPECT_EQ(0, result.Size());
+}
+
+TEST(Result, FilledResult) {
+  std::vector<std::shared_ptr<Cell> > cells;
+  PopulateCells(cells);
+
+  Result result(cells, true, false, false);
+
+  EXPECT_EQ(false, result.IsEmpty());
+  EXPECT_EQ(16, result.Size());
+
+  // Get Latest Cell for the given family and qualifier.
+  auto latest_cell(result.ColumnLatestCell("family", "column"));
+  // Nothing of the above family/qualifier combo is present so it should be
+  // nullptr
+  ASSERT_FALSE(latest_cell.get());
+
+  // Try to get the latest cell for the given family and qualifier.
+  latest_cell = result.ColumnLatestCell("family-4", "column-4");
+  // Now shouldn't be a nullptr
+  ASSERT_TRUE(latest_cell.get());
+  // And Value must match too
+  EXPECT_EQ("value-4", latest_cell->Value());
+
+  // Value will be nullptr as no such family and qualifier is present
+  ASSERT_FALSE(result.Value("family-4", "qualifier"));
+  // Value will be present as family and qualifier is present
+  ASSERT_TRUE(result.Value("family-4", "column-4") != nullopt);
+  // Value should be present and match.
+  EXPECT_EQ(latest_cell->Value(), (*result.ColumnLatestCell("family-4", 
"column-4")).Value());
+  EXPECT_EQ("value-5", (*result.ColumnLatestCell("family-5", 
"column-5")).Value());
+  EXPECT_EQ("value-8", (*result.ColumnLatestCell("family-8", 
"column-8")).Value());
+  EXPECT_EQ("value-7", *result.Value("family-7", "column-7"));
+
+  // Get cells for the given family and qualifier
+  auto column_cells = result.ColumnCells("family", "column");
+  // Size should be 0
+  EXPECT_EQ(0, column_cells.size());
+
+  // Size shouldn't be 0 and Row() and Value() must match
+  column_cells = result.ColumnCells("family-0", "column-0");
+  EXPECT_EQ(1, column_cells.size());
+  EXPECT_EQ("row", column_cells[0]->Row());
+  EXPECT_EQ("row", result.Row());
+
+  // Size shouldn't be 0 and Row() and Value() must match
+  column_cells = result.ColumnCells("family-5", "column-5");
+  EXPECT_EQ(2, column_cells.size());
+  EXPECT_EQ("row", column_cells[0]->Row());
+  EXPECT_EQ("row", column_cells[1]->Row());
+  EXPECT_EQ("value-5", column_cells[0]->Value());
+  EXPECT_EQ("value-X", column_cells[1]->Value());
+  EXPECT_EQ("row", result.Row());
+
+  // Size shouldn't be 0 and Row() and Value() must match
+  column_cells = result.ColumnCells("family-8", "column-8");
+  EXPECT_EQ(3, column_cells.size());
+  EXPECT_EQ("row", column_cells[0]->Row());
+  EXPECT_EQ("row", column_cells[1]->Row());
+  EXPECT_EQ("row", column_cells[2]->Row());
+  EXPECT_EQ("value-8", column_cells[0]->Value());
+  EXPECT_EQ("value-X", column_cells[1]->Value());
+  EXPECT_EQ("value-Y", column_cells[2]->Value());
+  EXPECT_EQ("row", result.Row());
+
+  // Size shouldn't be 0 and Row() and Value() must match
+  column_cells = result.ColumnCells("family-9", "column-9");
+  EXPECT_EQ(4, column_cells.size());
+  EXPECT_EQ("row", column_cells[0]->Row());
+  EXPECT_EQ("row", column_cells[1]->Row());
+  EXPECT_EQ("row", column_cells[2]->Row());
+  EXPECT_EQ("row", column_cells[3]->Row());
+  EXPECT_EQ("value-9", column_cells[0]->Value());
+  EXPECT_EQ("value-X", column_cells[1]->Value());
+  EXPECT_EQ("value-Y", column_cells[2]->Value());
+  EXPECT_EQ("value-Z", column_cells[3]->Value());
+  EXPECT_EQ("row", result.Row());
+
+  // Test all the Cell values
+  const auto &result_cells = result.Cells();
+  int i = 0, j = 0;
+  for (const auto &cell : result_cells) {
+    std::string row = "row";
+    std::string family = "family-" + std::to_string(i);
+    std::string column = "column-" + std::to_string(i);
+    std::string value = "value-" + std::to_string(i);
+    switch (j) {
+      case 6:
+      case 10:
+      case 13: {
+        EXPECT_EQ("value-X", cell->Value());
+        ++j;
+        continue;
+      }
+      case 11:
+      case 14: {
+        EXPECT_EQ("value-Y", cell->Value());
+        ++j;
+        continue;
+      }
+      case 15: {
+        EXPECT_EQ("value-Z", cell->Value());
+        ++j;
+        continue;
+      }
+    }
+    EXPECT_EQ(row, cell->Row());
+    EXPECT_EQ(family, cell->Family());
+    EXPECT_EQ(column, cell->Qualifier());
+    EXPECT_EQ(value, cell->Value());
+    ++i;
+    ++j;
+  }
+
+  auto result_map_tmp = result.Map();
+  result_map_tmp["testf"]["testq"][1] = "value";
+  EXPECT_EQ(11, result_map_tmp.size());
+
+  auto result_map = result.Map();
+  EXPECT_EQ(10, result_map.size());
+
+  i = 0;
+  for (auto family_map : result_map) {
+    std::string family = "family-" + std::to_string(i);
+    std::string qualifier = "column-" + std::to_string(i);
+    std::string value = "value-" + std::to_string(i);
+    EXPECT_EQ(family, family_map.first);
+    for (auto qualifier_map : family_map.second) {
+      EXPECT_EQ(qualifier, qualifier_map.first);
+      j = 0;
+      for (auto version_map : qualifier_map.second) {
+        switch (i) {
+          case 5: {
+            if (1 == j) {
+              EXPECT_EQ(1482111803856, version_map.first);
+              EXPECT_EQ("value-X", version_map.second);
+            } else if (0 == j) {
+              EXPECT_EQ(1482113040506, version_map.first);
+              EXPECT_EQ("value-5", version_map.second);
+            }
+            break;
+          }
+          case 8: {
+            if (2 == j) {
+              EXPECT_EQ(1482110969958, version_map.first);
+              EXPECT_EQ("value-Y", version_map.second);
+            } else if (1 == j) {
+              EXPECT_EQ(1482111803856, version_map.first);
+              EXPECT_EQ("value-X", version_map.second);
+            } else if (0 == j) {
+              EXPECT_EQ(1482113040506, version_map.first);
+              EXPECT_EQ("value-8", version_map.second);
+            }
+            break;
+          }
+          case 9: {
+            if (3 == j) {
+              EXPECT_EQ(1482110876075, version_map.first);
+              EXPECT_EQ("value-Z", version_map.second);
+            } else if (2 == j) {
+              EXPECT_EQ(1482110969958, version_map.first);
+              EXPECT_EQ("value-Y", version_map.second);
+            } else if (1 == j) {
+              EXPECT_EQ(1482111803856, version_map.first);
+              EXPECT_EQ("value-X", version_map.second);
+            } else if (0 == j) {
+              EXPECT_EQ(1482113040506, version_map.first);
+              EXPECT_EQ("value-9", version_map.second);
+            }
+            break;
+          }
+          default: {
+            EXPECT_EQ(std::numeric_limits<int64_t>::max(), version_map.first);
+            EXPECT_EQ(value, version_map.second);
+          }
+        }
+        ++j;
+      }
+    }
+    ++i;
+  }
+
+  auto family_map = result.FamilyMap("family-0");
+  EXPECT_EQ(1, family_map.size());
+  i = 0;
+  for (auto qual_val_map : family_map) {
+    EXPECT_EQ("column-0", qual_val_map.first);
+    EXPECT_EQ("value-0", qual_val_map.second);
+  }
+
+  family_map = result.FamilyMap("family-1");
+  EXPECT_EQ(1, family_map.size());
+  i = 0;
+  for (auto qual_val_map : family_map) {
+    EXPECT_EQ("column-1", qual_val_map.first);
+    EXPECT_EQ("value-1", qual_val_map.second);
+  }
+
+  family_map = result.FamilyMap("family-5");
+  EXPECT_EQ(1, family_map.size());
+  i = 0;
+  for (auto qual_val_map : family_map) {
+    EXPECT_EQ("column-5", qual_val_map.first);
+    EXPECT_EQ("value-5", qual_val_map.second);
+  }
+
+  family_map = result.FamilyMap("family-9");
+  EXPECT_EQ(1, family_map.size());
+  i = 0;
+  for (auto qual_val_map : family_map) {
+    EXPECT_EQ("column-9", qual_val_map.first);
+    EXPECT_EQ("value-9", qual_val_map.second);
+  }
+}
+
+TEST(Result, ResultEstimatedSize) {
+  CellType cell_type = CellType::PUT;
+  int64_t timestamp = std::numeric_limits<int64_t>::max();
+  std::vector<std::shared_ptr<Cell> > cells;
+  Result empty(cells, true, false, false);
+
+  EXPECT_EQ(empty.EstimatedSize(), sizeof(Result));
+
+  cells.push_back(std::make_shared<Cell>("a", "a", "", timestamp, "", 
cell_type));
+  Result result1(cells, true, false, false);
+  EXPECT_TRUE(result1.EstimatedSize() > empty.EstimatedSize());
+
+  cells.push_back(std::make_shared<Cell>("a", "a", "", timestamp, "", 
cell_type));
+  Result result2(cells, true, false, false);
+  EXPECT_TRUE(result2.EstimatedSize() > result1.EstimatedSize());
+
+  LOG(INFO) << empty.EstimatedSize();
+  LOG(INFO) << result1.EstimatedSize();
+  LOG(INFO) << result2.EstimatedSize();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/result.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/result.cc 
b/hbase-native-client/src/hbase/client/result.cc
new file mode 100644
index 0000000..a2f56aa
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/result.cc
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/result.h"
+
+namespace hbase {
+
+Result::~Result() {}
+
+Result::Result(const std::vector<std::shared_ptr<Cell> > &cells, bool exists, 
bool stale,
+               bool partial)
+    : exists_(exists), stale_(stale), partial_(partial), cells_(cells) {
+  row_ = (cells_.size() == 0 ? "" : cells_[0]->Row());
+}
+
+Result::Result(const Result &result) {
+  exists_ = result.exists_;
+  stale_ = result.stale_;
+  partial_ = result.partial_;
+  row_ = result.row_;
+  if (!result.cells_.empty()) {
+    for (const auto &cell : result.cells_) {
+      cells_.push_back(cell);
+    }
+  }
+}
+
+const std::vector<std::shared_ptr<Cell> > &Result::Cells() const { return 
cells_; }
+
+std::vector<std::shared_ptr<Cell> > Result::ColumnCells(const std::string 
&family,
+                                                        const std::string 
&qualifier) const {
+  std::vector<std::shared_ptr<Cell> > column_cells;
+  // TODO implement a BinarySearch here ?
+  for (const auto &cell : cells_) {
+    if (cell->Family() == family && cell->Qualifier() == qualifier) {
+      column_cells.push_back(cell);
+    }
+  }
+  return column_cells;
+}
+
+const std::shared_ptr<Cell> Result::ColumnLatestCell(const std::string &family,
+                                                     const std::string 
&qualifier) const {
+  // TODO implement a BinarySearch here ?
+  for (const auto &cell : cells_) {
+    // We find the latest(first) occurrence of the Cell for a given column and
+    // qualifier and break
+    if (cell->Family() == family && cell->Qualifier() == qualifier) {
+      return cell;
+    }
+  }
+  return nullptr;
+}
+
+optional<std::string> Result::Value(const std::string &family, const 
std::string &qualifier) const {
+  std::shared_ptr<Cell> latest_cell(ColumnLatestCell(family, qualifier));
+  if (latest_cell.get()) {
+    return optional<std::string>(latest_cell->Value());
+  }
+  return optional<std::string>();
+}
+
+bool Result::IsEmpty() const { return cells_.empty(); }
+
+const std::string &Result::Row() const { return row_; }
+
+int Result::Size() const { return cells_.size(); }
+
+ResultMap Result::Map() const {
+  ResultMap result_map;
+  for (const auto &cell : cells_) {
+    result_map[cell->Family()][cell->Qualifier()][cell->Timestamp()] = 
cell->Value();
+  }
+  return result_map;
+}
+
+std::map<std::string, std::string> Result::FamilyMap(const std::string 
&family) const {
+  std::map<std::string, std::string> family_map;
+  if (!IsEmpty()) {
+    auto result_map = Map();
+    auto itr = result_map.find(family);
+    if (itr == result_map.end()) {
+      return family_map;
+    }
+
+    for (auto qitr = itr->second.begin(); qitr != itr->second.end(); ++qitr) {
+      for (auto vitr = qitr->second.begin(); vitr != qitr->second.end(); 
++vitr) {
+        // We break after inserting the first value. Result.java takes only
+        // the first value
+        family_map[qitr->first] = vitr->second;
+        break;
+      }
+    }
+  }
+
+  return family_map;
+}
+
+std::string Result::DebugString() const {
+  std::string ret{"keyvalues="};
+  if (IsEmpty()) {
+    ret += "NONE";
+    return ret;
+  }
+  ret += "{";
+  bool is_first = true;
+  for (const auto &cell : cells_) {
+    if (is_first) {
+      is_first = false;
+    } else {
+      ret += ", ";
+    }
+    ret += cell->DebugString();
+  }
+  ret += "}";
+
+  return ret;
+}
+
+size_t Result::EstimatedSize() const {
+  size_t s = sizeof(Result);
+  s += row_.capacity();
+  for (const auto c : cells_) {
+    s += sizeof(std::shared_ptr<Cell>);
+    s + c->EstimatedSize();
+  }
+  return s;
+}
+
+} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/scan-result-cache-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/scan-result-cache-test.cc 
b/hbase-native-client/src/hbase/client/scan-result-cache-test.cc
new file mode 100644
index 0000000..4c10a05
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/scan-result-cache-test.cc
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Conv.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include <vector>
+
+#include "hbase/client/cell.h"
+#include "hbase/client/result.h"
+#include "hbase/client/scan-result-cache.h"
+
+using hbase::ScanResultCache;
+using hbase::Result;
+using hbase::Cell;
+using hbase::CellType;
+
+using ResultVector = std::vector<std::shared_ptr<Result>>;
+
+std::shared_ptr<Cell> CreateCell(const int32_t &key, const std::string &family,
+                                 const std::string &column) {
+  auto row = folly::to<std::string>(key);
+  return std::make_shared<Cell>(row, family, column, 
std::numeric_limits<int64_t>::max(), row,
+                                CellType::PUT);
+}
+
+std::shared_ptr<Result> CreateResult(std::shared_ptr<Cell> cell, bool partial) 
{
+  return std::make_shared<Result>(std::vector<std::shared_ptr<Cell>>{cell}, 
false, false, partial);
+}
+
+TEST(ScanResultCacheTest, NoPartial) {
+  ScanResultCache cache;
+  ASSERT_EQ(ResultVector{}, cache.AddAndGet(ResultVector{}, false));
+  ASSERT_EQ(ResultVector{}, cache.AddAndGet(ResultVector{}, true));
+  int32_t count = 10;
+  ResultVector results{};
+  for (int32_t i = 0; i < count; i++) {
+    results.push_back(CreateResult(CreateCell(i, "cf", "cq1"), false));
+  }
+  ASSERT_EQ(results, cache.AddAndGet(results, false));
+}
+
+TEST(ScanResultCacheTest, Combine1) {
+  ScanResultCache cache;
+  auto prev_result = CreateResult(CreateCell(0, "cf", "cq1"), true);
+  auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+  auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true);
+  auto result3 = CreateResult(CreateCell(1, "cf", "cq3"), true);
+  auto results = cache.AddAndGet(ResultVector{prev_result, result1}, false);
+  ASSERT_EQ(1L, results.size());
+  ASSERT_EQ(prev_result, results[0]);
+
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size());
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result3}, false).size());
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{}, true).size());
+
+  results = cache.AddAndGet(ResultVector{}, false);
+  ASSERT_EQ(1, results.size());
+  ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(3, results[0]->Cells().size());
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq3")));
+}
+
+TEST(ScanResultCacheTest, Combine2) {
+  ScanResultCache cache;
+  auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+  auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true);
+  auto result3 = CreateResult(CreateCell(1, "cf", "cq3"), true);
+
+  auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), true);
+  auto next_to_next_result1 = CreateResult(CreateCell(3, "cf", "cq2"), false);
+
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size());
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size());
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result3}, false).size());
+
+  auto results = cache.AddAndGet(ResultVector{next_result1}, false);
+  ASSERT_EQ(1, results.size());
+  ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(3, results[0]->Cells().size());
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq3")));
+
+  results = cache.AddAndGet(ResultVector{next_to_next_result1}, false);
+  ASSERT_EQ(2, results.size());
+  ASSERT_EQ(2, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(1, results[0]->Cells().size());
+  ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+  ASSERT_EQ(3, folly::to<int32_t>(results[1]->Row()));
+  ASSERT_EQ(1, results[1]->Cells().size());
+  ASSERT_EQ(3, folly::to<int32_t>(*results[1]->Value("cf", "cq2")));
+}
+
+TEST(ScanResultCacheTest, Combine3) {
+  ScanResultCache cache;
+  auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+  auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true);
+  auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), false);
+  auto next_to_next_result1 = CreateResult(CreateCell(3, "cf", "cq1"), true);
+
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size());
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size());
+
+  auto results = cache.AddAndGet(ResultVector{next_result1, 
next_to_next_result1}, false);
+
+  ASSERT_EQ(2, results.size());
+  ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(2, results[0]->Cells().size());
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+  ASSERT_EQ(2, folly::to<int32_t>(results[1]->Row()));
+  ASSERT_EQ(1, results[1]->Cells().size());
+  ASSERT_EQ(2, folly::to<int32_t>(*results[1]->Value("cf", "cq1")));
+
+  results = cache.AddAndGet(ResultVector{}, false);
+
+  ASSERT_EQ(1, results.size());
+  ASSERT_EQ(3, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(1, results[0]->Cells().size());
+  ASSERT_EQ(3, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+}
+
+TEST(ScanResultCacheTest, Combine4) {
+  ScanResultCache cache;
+  auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true);
+  auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), false);
+  auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), true);
+  auto next_result2 = CreateResult(CreateCell(2, "cf", "cq2"), false);
+
+  ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size());
+
+  auto results = cache.AddAndGet(ResultVector{result2, next_result1}, false);
+
+  ASSERT_EQ(1, results.size());
+  ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(2, results[0]->Cells().size());
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+  ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+
+  results = cache.AddAndGet(ResultVector{next_result2}, false);
+
+  ASSERT_EQ(1, results.size());
+  ASSERT_EQ(2, folly::to<int32_t>(results[0]->Row()));
+  ASSERT_EQ(2, results[0]->Cells().size());
+  ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq1")));
+  ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq2")));
+}
+
+TEST(ScanResultCacheTest, SizeOf) {
+  std::string e{""};
+  std::string f{"f"};
+  std::string foo{"foo"};
+
+  LOG(INFO) << sizeof(e) << " " << e.capacity();
+  LOG(INFO) << sizeof(f) << " " << f.capacity();
+  LOG(INFO) << sizeof(foo) << " " << foo.capacity();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/scan-result-cache.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/scan-result-cache.cc 
b/hbase-native-client/src/hbase/client/scan-result-cache.cc
new file mode 100644
index 0000000..e74a7d6
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/scan-result-cache.cc
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/scan-result-cache.h"
+#include <algorithm>
+#include <iterator>
+#include <limits>
+#include <stdexcept>
+
+namespace hbase {
+/**
+ * Add the given results to cache and get valid results back.
+ * @param results the results of a scan next. Must not be null.
+ * @param is_hearthbeat indicate whether the results is gotten from a 
heartbeat response.
+ * @return valid results, never null.
+ */
+std::vector<std::shared_ptr<Result>> ScanResultCache::AddAndGet(
+    const std::vector<std::shared_ptr<Result>> &results, bool is_hearthbeat) {
+  // If no results were returned it indicates that either we have the all the 
partial results
+  // necessary to construct the complete result or the server had to send a 
heartbeat message
+  // to the client to keep the client-server connection alive
+  if (results.empty()) {
+    // If this response was an empty heartbeat message, then we have not 
exhausted the region
+    // and thus there may be more partials server side that still need to be 
added to the partial
+    // list before we form the complete Result
+    if (!partial_results_.empty() && !is_hearthbeat) {
+      return UpdateNumberOfCompleteResultsAndReturn(
+          std::vector<std::shared_ptr<Result>>{Combine()});
+    }
+    return std::vector<std::shared_ptr<Result>>{};
+  }
+  // In every RPC response there should be at most a single partial result. 
Furthermore, if
+  // there is a partial result, it is guaranteed to be in the last position of 
the array.
+  auto last = results[results.size() - 1];
+  if (last->Partial()) {
+    if (partial_results_.empty()) {
+      partial_results_.push_back(last);
+      std::vector<std::shared_ptr<Result>> new_results;
+      std::copy_n(results.begin(), results.size() - 1, 
std::back_inserter(new_results));
+      return UpdateNumberOfCompleteResultsAndReturn(new_results);
+    }
+    // We have only one result and it is partial
+    if (results.size() == 1) {
+      // check if there is a row change
+      if (partial_results_.at(0)->Row() == last->Row()) {
+        partial_results_.push_back(last);
+        return std::vector<std::shared_ptr<Result>>{};
+      }
+      auto complete_result = Combine();
+      partial_results_.push_back(last);
+      return UpdateNumberOfCompleteResultsAndReturn(complete_result);
+    }
+    // We have some complete results
+    auto results_to_return = PrependCombined(results, results.size() - 1);
+    partial_results_.push_back(last);
+    return UpdateNumberOfCompleteResultsAndReturn(results_to_return);
+  }
+  if (!partial_results_.empty()) {
+    return UpdateNumberOfCompleteResultsAndReturn(PrependCombined(results, 
results.size()));
+  }
+  return UpdateNumberOfCompleteResultsAndReturn(results);
+}
+
+void ScanResultCache::Clear() { partial_results_.clear(); }
+
+std::shared_ptr<Result> ScanResultCache::CreateCompleteResult(
+    const std::vector<std::shared_ptr<Result>> &partial_results) {
+  if (partial_results.empty()) {
+    return std::make_shared<Result>(std::vector<std::shared_ptr<Cell>>{}, 
false, false, false);
+  }
+  std::vector<std::shared_ptr<Cell>> cells{};
+  bool stale = false;
+  std::string prev_row = "";
+  std::string current_row = "";
+  size_t i = 0;
+  for (const auto &r : partial_results) {
+    current_row = r->Row();
+    if (i != 0 && prev_row != current_row) {
+      throw new std::runtime_error(
+          "Cannot form complete result. Rows of partial results do not 
match.");
+    }
+    // Ensure that all Results except the last one are marked as partials. The 
last result
+    // may not be marked as a partial because Results are only marked as 
partials when
+    // the scan on the server side must be stopped due to reaching the 
maxResultSize.
+    // Visualizing it makes it easier to understand:
+    // maxResultSize: 2 cells
+    // (-x-) represents cell number x in a row
+    // Example: row1: -1- -2- -3- -4- -5- (5 cells total)
+    // How row1 will be returned by the server as partial Results:
+    // Result1: -1- -2- (2 cells, size limit reached, mark as partial)
+    // Result2: -3- -4- (2 cells, size limit reached, mark as partial)
+    // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
+    if (i != partial_results.size() - 1 && !r->Partial()) {
+      throw new std::runtime_error("Cannot form complete result. Result is 
missing partial flag.");
+    }
+    prev_row = current_row;
+    stale = stale || r->Stale();
+    for (const auto &c : r->Cells()) {
+      cells.push_back(c);
+    }
+    i++;
+  }
+
+  return std::make_shared<Result>(cells, false, stale, false);
+}
+
+std::shared_ptr<Result> ScanResultCache::Combine() {
+  auto result = CreateCompleteResult(partial_results_);
+  partial_results_.clear();
+  return result;
+}
+
+std::vector<std::shared_ptr<Result>> ScanResultCache::PrependCombined(
+    const std::vector<std::shared_ptr<Result>> &results, int length) {
+  if (length == 0) {
+    return std::vector<std::shared_ptr<Result>>{Combine()};
+  }
+  // the last part of a partial result may not be marked as partial so here we 
need to check if
+  // there is a row change.
+  size_t start;
+  if (partial_results_[0]->Row() == results[0]->Row()) {
+    partial_results_.push_back(results[0]);
+    start = 1;
+    length--;
+  } else {
+    start = 0;
+  }
+  std::vector<std::shared_ptr<Result>> prepend_results{};
+  prepend_results.push_back(Combine());
+  std::copy_n(results.begin() + start, length, 
std::back_inserter(prepend_results));
+  return prepend_results;
+}
+
+std::vector<std::shared_ptr<Result>> 
ScanResultCache::UpdateNumberOfCompleteResultsAndReturn(
+    const std::shared_ptr<Result> &result) {
+  return 
UpdateNumberOfCompleteResultsAndReturn(std::vector<std::shared_ptr<Result>>{result});
+}
+
+std::vector<std::shared_ptr<Result>> 
ScanResultCache::UpdateNumberOfCompleteResultsAndReturn(
+    const std::vector<std::shared_ptr<Result>> &results) {
+  num_complete_rows_ += results.size();
+  return results;
+}
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/scan-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/scan-test.cc 
b/hbase-native-client/src/hbase/client/scan-test.cc
new file mode 100644
index 0000000..ba3a029
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/scan-test.cc
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <gtest/gtest.h>
+#include <limits>
+
+#include "hbase/client/scan.h"
+
+using hbase::Get;
+using hbase::Scan;
+
+void CheckFamilies(Scan &scan) {
+  EXPECT_EQ(false, scan.HasFamilies());
+  scan.AddFamily("family-1");
+  EXPECT_EQ(true, scan.HasFamilies());
+  EXPECT_EQ(1, scan.FamilyMap().size());
+  for (const auto &family : scan.FamilyMap()) {
+    EXPECT_STREQ("family-1", family.first.c_str());
+    EXPECT_EQ(0, family.second.size());
+  }
+  // Not allowed to add the same CF.
+  scan.AddFamily("family-1");
+  EXPECT_EQ(1, scan.FamilyMap().size());
+  scan.AddFamily("family-2");
+  EXPECT_EQ(2, scan.FamilyMap().size());
+  scan.AddFamily("family-3");
+  EXPECT_EQ(3, scan.FamilyMap().size());
+  int i = 1;
+  for (const auto &family : scan.FamilyMap()) {
+    std::string family_name = "family-" + std::to_string(i);
+    EXPECT_STREQ(family_name.c_str(), family.first.c_str());
+    EXPECT_EQ(0, family.second.size());
+    i += 1;
+  }
+
+  scan.AddColumn("family-1", "column-1");
+  scan.AddColumn("family-1", "column-2");
+  scan.AddColumn("family-1", "");
+  scan.AddColumn("family-1", "column-3");
+  scan.AddColumn("family-2", "column-X");
+
+  EXPECT_EQ(3, scan.FamilyMap().size());
+  auto it = scan.FamilyMap().begin();
+  EXPECT_STREQ("family-1", it->first.c_str());
+  EXPECT_EQ(4, it->second.size());
+  EXPECT_STREQ("column-1", it->second[0].c_str());
+  EXPECT_STREQ("column-2", it->second[1].c_str());
+  EXPECT_STREQ("", it->second[2].c_str());
+  EXPECT_STREQ("column-3", it->second[3].c_str());
+  ++it;
+  EXPECT_STREQ("family-2", it->first.c_str());
+  EXPECT_EQ(1, it->second.size());
+  EXPECT_STREQ("column-X", it->second[0].c_str());
+  ++it;
+  EXPECT_STREQ("family-3", it->first.c_str());
+  EXPECT_EQ(0, it->second.size());
+  ++it;
+  EXPECT_EQ(it, scan.FamilyMap().end());
+}
+
+void CheckFamiliesAfterCopy(const Scan &scan) {
+  EXPECT_EQ(true, scan.HasFamilies());
+  EXPECT_EQ(3, scan.FamilyMap().size());
+  int i = 1;
+  for (const auto &family : scan.FamilyMap()) {
+    std::string family_name = "family-" + std::to_string(i);
+    EXPECT_STREQ(family_name.c_str(), family.first.c_str());
+    i += 1;
+  }
+  // Check if the alreaday added CF's and CQ's are as expected
+  auto it = scan.FamilyMap().begin();
+  EXPECT_STREQ("family-1", it->first.c_str());
+  EXPECT_EQ(4, it->second.size());
+  EXPECT_STREQ("column-1", it->second[0].c_str());
+  EXPECT_STREQ("column-2", it->second[1].c_str());
+  EXPECT_STREQ("", it->second[2].c_str());
+  EXPECT_STREQ("column-3", it->second[3].c_str());
+  ++it;
+  EXPECT_STREQ("family-2", it->first.c_str());
+  EXPECT_EQ(1, it->second.size());
+  EXPECT_STREQ("column-X", it->second[0].c_str());
+  ++it;
+  EXPECT_STREQ("family-3", it->first.c_str());
+  EXPECT_EQ(0, it->second.size());
+  ++it;
+  EXPECT_EQ(it, scan.FamilyMap().end());
+}
+
+void ScanMethods(Scan &scan) {
+  scan.SetReversed(true);
+  EXPECT_EQ(true, scan.IsReversed());
+  scan.SetReversed(false);
+  EXPECT_EQ(false, scan.IsReversed());
+
+  std::string start_row("start-row");
+  std::string stop_row("stop-row");
+  scan.SetStartRow(start_row);
+  EXPECT_EQ(start_row, scan.StartRow());
+
+  scan.SetStopRow(stop_row);
+  EXPECT_EQ(stop_row, scan.StopRow());
+
+  scan.SetCaching(3);
+  EXPECT_EQ(3, scan.Caching());
+
+  scan.SetConsistency(hbase::pb::Consistency::STRONG);
+  EXPECT_EQ(hbase::pb::Consistency::STRONG, scan.Consistency());
+  scan.SetConsistency(hbase::pb::Consistency::TIMELINE);
+  EXPECT_EQ(hbase::pb::Consistency::TIMELINE, scan.Consistency());
+
+  scan.SetCacheBlocks(true);
+  EXPECT_EQ(true, scan.CacheBlocks());
+  scan.SetCacheBlocks(false);
+  EXPECT_EQ(false, scan.CacheBlocks());
+
+  scan.SetAllowPartialResults(true);
+  EXPECT_EQ(true, scan.AllowPartialResults());
+  scan.SetAllowPartialResults(false);
+  EXPECT_EQ(false, scan.AllowPartialResults());
+
+  scan.SetLoadColumnFamiliesOnDemand(true);
+  EXPECT_EQ(true, scan.LoadColumnFamiliesOnDemand());
+  scan.SetLoadColumnFamiliesOnDemand(false);
+  EXPECT_EQ(false, scan.LoadColumnFamiliesOnDemand());
+
+  scan.SetMaxVersions();
+  EXPECT_EQ(1, scan.MaxVersions());
+  scan.SetMaxVersions(20);
+  EXPECT_EQ(20, scan.MaxVersions());
+
+  scan.SetMaxResultSize(1024);
+  EXPECT_EQ(1024, scan.MaxResultSize());
+
+  // Test initial values
+  EXPECT_EQ(0, scan.Timerange().MinTimeStamp());
+  EXPECT_EQ(std::numeric_limits<int64_t>::max(), 
scan.Timerange().MaxTimeStamp());
+
+  // Set & Test new values using TimeRange and TimeStamp
+  scan.SetTimeRange(1000, 2000);
+  EXPECT_EQ(1000, scan.Timerange().MinTimeStamp());
+  EXPECT_EQ(2000, scan.Timerange().MaxTimeStamp());
+  scan.SetTimeStamp(0);
+  EXPECT_EQ(0, scan.Timerange().MinTimeStamp());
+  EXPECT_EQ(1, scan.Timerange().MaxTimeStamp());
+
+  // Test some exceptions
+  ASSERT_THROW(scan.SetTimeRange(-1000, 2000), std::runtime_error);
+  ASSERT_THROW(scan.SetTimeRange(1000, -2000), std::runtime_error);
+  ASSERT_THROW(scan.SetTimeRange(1000, 200), std::runtime_error);
+  ASSERT_THROW(scan.SetTimeStamp(std::numeric_limits<int64_t>::max()), 
std::runtime_error);
+}
+
+TEST(Scan, Object) {
+  Scan scan;
+  ScanMethods(scan);
+  CheckFamilies(scan);
+
+  // Resetting TimeRange values so that the copy construction and assignment
+  // operator tests pass.
+  scan.SetTimeRange(0, std::numeric_limits<int64_t>::max());
+  Scan scancp(scan);
+  ScanMethods(scancp);
+  CheckFamiliesAfterCopy(scancp);
+
+  Scan scaneq;
+  scaneq = scan;
+  ScanMethods(scaneq);
+  CheckFamiliesAfterCopy(scaneq);
+}
+
+TEST(Scan, WithStartRow) {
+  Scan("row-test");
+  Scan scan("row-test");
+  ScanMethods(scan);
+  CheckFamilies(scan);
+}
+
+TEST(Scan, WithStartAndStopRow) {
+  Scan("start-row", "stop-row");
+  Scan scan("start-row", "stop-row");
+  ScanMethods(scan);
+  CheckFamilies(scan);
+}
+
+TEST(Scan, FromGet) {
+  std::string row_str = "row-test";
+  Get get = Get(row_str);
+
+  get.SetCacheBlocks(true);
+  get.SetMaxVersions(5);
+  get.AddFamily("family-1");
+  get.AddFamily("family-1");
+  get.AddFamily("family-2");
+  get.AddFamily("family-3");
+  get.AddColumn("family-1", "column-1");
+  get.AddColumn("family-1", "column-2");
+  get.AddColumn("family-1", "");
+  get.AddColumn("family-1", "column-3");
+  get.AddColumn("family-2", "column-X");
+
+  EXPECT_EQ(3, get.FamilyMap().size());
+
+  Scan scan(get);
+  ScanMethods(scan);
+  CheckFamiliesAfterCopy(scan);
+}
+
+TEST(Scan, Exception) {
+  std::string row(std::numeric_limits<int16_t>::max() + 1, 'X');
+  ASSERT_THROW(Scan tmp(row), std::runtime_error);
+  ASSERT_THROW(Scan tmp(""), std::runtime_error);
+}

Reply via email to